Advanced: Scheduling Pipeline Runs

You can find the code for this tutorial on Github.

Dagster includes a simple built-in scheduler that works with Dagit, which is useful when you need to schedule pipelines to run at regular intervals, e.g. daily or hourly.

Suppose that we need to run our simple cereal pipeline every morning before breakfast, at 6:45 AM.

Requirements

You'll need to install the dagster-cron library.

pip install dagster-cron

You must also ensure that cron is installed on the machine you're running the scheduler on.

Pipeline

scheduler.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
import csv
from datetime import datetime, time

from dagster import daily_schedule, pipeline, repository, solid
from dagster.utils import file_relative_path


@solid
def hello_cereal(context, date):
    dataset_path = file_relative_path(__file__, "cereal.csv")
    context.log.info(dataset_path)
    with open(dataset_path, "r") as fd:
        cereals = [row for row in csv.DictReader(fd)]

    context.log.info(
        "Today is {date}. Found {n_cereals} cereals".format(
            date=date, n_cereals=len(cereals)
        )
    )


@pipeline
def hello_cereal_pipeline():
    hello_cereal()

As before, we've defined a solid and a pipeline.

Defining the Scheduler on the Instance

We first need to define the Scheduler on our DagsterInstance. For now, the only implemented scheduler is SystemCronScheduler, but this is pluggable (and you can write your own). To use the scheduler, add the following lines to your $DAGSTER_HOME/dagster.yaml:

scheduler:
  module: dagster_cron.cron_scheduler
  class: SystemCronScheduler

Defining schedules

Now we'll write a ScheduleDefinition to define the schedule we want. We can either directly construct a ScheduleDefinition, or use one of the included schedule decorators.

In this example, we'll use the @daily_schedule decorator, which runs a schedule once a day at a specified time.

The decorated function should return the run_config needed to run the schedule at execution time. The function is passed the date for which the schedule is running.

scheduler.py
27
28
29
30
31
32
33
34
35
36
37
38
39
@daily_schedule(
    pipeline_name="hello_cereal_pipeline",
    start_date=datetime(2020, 6, 1),
    execution_time=time(6, 45),
)
def good_morning_schedule(date):
    return {
        "solids": {
            "hello_cereal": {
                "inputs": {"date": {"value": date.strftime("%Y-%m-%d")}}
            }
        }
    }

To complete the picture, we'll need to add the schedule definition to the list of definitions returned from our repository.

scheduler.py
42
43
44
@repository
def hello_cereal_repository():
    return [hello_cereal_pipeline, good_morning_schedule]

Starting schedules

Whenever we make changes to schedule definitions, we need to run dagster schedule up. This utility will create, update, or remove schedules in the underlying system cron file as appropriate to assure it is consistent with the schedule definitions in code.

To preview the changes, first run:

dagster schedule up --preview
Planned Changes:
  + good_morning_schedule (add)

After confirming schedule changes are as expected, run:

dagster schedule up
Changes:
  + good_morning_schedule (add)

To start running the schedule, run:

dagster schedule start good_morning_schedule

Verify that the good_morning_schedule scheduled job has been added to cron:

crontab -l

Now, we can load Dagit to view the schedule and monitor runs:

dagit

Cron filters

If you need to define a more specific schedule than cron allows, you can pass a function in the should_execute argument to ScheduleDefinition.

For example, we can define a filter that only returns True on weekdays:

scheduler.py
47
48
49
50
def weekday_filter():
    weekno = datetime.today().weekday()
    # Returns true if current day is a weekday
    return weekno < 5

If we combine this should_execute filter with a schedule that runs at 6:45am every day, then we'll have a schedule that runs at 6:45am only on weekdays.

scheduler.py
53
54
55
56
57
58
@daily_schedule(
    pipeline_name="hello_cereal_pipeline",
    start_date=datetime(2020, 6, 1),
    execution_time=time(6, 45),
    should_execute=weekday_filter,
)