Dagster Pipes is a toolkit for building integrations between Dagster and external execution environments. This reference outlines the APIs included with the dagster
library, which should be used in the orchestration environment.
For a detailed look at the Pipes process, including how to customize it, refer to the Dagster Pipes details and customization guide.
Looking to write code in an external process? Refer to the API reference for the separately-installed dagster-pipes library.
Object representing a pipes session.
A pipes session is defined by a pair of PipesContextInjector
and
PipesMessageReader
objects. At the opening of the session, the context injector
writes context data to an externally accessible location, and the message reader starts
monitoring an externally accessible location. These locations are encoded in parameters stored
on a PipesSession object.
During the session, an external process should be started and the parameters injected into its
environment. The typical way to do this is to call PipesSession.get_bootstrap_env_vars()
and pass the result as environment variables.
During execution, results (e.g. asset materializations) are reported by the external process and buffered on the PipesSession object. The buffer can periodically be cleared and yielded to Dagster machinery by calling yield from PipesSession.get_results().
When the external process exits, the session can be closed. Closing consists of handling any unprocessed messages written by the external process and cleaning up any resources used for context injection and message reading.
context_data (PipesContextData) – The context for the executing op/asset.
message_handler (PipesMessageHandler) – The message handler to use for processing messages
context_injector_params (PipesParams) – Parameters yielded by the context injector, indicating the location from which the external process should load context data.
message_reader_params (PipesParams) – Parameters yielded by the message reader, indicating the location to which the external process should write messages.
created_at (datetime) – The time at which the session was created. Useful as cutoff for reading logs.
Encode context injector and message reader params as CLI arguments.
Passing CLI arguments is an alternative way to expose the pipes I/O parameters to a pipes process. Using environment variables should be preferred when possible.
CLI arguments pass to the external process. The values are serialized as json, compressed with zlib, and then base64-encoded.
Mapping[str, str]
Encode context injector and message reader params as environment variables.
Passing environment variables is the typical way to expose the pipes I/O parameters to a pipes process.
Environment variables to pass to the external process. The values are serialized as json, compressed with gzip, and then base-64-encoded.
Mapping[str, str]
Get the params necessary to bootstrap a launched pipes process. These parameters are typically are as environment variable. See get_bootstrap_env_vars. It is the context injector’s responsibility to decide how to pass these parameters to the external environment.
Parameters to pass to the external process and their corresponding values that must be passed by the context injector.
Mapping[str, str]
Get the sequence of deserialized JSON data that was reported from the external process using report_custom_message.
Returns: Sequence[Any]
PipesExecutionResult
objects only explicitly received from the external process.
Result reported by external process.
Sequence[PipesExecutionResult]
PipesExecutionResult
objects reported from the external process,potentially modified by Pipes.
implicit_materializations (bool) – Create MaterializeResults for expected assets even was nothing is reported from the external process.
metadata (Optional[Mapping[str, MetadataValue]]) – Arbitrary metadata that will be attached to all results generated by the invocation. Useful for attaching information to asset materializations and checks that is available via the external process launch API but not in the external process itself (e.g. a job_id param returned by the launch API call).
Result reported by external process.
Sequence[PipesExecutionResult]
Context manager that opens and closes a pipes session.
This context manager should be used to wrap the launch of an external process using the pipe
protocol to report results back to Dagster. The yielded PipesSession
should be used
to (a) obtain the environment variables that need to be provided to the external process; (b)
access results streamed back from the external process.
This method is an alternative to PipesClient
subclasses for users who want more
control over how pipes processes are launched. When using open_pipes_session, it is the user’s
responsibility to inject the message reader and context injector parameters available on the
yielded PipesSession and pass them to the appropriate API when launching the external process.
Typically these parameters should be set as environment variables.
context (Union[OpExecutionContext, AssetExecutionContext]) – The context for the current op/asset execution.
context_injector (PipesContextInjector) – The context injector to use to inject context into the external process.
message_reader (PipesMessageReader) – The message reader to use to read messages from the external process.
extras (Optional[PipesExtras]) – Optional extras to pass to the external process via the injected context.
PipesSession – Interface for interacting with the external process.
import subprocess
from dagster import open_pipes_session
extras = {"foo": "bar"}
@asset
def ext_asset(context: AssetExecutionContext):
with open_pipes_session(
context=context,
extras={"foo": "bar"},
context_injector=PipesTempFileContextInjector(),
message_reader=PipesTempFileMessageReader(),
) as pipes_session:
subprocess.Popen(
["/bin/python", "/path/to/script.py"],
env={**pipes_session.get_bootstrap_env_vars()}
)
while process.poll() is None:
yield from pipes_session.get_results()
yield from pipes_session.get_results()
Pipes client base class.
Pipes clients for specific external environments should subclass this.
clients must have context and extras arguments, but also can add arbitrary arguments that are appropriate for their own implementation.
context (Union[OpExecutionContext, AssetExecutionContext]) – The context from the executing op/asset.
extras (Optional[PipesExtras]) – Arbitrary data to pass to the external environment.
Wrapper containing results reported by the external process.
PipesClientCompletedInvocation
A pipes client that runs a subprocess with the given command and environment.
By default parameters are injected via environment variables. Context is passed via a temp file, and structured messages are read from from a temp file.
env (Optional[Mapping[str, str]]) – An optional dict of environment variables to pass to the subprocess.
cwd (Optional[str]) – Working directory in which to launch the subprocess command.
context_injector (Optional[PipesContextInjector]) – A context injector to use to inject
context into the subprocess. Defaults to PipesTempFileContextInjector
.
message_reader (Optional[PipesMessageReader]) – A message reader to use to read messages from
the subprocess. Defaults to PipesTempFileMessageReader
.
forward_termination (bool) – Whether to send a SIGINT signal to the subprocess if the orchestration process is interrupted or canceled. Defaults to True.
termination_timeout_seconds (float) – How long to wait after forwarding termination for the subprocess to exit. Defaults to 20.
Synchronously execute a subprocess with in a pipes session.
command (Union[str, Sequence[str]]) – The command to run. Will be passed to subprocess.Popen().
context (Union[OpExecutionContext, AssetExecutionContext]) – The context from the executing op or asset.
extras (Optional[PipesExtras]) – An optional dict of extra parameters to pass to the subprocess.
env (Optional[Mapping[str, str]]) – An optional dict of environment variables to pass to the subprocess.
cwd (Optional[str]) – Working directory in which to launch the subprocess command.
Wrapper containing results reported by the external process.
PipesClientCompletedInvocation
Most Pipes users won’t need to use the APIs in the following sections unless they are customizing the Pipes protocol.
Refer to the Dagster Pipes details and customization guide for more information.
Context injectors write context payloads to an externally accessible location and yield a set of parameters encoding the location for inclusion in the bootstrap payload.
Context injector that injects context data into the external process by injecting it directly into the external process environment.
Message readers read messages (and optionally log files) from an externally accessible location and yield a set of parameters encoding the location in the bootstrap payload.
Message reader that reads a sequence of message chunks written by an external process into a blob store such as S3, Azure blob storage, or GCS.
The reader maintains a counter, starting at 1, that is synchronized with a message writer in
some pipes process. The reader starts a thread that periodically attempts to read a chunk
indexed by the counter at some location expected to be written by the pipes process. The chunk
should be a file with each line corresponding to a JSON-encoded pipes message. When a chunk is
successfully read, the messages are processed and the counter is incremented. The
PipesBlobStoreMessageWriter
on the other end is expected to similarly increment a
counter (starting from 1) on successful write, keeping counters on the read and write end in
sync.
If log_readers is passed, the message reader will start the passed log readers when the opened message is received from the external process.
interval (float) – interval in seconds between attempts to download a chunk
log_readers (Optional[Sequence[PipesLogReader]]) – A set of log readers to use to read logs.
Message reader that reads messages by tailing a specified file.
path (str) – The path of the file to which messages will be written. The file will be deleted on close of the pipes session.
Message reader that reads messages by tailing an automatically-generated temporary file.
Class to process PipesMessage
objects received from a pipes process.
context (Union[OpExecutionContext, AssetExecutionContext]) – The context for the executing op/asset.
message_reader (PipesMessageReader) – The message reader used to read messages from the external process.