Repositories

Dagster provides the concept of a repository, a collection of pipelines and other definitions that various tools, such as the Dagster CLI or Dagit, can target to load the definitions. Repositories are declared using the RepositoryDefinition API as follows:

repos.py
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
@pipeline
def addition_pipeline():
    return add(return_one(), return_two())


@pipeline
def subtraction_pipeline():
    return subtract(return_one(), return_two())


@daily_schedule(
    pipeline_name="addition_pipeline", start_date=datetime.datetime(2020, 1, 1),
)
def daily_addition_schedule(date):
    return {}


@repository
def my_repository():
    return [addition_pipeline, subtraction_pipeline, daily_addition_schedule]

You can return a list of items, each one of which can be a PipelineDefinition,

ScheduleDefinition, or PartitionSetDefinition. If you save this file as `repos.py`, you can then run the command line tools on it. Try running:
dagit -f repos.py

Lazy ConstructionΒΆ

Notice that this requires eager construction of all its member definitions. In large codebases, pipeline construction time can be large. In these cases, it may behoove you to only construct them on demand. For that, you can also return a dictionary of function references, which are constructed on demand:

repos.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
def load_addition_pipeline():
    @pipeline
    def addition_pipeline():
        return add(return_one(), return_two())

    return addition_pipeline


def load_subtraction_pipeline():
    @pipeline
    def subtraction_pipeline():
        return subtract(return_one(), return_two())

    return subtraction_pipeline


def load_daily_addition_schedule():
    @daily_schedule(
        pipeline_name="addition_pipeline", start_date=datetime.datetime(2020, 1, 1),
    )
    def daily_addition_schedule(date):
        return {}

    return daily_addition_schedule


@repository
def my_lazy_repository():
    # Note that we can pass a dict of functions, rather than a list of
    # pipeline definitions. This allows us to construct pipelines lazily,
    # if, e.g., initializing a pipeline involves any heavy compute
    return {
        "pipelines": {
            "addition_pipeline": load_addition_pipeline,
            "subtraction_pipeline": load_subtraction_pipeline,
        },
        "schedules": {"daily_addition_schedule": load_daily_addition_schedule},
    }

Note that the name of the pipeline in the RepositoryDefinition must match the name we declared for it in its pipeline (the default is the function name). Don't worry, if these names don't match, you'll see a helpful error message.