Schedules

Dagster includes a scheduler that can be used to submit runs on a fixed interval. To create a schedule, define a ScheduleDefinition and include it in your repository.

Each schedule:

  • Targets a pipeline
  • Defines a function that returns run_config for the targeted pipeline
  • Optionally defines tags, mode, and solid_selection for the target pipeline
  • Optionally defines a should_execute function that can be used to skip a scheduled execution

Defining a Schedule

Imagine we have a pipeline that computes metrics for a given date, and want to compute those metrics on a fixed interval:

pipeline.py
@solid(config_schema={"date": str})
def process_data_for_date(context):
    date = context.solid_config["date"]
    context.log.info(f"processing data for {date}")


@solid
def post_slack_message(context):
    context.log.info("posting slack message")


@pipeline
def my_data_pipeline():
    process_data_for_date()
    post_slack_message()

Partition-based schedules

Partition-based schedules are useful when it's important for a successful run to exist for every scheduled run time. For example, if each run fills in a daily partition of a data warehouse table, basing your schedule on a partition set allows you to track which days have corresponding runs and fill in gaps when runs fail.

The @daily_schedule, @hourly_schedule, @weekly_schedule , and @monthly_schedule decorators each define a schedule that wraps an underlying PartitionSetDefinition. Each partition in the set is a datetime.

The decorated schedule function should accept a datetime and return the run config for the pipeline run associated with that partition. The scheduler will ensure that the schedule function is evaluated exactly once for every partition to generate run config, which is then used to create and launch a pipeline run.

Partition-based schedules require a start_date that indicates when the set of partitions begins. The scheduler will only kick off runs for times that both come after the start_date and come after the time when the schedule was turned on (usually in Dagit). If the start_date is earlier than when the schedule is turned on, you can kick off runs for times between the start_date and when the schedule is turned on by launching a backfill.

By default, the partition that is used for the run will be one partition earlier than the partition that includes the current time, to capture a common ETL use case - for example, a daily schedule will fill in the previous day's partition, and a monthly schedule will fill in last month's partition. You can customize this behavior by changing the partition_days_offset parameter for a daily schedule. The default value of this parameter is 1, which means that the scheduler goes back one day to determine the partition. Setting the value to 0 will cause the schedule to fill in the partition that corresponds to the current day, and increasing it above 1 will cause it to fill in an even earlier partition. A similarly-named parameter also exists for the other execution intervals.

The following examples demonstrate schedules with an underlying partition set, each with a different execution interval:

schedule_definition.py
@hourly_schedule(
    pipeline_name="my_data_pipeline",
    start_date=datetime(2020, 1, 1),
    execution_timezone="US/Central",
)
def my_hourly_schedule(date):
    return {
        "solids": {
            "process_data_for_date": {"config": {"date": date.strftime("%Y-%m-%d %H:%M:%S")}}
        }
    }


@daily_schedule(
    pipeline_name="my_data_pipeline",
    start_date=datetime(2020, 1, 1),
    execution_timezone="US/Central",
)
def my_daily_schedule(date):
    return {"solids": {"process_data_for_date": {"config": {"date": date.strftime("%Y-%m-%d")}}}}


@weekly_schedule(
    pipeline_name="my_data_pipeline",
    start_date=datetime(2020, 1, 1),
    execution_timezone="US/Central",
    partition_weeks_offset=2,  # Use the partition for two weeks ago, not last week
)
def my_weekly_schedule(date):
    return {"solids": {"process_data_for_date": {"config": {"date": date.strftime("%Y-%m-%d")}}}}


@monthly_schedule(
    pipeline_name="my_data_pipeline",
    start_date=datetime(2020, 1, 1),
    execution_timezone="US/Central",
    partition_months_offset=0,  # Use the partition for the current month, not the previous month
)
def my_monthly_schedule(date):
    return {"solids": {"process_data_for_date": {"config": {"date": date.strftime("%Y-%m-%d")}}}}

Non-partition-based schedules

Non-partition-based schedules are useful when it's only important for a run to exist for the most recent scheduled run time. For example, if each run drops and recreates a data warehouse table, then there's usually no need to kick off runs corresponding to historical scheduled run times - those runs would just have the same effect as a run for the most recent.

The @schedule decorator defines a non-partitioned-based schedule with an arbitrary cron string. This decorator should decorate a function that takes in a ScheduleExecutionContext and returns run config for your run. The schedule will not create a partition set, but you can access the execution time for your schedule by using the scheduled_execution_time property on the passed-in ScheduleExecutionContext (assuming you're using the recommended DagsterDaemonScheduler - the scheduled_execution_time property is not set on the deprecated SystemCronScheduler or K8sScheduler).

At each time the schedule should execute the function will be evaluated to generate the run config and a pipeline run will be launched with the run config.

schedule_definition.py
@schedule(
    cron_schedule="0 1 * * *", pipeline_name="my_data_pipeline", execution_timezone="US/Central"
)  # Executes at 1:00 AM in US/Central time every day
def my_schedule(context):
    date = context.scheduled_execution_time.strftime("%Y-%m-%d")
    return {"solids": {"process_data_for_date": {"config": {"date": date}}}}

Running your schedule

Dagster's default scheduler, DagsterDaemonScheduler, runs as part of the dagster-daemon process. This scheduler uses checkpointing to ensure that your schedule function runs exactly once per schedule interval, even if the scheduler goes down. Check the Deploying Dagster section of the docs for more on how to deploy dagster-daemon to enable the use of this scheduler. Once dagster-daemon is deployed and your schedule is turned on, your schedule should begin running at the interval specified by the schedule.

Dagster also supports configuring SystemCronScheduler and K8sScheduler on your Dagster instance, but these schedulers are both deprecated - we recommend that you use DagsterDaemonScheduler.

Timezones

Dagster supports executing your schedule in any IANA time zone. For example, the following schedule executes daily at 9AM in US/Pacific time:

schedule_definition.py
@daily_schedule(
    pipeline_name="my_data_pipeline",
    start_date=datetime(2020, 1, 1),
    execution_time=datetime.time(9, 0),
    execution_timezone="US/Pacific",
)
def my_timezone_schedule(date):
    return {"solids": {"process_data_for_date": {"config": {"date": date.strftime("%Y-%m-%d")}}}}

If you don't specify an execution timezone for your schedule, it will execute in your system timezone. We recommend setting an execution timezone for all schedules, as unexpected behavior will occur if different components in your deployment have different system timezones.

Only the default DagsterDaemonScheduler supports specifying an execution timezone that is different from your system timezone.

Daylight Savings Time

(This section assumes that you are using DagsterDaemonScheduler in your deployment)

Because of Daylight Savings Time transitions, it's possible to specify an execution time that does not always exist (for example, 2019/03/10 at 2:30AM, since the clock jumps from 2:00AM to 3:00AM when the time zone changes). When this happens, Dagster will run your schedule at the next time that exists (in the previous example, 3:00AM).

It's also possible to specify a time that exists twice on one day every year (for example, 2019/11/03 at 1:30AM, since the hour from 1:00AM to 2:00AM repeats when the time zone changes) - in this case, dagster will execute your schedule execute only once for that day, at the time associated with the new timezone.

Hourly schedules will be unaffected by daylight savings time transitions - the schedule will continue to run exactly once every hour, even as the timezone changes.

Schedules in Dagit

You can view and operate schedules in Dagit. To view the schedules page, click the schedules tab in the left hand navigation pane.

Here, we can turn schedules on and off, and also see runs previously created by the schedule.

Schedules in Dagit

Troubleshooting

See the Troubleshooting section for tips on debugging your schedules.