Ask AI

Dagster Pipes details and customization#

Dagster Pipes is a toolkit for integrating Dagster with an arbitrary external compute environment. While many users will be well-served by the simplified interface offered by Pipes client objects (e.g. PipesSubprocessClient, PipesDatabricksClient), others will need a greater level of control over Pipes. This is particularly the case for users seeking to connect large existing codebases to Dagster.

This guide will cover the lower level Pipes APIs and how you can compose them to provide a custom solution for your data platform.


Overview and terms#

Detailed overview of a Dagster Pipes session
TermDefinition
External environmentAn environment external to Dagster, for example: Databricks, Kubernetes, Docker.
Orchestration processA process running Dagster code to materialize an asset or execute an op.
External processA process running in an external environment, from which log output and Dagster events can be reported back to the orchestration process. The orchestration process must launch the external process.
Bootstrap payloadA small bundle of key/value pairs that is written by the orchestration process to some globally accessible key-value store in the external process. Typically the bootstrap payload will be written in environment variables, but another mechanism may be used for external environments that do not support setting environment variables.
Context payloadA JSON object containing information derived from the execution context (OpExecutionContext or AssetExecutionContext ) in the orchestration process. This includes in-scope asset keys, partition keys, etc. The context payload is written by the orchestration process to some location accessible to the external process. The external process obtains the location of the context payload (e.g. an object URL on Amazon S3) from the bootstrap payload and reads the context payload.
MessagesJSON objects written by the external process for consumption by the orchestration process. Messages can report asset materializations and check results as well as trigger orchestration-side logging.
LogsLog files generated by the external process, including but not limited to logged stdout/stderr streams.
Params loaderAn entity in the external process that reads the bootstrap payload from some globally accessible key-value store. The default params loader reads the bootstrap payload from environment variables.
Context injectorAn entity in the orchestration process that writes the context payload to an externally accessible location and yields a set of parameters encoding this location for inclusion in the bootstrap payload.
Context loaderAn entity in the external process that loads the context payload from the location specified in the bootstrap payload.
Message readerAn entity in the orchestration process that reads messages (and optionally log files) from an externally accessible location and yields a set of parameters encoding this location in the bootstrap payload.
Message writerAn entity in the external process that writes messages to the location specified in the bootstrap payload.

Pipes session#

A Pipes session is the time spanning:

  1. The creation of communications channels between the orchestration and external process.
  2. The launching and terminating of the external process.
  3. The reading of all messages reported by the external process and the closing of communications channels.

There are separate APIs for interacting with a Pipes session in the orchestration and external processes. The orchestration process API is defined in dagster. The external process API is defined by a Pipes integration library that is loaded by user code in the external process. This library knows how to interpret the bootstrap payload and spin up a context loader and message writer.

At present the only official Dagster Pipes integration library is Python’s dagster-pipes, available on PyPI. The library has no dependencies and fits in a single file, so it may also be trivially vendored.

Session lifecycle (orchestration process)#

Pipes sessions are represented in the orchestration process by the PipesSession class. A session is started with the open_pipes_session context manager, which yields a PipesSession. open_pipes_session should be called inside of an asset or op compute function - somewhere an OpExecutionContext or AssetExecutionContext is available:

### ORCHESTRATION PROCESS

from typing import Iterator

# `third_party_api` is a fictional package representing a third-party library (or user code)
# providing APIs for launching and polling a process in some external environment.
from third_party_api import (
    is_external_process_done,
    launch_external_process,
)

from dagster import (
    AssetExecutionContext,
    PipesExecutionResult,
    PipesTempFileContextInjector,
    PipesTempFileMessageReader,
    asset,
    open_pipes_session,
)


