Now that you've written an entire pipeline in Dagster, you will need to run it regularly to keep your assets up to date.
By the end of this part of the tutorial, you'll be able to:
A job lets you target a selection of assets to materialize them together as a single action. Assets can belong to multiple jobs.
Your Dagster repository has a file called
tutorial/__init__.py that is used as a top-level definition for your project. Update the code in this file to add the job using the
from dagster import ( AssetSelection, Definitions, define_asset_job, load_assets_from_modules, ) from . import assets all_assets = load_assets_from_modules([assets]) # Addition: define a job that will materialize the assets hackernews_job = define_asset_job("hackernews_job", selection=AssetSelection.all()) defs = Definitions( assets=all_assets, jobs=[hackernews_job], # Addition: add the job to Definitions object (see below) )
Once you have a job, you can execute it on a schedule, by clicking a button in the Dagster UI, the CLI, or via Dagster's GraphQL endpoints. Confirm that your job was defined by:
Up until this point, you defined assets using the
@asset decorator. Dagster definitions are entities that Dagster learns about by importing your code. Just now, you used a different kind of definition: a job definition.
Managing one type of definition, such as assets, is easy. However, it can quickly become unruly as your project grows to have a variety of definitions (ex. schedules, jobs, sensors). To combine definitions and have them aware of each other, Dagster provides a utility called the
After defining a job, it can be attached to a schedule. A schedule's responsibility is to start a run of the assigned job at a specified time. Schedules are added with the
To regularly update the assets, add the new
ScheduleDefinition import, create a new schedule for the
hackernews_job, and add the schedule to the code location. The code below is how your
__init__.py should look after making these changes:
from dagster import ( AssetSelection, Definitions, ScheduleDefinition, define_asset_job, load_assets_from_modules, ) from . import assets all_assets = load_assets_from_modules([assets]) # Define a job that will materialize the assets hackernews_job = define_asset_job("hackernews_job", selection=AssetSelection.all()) # Addition: a ScheduleDefinition the job it should run and a cron schedule of how frequently to run it hackernews_schedule = ScheduleDefinition( job=hackernews_job, cron_schedule="0 * * * *", # every hour ) defs = Definitions( assets=all_assets, schedules=[hackernews_schedule], )
Go to the UI, click Overview > Schedules tab, and observe your new schedule with the attached job.
To test the change, click the schedule's name to view its details. Click the Test Schedule button on the top right corner of the page to trigger the schedule immediately.
Schedules are just one way to start jobs. Jobs can also be run by using the CLI, a Python function, or the UI. Refer to the Jobs documentation to learn more.
You've used a schedule to update your data on a regular cadence. However, there are other ways to trigger jobs. For example, sensors can trigger a job after routinely polling a source. Check out the Automating your pipelines guide to learn more.
By now, you've:
In the next section, you'll learn how to use I/O managers to save your data.