Schedules
Dagster includes a scheduler that can be used to submit runs on a fixed interval. To create a schedule, define a ScheduleDefinition
and include it in your repository.
Each schedule:
- Targets a pipeline
- Defines a function that returns
run_config
for the targeted pipeline - Optionally defines
tags
,mode
, andsolid_selection
for the target pipeline - Optionally defines a
should_execute
function that can be used to skip a scheduled execution
Defining a Schedule¶
Imagine we have a pipeline that computes metrics for a given date, and want to compute those metrics on a fixed interval:
@solid(config_schema={"date": str})
def process_data_for_date(context):
date = context.solid_config["date"]
context.log.info(f"processing data for {date}")
@solid
def post_slack_message(context):
context.log.info("posting slack message")
@pipeline
def my_data_pipeline():
process_data_for_date()
post_slack_message()
Partition-based schedules¶
Partition-based schedules are useful when it's important for a successful run to exist for every scheduled run time. For example, if each run fills in a daily partition of a data warehouse table, basing your schedule on a partition set allows you to track which days have corresponding runs and fill in gaps when runs fail.
The @daily_schedule
, @hourly_schedule
, @weekly_schedule
, and @monthly_schedule
decorators each define a schedule that wraps an underlying PartitionSetDefinition
. Each partition in the set is a datetime
.
The decorated schedule function should accept a datetime
and return the run config for the
pipeline run associated with that partition. The scheduler will ensure that the schedule function
is evaluated exactly once for every partition to generate run config, which is then
used to create and launch a pipeline run.
Partition-based schedules require a start_date
that indicates when the set of partitions begins.
The scheduler will only kick off runs for times that both come after the start_date
and come
after the time when the schedule was turned on (usually in Dagit). If the
start_date
is earlier than when the schedule is turned on, you can kick off runs for times
between the start_date
and when the schedule is turned on by launching a
backfill.
By default, the partition that is used for the run will be one partition earlier than the partition
that includes the current time, to capture a common ETL use case - for example, a daily schedule
will fill in the previous day's partition, and a monthly schedule will fill in last month's
partition. You can customize this behavior by changing the partition_days_offset
parameter for a
daily schedule. The default value of this parameter is 1, which means that the scheduler goes back one day to determine
the partition. Setting the value to 0 will cause the
schedule to fill in the partition that corresponds to the current day, and increasing it above 1
will cause it to fill in an even earlier partition. A similarly-named parameter also exists for the other execution
intervals.
The following examples demonstrate schedules with an underlying partition set, each with a different execution interval:
@hourly_schedule(
pipeline_name="my_data_pipeline",
start_date=datetime(2020, 1, 1),
execution_timezone="US/Central",
)
def my_hourly_schedule(date):
return {
"solids": {
"process_data_for_date": {"config": {"date": date.strftime("%Y-%m-%d %H:%M:%S")}}
}
}
@daily_schedule(
pipeline_name="my_data_pipeline",
start_date=datetime(2020, 1, 1),
execution_timezone="US/Central",
)
def my_daily_schedule(date):
return {"solids": {"process_data_for_date": {"config": {"date": date.strftime("%Y-%m-%d")}}}}
@weekly_schedule(
pipeline_name="my_data_pipeline",
start_date=datetime(2020, 1, 1),
execution_timezone="US/Central",
partition_weeks_offset=2, # Use the partition for two weeks ago, not last week
)
def my_weekly_schedule(date):
return {"solids": {"process_data_for_date": {"config": {"date": date.strftime("%Y-%m-%d")}}}}
@monthly_schedule(
pipeline_name="my_data_pipeline",
start_date=datetime(2020, 1, 1),
execution_timezone="US/Central",
partition_months_offset=0, # Use the partition for the current month, not the previous month
)
def my_monthly_schedule(date):
return {"solids": {"process_data_for_date": {"config": {"date": date.strftime("%Y-%m-%d")}}}}
Non-partition-based schedules¶
Non-partition-based schedules are useful when it's only important for a run to exist for the most recent scheduled run time. For example, if each run drops and recreates a data warehouse table, then there's usually no need to kick off runs corresponding to historical scheduled run times - those runs would just have the same effect as a run for the most recent.
The @schedule
decorator defines a non-partitioned-based schedule with an arbitrary cron string. This decorator
should decorate a function that takes in a
ScheduleExecutionContext
and returns run config for your run. The schedule will not create a
partition set, but you can access the execution time for your schedule by using the
scheduled_execution_time
property on the passed-in ScheduleExecutionContext
(assuming you're
using the recommended DagsterDaemonScheduler
- the scheduled_execution_time
property is not set
on the deprecated SystemCronScheduler
or K8sScheduler
).
At each time the schedule should execute the function will be evaluated to generate the run config and a pipeline run will be launched with the run config.
@schedule(
cron_schedule="0 1 * * *", pipeline_name="my_data_pipeline", execution_timezone="US/Central"
) # Executes at 1:00 AM in US/Central time every day
def my_schedule(context):
date = context.scheduled_execution_time.strftime("%Y-%m-%d")
return {"solids": {"process_data_for_date": {"config": {"date": date}}}}
Running your schedule¶
Dagster's default scheduler, DagsterDaemonScheduler
, runs as part
of the dagster-daemon process. This scheduler uses checkpointing to ensure
that your schedule function runs exactly once per schedule interval, even if the scheduler goes
down. Check the Deploying Dagster section of the docs for more on how to deploy
dagster-daemon
to enable the use of this scheduler. Once dagster-daemon
is deployed and your
schedule is turned on, your schedule should begin running at the interval specified by the schedule.
Dagster also supports configuring SystemCronScheduler
and K8sScheduler
on your Dagster instance,
but these schedulers are both deprecated - we recommend that you use DagsterDaemonScheduler
.
Timezones¶
Dagster supports executing your schedule in any IANA time zone. For example, the following schedule executes daily at 9AM in US/Pacific time:
@daily_schedule(
pipeline_name="my_data_pipeline",
start_date=datetime(2020, 1, 1),
execution_time=datetime.time(9, 0),
execution_timezone="US/Pacific",
)
def my_timezone_schedule(date):
return {"solids": {"process_data_for_date": {"config": {"date": date.strftime("%Y-%m-%d")}}}}
If you don't specify an execution timezone for your schedule, it will execute in your system timezone. We recommend setting an execution timezone for all schedules, as unexpected behavior will occur if different components in your deployment have different system timezones.
Only the default DagsterDaemonScheduler
supports specifying an execution timezone that is
different from your system timezone.
Daylight Savings Time¶
(This section assumes that you are using DagsterDaemonScheduler
in your deployment)
Because of Daylight Savings Time transitions, it's possible to specify an execution time that does not always exist (for example, 2019/03/10 at 2:30AM, since the clock jumps from 2:00AM to 3:00AM when the time zone changes). When this happens, Dagster will run your schedule at the next time that exists (in the previous example, 3:00AM).
It's also possible to specify a time that exists twice on one day every year (for example, 2019/11/03 at 1:30AM, since the hour from 1:00AM to 2:00AM repeats when the time zone changes) - in this case, dagster will execute your schedule execute only once for that day, at the time associated with the new timezone.
Hourly schedules will be unaffected by daylight savings time transitions - the schedule will continue to run exactly once every hour, even as the timezone changes.
Schedules in Dagit¶
You can view and operate schedules in Dagit. To view the schedules page, click the schedules tab in the left hand navigation pane.
Here, we can turn schedules on and off, and also see runs previously created by the schedule.
Troubleshooting¶
See the Troubleshooting section for tips on debugging your schedules.