DagsterDocs

Experimental Core APIs

These are a set of API changes that seek to improve on the original Dagster Pipeline & Solid model in response to feedback from our users so far.

A toggle is available in the Dagit settings menu (top right) to view things in terms of these new APIs.

Graph

The replacement for composite_solid / CompositeSolidDefinition . It has a more intuitive name and there is no longer a distinction between a graph for execution (pipeline) and a graph for composition (composite solid).

@dagster.graph(name=None, description=None, input_defs=None, output_defs=None)[source]

Create a graph with the specified parameters from the decorated composition function.

Using this decorator allows you to build up a dependency graph by writing a function that invokes solids (or other graphs) and passes the output to subsequent invocations.

Parameters
  • name (Optional[str]) – The name of the graph. Must be unique within any RepositoryDefinition containing the graph.

  • description (Optional[str]) – A human-readable description of the graph.

  • input_defs (Optional[List[InputDefinition]]) –

    Information about the inputs that this graph maps. Information provided here will be combined with what can be inferred from the function signature, with these explicit InputDefinitions taking precedence.

    Uses of inputs in the body of the decorated composition function will determine the InputMappings passed to the underlying GraphDefinition.

  • output_defs (Optional[List[OutputDefinition]]) –

    Output definitions for the graph. If not provided explicitly, these will be inferred from typehints.

    Uses of these outputs in the body of the decorated composition function, as well as the return value of the decorated function, will be used to infer the appropriate set of OutputMappings for the underlying GraphDefinition.

    To map multiple outputs, return a dictionary from the composition function.

class dagster.core.definitions.graph.GraphDefinition(name, description, node_defs, dependencies, input_mappings, output_mappings, config_mapping, **kwargs)[source]
execute_in_process(run_config=None, instance=None, resources=None)[source]

Execute this graph in-process, collecting results in-memory.

Parameters
  • run_config (Optional[Dict[str, Any]]) – Configuration for the run.

  • instance (Optional[DagsterInstance]) – The instance to execute against, an ephemeral one will be used if none provided.

  • resources (Optional[Dict[str, Any]]) – The resources needed if any are required. Can provide resource instances directly, or resource definitions.

Returns

InProcessGraphResult

to_job(name=None, description=None, resource_defs=None, config=None, tags=None, logger_defs=None, executor_def=None, hooks=None)[source]

Make this graph in to an executable Job by providing remaining components required for execution.

Parameters
  • name (Optional[str]) – The name for the Job. Defaults to the name of the this graph.

  • resource_defs (Optional[Dict[str, ResourceDefinition]]) – Resources that are required by this graph for execution. If not defined, io_manager will default to filesystem.

  • config

    Describes how the job is parameterized at runtime.

    If no value is provided, then the schema for the job’s run config is a standard format based on its solids and resources.

    If a dictionary is provided, then it must conform to the standard config schema, and it will be used as the job’s run config for the job whenever the job is executed. The values provided will be viewable and editable in the Dagit playground, so be careful with secrets.

    If a ConfigMapping object is provided, then the schema for the job’s run config is determined by the config mapping, and the ConfigMapping, which should return configuration in the standard format to configure the job.

    If a PartitionedConfig object is provided, then it defines a discrete set of config values that can parameterize the pipeline, as well as a function for mapping those values to the base config. The values provided will be viewable and editable in the Dagit playground, so be careful with secrets.

  • tags (Optional[Dict[str, Any]]) – Arbitrary metadata for any execution of the Job. Values that are not strings will be json encoded and must meet the criteria that json.loads(json.dumps(value)) == value. These tag values may be overwritten by tag values provided at invocation time.

  • logger_defs (Optional[Dict[str, LoggerDefinition]]) – A dictionary of string logger identifiers to their implementations.

  • executor_def (Optional[ExecutorDefinition]) – How this Job will be executed. Defaults to multiprocess_executor .

Returns

The “Job” currently implemented as a single-mode pipeline

Return type

PipelineDefinition

Job

The replacement for pipeline / PipelineDefinition, a Job binds a Graph and the resources it needs to be executable.

Jobs are created by calling GraphDefinition.to_job() on a graph instance.

Op

The replacement for solid, has a more intuitive name and offers a more concise way of defining inputs & outputs.

@dagster.op(name=None, description=None, ins=None, out=None, config_schema=None, required_resource_keys=None, tags=None, version=None, retry_policy=None, input_defs=None, output_defs=None)[source]

Op is an experimental replacement for solid, intended to decrease verbosity and have a more intuitive name.

Ops are currently implemented as SolidDefinition , so are likely to be called solids throughout the product.

Parameters
  • name (Optional[str]) – Name of op. Must be unique within any GraphDefinition using the op.

  • description (Optional[str]) – Human-readable description of this op. If not provided, and the decorated function has docstring, that docstring will be used as the description.

  • ins (Optional[Dict[str, In]]) – Information about the inputs to the op. Information provided here will be combined with what can be inferred from the function signature.

  • out (Optional[Union[Out, Dict[str, Out]]]) – Information about the solids outputs. Information provided here will be combined with what can be inferred from the return type signature if the function does not use yield.

  • config_schema (Optional[ConfigSchema) – The schema for the config. If set, Dagster will check that config provided for the solid matches this schema and fail if it does not. If not set, Dagster will accept any config provided for the solid.

  • required_resource_keys (Optional[Set[str]]) – Set of resource handles required by this solid.

  • tags (Optional[Dict[str, Any]]) – Arbitrary metadata for the op. Frameworks may expect and require certain metadata to be attached to a op. Values that are not strings will be json encoded and must meet the criteria that json.loads(json.dumps(value)) == value.

  • version (Optional[str]) – (Experimental) The version of the op’s compute_fn. Two ops should have the same version if and only if they deterministically produce the same outputs when provided the same inputs.

  • retry_policy (Optional[RetryPolicy]) – The retry policy for this op.

  • input_defs (Optional[List[InputDefinition]]) – Preserved to ease migration from solid

  • output_defs (Optional[List[OutputDefinition]]) – Preserved to ease migration from solid

Examples

@op
def hello_world():
    print('hello')

@op
def echo(msg: str) -> str:
    return msg

@op(
    ins={'msg': In(str)},
    out=Out(str)
)
def echo_2(msg): # same as above
    return msg

@op(
    out={'word': Out(), 'num': Out()}
)
def multi_out() -> Tuple[str, int]:
    return 'cool', 4
class dagster.In(dagster_type=<class 'dagster.core.definitions.utils.NoValueSentinel'>, description=None, default_value=<class 'dagster.core.definitions.utils.NoValueSentinel'>, root_manager_key=None, metadata=None, asset_key=None, asset_partitions=None)[source]

Experimental replacement for InputDefinition, intended to decrease verbosity.

Parameters
  • dagster_type (Optional[Union[Type, DagsterType]]]) – The type of this input. Should only be set if the correct type can not be inferred directly from the type signature of the decorated function.

  • description (Optional[str]) – Human-readable description of the input.

  • default_value (Optional[Any]) – The default value to use if no input is provided.

  • root_manager_key (Optional[str]) – (Experimental) The resource key for the RootInputManager used for loading this input when it is not connected to an upstream output.

  • metadata (Optional[Dict[str, Any]]) – A dict of metadata for the input.

  • asset_key (Optional[Union[AssetKey, InputContext -> AssetKey]]) – (Experimental) An AssetKey (or function that produces an AssetKey from the InputContext) which should be associated with this InputDefinition. Used for tracking lineage information through Dagster.

  • asset_partitions (Optional[Union[Set[str], InputContext -> Set[str]]]) – (Experimental) A set of partitions of the given asset_key (or a function that produces this list of partitions from the InputContext) which should be associated with this InputDefinition.

class dagster.Out(dagster_type=<class 'dagster.core.definitions.utils.NoValueSentinel'>, description=None, is_required=None, io_manager_key=None, metadata=None, asset_key=None, asset_partitions=None)[source]

Experimental replacement for OutputDefinition intended to decrease verbosity.

Parameters
  • dagster_type (Optional[Union[Type, DagsterType]]]) – The type of this output. Should only be set if the correct type can not be inferred directly from the type signature of the decorated function.

  • description (Optional[str]) – Human-readable description of the output.

  • is_required (Optional[bool]) – Whether the presence of this field is required. (default: True)

  • io_manager_key (Optional[str]) – The resource key of the output manager used for this output. (default: “io_manager”).

  • metadata (Optional[Dict[str, Any]]) – A dict of the metadata for the output. For example, users can provide a file path if the data object will be stored in a filesystem, or provide information of a database table when it is going to load the data into the table.

  • asset_key (Optional[Union[AssetKey, OutputContext -> AssetKey]]) – (Experimental) An AssetKey (or function that produces an AssetKey from the OutputContext) which should be associated with this OutputDefinition. Used for tracking lineage information through Dagster.

  • asset_partitions (Optional[Union[Set[str], OutputContext -> Set[str]]]) – (Experimental) A set of partitions of the given asset_key (or a function that produces this list of partitions from the OutputContext) which should be associated with this OutputDefinition.

