Ask AI

Asset jobs#

Looking to execute a graph of ops, which aren't tied to asset definitions? Check out the Op jobs documentation.

Jobs are the main unit for executing and monitoring asset definitions in Dagster. An asset job is a type of job that targets a selection of assets and can be launched:

  • Manually from the Dagster UI
  • At fixed intervals, by schedules
  • When external changes occur, using sensors

Creating asset jobs#

In this section, we'll demonstrate how to create a few asset jobs that target the following assets:

from dagster import asset


@asset
def sugary_cereals() -> None:
    execute_query(
        "CREATE TABLE sugary_cereals AS SELECT * FROM cereals WHERE sugar_grams > 10"
    )


@asset(deps=[sugary_cereals])
def shopping_list() -> None:
    execute_query("CREATE TABLE shopping_list AS SELECT * FROM sugary_cereals")

To create an asset job, use define_asset_job. An asset-based job is based on the assets the job targets and their dependencies.

You can target one or multiple assets, or create multiple jobs that target overlapping sets of assets. In the following example, we have two jobs:

  • all_assets_job targets all assets
  • sugary_cereals_job targets only the sugary_cereals asset
from dagster import Definitions, define_asset_job


all_assets_job = define_asset_job(name="all_assets_job")
sugary_cereals_job = define_asset_job(
    name="sugary_cereals_job", selection="sugary_cereals"
)

defs = Definitions(
    assets=[sugary_cereals, shopping_list],
    jobs=[all_assets_job, sugary_cereals_job],
)

Making asset jobs available to Dagster tools#

Including the jobs in a Definitions object located at the top level of a Python module or file makes asset jobs available to the UI, GraphQL, and the command line. The Dagster tool loads that module as a code location. If you include schedules or sensors, the code location will automatically include jobs that those schedules or sensors target.

from dagster import Definitions, MaterializeResult, asset, define_asset_job


@asset
def number_asset():
    yield MaterializeResult(
        metadata={
            "number": 1,
        }
    )


number_asset_job = define_asset_job(name="number_asset_job", selection="number_asset")

defs = Definitions(
    assets=[number_asset],
    jobs=[number_asset_job],
)

Testing asset jobs#

Dagster has built-in support for testing, including separating business logic from environments and setting explicit expectations on uncontrollable inputs. Refer to the Testing guide for more info and examples.


Executing asset jobs#

You can run an asset job in a variety of ways:

  • In the Python process where it's defined
  • Via the command line
  • Via the GraphQL API
  • In the UI. The UI centers on jobs, making it a one-stop shop - you can manually kick off runs for a job and view all historical runs.

Reference#

Relevant APIs#

NameDescription
define_asset_jobA function for defining a job from a selection of assets.

Examples#

Check this code in the Hacker News example that builds an asset job that targets an asset group.