Ask AI

Pipes

Abstractions for the orchestration side of the Dagster Pipes protocol.

class dagster.PipesClient(*args, **kwargs)[source]

experimental This API may break in future versions, even between dot releases.

Pipes client base class.

Pipes clients for specific external environments should subclass this.

abstract run(*, context, extras=None, **kwargs)[source]
Synchronously execute an external process with the pipes protocol. Derived

clients must have context and extras arguments, but also can add arbitrary arguments that are appropriate for their own implementation.

Parameters:
  • context (OpExecutionContext) – The context from the executing op/asset.

  • extras (Optional[PipesExtras]) – Arbitrary data to pass to the external environment.

Returns:

Wrapper containing results reported by the external process.

Return type:

PipesClientCompletedInvocation

dagster.PipesSubprocessClient

alias of _PipesSubprocess[_PipesSubprocess]

class dagster.PipesContextInjector(*args, **kwargs)[source]

experimental This API may break in future versions, even between dot releases.

class dagster.PipesMessageReader(*args, **kwargs)[source]

experimental This API may break in future versions, even between dot releases.

class dagster.PipesMessageHandler(context, message_reader)[source]

experimental This API may break in future versions, even between dot releases.

Class to process PipesMessage objects received from a pipes process.

Parameters:
  • context (OpExecutionContext) – The context for the executing op/asset.

  • message_reader (PipesMessageReader) – The message reader used to read messages from the external process.

class dagster.PipesSession(context_data, message_handler, context_injector_params, message_reader_params, context)[source]

experimental This API may break in future versions, even between dot releases.

Object representing a pipes session.

A pipes session is defined by a pair of PipesContextInjector and PipesMessageReader objects. At the opening of the session, the context injector writes context data to an externally accessible location, and the message reader starts monitoring an externally accessible location. These locations are encoded in parameters stored on a PipesSession object.

During the session, an external process should be started and the parameters injected into its environment. The typical way to do this is to call PipesSession.get_bootstrap_env_vars() and pass the result as environment variables.

During execution, results (e.g. asset materializations) are reported by the external process and buffered on the PipesSession object. The buffer can periodically be cleared and yielded to Dagster machinery by calling yield from PipesSession.get_results().

When the external process exits, the session can be closed. Closing consists of handling any unprocessed messages written by the external process and cleaning up any resources used for context injection and message reading.

Parameters:
  • context_data (PipesContextData) – The context for the executing op/asset.

  • message_handler (PipesMessageHandler) – The message handler to use for processing messages

  • context_injector_params (PipesParams) – Parameters yielded by the context injector, indicating the location from which the external process should load context data.

  • message_reader_params (PipesParams) – Parameters yielded by the message reader, indicating the location to which the external process should write messages.

get_bootstrap_env_vars()[source]

Encode context injector and message reader params as environment variables.

Passing environment variables is the typical way to expose the pipes I/O parameters to a pipes process.

Returns:

Environment variables to pass to the external process. The values are serialized as json, compressed with gzip, and then base-64-encoded.

Return type:

Mapping[str, str]

get_bootstrap_params()[source]

Get the params necessary to bootstrap a launched pipes process. These parameters are typically are as environment variable. See get_bootstrap_env_vars. It is the context injector’s responsibility to decide how to pass these parameters to the external environment.

Returns:

Parameters to pass to the external process and their corresponding values that must be passed by the context injector.

Return type:

Mapping[str, str]

get_custom_messages()[source]

Get the sequence of deserialized JSON data that was reported from the external process using report_custom_message.

Returns: Sequence[Any]

get_reported_results()[source]

PipesExecutionResult objects only explicitly received from the external process.

Returns:

Result reported by external process.

Return type:

Sequence[PipesExecutionResult]

get_results(*, implicit_materializations=True)[source]

PipesExecutionResult objects reported from the external process.

Parameters:

implicit_materializations (bool) – Create MaterializeResults for expected assets even was nothing is reported from the external process.

