DagsterDocs

Migrating to Ops, Graphs, and Jobs#

Ops, Graphs, and Jobs, are an experimental set of APIs that aim to simplify and give more accessible names to some of Dagster's core concepts. This guide describes how to migrate code that uses Pipelines, Modes, Presets, and Partition Sets to the new APIs.

A detailed API reference for the new APIs can be found here.

Migrating a pipeline to a graph does not require migrating all your other pipelines to graphs. Graphs, jobs, and pipelines can co-exist peacefully in Python.

In Dagit, graphs and jobs will be displayed as pipelines. You can also toggle Dagit to switch to a view that's better suited to the new APIs. Click the gear icon in the upper right corner, and then toggle "Experimental Core APIs (Job & Graph)". After flipping this toggle, Dagit will display a job for each mode in each pipeline.

A simple pipeline#

For simple pipelines, the new APIs just have different names. Here's a pipeline containing a single solid:

from dagster import pipeline, solid


@solid
def do_something():
    ...


@pipeline
def do_it_all():
    do_something()

And here's an equivalent graph containing a single op:

from dagster import graph, op


@op
def do_something():
    ...


@graph
def do_it_all():
    do_something()

Note that graphs can be built out of solids as well as ops. So you don't need to convert all your solids to ops in order to use @graph.

In both cases, you can load the pipeline / graph in Dagit with:

dagit -f <path/to/file>

If you want to execute the graph in process, instead of using execute_pipeline, you can invoke its GraphDefinition.execute_in_process method.

do_it_all.execute_in_process()

Resources#

A pipeline with a single mode#

With pipelines, supplying resources involves defining a mode:

from dagster import ModeDefinition, pipeline, resource, solid


@resource
def external_service():
    ...


@solid(required_resource_keys={"external_service"})
def do_something():
    ...


@pipeline(mode_defs=[ModeDefinition(resource_defs={"external_service": external_service})])
def do_it_all():
    do_something()

With the new APIs, supplying resources to a graph involves building a job from it:

from dagster import graph, op, resource


@resource
def external_service():
    ...


@op(required_resource_keys={"external_service"})
def do_something():
    ...


@graph
def do_it_all():
    do_something()


do_it_all_job = do_it_all.to_job(resource_defs={"external_service": external_service})

A pipeline with a test mode#

With pipelines, if you wanted to execute a pipeline with resources for a test, you need to include a mode for those resources on the pipeline definition:

from unittest.mock import MagicMock

from dagster import ModeDefinition, ResourceDefinition, execute_pipeline, pipeline, resource, solid


@resource
def external_service():
    ...


@solid(required_resource_keys={"external_service"})
def do_something():
    ...


@pipeline(
    mode_defs=[
        ModeDefinition(resource_defs={"external_service": external_service}),
        ModeDefinition(
            "test",
            resource_defs={"external_service": ResourceDefinition.hardcoded_resource(MagicMock())},
        ),
    ]
)
def do_it_all():
    do_something()


def test_do_it_all():
    execute_pipeline(do_it_all, mode="test")

This made it difficult to construct mock resources that were specific to a particular test. It also used string indirection instead of object pointers for referring to the mode (mode="test"), which is error-prone. With the new APIs, you can supply those resources at the time you execute the pipeline in the test:

from unittest.mock import MagicMock

from dagster import graph, op, resource


@resource
def external_service():
    ...


@op(required_resource_keys={"external_service"})
def do_something():
    ...


@graph
def do_it_all():
    do_something()


do_it_all_job = do_it_all.to_job(resource_defs={"external_service": external_service})


def test_do_it_all():
    do_it_all.execute_in_process(resources={"external_service": MagicMock()})

A pipeline with prod and dev modes#

It's common for pipelines to have modes corresponding to different environments - e.g. production and development. Here's a pair of resources: one for production and one for development.

from dagster import resource


@resource
def prod_external_service():
    ...


@resource
def dev_external_service():
    ...

And here's a pipeline that includes these resources inside a pair of modes, along with a repository for loading it from Dagit:

@pipeline(
    mode_defs=[
        ModeDefinition("prod", resource_defs={"external_service": prod_external_service}),
        ModeDefinition("dev", resource_defs={"external_service": dev_external_service}),
    ]
)
def do_it_all():
    do_something()