class dagster.DynamicOut(dagster_type=<class 'dagster.core.definitions.utils.NoValueSentinel'>, description=None, is_required=None, io_manager_key=None, metadata=None, asset_key=None, asset_partitions=None)[source]

Experimental replacement for DynamicOutputDefinition intended to decrease verbosity. Variant of Out for an output that will dynamically alter the graph at runtime.

Testing

Explicit in-process execution APIs have been added to better facilitate testing of Graphs and Jobs.

Jobs can be tested with PipelineDefinition.execute_in_process(), and Graphs with GraphDefinition.execute_in_process()

PipelineDefinition.execute_in_process(run_config=None, instance=None)[source]

(Experimental) Execute the “Job” (single mode pipeline) in-process, gathering results in-memory.

The executor_def on the Job will be ignored, and replaced with the in-process executor. If using the default io_manager, it will switch from filesystem to in-memory.

Parameters
  • (Optional[Dict[str (run_config) – The configuration for the run

  • Any]] – The configuration for the run

  • instance (Optional[DagsterInstance]) – The instance to execute against, an ephemeral one will be used if none provided.

Returns

InProcessGraphResult

class dagster.core.execution.execution_results.InProcessGraphResult(graph_def, handle, all_events, output_capture)[source]
property output_values

The values for any outputs that this associated graph maps.

result_for_node(name)[source]

The inner result for a node within the graph.

class dagster.core.execution.execution_results.InProcessSolidResult(solid_def, handle, all_events, output_capture)[source]
property output_values

The output values for the associated op/solid, keyed by output name.

class dagster.core.execution.execution_results.NodeExecutionResult[source]
property success

Whether all steps in the execution were successful.

Type

bool

Partition-Based Schedules

New APIs have been added to better integrate partition-based scheduling with the job API. These new APIs replace the existing daily_schedule(), weekly_schedule(), monthly_schedule(), and hourly_schedule() decorators.

@dagster.daily_partitioned_config(start_date, timezone=None, fmt=None)[source]

Defines run config over a set of daily partitions.

The decorated function should accept a start datetime and end datetime, which represent the date partition the config should delineate.

The decorated function should return a run config dictionary.

The resulting object created by this decorator can be provided to the config argument of a Job.

Parameters
  • start_date (Union[datetime.datetime, str]) – The date from which to run the schedule. Can provide in either a datetime or string format.

  • timezone (Optional[str]) – The timezone in which each date should exist.

  • fmt (Optional[str]) – The date format to use. Defaults to %Y-%m-%d.

@dagster.monthly_partitioned_config(start_date, timezone=None, fmt=None)[source]

Defines run config over a set of monthly partitions.

The decorated function should accept a start datetime and end datetime, which represent the date partition the config should delineate.

The decorated function should return a run config dictionary.

The resulting object created by this decorator can be provided to the config argument of a Job.

Parameters
  • start_date (Union[datetime.datetime, str]) – The date from which to run the schedule. Can provide in either a datetime or string format.

  • timezone (Optional[str]) – The timezone in which each date should exist.

  • fmt (Optional[str]) – The date format to use. Defaults to %Y-%m-%d.

@dagster.hourly_partitioned_config(start_date, timezone=None, fmt=None)[source]

Defines run config over a set of hourly partitions.

The decorated function should accept a start datetime and end datetime, which represent the date partition the config should delineate.

The decorated function should return a run config dictionary.

The resulting object created by this decorator can be provided to the config argument of a Job.

Parameters
  • start_date (Union[datetime.datetime, str]) – The date from which to run the schedule. Can provide in either a datetime or string format.

  • fmt (Optional[str]) – The date format to use. Defaults to %Y-%m-%d.

  • timezone (Optional[str]) – The timezone in which each date should exist.

@dagster.weekly_partitioned_config(start_date, timezone=None, fmt=None)[source]

Defines run config over a set of weekly partitions.

The decorated function should accept a start datetime and end datetime, which represent the date partition the config should delineate.

The decorated function should return a run config dictionary.

The resulting object created by this decorator can be provided to the config argument of a Job.

Parameters
  • start_date (Union[datetime.datetime, str]) – The date from which to run the schedule. Can provide in either a datetime or string format.

  • timezone (Optional[str]) – The timezone in which each date should exist.

  • fmt (Optional[str]) – The date format to use. Defaults to %Y-%m-%d.