External assets
One of Dagster's goals is to present a single unified lineage of all of the data assets in an organization, even if those assets are orchestrated by systems other than Dagster.
With external assets, you can model assets orchestrated by other systems natively within Dagster, ensuring you have a comprehensive catalog of your organization's data. You can also create new data assets downstream of these external assets.
Unlike native assets, Dagster can't materialize external assets directly or put them in a schedule. In these cases, an external system must inform Dagster when an external asset is updated.
For example, external assets could be:
- Files in a data lake that are populated by a bespoke internal tool
- A CSV file delivered daily by SFTP from a partner
- A table in a data warehouse populated by another orchestrator
Defining external assets
Let's say you have a partner who sends you raw transaction data by SFTP on an almost daily basis. This data is later cleaned and stored in an internal data lake.
Because the raw transaction data isn't materialized by Dagster, it makes sense to model it as an external asset. The following example accomplishes this by using AssetSpec
:
import dagster as dg
# Define an external asset with the key "raw_transactions".
# This will appear in the Dagster asset catalog, but cannot
# be materialized by Dagster itself.
raw_transactions = dg.AssetSpec("raw_transactions")
# This asset is materialized by Dagster and depends on the
# external asset.
@dg.asset(deps=[raw_transactions])
def cleaned_transactions(): ...
# Define the Definitions object
defs = dg.Definitions(assets=[raw_transactions, cleaned_transactions])
Refer to the AssetSpec
for the parameters you can provide to an external asset.
Recording materializations and metadata
When an external asset is modeled in Dagster, you also need to inform Dagster whenever the external asset is updated. You should also include any relevant metadata about the asset, such as the time it was last updated.
There are two main ways to do this:
- Pulling external assets events with sensors
- Pushing external asset events using Dagster's REST API
Pulling with sensors
You can use a Dagster sensor to regularly poll the external system and pull information about the external asset into Dagster.
For example, here's how you would poll an external system like an SFTP server to update an external asset whenever the file is changed.
import dagster as dg
# Define the external asset
raw_transactions = dg.AssetSpec("raw_transactions")
@dg.sensor(minimum_interval_seconds=30)
def raw_transactions_sensor(
context: dg.SensorEvaluationContext,
) -> dg.SensorResult:
# Poll the external system every 30 seconds
# for the last time the file was modified
file_last_modified_at_ms = ...
# Use the cursor to store the last time the sensor updated the asset
if context.cursor is not None:
external_asset_last_updated_at_ms = float(context.cursor)
else:
external_asset_last_updated_at_ms = 0
if file_last_modified_at_ms > external_asset_last_updated_at_ms:
# The external asset has been modified since it was last updated,
# so record a materialization and update the cursor.
return dg.SensorResult(
asset_events=[
dg.AssetMaterialization(
asset_key=raw_transactions.key,
# You can optionally attach metadata
metadata={"file_last_modified_at_ms": file_last_modified_at_ms},
)
],
cursor=str(file_last_modified_at_ms),
)
else:
# Nothing has happened since the last check
return dg.SensorResult()
# Define the Definitions object
defs = dg.Definitions(assets=[raw_transactions], sensors=[raw_transactions_sensor])
Refer to the Sensors guide for more information about sensors.
Pushing with the REST API
You can inform Dagster that an external asset has materialized by pushing the event from an external system to the REST API. The following examples demonstrate how to inform Dagster that a materialization of the raw_transactions
external asset has occurred.
The required headers for the REST API depend on whether you're using Dagster+ or OSS. Use the tabs to view an example API request for each Dagster type.
- Dagster+
- OSS
Authentication headers are required if using Dagster+. The request should made to your Dagster+ organization and a specific deployment in the organization.
curl \
-X POST \
-H 'Content-Type: application/json' \
-H 'Dagster-Cloud-Api-Token: [YOUR API TOKEN]' \
'https://[YOUR ORG NAME].dagster.cloud/[YOUR DEPLOYMENT NAME]/report_asset_materialization/' \
-d '
{
"asset_key": "raw_transactions",
"metadata": {
"file_last_modified_at_ms": 1724614700266
}
}'
Authentication headers aren't required if using Dagster OSS. The request should be pointed at your open source URL, which is http://localhost:3000
in this example.
curl \
-X POST \
-H 'Content-Type: application/json' \
'http://localhost:3000/report_asset_materialization/' \
-d '
{
"asset_key": "raw_transactions",
"metadata": {
"file_last_modified_at_ms": 1724614700266
}
}'
Refer to the External assets REST API documentation for more information.
Modeling a graph of external assets
Like regular Dagster assets, external assets can have dependencies. This is useful when you want to model an entire data pipeline orchestrated by another system.
import dagster as dg
# Three external assets that depend on each other
raw_data = dg.AssetSpec("raw_data")
stg_data = dg.AssetSpec("stg_data", deps=[raw_data])
cleaned_data = dg.AssetSpec("cleaned_data", deps=[stg_data])
# Native asset that depends on an external asset
@dg.asset(deps=[cleaned_data])
def derived_data(): ...
# Define the Definitions object
defs = dg.Definitions(assets=[raw_data, stg_data, cleaned_data, derived_data])