@repository
def repo():
    return [do_it_all]

These modes typically correspond to different Dagster Instances. E.g. the pipeline will be executed using the development mode on a local machine and using the production mode from a production instance deployed in the cloud. To point the instances to your repository, you would typically make a workspace yaml that looks something like this:

load_from:
  - python_package:
      package_name: prod_dev_modes
      attribute: repo

With this setup, both modes show up on both instances . This is awkward because, even though the production mode shows up on the development instance, it's not meant to be executed there. And vice versa with the dev mode and the production instance.

With the new APIs, the convention is to include development jobs and production jobs in separate repositories. Then the workspace for the production instance can point to the repository with the production jobs, and the workspace for the development instance can point to the development jobs.

@graph
def do_it_all():
    do_something()


@repository
def prod_repo():
    return [do_it_all.to_job(resource_defs={"external_service": prod_external_service})]


@repository
def dev_repo():
    return [do_it_all.to_job(resource_defs={"external_service": dev_external_service})]

workspace.yaml for the prod instance:

load_from:
  - python_package:
      package_name: prod_dev_jobs
      attribute: prod_repo

workspace.yaml for the dev instance:

load_from:
  - python_package:
      package_name: prod_dev_jobs
      attribute: dev_repo

Config#

A pipeline with a preset#

