Heads up! This guide focuses on using an out-of-the-box Kubernetes resource. For further customization, use the open_pipes_session approach instead.
In this guide, we’ll show you how to use Dagster Pipes with Dagster’s Kubernetes integration to launch Kubernetes pods and execute external code.
Pipes allows your code to interact with Dagster outside of a full Dagster environment. Instead, the environment only needs to contain dagster-pipes, a single-file Python package with no dependencies that can be installed from PyPI or easily vendored. dagster-pipes handles streaming stdout/stderr and Dagster events back to the orchestration process.
First, you'll write a Python script that uses dagster-pipes and is executed in a container via Kubernetes:
# my_python_script.pyfrom dagster_pipes import open_dagster_pipes
with open_dagster_pipes()as pipes:# Stream log message back to Dagster
pipes.log.info(f"Using some_parameter value: {pipes.get_extra('some_parameter')}")# ... your code that computes and persists the asset
pipes.report_asset_materialization(
metadata={"some_metric":{"raw_value":2,"type":"int"}},
data_version="alpha",)
We're using the default context loader (PipesDefaultContextLoader) and message writer (PipesDefaultMessageWriter) in this example. These objects establish communication between the orchestration and external process. On the orchestration end, these match a corresponding PipesContextInjector and PipesMessageReader, which are instantiated inside the PipesK8sClient.
Next, you'll package the script into a container image using a Dockerfile. For example:
FROM python:3.10-slimRUN pip install dagster-pipesCOPY my_python_script.py .ENTRYPOINT [ "python","my_python_script.py" ]
Then, build the image:
docker build -t pipes-example:v1 .
Note: Depending on the Kubernetes setup you're using, you may need to upload the container image to a registry or otherwise make it available to the cluster. For example: kind load docker-image pipes-example:v1
In this step, you’ll create a Dagster asset that, when materialized, opens a Dagster pipes session and spins up a Kubernetes pod to execute the container created in the previous step.
Provided AssetExecutionContext as the context argument to the asset. This object provides access to system APIs such as resources, config, and logging.
Specified a resource for the asset to use, PipesK8sClient, which is a pre-built Dagster resource that allows you to quickly get Pipes working with Kubernetes.
These arguments are passed to the run method of PipesK8sClient, which submits the provided cluster information to the Kubernetes API and then runs the specified image.
Returned a MaterializeResult object representing the result of execution. This is obtained by calling get_materialize_result on the PipesClientCompletedInvocation object returned by run after the execution in Kubernetes has completed.
Heads up! Depending on your Kubernetes setup, there may be a few additional things you need to do:
If the default behavior doesn't target the correct cluster, supply the load_incluster_config, kubeconfig_file, and kube_context arguments on PipesK8sClient
If you need to alter default spec behaviors, use arguments on PipesK8sClient.run such as base_pod_spec
Step 3: Launch the Kubernetes container from the Dagster UI#
In this step, you’ll run the Kubernetes container you defined in Step 1 from the Dagster UI.
In a new command line session, run the following to start the UI:
dagster dev -f dagster_k8s_pipes.py
Navigate to localhost:3000, where you should see the UI.
Click Materialize near the top right corner of the page, then click View on the Launched Run popup. Wait for the run to complete, and the event log should look like this: