Mastering Advanced Pipeline Design: Conditional Execution and Loops in Kubeflow

Kubeflow Pipelines provide a powerful platform for building, deploying, and managing machine learning workflows. To create more complex and dynamic pipelines, you may need to use conditional execution and loops. In this tutorial, we will guide you through the process of implementing conditional execution and loops in Kubeflow Pipelines using Python.
Step 1: Define a Conditional Execution Function
To demonstrate conditional execution in Kubeflow Pipelines, we will create a simple pipeline that processes input data depending on a condition. First, let’s define a Python function for the conditional execution:
from typing import NamedTuple
from kfp.components import create_component_from_func
def process_data_conditional(input_data: str, condition: str) -> NamedTuple("Outputs", [("output_data", str)]):
import json
from collections import namedtuple
if condition == "uppercase":
output_data = input_data.upper()
elif condition == "lowercase":
output_data = input_data.lower()
else:
output_data = input_data
Outputs = namedtuple("Outputs", ["output_data"])
return Outputs(output_data)
process_data_conditional_component = create_component_from_func(process_data_conditional, output_component_file="process_data_conditional_component.yaml")
This function takes an input string and a condition as arguments. Depending on the condition, the input data will be converted to uppercase, lowercase, or remain unchanged.
Step 2: Implement the Pipeline with Conditional Execution
Now, let’s create a pipeline that uses the process_data_conditional
function:
import kfp
from kfp import dsl
@dsl.pipeline(
name="Conditional Execution Pipeline",
description="A pipeline that demonstrates conditional execution."
)
def conditional_pipeline(input_data: str = "Hello, Kubeflow!", condition: str = "uppercase"):
process_data = process_data_conditional_component(input_data, condition)
if __name__ == "__main__":
kfp.compiler.Compiler().compile(conditional_pipeline, "conditional_pipeline.yaml")
In this pipeline, the process_data_conditional
function is called with the input data and condition provided as arguments.
Step 3: Upload and Run the Pipeline with Different Conditions
- Access the Kubeflow Pipelines dashboard by navigating to the URL provided during the setup process.
- Click on the “Pipelines” tab in the left-hand sidebar.
- Click the “Upload pipeline” button in the upper right corner.
- In the “Upload pipeline” dialog, click “Browse” and select the
conditional_pipeline.yaml
file generated in the previous step. - Click “Upload” to upload the pipeline to the Kubeflow platform.
- Once the pipeline is uploaded, click on its name to open the pipeline details page.
- Click the “Create run” button to start a new run of the pipeline.
- On the “Create run” page, you can give your run a name and choose a pipeline version. Set the “input_data” and “condition” arguments to test different conditions (e.g., “uppercase”, “lowercase”, or “unchanged”).
- Click “Start” to begin the pipeline run.
Step 4: Add a Loop to the Pipeline
To demonstrate how to add loops in Kubeflow Pipelines, we will modify our pipeline to process a list of input data and conditions. First, let’s update the conditional_pipeline
function:
import kfp
from kfp import dsl
@dsl.pipeline(
name="Conditional Execution with Loop Pipeline",
description="A pipeline that demonstrates conditional execution and looping."
)
def conditional_loop_pipeline(input_data_list: str, condition_list: str):
input_data_list = json.loads(input_data_list)
condition_list = json.loads(condition_list)
with dsl.ParallelFor(input_data_list) as item:
for condition in condition_list:
process_data = process_data_conditional_component(item, condition)
if __name__ == "__main__":
kfp.compiler.Compiler().compile(conditional_loop_pipeline, "conditional_loop_pipeline.yaml")
In this updated pipeline, we use the dsl.ParallelFor
construct to loop over the input data list. For each item in the input data list, we loop over the condition list and call the process_data_conditional_component
with the item and condition as arguments.
Step 5: Upload and Run the Pipeline with a List of Input Data and Conditions
- Access the Kubeflow Pipelines dashboard by navigating to the URL provided during the setup process.
- Click on the “Pipelines” tab in the left-hand sidebar.
- Click the “Upload pipeline” button in the upper right corner.
- In the “Upload pipeline” dialog, click “Browse” and select the
conditional_loop_pipeline.yaml
file generated in the previous step. - Click “Upload” to upload the pipeline to the Kubeflow platform.
- Once the pipeline is uploaded, click on its name to open the pipeline details page.
- Click the “Create run” button to start a new run of the pipeline.
- On the “Create run” page, you can give your run a name and choose a pipeline version. Set the “input_data_list” and “condition_list” arguments to JSON-encoded lists of input data and conditions (e.g., ‘[“Hello, Kubeflow!”, “Machine Learning”]’ and ‘[“uppercase”, “lowercase”]’).
- Click “Start” to begin the pipeline run.
In this tutorial, we covered how to implement conditional execution and loops in Kubeflow Pipelines using Python. With these advanced pipeline design techniques, you can create more complex and dynamic machine learning workflows, enabling greater flexibility and control over your ML experiments. As you continue to work with Kubeflow Pipelines, you can explore other advanced features to further enhance your machine learning workflows.
Lyron Foster is a Hawai’i based African American Author, Musician, Actor, Blogger, Philanthropist and Multinational Serial Tech Entrepreneur.