Prefer videos? Check out our explainer and demo videos to get a quick look at asset definitions.
An asset is an object in persistent storage, such as a table, file, or persisted machine learning model. An asset definition is a description, in code, of an asset that should exist and how to produce and update that asset.
Asset definitions enable a declarative approach to data management, in which code is the source of truth on what data assets should exist and how those assets are computed.
An asset definition includes the following:
An AssetKey, which is a handle for referring to the asset.
A set of upstream asset keys, which refer to assets that the contents of the asset definition are derived from.
A Python function, which is responsible for computing the contents of the asset from its upstream dependencies and storing the results.
Note: Behind-the-scenes, the Python function is an op. Ops are an advanced topic that isn't required to get started with Dagster. A crucial distinction between asset definitions and ops is that asset definitions know about their dependencies, while ops do not. Ops aren't connected to dependencies until they're placed inside a graph.
Materializing an asset is the act of running its function and saving the results to persistent storage. You can initiate materializations from the Dagster UI or by invoking Python APIs.
The easiest way to create an asset definition is with the @asset decorator.
import json
import os
from dagster import asset
@assetdefmy_asset():
os.makedirs("data", exist_ok=True)withopen("data/my_asset.json","w")as f:
json.dump([1,2,3], f)
By default, the name of the decorated function, my_asset, is used as the asset key. The decorated function is responsible for producing the asset's contents. The asset in this example doesn't depend on any other assets.
You can define a dependency between two assets by passing the upstream asset to the deps parameter in the downstream asset's @asset decorator.
In this example, the asset sugary_cereals creates a new table (sugary_cereals) by selecting records from the cereals table. Then the asset shopping_list creates a new table (shopping_list) by selecting records from sugary_cereals:
from dagster import asset
@assetdefsugary_cereals()->None:
execute_query("CREATE TABLE sugary_cereals AS SELECT * FROM cereals WHERE sugar_grams > 10")@asset(deps=[sugary_cereals])defshopping_list()->None:
execute_query("CREATE TABLE shopping_list AS SELECT * FROM sugary_cereals")
Defining asset dependencies across code locations#
Assets can depend on assets in different code locations. Consider this example for code_location_1:
Graph-backed assets: An asset whose computations are separated into multiple ops that are combined to build a graph. If the graph outputs multiple assets, the graph-backed asset is a multi-asset.
Assets in Dagster can specify a config schema. This allows you to provide values to assets at run time. The configuration system is explained in detail in the Config schema documentation.
Asset functions can specify an annotated config parameter for the assets's configuration. The config class, which subclasses Config (which inherits from pydantic.BaseModel) specifies the configuration schema for the asset.
For example, the following downstream asset queries an API endpoint defined through configuration:
from dagster import Config, asset
classMyDownstreamAssetConfig(Config):
api_endpoint:str@assetdefmy_downstream_asset(config: MyDownstreamAssetConfig):
data = requests.get(f"{config.api_endpoint}/data").json()...
When writing an asset, users can optionally provide a first parameter, context. When this parameter is supplied, Dagster will supply an AssetExecutionContext object to the body of the asset which provides access to system information like loggers and the current run ID.
For example, to access the logger and log an info message:
from dagster import AssetExecutionContext, asset
@assetdefcontext_asset(context: AssetExecutionContext):
context.log.info(f"My run ID is {context.run.run_id}")...
Assets may be assigned a code_version. Versions let you help Dagster track what assets haven't been re-materialized since their code has changed, and avoid performing redundant computation.
When an asset with a code version is materialized, the generated AssetMaterialization is tagged with the version. The UI will indicate when an asset has a different code version than the code version used for its most recent materialization.
Dagster offers several ways to provide useful information and documentation alongside your data pipelines, including metadata and tagging. For example, you can attach metadata to an asset that calculates how many records are processed during each run and then view the data as a plot in the Dagster UI!
To view and materialize assets in the UI, you can point the underlying webserver at a module that contains asset definitions or lists of asset definitions as module-level attributes:
dagster dev -m module_with_assets
If you want the UI to show both assets and jobs that target the assets, you can place the assets and jobs together inside a Definitions object. For example:
A Definitions object defines a code location, which is a collection of assets, jobs, resources, and schedules. Refer to the Code locations documentation for more info.
To show a subset of assets in the graph, use the search bar next to the Filter button. Refer to the Asset selection syntax reference for more info and examples using the query syntax.
In the UI, you can launch runs that materialize assets by:
Navigating to the Asset details page for the asset and clicking the Materialize button in the upper right corner.
Navigating to the graph view of the Asset catalog page and clicking the Materialize button in the upper right corner. You can also click on individual assets to collect a subset to materialize.
Jobs that target assets can materialize a fixed selection of assets each time they run and be placed on schedules and sensors. Refer to the Asset jobs documentation for more info and examples.
To help keep your assets tidy, you can organize them into groups. Grouping assets by project, concept, and so on simplifies keeping track of them in the UI. Each asset is assigned to a single group, which by default is called "default".
Assets can also be given groups on an individual basis by specifying an argument when creating the asset:
@asset(group_name="cereal_assets")defnabisco_cereals():
execute_query("CREATE TABLE nabisco_cereals AS SELECT * FROM cereals WHERE manufacturer = 'Nabisco'")
This recommended approach constructs a group of assets from a specified module in your project. Using the load_assets_from_package_module function, you can import all assets in a module and apply a grouping:
from my_package import cereal
cereal_assets = load_assets_from_package_module(
cereal,
group_name="cereal_assets",)
If any of the assets in the module already has a group_name explicitly set on it, you'll encounter a Group name already exists on assets error.
To view your asset groups in the UI, open the left navigation by clicking the menu icon in the top left corner. As asset groups are grouped in code locations, you may need to open a code location to view its asset groups.
Click the asset group to open a dependency graph for all assets in the group. For example, in the following image, the dependency graph for the activity_analytics asset group is currently displayed:
You can manually provide values for those dependencies in your unit test. This allows you to test assets in isolation from one another:
deftest_more_complex_asset():
result = more_complex_asset([0])assert result ==[0,4,5,6]
If you use config of resources in your asset, they will be provided automatically during execution. When writing unit tests, you may provide them directly when invoking the asset function:
classMyConfig(Config):
api_url:strclassMyAPIResource(ConfigurableResource):defquery(self, url)-> Dict[str, Any]:return requests.get(url).json()@assetdefuses_config_and_resource(config: MyConfig, my_api: MyAPIResource):return my_api.query(config.api_url)deftest_uses_resource()->None:
result = uses_config_and_resource(
config=MyConfig(api_url="https://dagster.io"), my_api=MyAPIResource())assert result =={"foo":"bar"}
If you use a context object in your function, you can use build_asset_context to generate the context object.
Consider the following asset that uses a context object:
Assets are often objects in systems with hierarchical namespaces, like filesystems. Because of this, it often makes sense for an asset key to be a list of strings, instead of just a single string. To define an asset with a multi-part asset key, use the key_prefix argument with a list of strings. The full asset key is formed by prepending the key_prefix to the asset name (which defaults to the name of the decorated function).
from dagster import AssetIn, asset
@asset(key_prefix=["one","two","three"])defupstream_asset():return[1,2,3]@asset(ins={"upstream_asset": AssetIn(key_prefix=["one","two","three"])})defdownstream_asset(upstream_asset):return upstream_asset +[4]