@asset
def some_pipes_asset(context: AssetExecutionContext) -> Iterator[PipesExecutionResult]:
    with open_pipes_session(
        context=context,
        extras={"foo": "bar"},
        context_injector=PipesTempFileContextInjector(),
        message_reader=PipesTempFileMessageReader(),
    ) as pipes_session:
        # Get the bootstrap payload encoded as a Dict[str, str] suitable for passage as environment
        # variables.
        env_vars = pipes_session.get_bootstrap_env_vars()

        # `launch_external_process` is responsible for including the passed `env_vars` in the
        # launched external process.
        external_process = launch_external_process(env_vars)

        # Continually poll the external process and stream any incrementally received messages back
        # to Dagster
        while not is_external_process_done(external_process):
            yield from pipes_session.get_results()

    # Yield any remaining results received from the external process.
    yield from pipes_session.get_results()

Above we see that open_pipes_session takes four parameters:

  • context: An execution context (OpExecutionContext or AssetExecutionContext) that will be used to derive the context payload.
  • extras: A bundle of key-value pairs in the form of a JSON-serializable dictionary. This is slotted into the context payload. Users can pass arbitrary data here that they want to expose to the external process.
  • context_injector: A context injector responsible for writing the serialized context payload to some location and expressing that location as bootstrap parameters for consumption by the external process. Above we used the built-in (and default) PipesTempFileContextInjector, which writes the serialized context payload to an automatically created local temp file and exposes the path to that file as a bootstrap parameter.
  • message_reader: A message reader responsible for reading streaming messages and log files written to some location, and expressing that location as bootstrap parameters for consumption by the external process. Above we used the built-in (and default) PipesTempFileMessageReader, which tails an automatically created local temp file and exposes the path to that file as a bootstrap parameter.

Python context manager invocations have three parts:

  1. An opening routine (__enter__, executed at the start of a with block).
  2. A body (user code nested in a with block).
  3. A closing routine (__exit__, executed at the end of a with block).

For open_pipes_session, these three parts perform the following tasks:

  • Opening routine: Writes the context payload and spins up the message reader (which usually involves starting a thread to continually read messages). These steps may involve the creation of resources, such as a temporary file (locally or on some remote system) for the context payload or a temporary directory to which messages will be written.
  • Body: User code should handle launching, polling, and termination of the external process here. While the external process is executing, any intermediate results that have been received can be reported to Dagster with yield from pipes_session.get_results().
  • Closing routine: Ensures that all messages written by the external process have been read into the orchestration process and cleans up any resources used by the context injector and message reader.

Session lifecycle (external process)#

As noted above, currently the only existing Pipes integration library is Python’s dagster-pipes. The below example therefore uses Python and dagster-pipes. In the future we will be releasing dagster-pipes equivalents for selected other languages. and the concepts illustrated here should map straightforwardly to these other integration libraries.

A Pipes session is represented in the external process by a PipesContext object. A session created by the launching orchestration process can be connected to with open_dagster_pipes from dagster-pipes:

### EXTERNAL PROCESS

from dagster_pipes import (
    PipesDefaultContextLoader,
    PipesDefaultMessageWriter,
    PipesEnvVarParamsLoader,
    open_dagster_pipes,
)

# `user_code` is a fictional package providing pre-existing business logic for assets.
from user_code import get_data_version, get_metric

with open_dagster_pipes(
    params_loader=PipesEnvVarParamsLoader(),
    context_loader=PipesDefaultContextLoader(),
    message_writer=PipesDefaultMessageWriter(),
) as pipes:
    # Equivalent of calling `context.log.info` on the orchestration side.
    # Streams log message back to orchestration process.
    pipes.log.info(f"materializing asset {pipes.asset_key}")

    # ... business logic

    # Creates a `MaterializeResult` on the orchestration side. Notice no value for the asset is
    # included. Pipes only supports reporting that a materialization occurred and associated
    # metadata.
    pipes.report_asset_materialization(
        metadata={"some_metric": {"raw_value": get_metric(), "type": "text"}},
        data_version=get_data_version(),
    )

