Ask AI

Automating assets using schedules and jobs#

After creating some asset definitions, you may want to automate their materialization.

In this guide, we'll show you one method of accomplishing this by using schedules and jobs. To do this for ops, refer to the Automating ops using schedules guide.

By the end of this guide, you'll be able to:

  • Create a job that materializes assets
  • Create a schedule
  • Add the new job and schedule to your project's Definitions object
  • Turn the schedule on

Prerequisites#

To follow this guide, you'll need:


Step 1: Create a job#

The first step in creating a schedule is to build a job that materializes some assets.

Let's assume we already have a few assets in our project in a group named ecommerce_assets:

@asset(group_name="ecommerce_assets")
def orders_asset():
    return 1


@asset(group_name="ecommerce_assets")
def users_asset():
    return 2

To create a job that materializes the assets in this group, we'll use define_asset_job:

ecommerce_job = define_asset_job(
    "ecommerce_job", AssetSelection.groups("ecommerce_assets")
)

To create the job, we:

  1. Imported AssetSelection and define_asset_job
  2. Constructed the job using define_asset_job and name it ecommerce_job
  3. Selected all assets in the ecommerce_assets group using AssetSelection. Only these assets will be materialized when the job runs.

Refer to the Asset jobs documentation for more info and examples.


Step 2: Define the schedule#

Next, we'll construct the schedule using ScheduleDefinition and attach it to the job we created in Step 1.

ecommerce_schedule = ScheduleDefinition(
    job=ecommerce_job,
    cron_schedule="15 5 * * 1-5",
    default_status=DefaultScheduleStatus.RUNNING,
)

To build the schedule, we:

  1. Imported DefaultScheduleStatus and ScheduleDefinition from dagster

  2. Created a schedule using ScheduleDefinition that:

    • Is attached to the ecommerce_job job
    • Has a cron expression of 15 5 * * 1-5, which translates to Every Monday through Friday of every month at 5:15AM
    • Is turned on by default (default_status). We'll discuss this more in Step 4.

Step 3: Update the Definitions object#

Next, we'll update our project's Definitions object to include the new job and schedule. This ensures the job and schedule are available to Dagster processes, such as the Dagster UI.

defs = Definitions(
    assets=[orders_asset, users_asset],
    jobs=[ecommerce_job],
    schedules=[ecommerce_schedule],
)

At this point, your code should look like the following:

from dagster import (
    AssetSelection,
    DefaultScheduleStatus,
    Definitions,
    ScheduleDefinition,
    asset,
    define_asset_job,
)
@asset(group_name="ecommerce_assets")
def orders_asset():
    return 1


@asset(group_name="ecommerce_assets")
def users_asset():
    return 2



# end_job


# start_schedule


# start_definitions
defs = Definitions(
    assets=[orders_asset, users_asset],
    jobs=[ecommerce_job],
    schedules=[ecommerce_schedule],

Step 4: Turn the schedule on#

Schedules must be turned on before they can be used. In our case, we already turned the schedule on by using the default_status parameter in its ScheduleDefinition, but there are a few other ways to do this:

Heads up! Starting or stopping a schedule in the UI will override any default status set in code.

To turn on a schedule in the Dagster UI, navigate to Overview > Schedules:

Schedules tab in the Dagster UI

After the schedule is started, it will begin executing immediately if the dagster-daemon process is running. This process starts automatically when dagster dev is run.


APIs in this guide#

NameDescription
define_asset_jobA function for defining a job from a selection of assets.
AssetSelectionA class that defines a selection of assets. Typically used with define_asset_job.
ScheduleDefinitionA class that defines a schedule and attaches it to a job.
DefinitionsThe object that contains all the definitions defined within a code location. Definitions include assets, jobs, resources, schedules, and sensors.