Ask AI

Nesting Graphs#

To organize the ops inside a job, you can nest sets of ops into sub-graphs.

Relevant APIs#

NameDescription
@graphThe decorator used to define a graph.

Overview#

A Dagster job is usually based on a graph of connected ops, but its graph can also contain other graphs. Nesting graphs is useful for organizing large or complicated graphs and for abstracting away complexity. Dagster supports arbitrary levels of nesting.

We use the term node to refer to both ops and graphs, because both ops and graphs can be used as nodes inside graphs.

Nesting Graphs inside Graphs#

A Job without Nesting#

As a baseline, here's a job that does not use nesting. It starts with an op that returns a number, then uses two ops to convert it from Celsius to Fahrenheight, then logs the result:

from dagster import OpExecutionContext, job, op


@op
def return_fifty():
    return 50.0


@op
def add_thirty_two(number):
    return number + 32.0


@op
def multiply_by_one_point_eight(number):
    return number * 1.8


@op
def log_number(context: OpExecutionContext, number):
    context.log.info(f"number: {number}")


@job
def all_together_unnested():
    log_number(add_thirty_two(multiply_by_one_point_eight(return_fifty())))

Nesting#

We can put the ops that perform the Celsius-to-Fahrenheit conversion into their own sub-graph and invoke that sub-graph from our job's main graph:

@graph
def celsius_to_fahrenheit(number):
    return add_thirty_two(multiply_by_one_point_eight(number))


@job
def all_together_nested():
    log_number(celsius_to_fahrenheit(return_fifty()))

When executed, the above example will do the exact same thing as the non-nested version, but the nesting allows better organization of code and simplifies the presentation of the main graph in the Dagster UI.

Sub-Graph Inputs and Outputs#

As shown in the example above, sub-graphs can have inputs and outputs - celsius_to_fahrenheit accepts a number argument, and it has a return statement. Sub-graph inputs and outputs enable connecting the inputs and outputs of nodes inside the graph to the inputs and outputs of nodes outside the graph. In the all_together_nested example:

  • The number input of the celsius_to_fahrenheit graph is passed as an argument to the multiply_by_one_point_eight op. This means that, when an outer graph invokes celsius_to_fahrenheit and provides the output of another op or sub-graph for the number arg, the output of that op or sub-graph will be passed to multiply_by_one_point_eight, and multiply_by_one_point_eight will not execute until the upstream op that produces the output has completed.
  • The implementation of the celsius_to_fahrenheit graph returns the result of the add_thirty_two op. This means that, when an outer graph invokes celsius_to_fahrenheit and passes its output to the input of another node, the output of add_thirty_two will be provided to that node, and any ops that ultimately receive that input will not execute until add_thirty_two has completed.

If you want to add a description to an input (that will display in the UI), you can provide a GraphIn when constructing the graph.

Sub-Graph Configuration#

To provide configuration to ops inside a sub-graph when launching a run, you provide config for them under a key with the name of that sub-graph.

This example two ops that both take config and are wrapped by a graph, which is included inside a job.

@op(config_schema={"n": float})
def add_n(context: OpExecutionContext, number):
    return number + context.op_config["n"]


@op(config_schema={"m": float})
def multiply_by_m(context: OpExecutionContext, number):
    return number * context.op_config["m"]


@graph
def add_n_times_m_graph(number):
    return multiply_by_m(add_n(number))


@job
def subgraph_config_job():
    add_n_times_m_graph(return_fifty())

To kick off a run of this job, you will need to specify the config for both add_n and multiply_by_m through the sub-graph:

ops:
  add_n_times_m_graph:
    ops:
      add_n:
        config:
          n: 3
      multiply_by_m:
        config:
          m: 2

Configuration Mapping#

Sub-graphs can dictate config for the ops and sub-graphs inside them. If the full config is known at the time that you're defining the graph, you can pass a dictionary to the config argument of the @graph decorator.

from dagster import Config, graph, op


class AddNConfig(Config):
    n: float


@op
def add_n(config: AddNConfig, number):
    return number + config.n


class MultiplyByMConfig(Config):
    m: float


@op
def multiply_by_m(config: MultiplyByMConfig, number):
    return number * config.m


@graph(config={"multiply_by_m": {"config": {"m": 1.8}}, "add_n": {"config": {"n": 32}}})
def celsius_to_fahrenheit(number):
    return multiply_by_m(add_n(number))

Alternatively, you can use "config mapping", i.e. you can provide a function that accepts config that's provided to the graph and generates config for the nodes inside the graph.

from dagster import Config, config_mapping, graph, op


class AddNConfig(Config):
    n: float


@op
def add_n(config: AddNConfig, number):
    return number + config.n


class MultiplyByMConfig(Config):
    m: float


@op
def multiply_by_m(config: MultiplyByMConfig, number):
    return number * config.m


class ToFahrenheitConfig(Config):
    from_unit: str


@config_mapping
def generate_config(config_in: ToFahrenheitConfig):
    if config_in.from_unit == "celsius":
        n = 32
    elif config_in.from_unit == "kelvin":
        n = -459.67
    else:
        raise ValueError()

    return {"multiply_by_m": {"config": {"m": 1.8}}, "add_n": {"config": {"n": n}}}


@graph(config=generate_config)
def to_fahrenheit(number):
    return multiply_by_m(add_n(number))

To run a job that contains to_fahrenheit as a sub-graph, you need to provide a value for the from_unit config option:

ops:
  to_fahrenheit:
    config:
      from_unit: celsius

Examples#

Multiple Outputs#

To have multiple outputs from a graph, you need to define the outputs it maps and return a dictionary, where the keys are the output names and the values are the output values.

from dagster import GraphOut


@op
def echo(i):
    print(i)


@op
def one() -> int:
    return 1


@op
def hello() -> str:
    return "hello"


@graph(out={"x": GraphOut(), "y": GraphOut()})
def graph_with_multiple_outputs():
    x = one()
    y = hello()
    return {"x": x, "y": y}


@job
def subgraph_multiple_outputs_job():
    x, y = graph_with_multiple_outputs()
    echo(x)
    echo(y)