Above we see that open_dagster_pipes takes three parameters:

  • params_loader: A params loader responsible for loading the bootstrap payload injected into the external process at launch. The standard approach is to inject the bootstrap payload into predetermined environment variables that the PipesEnvVarParamsLoader knows how to read. However, a different bootstrap parameter loader can be substituted in environments where it is not possible to modify environment variables.
  • context_loader: A context loader responsible for loading the context payload from a location specified in the bootstrap payload. Above we use PipesDefaultContextLoader, which will look for a path key in the bootstrap params for a file path to target. The PipesTempFileContextInjector used earlier on the orchestration side writes this path key, but the PipesDefaultContextLoader does not otherwise depend on a specific context injector.
  • message_writer: A message writer responsible for writing streaming messages to a location specified in the bootstrap payload. Above we use PipesDefaultMessageWriter, which will look for a path key in the bootstrap params for a file path to target. The PipesTempFileMessageReader used earlier on the orchestration side writes this path key, but the PipesDefaultMessageWriter does not otherwise depend on a specific context injector.

As with the orchestration-side open_pipes_session, open_dagster_pipes is a context manager. Its three parts perform the following functions:

  • Opening routine: Reads the bootstrap payload from the environment and then the context payload. Spins up the message writer, which may involve starting a thread to periodically write buffered messages.
  • Body: Business logic goes here, and can use the yielded PipesContext (in the pipes variable above) to read context information or write messages.
  • Closing routine: Ensures that any messages submitted by business logic have been written before the process exits. This is necessary because some message writers buffer messages between writes.

Customization#

Users may implement custom params loaders, context loader/injector pairs, and message reader/writer pairs. Any of the above may be necessary if you’d like to use Dagster Pipes in an environment for which Dagster does not currently ship a compatible context loader/injector or message reader/writer.

Custom params loader#

Params loaders need to inherit from PipesParamsLoader. Here is an example that loads parameters from an object called METADATA imported from a fictional package called cloud_service. It is assumed that "cloud service" represents some compute platform, that the cloud_service package is available in the environment, and that the API for launching processes in “cloud service” allows you to set arbitrary key-value pairs in a payload that is exposed as cloud_service.METADATA.

### EXTERNAL PROCESS

from cloud_service import METADATA
from dagster_pipes import (
    DAGSTER_PIPES_CONTEXT_ENV_VAR,
    DAGSTER_PIPES_MESSAGES_ENV_VAR,
    PipesParams,
    PipesParamsLoader,
)


class MyCustomParamsLoader(PipesParamsLoader):
    def is_dagster_pipes_process(self) -> bool:
        return DAGSTER_PIPES_CONTEXT_ENV_VAR in METADATA

    def load_context_params(self) -> PipesParams:
        return METADATA[DAGSTER_PIPES_CONTEXT_ENV_VAR]

    def load_messages_params(self) -> PipesParams:
        return METADATA[DAGSTER_PIPES_MESSAGES_ENV_VAR]

Custom context injector/loader#

Context injectors must inherit from dagster.PipesContextInjector and context loaders from dagster_pipes.PipesContextLoader.

In general if you are implementing a custom variant of one, you will want to implement a matching variant of the other. Below is a simple example that uses a fictional cloud_service key/value store to write the context. First the context injector:

### ORCHESTRATION PROCESS

import json
import random
import string
from contextlib import contextmanager
from typing import Iterator

import cloud_service
from dagster_pipes import PipesContextData, PipesParams

from dagster import PipesContextInjector


class MyCustomCloudServiceContextInjector(PipesContextInjector):
    # Note that `PipesContextData` corresponds to what this document
    # calls the "context payload"-- a JSON-serializable dictionary with context info.
    @contextmanager
    def inject_context(self, context_data: "PipesContextData") -> Iterator[PipesParams]:
        key = "".join(random.choices(string.ascii_letters, k=30))
        cloud_service.write(key, json.dumps(context_data))
        yield {"key": key}

    def no_messages_debug_text(self) -> str:
        return (
            "Attempted to inject context using a `cloud_service`. Expected"
            " `MyCustomCloudServiceContextLoader` to be explicitly passed to `open_dagster_pipes`"
            " in the external process."
        )

