Dynamic Graph (Experimental)

You can find the code for this example on Github.

This pipeline demonstrates how to create a pipeline that's final structure is not determined until runtime. This is an experimental new feature, so the API may change around. At this point we can perform "map" / "for each" operations, but do not yet offer a "collect" step.

In this example we have a solid files_in_directory that defines a DynamicOutputDefinition . This dynamic output will cause the downstream dependencies to be cloned for each DynamicOutput that is yielded. The downstream copies can be identified by the mapping_key supplied to DynamicOutput.

repo.py
import os

from dagster import Field, pipeline, repository, solid
from dagster.experimental import DynamicOutput, DynamicOutputDefinition
from dagster.utils import file_relative_path


@solid(
    config_schema={"path": Field(str, default_value=file_relative_path(__file__, "sample"))},
    output_defs=[DynamicOutputDefinition(str)],
)
def files_in_directory(context):
    path = context.solid_config["path"]
    dirname, _, filenames = next(os.walk(path))
    for file in filenames:
        yield DynamicOutput(
            value=os.path.join(dirname, file),
            # create a mapping key from the file name
            mapping_key=file.replace(".", "_").replace("-", "_"),
        )


@solid
def process_file(_, path: str) -> int:
    # simple example of calculating size
    return os.path.getsize(path)


@pipeline
def process_directory():
    files_in_directory().map(process_file)


@repository
def dynamic_graph_repository():
    return [process_directory]

Open in a playground

Open in Gitpod

Download

curl https://codeload.github.com/dagster-io/dagster/tar.gz/master | tar -xz --strip=2 dagster-master/examples/dynamic_graph
cd dynamic_graph