Returns:

Result reported by external process.

Return type:

Sequence[PipesExecutionResult]

class dagster.PipesBlobStoreMessageReader(interval=10, log_readers=None)[source]

experimental This API may break in future versions, even between dot releases.

Message reader that reads a sequence of message chunks written by an external process into a blob store such as S3, Azure blob storage, or GCS.

The reader maintains a counter, starting at 1, that is synchronized with a message writer in some pipes process. The reader starts a thread that periodically attempts to read a chunk indexed by the counter at some location expected to be written by the pipes process. The chunk should be a file with each line corresponding to a JSON-encoded pipes message. When a chunk is successfully read, the messages are processed and the counter is incremented. The PipesBlobStoreMessageWriter on the other end is expected to similarly increment a counter (starting from 1) on successful write, keeping counters on the read and write end in sync.

If log_readers is passed, the message reader will start the passed log readers when the opened message is received from the external process.

Parameters:
  • interval (float) – interval in seconds between attempts to download a chunk

  • log_readers (Optional[Sequence[PipesLogReader]]) – A set of readers for logs.

class dagster.PipesEnvContextInjector(*args, **kwargs)[source]

experimental This API may break in future versions, even between dot releases.

Context injector that injects context data into the external process by injecting it directly into the external process environment.

class dagster.PipesFileContextInjector(path)[source]

experimental This API may break in future versions, even between dot releases.

Context injector that injects context data into the external process by writing it to a specified file.

Parameters:

path (str) – The path of a file to which to write context data. The file will be deleted on close of the pipes session.

class dagster.PipesFileMessageReader(path)[source]

experimental This API may break in future versions, even between dot releases.

Message reader that reads messages by tailing a specified file.

Parameters:

path (str) – The path of the file to which messages will be written. The file will be deleted on close of the pipes session.

class dagster.PipesTempFileContextInjector(*args, **kwargs)[source]

experimental This API may break in future versions, even between dot releases.

Context injector that injects context data into the external process by writing it to an automatically-generated temporary file.

class dagster.PipesTempFileMessageReader(*args, **kwargs)[source]

experimental This API may break in future versions, even between dot releases.

Message reader that reads messages by tailing an automatically-generated temporary file.

dagster.open_pipes_session(context, context_injector, message_reader, extras=None)[source]

experimental This API may break in future versions, even between dot releases.

Context manager that opens and closes a pipes session.

This context manager should be used to wrap the launch of an external process using the pipe protocol to report results back to Dagster. The yielded PipesSession should be used to (a) obtain the environment variables that need to be provided to the external process; (b) access results streamed back from the external process.

This method is an alternative to PipesClient subclasses for users who want more control over how pipes processes are launched. When using open_pipes_session, it is the user’s responsibility to inject the message reader and context injector parameters available on the yielded PipesSession and pass them to the appropriate API when launching the external process. Typically these parameters should be set as environment variables.

Parameters:
  • context (OpExecutionContext) – The context for the current op/asset execution.

  • context_injector (PipesContextInjector) – The context injector to use to inject context into the external process.

  • message_reader (PipesMessageReader) – The message reader to use to read messages from the external process.

  • extras (Optional[PipesExtras]) – Optional extras to pass to the external process via the injected context.

Yields:

PipesSession – Interface for interacting with the external process.

import subprocess
from dagster import open_pipes_session

extras = {"foo": "bar"}

@asset
def ext_asset(context: OpExecutionContext):
    with open_pipes_session(
        context=context,
        extras={"foo": "bar"},
        context_injector=ExtTempFileContextInjector(),
        message_reader=ExtTempFileMessageReader(),
    ) as pipes_session:
        subprocess.Popen(
            ["/bin/python", "/path/to/script.py"],
            env={**pipes_session.get_bootstrap_env_vars()}
        )
        while process.poll() is None:
            yield from pipes_session.get_results()

    yield from pipes_session.get_results()