And the context loader:

### EXTERNAL PROCESS

import json
from contextlib import contextmanager
from typing import Iterator

import cloud_service
from dagster_pipes import PipesContextData, PipesContextLoader, PipesParams


class MyCustomCloudServiceContextLoader(PipesContextLoader):
    @contextmanager
    def load_context(self, params: PipesParams) -> Iterator[PipesContextData]:
        # params were yielded by the above context injector and sourced from the bootstrap payload
        key = params["key"]
        data = cloud_service.read(key)
        yield json.loads(data)

Custom message reader/writer#

The message reader/writer is responsible for handling log files written by the external process as well as messages. However, the APIs for customizing log file handling are still in flux, so they are not covered in this guide. We will update the guide with instructions for customizing log handling as soon as these questions are resolved.

Message readers must inherit from dagster.PipesMessageReader and message writers from dagster_pipes.PipesMessageWriter.

In general if you are implementing a custom variant of one, you will want to implement a matching variant of the other. Furtheremore, message writers internally create a PipesMessageWriterChannel subcomponent for which you will likely also need to implement a custom variant-- see below for details.

Below is a simple example that uses a fictional cloud_service key/value store as a storage layer for message chunks. This example is a little more sophisticated than the context injector/loader example because we are going to inherit from PipesBlobStoreMessageReader and PipesBlobStoreMessageWriter instead of the plain abstract base classes. The blob store reader/writer provide infrastructure for chunking messages. Messages are buffered on the writer and uploaded in chunks at a fixed interval (defaulting to 10 seconds). The reader similarly attempts to download message chunks at a fixed interval (defaulting to 10 seconds). This prevents the need to read/write a cloud service blob store for every message (which could get expensive).

First, the message reader:

### ORCHESTRATION PROCESS

import os
import string
from random import random
from typing import Iterator, Optional

import cloud_service
from dagster_pipes import PipesParams

from dagster import PipesBlobStoreMessageReader


class MyCustomCloudServiceMessageReader(PipesBlobStoreMessageReader):
    def get_params(self) -> Iterator[PipesParams]:
        # generate a random key prefix to write message chunks under on the cloud service
        key_prefix = "".join(random.choices(string.ascii_letters, k=30))
        yield {"key_prefix": key_prefix}

    def download_messages_chunk(self, index: int, params: PipesParams) -> Optional[str]:
        message_path = os.path.join(params["path"], f"{index}.json")
        raw_message = cloud_service.read(message_path)
        return raw_message

    def no_messages_debug_text(self) -> str:
        return (
            "Attempted to read messages from a `cloud_service`. Expected"
            " MyCustomCloudServiceMessageWriter to be explicitly passed to `open_dagster_pipes` in"
            " the external process."
        )

And the message writer:

### EXTERNAL PROCESS

import json
from typing import IO

import cloud_service
from dagster_pipes import (
    PipesBlobStoreMessageWriter,
    PipesBlobStoreMessageWriterChannel,
    PipesParams,
)


class MyCustomCloudServiceMessageWriter(PipesBlobStoreMessageWriter):
    def make_channel(
        self, params: PipesParams
    ) -> "MyCustomCloudServiceMessageWriterChannel":
        # params were yielded by the above message reader and sourced from the bootstrap payload
        key_prefix = params["key_prefix"]
        return MyCustomCloudServiceMessageWriterChannel(key_prefix=key_prefix)


class MyCustomCloudServiceMessageWriterChannel(PipesBlobStoreMessageWriterChannel):
    def __init__(self, key_prefix: str):
        super().__init__()
        self.key_prefix = key_prefix

    # This will be called periodically to upload any buffered messages
    def upload_messages_chunk(self, payload: IO, index: int) -> None:
        key = f"{self.key_prefix}/{index}.json"
        cloud_service.write(key, json.dumps(payload.read()))