With the pipeline APIs, if you want to specify config definition time (as opposed to at the time you're running the pipeline), you use a PresetDefinition.

from dagster import PresetDefinition, pipeline, solid


@solid(config_schema={"param": str})
def do_something(_):
    ...


@pipeline(
    preset_defs=[
        PresetDefinition(
            "my_preset",
            run_config={"solids": {"do_something": {"config": {"param": "some_val"}}}},
        )
    ]
)
def do_it_all():
    do_something()

With the new APIs, you can accomplish this by supplying config when building the job.

from dagster import graph, op


@op(config_schema={"param": str})
def do_something(_):
    ...


@graph
def do_it_all():
    do_something()


do_it_all_job = do_it_all.to_job(
    config={"solids": {"do_something": {"config": {"param": "some_val"}}}}
)

Unlike with presets, this config will be used, by default, any time the job is launched, not just when it's launched from the Dagit Playground. I.e. it will also be used when launching the job via a schedule, sensor, do_it_all_job.execute_in_process or the GraphQL API. In all these cases, it can instead be overridden with config specified at runtime.

Alternatively, you can provide "partial" config via a ConfigMapping.

do_it_all_job = do_it_all.to_job(
    config=ConfigMapping(
        config_fn=lambda conf: {"solids": {"do_something": {"config": {"param": conf["arg"]}}}},
        config_schema={"arg": str},
    )
)

Then, when running the job, you only need to supply more compact config:

do_it_all_job.execute_in_process(run_config={"arg": "some_value"})

Schedules#

A pipeline with a schedule#

With the pipeline APIs, a schedule targets a pipeline by referencing the pipeline name in the schedule definition.

Both the schedule and its targeted pipeline need to be provided to a repository in order to make use of the schedule.

from dagster import ScheduleDefinition, pipeline, repository, solid


@solid
def do_something():
    ...


@pipeline
def do_it_all():
    do_something()


do_it_all_schedule = ScheduleDefinition(cron_schedule="0 0 * * *", pipeline_name="do_it_all")


@repository
def do_it_all_repository():
    return [do_it_all, do_it_all_schedule]

With the new APIs, a schedule targets a pipeline by referencing the graph or job object.

from dagster import ScheduleDefinition, graph, op


@op
def do_something():
    ...


@graph
def do_it_all():
    do_something()


do_it_all_schedule = ScheduleDefinition(cron_schedule="0 0 * * *", job=do_it_all)

This makes it easier to catch errors and makes it possible to reference the graph or job from the schedule.

The same pattern works for sensors.

A pipeline with config and a schedule#

With the pipeline APIs, if you want to schedule runs with config that depends on the scheduled execution time, you need to do something like this:

from dagster import pipeline, schedule, solid


@solid(config_schema={"date": str})
def do_something(_):
    ...


@pipeline
def do_it_all():
    do_something()


@schedule(cron_schedule="0 0 * * *", pipeline_name="do_it_all", execution_timezone="US/Central")
def do_it_all_schedule(context):
    date = context.scheduled_execution_time.strftime("%Y-%m-%d")
    return {"solids": {"do_something": {"config": {"date": date}}}}

With the new APIs, you can simply reference the graph or job object:

from dagster import graph, op, schedule


@op(config_schema={"date": str})
def do_something(_):
    ...


@graph
def do_it_all():
    do_something()


@schedule(cron_schedule="0 0 * * *", job=do_it_all.to_job(), execution_timezone="US/Central")
def do_it_all_schedule(context):
    date = context.scheduled_execution_time.strftime("%Y-%m-%d")
    return {"solids": {"do_something": {"config": {"date": date}}}}

A pipeline with a preset and a schedule#

With the pipeline APIs, if you want to have the same config available for manual runs of a pipeline and used in a schedule, you need to do something like this:

from dagster import PresetDefinition, pipeline, schedule, solid


@solid(config_schema={"param": str})
def do_something(_):
    ...


do_it_all_preset = PresetDefinition(
    "my_preset", run_config={"solids": {"do_something": {"config": {"param": "some_val"}}}}
)


@pipeline(preset_defs=[do_it_all_preset])
def do_it_all():
    do_something()


@schedule(cron_schedule="0 0 * * *", pipeline_name="do_it_all")
def do_it_all_schedule():
    return do_it_all_preset.run_config

With the new APIs, you can supply the config in one place (the job), and it will both show up in the Playground and be used by the schedule.

from dagster import ScheduleDefinition, graph, op


@op(config_schema={"param": str})
def do_something(_):
    ...


@graph
def do_it_all():
    do_something()


do_it_all_schedule = ScheduleDefinition(
    job=do_it_all.to_job(config={"solids": {"do_something": {"config": {"param": "some_val"}}}}),
    cron_schedule="0 0 * * *",
)

Partition Schedules#

In order to declare a partitioned schedule using the pipeline-centric APIs, you need to use one of the partitioned schedule decorators, such as daily_schedule or hourly_schedule. The created schedule targets a specific pipeline via pipeline_name.

import datetime

from dagster import daily_schedule, pipeline, repository, solid


@solid(config_schema={"date": str})
def do_something_with_config(context):
    return context.solid_config["date"]


@pipeline
def do_it_all():
    do_something_with_config()


@daily_schedule(pipeline_name="do_it_all", start_date=datetime.datetime(2020, 1, 1))
def do_it_all_schedule(date):
    return {
        "solids": {"do_something_with_config": {"config": {"date": date.strftime("%Y-%m-%d %H")}}}
    }


@repository
def do_it_all_repo():
    return [do_it_all, do_it_all_schedule]

With the graph APIs, you can first define a partitioning for the job. These partitions exist independently of any schedule, and can be used for backfills. A schedule can then be derived from that partitioning, while providing execution time information. The resulting schedule can be independently passed to a repository, and the underlying job will be inferred.

from dagster import daily_partitioned_config, graph, op, repository, schedule_from_partitions


@op(config_schema={"date": str})
def do_something_with_config(context):
    return context.op_config["date"]


@graph
def do_it_all():
    do_something_with_config()


@daily_partitioned_config(start_date="2020-01-01")
def do_it_all_config(start, _end):
    return {"solids": {"do_something_with_config": {"config": {"date": str(start)}}}}


do_it_all_job = do_it_all.to_job(config=do_it_all_config)
do_it_all_schedule = schedule_from_partitions(do_it_all_job)


@repository
def do_it_all_repo():
    return [do_it_all_schedule]

InputDefinitions and OutputDefinitions#

With the solid API, you define inputs and outputs using the InputDefinition and OutputDefinition classes.

from dagster import InputDefinition, OutputDefinition, solid


@solid(
    input_defs=[InputDefinition("arg1", metadata={"a": "b"})],
    output_defs=[OutputDefinition(metadata={"c": "d"})],
)
def do_something(arg1: str) -> int:
    return int(arg1)

The op API supports a less verbose way to define inputs and outputs: with the In and Out classes.

from dagster import In, Out, op


@op(ins={"arg1": In(metadata={"a": "b"})}, out=Out(metadata={"c": "d"}))
def do_something(arg1: str) -> int:
    return int(arg1)

For single outputs, you can just supply an Out object, and for multiple outputs, you can use a dictionary:

from typing import Tuple

from dagster import In, Out, op


@op(
    ins={"arg1": In(metadata={"a": "b"})},
    out={"out1": Out(metadata={"c": "d"}), "out2": Out(metadata={"e": "f"})},
)
def do_something(arg1: str) -> Tuple[int, int]:
    return int(arg1), int(arg1) + 1

Composite Solids#

A simple composite solid#

With the pipeline APIs, if you want multiple layers of composition, you define a composite solid.

from dagster import composite_solid, pipeline, solid


@solid
def do_something():
    pass


@solid
def do_something_else():
    return 5


@composite_solid
def do_two_things():
    do_something()
    return do_something_else()


@solid
def do_yet_more(arg1):
    assert arg1 == 5


@pipeline
def do_it_all():
    do_yet_more(do_two_things())

With the graph APIs, you can just use graphs at both layers:

from dagster import graph, op


@op
def do_something():
    pass


@op
def do_something_else():
    return 5


@graph
def do_two_things():
    do_something()
    return do_something_else()


@op
def do_yet_more(arg1):
    assert arg1 == 5


@graph
def do_it_all():
    do_yet_more(do_two_things())

A composite solid with inputs and outputs#

With the pipeline APIs, if you want to pass inputs into and return outputs from a composite solid, you define the definitions of inputs and outputs it maps.

from dagster import OutputDefinition, composite_solid, pipeline, solid
from dagster.core.definitions.input import InputDefinition


@solid
def do_something():
    pass


@solid
def one():
    return 1


@solid(
    input_defs=[InputDefinition("arg1", int)],
    output_defs=[OutputDefinition(int)],
)
def do_something_else(arg1):
    return arg1


@composite_solid(
    input_defs=[InputDefinition("arg1", int)],
    output_defs=[OutputDefinition(int)],
)
def do_two_things(arg1):
    do_something()
    return do_something_else(arg1)


@solid
def do_yet_more(arg1):
    assert arg1 == 1


@pipeline
def do_it_all():
    do_yet_more(do_two_things(one()))

With the graph APIs, you can specify ins and out properties to do so:

from dagster import In, Out, graph, op
from dagster.core.definitions.input import GraphIn
from dagster.core.definitions.output import GraphOut


@op
def do_something():
    pass


@op
def one():
    return 1


@op(ins={"arg1": In(int)}, out=Out(int))
def do_something_else(arg1):
    return arg1


@graph(ins={"arg1": GraphIn(int)}, out=GraphOut(int))
def do_two_things(arg1):
    do_something()
    return do_something_else(arg1)


@op
def do_yet_more(arg1):
    assert arg1 == 1


@graph
def do_it_all():
    do_yet_more(do_two_things(one()))

A composite solid with multiple outputs#

With the pipeline APIs, if you want to have multiple outputs from a composite solid, you define the output definitions it maps and return a dictionary, where the keys are the output names and the values are the output values.

from dagster import Output, OutputDefinition, composite_solid, pipeline, solid


@solid
def do_something():
    pass


@solid(output_defs=[OutputDefinition(int, "one"), OutputDefinition(int, "two")])
def return_multi():
    yield Output(1, "one")
    yield Output(2, "two")


@composite_solid(output_defs=[OutputDefinition(int, "one"), OutputDefinition(int, "two")])
def do_two_things():
    do_something()
    one, two = return_multi()
    return {"one": one, "two": two}


@solid
def do_yet_more(arg1, arg2):
    assert arg1 == 1
    assert arg2 == 2


@pipeline
def do_it_all():
    one, two = do_two_things()
    do_yet_more(one, two)

With the graph APIs, you can specify out property:

from dagster import Out, Output, graph, op
from dagster.core.definitions.output import GraphOut


@op
def do_something():
    pass


@op(out={"one": Out(int), "two": Out(int)})
def return_multi():
    yield Output(1, "one")
    yield Output(2, "two")


@graph(out={"one": GraphOut(int), "two": GraphOut(int)})
def do_two_things():
    do_something()
    one, two = return_multi()
    return {"one": one, "two": two}


@op
def do_yet_more(arg1, arg2):
    assert arg1 == 1
    assert arg2 == 2


@graph
def do_it_all():
    one, two = do_two_things()
    do_yet_more(one, two)