Pipes (dagster-pipes)
The dagster-pipes
library is intended for inclusion in an external process that integrates with Dagster using the Pipes protocol. This could be in an environment like Databricks, Kubernetes, or Docker. Using this library, you can write code in the external process that streams metadata back to Dagster.
For a detailed look at the Pipes process, including how to customize it, refer to the Dagster Pipes details and customization guide.
Looking to set up a Pipes client in Dagster? Refer to the Dagster Pipes API reference.
Note: This library isn’t included with dagster
and must be installed separately.
Context
- dagster_pipes.open_dagster_pipes
Initialize the Dagster Pipes context.
This function should be called near the entry point of a pipes process. It will load injected context information from Dagster and spin up the machinery for streaming messages back to Dagster.
If the process was not launched by Dagster, this function will emit a warning and return a MagicMock object. This should make all operations on the context no-ops and prevent your code from crashing.
Parameters:
- context_loader (Optional[PipesContextLoader]) – The context loader to use. Defaults to
PipesDefaultContextLoader
. - message_writer (Optional[PipesMessageWriter]) – The message writer to use. Defaults to
PipesDefaultMessageWriter
. - params_loader (Optional[PipesParamsLoader]) – The params loader to use. Defaults to
PipesEnvVarParamsLoader
.
Returns: The initialized context.Return type: PipesContext
- context_loader (Optional[PipesContextLoader]) – The context loader to use. Defaults to
class
dagster_pipes.PipesContextThe context for a Dagster Pipes process.
This class is analogous to
OpExecutionContext
on the Dagster side of the Pipes connection. It provides access to information such as the asset key(s) and partition key(s) in scope for the current step. It also provides methods for logging and emitting results that will be streamed back to Dagster.This class should not be directly instantiated by the user. Instead it should be initialized by calling
open_dagster_pipes()
, which will return the singleton instance of this class. After open_dagster_pipes() has been called, the singleton instance can also be retrieved by callingPipesContext.get()
.classmethod
getGet the singleton instance of the context. Raises an error if the context has not been initialized.
classmethod
is_initializedbool: Whether the context has been initialized.
classmethod
setSet the singleton instance of the context.
- close
Close the pipes connection. This will flush all buffered messages to the orchestration process and cause any further attempt to write a message to raise an error. This method is idempotent– subsequent calls after the first have no effect.
- get_extra
Get the value of an extra provided by the user. Raises an error if the extra is not defined.
Parameters: key (str) – The key of the extra.Returns: The value of the extra.Return type: Any
- report_asset_check
Report to Dagster that an asset check has been performed. Streams a payload containing check result information back to Dagster. If no assets or associated checks are in scope, raises an error.
Parameters:
- check_name (str) – The name of the check.
- passed (bool) – Whether the check passed.
- severity (PipesAssetCheckSeverity) – The severity of the check. Defaults to “ERROR”.
- metadata (Optional[Mapping[str, Union[PipesMetadataRawValue, PipesMetadataValue]]]) – Metadata for the check. Defaults to None.
- asset_key (Optional[str]) – The asset key for the check. If only a single asset is in scope, default to that asset’s key. If multiple assets are in scope, this must be set explicitly or an error will be raised.
- report_asset_materialization
Report to Dagster that an asset has been materialized. Streams a payload containing materialization information back to Dagster. If no assets are in scope, raises an error.
Parameters:
- metadata (Optional[Mapping[str, Union[PipesMetadataRawValue, PipesMetadataValue]]]) – Metadata for the materialized asset. Defaults to None.
- data_version (Optional[str]) – The data version for the materialized asset. Defaults to None.
- asset_key (Optional[str]) – The asset key for the materialized asset. If only a single asset is in scope, default to that asset’s key. If multiple assets are in scope, this must be set explicitly or an error will be raised.
- report_custom_message
Send a JSON serializable payload back to the orchestration process. Can be retrieved there using get_custom_messages.
Parameters: payload (Any) – JSON serializable data.
property
asset_keyThe AssetKey for the currently scoped asset. Raises an error if 0 or multiple assets are in scope.
Type: str
property
asset_keysThe AssetKeys for the currently scoped assets. Raises an error if no assets are in scope.
Type: Sequence[str]
property
code_versionThe code version for the currently scoped asset. Raises an error if 0 or multiple assets are in scope.
Type: Optional[str]
property
code_version_by_asset_keyMapping of asset key to code version for the currently scoped assets. Raises an error if no assets are in scope.
Type: Mapping[str, Optional[str]]
property
extrasKey-value map for all extras provided by the user.
Type: Mapping[str, Any]
property
is_asset_stepWhether the current step targets assets.
Type: bool
property
is_closedWhether the context has been closed.
Type: bool
property
is_partition_stepWhether the current step is scoped to one or more partitions.
Type: bool
property
job_nameThe job name for the currently executing run. Returns None if the run is not derived from a job.
Type: Optional[str]
property
logA logger that streams log messages back to Dagster.
Type: logging.Logger
property
partition_keyThe partition key for the currently scoped partition. Raises an error if 0 or multiple partitions are in scope.
Type: str
property
partition_key_rangeThe partition key range for the currently scoped partition or partitions. Raises an error if no partitions are in scope.
Type: PipesPartitionKeyRange
property
partition_time_windowThe partition time window for the currently scoped partition or partitions. Returns None if partitions in scope are not temporal. Raises an error if no partitions are in scope.
Type: Optional[PipesTimeWindow]
property
provenanceThe provenance for the currently scoped asset. Raises an error if 0 or multiple assets are in scope.
Type: Optional[PipesDataProvenance]
property
provenance_by_asset_keyMapping of asset key to provenance for the currently scoped assets. Raises an error if no assets are in scope.
Type: Mapping[str, Optional[PipesDataProvenance]]
property
retry_numberThe retry number for the currently executing run.
Type: int
property
run_idThe run ID for the currently executing pipeline run.
Type: str
Advanced
Most Pipes users won’t need to use the APIs in the following sections unless they are customizing the Pipes protocol.
Refer to the Dagster Pipes details and customization guide for more information.
Context loaders
Context loaders load the context payload from the location specified in the bootstrap payload.
class
dagster_pipes.PipesContextLoaderabstractmethod
load_contextA @contextmanager that loads context data injected by the orchestration process.
This method should read and yield the context data from the location specified by the passed in PipesParams.
Parameters: params (PipesParams) – The params provided by the context injector in the orchestration process.Yields: PipesContextData – The context data.
class
dagster_pipes.PipesDefaultContextLoaderContext loader that loads context data from either a file or directly from the provided params.
The location of the context data is configured by the params received by the loader. If the params include a key path, then the context data will be loaded from a file at the specified path. If the params instead include a key data, then the corresponding value should be a dict representing the context data.
- load_context
A @contextmanager that loads context data injected by the orchestration process.
This method should read and yield the context data from the location specified by the passed in PipesParams.
Parameters: params (PipesParams) – The params provided by the context injector in the orchestration process.Yields: PipesContextData – The context data.
class
dagster_pipes.PipesS3ContextLoaderContext loader that reads context from a JSON file on S3.
Parameters: client (Any) – A boto3.client(“s3”) object.
- load_context
A @contextmanager that loads context data injected by the orchestration process.
This method should read and yield the context data from the location specified by the passed in PipesParams.
Parameters: params (PipesParams) – The params provided by the context injector in the orchestration process.Yields: PipesContextData – The context data.
class
dagster_pipes.PipesGCSContextLoaderContext loader that reads context from a JSON file on GCS.
Parameters: client (google.cloud.storage.Client) – A google.cloud.storage.Client object.
- load_context
A @contextmanager that loads context data injected by the orchestration process.
This method should read and yield the context data from the location specified by the passed in PipesParams.
Parameters: params (PipesParams) – The params provided by the context injector in the orchestration process.Yields: PipesContextData – The context data.
class
dagster_pipes.PipesDbfsContextLoaderContext loader that reads context from a JSON file on DBFS.
- load_context
A @contextmanager that loads context data injected by the orchestration process.
This method should read and yield the context data from the location specified by the passed in PipesParams.
Parameters: params (PipesParams) – The params provided by the context injector in the orchestration process.Yields: PipesContextData – The context data.
Params loaders
Params loaders load the bootstrap payload from some globally accessible key-value store.
class
dagster_pipes.PipesParamsLoaderObject that loads params passed from the orchestration process by the context injector and message reader. These params are used to respectively bootstrap the
PipesContextLoader
andPipesMessageWriter
.abstractmethod
is_dagster_pipes_processWhether or not this process has been provided with provided with information to create a PipesContext or should instead return a mock.
abstractmethod
load_context_paramsPipesParams: Load params passed by the orchestration-side context injector.
abstractmethod
load_messages_paramsPipesParams: Load params passed by the orchestration-side message reader.
class
dagster_pipes.PipesEnvVarParamsLoaderParams loader that extracts params from environment variables.
class
dagster_pipes.PipesCliArgsParamsLoaderParams loader that extracts params from known CLI arguments.
- is_dagster_pipes_process
Whether or not this process has been provided with provided with information to create a PipesContext or should instead return a mock.
- load_context_params
PipesParams: Load params passed by the orchestration-side context injector.
- load_messages_params
PipesParams: Load params passed by the orchestration-side message reader.
class
dagster_pipes.PipesMappingParamsLoaderParams loader that extracts params from a Mapping provided at init time.
- is_dagster_pipes_process
Whether or not this process has been provided with provided with information to create a PipesContext or should instead return a mock.
- load_context_params
PipesParams: Load params passed by the orchestration-side context injector.
- load_messages_params
PipesParams: Load params passed by the orchestration-side message reader.
Message writers
Message writers write messages to the location specified in the bootstrap payload.
class
dagster_pipes.PipesMessageWriter- get_opened_extras
Return arbitary reader-specific information to be passed back to the orchestration process under the extras key of the initialization payload.
Returns: A dict of arbitrary data to be passed back to the orchestration process.Return type: PipesExtras
final
get_opened_payloadReturn a payload containing information about the external process to be passed back to the orchestration process. This should contain information that cannot be known before the external process is launched.
This method should not be overridden by users. Instead, users should override get_opened_extras to inject custom data.
abstractmethod
openA @contextmanager that initializes a channel for writing messages back to Dagster.
This method should takes the params passed by the orchestration-side
PipesMessageReader
and use them to construct and yield aPipesMessageWriterChannel
.Parameters: params (PipesParams) – The params provided by the message reader in the orchestration process.Yields: PipesMessageWriterChannel – Channel for writing messagse back to Dagster.
class
dagster_pipes.PipesDefaultMessageWriterMessage writer that writes messages to either a file or the stdout or stderr stream.
The write location is configured by the params received by the writer. If the params include a key path, then messages will be written to a file at the specified path. If the params instead include a key stdio, then messages then the corresponding value must specify either stderr or stdout, and messages will be written to the selected stream.
- open
A @contextmanager that initializes a channel for writing messages back to Dagster.
This method should takes the params passed by the orchestration-side
PipesMessageReader
and use them to construct and yield aPipesMessageWriterChannel
.Parameters: params (PipesParams) – The params provided by the message reader in the orchestration process.Yields: PipesMessageWriterChannel – Channel for writing messagse back to Dagster.
class
dagster_pipes.PipesBlobStoreMessageWriter- open
Construct and yield a
PipesBlobStoreMessageWriterChannel
.Parameters: params (PipesParams) – The params provided by the message reader in the orchestration process.Yields: PipesBlobStoreMessageWriterChannel – Channel that periodically uploads message chunks to a blob store.
- INCLUDE_STDIO_IN_MESSAGES_KEY
=
'include_stdio_in_messages' Message writer channel that periodically uploads message chunks to some blob store endpoint.
class
dagster_pipes.PipesS3MessageWriterMessage writer that writes messages by periodically writing message chunks to an S3 bucket.
Parameters:
- client (Any) – A boto3.client(“s3”) object.
- interval (float) – interval in seconds between upload chunk uploads
class
dagster_pipes.PipesGCSMessageWriterMessage writer that writes messages by periodically writing message chunks to a GCS bucket.
Parameters:
- client (google.cloud.storage.Client) – A google.cloud.storage.Client object.
- interval (float) – interval in seconds between upload chunk uploads
class
dagster_pipes.PipesDbfsMessageWriterMessage writer that writes messages by periodically writing message chunks to a directory on DBFS.
- get_opened_extras
Return arbitary reader-specific information to be passed back to the orchestration process under the extras key of the initialization payload.
Returns: A dict of arbitrary data to be passed back to the orchestration process.Return type: PipesExtras
Message writer channels
Message writer channels are objects that write messages back to the Dagster orchestration process.
class
dagster_pipes.PipesMessageWriterChannelObject that writes messages back to the Dagster orchestration process.
abstractmethod
write_messageWrite a message to the orchestration process.
Parameters: message (PipesMessage) – The message to write.
class
dagster_pipes.PipesBlobStoreMessageWriterChannelMessage writer channel that periodically uploads message chunks to some blob store endpoint.
- write_message
Write a message to the orchestration process.
Parameters: message (PipesMessage) – The message to write.
class
dagster_pipes.PipesBufferedFilesystemMessageWriterChannelMessage writer channel that periodically writes message chunks to an endpoint mounted on the filesystem.
Parameters: interval (float) – interval in seconds between chunk uploads
class
dagster_pipes.PipesFileMessageWriterChannelMessage writer channel that writes one message per line to a file.
- write_message
Write a message to the orchestration process.
Parameters: message (PipesMessage) – The message to write.
class
dagster_pipes.PipesStreamMessageWriterChannelMessage writer channel that writes one message per line to a TextIO stream.
- write_message
Write a message to the orchestration process.
Parameters: message (PipesMessage) – The message to write.
class
dagster_pipes.PipesS3MessageWriterChannelMessage writer channel for writing messages by periodically writing message chunks to an S3 bucket.
Parameters:
- client (Any) – A boto3.client(“s3”) object.
- bucket (str) – The name of the S3 bucket to write to.
- key_prefix (Optional[str]) – An optional prefix to use for the keys of written blobs.
- interval (float) – interval in seconds between upload chunk uploads
class
dagster_pipes.PipesGCSMessageWriterChannelMessage writer channel for writing messages by periodically writing message chunks to a GCS bucket.
Parameters:
- client (google.cloud.storage.Client) – A google.cloud.storage.Client object.
- bucket (str) – The name of the GCS bucket to write to.
- key_prefix (Optional[str]) – An optional prefix to use for the keys of written blobs.
- interval (float) – interval in seconds between upload chunk uploads
Utilities
- dagster_pipes.encode_env_var
Encode value by serializing to JSON, compressing with zlib, and finally encoding with base64. base64_encode(compress(to_json(value))) in function notation.
Parameters: value (Any) – The value to encode. Must be JSON-serializable.Returns: The encoded value.Return type: str
- dagster_pipes.decode_env_var
Decode a value by decoding from base64, decompressing with zlib, and finally deserializing from JSON. from_json(decompress(base64_decode(value))) in function notation.
Parameters: value (Any) – The value to decode.Returns: The decoded value.Return type: Any