Ask AI

External Assets (Experimental)#

An external asset is an asset that is visible in Dagster but executed by an external process. For example, you have a process that loads data from Kafka into Amazon S3 every day. You want the S3 asset to be visible alongside your other data assets, but not triggered by Dagster.

In this case, you could use an external asset to leverage Dagster's event log and tooling without using the orchestrator. This allows you to maintain data lineage, observability, and data quality without unnecessary migrations.

What about Source Assets?#

SourceAssets can be used to model data that's produced by a process Dagster doesn't control, such as a daily file drop into Amazon S3.

External assets can accomplish this, and more. As a result, Source Assets will be replaced with external assets in the near future.


Uses and limitations#

Using external assets, you can:

  • Attach metadata to asset definitions for documentation, tracking ownership, and so on
  • Track the assets' data quality and version in Dagster
  • Use asset sensors or auto-materialize policies to update downstream assets based on updates to external assets

Limitations#

The following aren't currently supported when using external assets:

  • Scheduling the execution of an external asset
  • Backfilling an external asset using Dagster
  • Using the Dagster UI or GraphQL API to instigate ad hoc executions

Relevant APIs#

NameDescription
external_assets_from_specsCreates a list of AssetsDefinition objects that represent external assets
AssetSpecAn object that represents the metadata of a particular asset

Defining external assets#

The following code declares a single external asset that represents a file in S3 and passes it to a Definitions object:

Click the Asset in the Dagster UI tab to see how this asset would be rendered in the Dagster UI.

from dagster import AssetSpec, Definitions, external_asset_from_spec

defs = Definitions(assets=[external_asset_from_spec(AssetSpec("file_in_s3"))])

External assets with dependencies#

External assets can depend only on other external assets.

Dependencies are defined by using the deps argument of AssetSpec. This enables Dagster to model entire graphs of assets scheduled and orchestrated by other systems.

In the following example, we have two assets: raw_logs and processed_logs. The processed_logs asset is produced by a scheduled computation in another orchestration system. Using external assets allows you to model both assets in Dagster.

Click the Assets in the Dagster UI tab to see how these assets would be rendered in the Dagster UI.

from dagster import AssetSpec, Definitions, external_assets_from_specs

raw_logs = AssetSpec("raw_logs")
processed_logs = AssetSpec("processed_logs", deps=[raw_logs])

defs = Definitions(assets=external_assets_from_specs([raw_logs, processed_logs]))

Dagster-native assets with external asset dependencies#

Fully-managed assets can depend on external assets. In this example, the aggregated_logs asset depends on processed_logs, which is an external asset:

Click the Assets in the Dagster UI tab to see how these assets would be rendered in the Dagster UI.

from dagster import AssetSpec, Definitions, asset, external_assets_from_specs

raw_logs = AssetSpec("raw_logs")
processed_logs = AssetSpec("processed_logs", deps=[raw_logs])


@asset(deps=[processed_logs])
def aggregated_logs() -> None:
    # Loads "processed_log" into memory and performs some aggregation
    ...


defs = Definitions(
    assets=[aggregated_logs, *external_assets_from_specs([raw_logs, processed_logs])]
)

Updating external asset metadata#

As Dagster doesn't control scheduling or materializing external assets, it's up to you to keep their metadata updated. This also means that materialization for external assets will be disabled in the Dagster UI.

To keep your external assets updated, you can use any of the following approaches:

Using the REST API#

Whether you're using Dagster OSS or Dagster+, you can use a REST endpoint for reporting asset materializations. The API also has endpoints for reporting asset observations and asset check evaluations.

Refer to the following tabs for examples using curl and Python to communicate with the API.

Using curl#

Dagster+#
curl --request POST \
    --url https://{organization}.dagster.cloud/{deployment}/report_asset_materialization/{asset_key} \
    --header 'Content-Type: application/json' \
    --header 'Dagster-Cloud-Api-Token: {token}' \
    --data '{
    "metadata" : {
        "source": "From curl command"
    }
}'

Using Python#

Dagster+#
import requests

url = f"https://{organization}.dagster.cloud/{deployment}/report_asset_materialization/{asset_key}"
payload = { "metadata": { "source": "From python script" } }
headers = { "Content-Type": "application/json", "Dagster-Cloud-Api-Token": "{token}" }

response = requests.request("POST", url, json=payload, headers=headers)

Using sensors#

By using the asset_events parameter of SensorResult, you can generate events to attach to external assets and then provide them directly to sensors. For example:

import datetime

from dagster import (
    AssetMaterialization,
    AssetSpec,
    Definitions,
    SensorEvaluationContext,
    SensorResult,
    external_asset_from_spec,
    sensor,
)


def utc_now_str() -> str:
    return datetime.datetime.now(datetime.timezone.utc).strftime("%Y-%m-%d, %H:%M:%S")


@sensor()
def keep_external_asset_a_up_to_date(context: SensorEvaluationContext) -> SensorResult:
    # Materialization happened in external system, but is recorded here
    return SensorResult(
        asset_events=[
            AssetMaterialization(
                asset_key="external_asset_a",
                metadata={
                    "source": f'From sensor "{context.sensor_name}" at UTC time "{utc_now_str()}"'
                },
            )
        ]
    )


defs = Definitions(
    assets=[external_asset_from_spec(AssetSpec("external_asset_a"))],
    sensors=[keep_external_asset_a_up_to_date],
)

Using the Python API#

You can insert events to attach to external assets directly from Dagster's Python API. Specifically, the API is report_runless_asset_event on DagsterInstance.

For example, this would be useful when writing a Python script to backfill metadata:

from dagster import AssetMaterialization

# instance is a DagsterInstance. Get using DagsterInstance.get()
instance.report_runless_asset_event(
    AssetMaterialization(
        "asset_one", metadata={"nrows": 10, "source": "From this script."}
    )
)

Logging events using ops#

You can log an AssetMaterialization from an op. In this case, use the log_event method of OpExecutionContext to report an asset materialization of an external asset. For example:

from dagster import (
    AssetMaterialization,
    AssetSpec,
    Definitions,
    OpExecutionContext,
    external_asset_from_spec,
    job,
    op,
)


@op
def an_op(context: OpExecutionContext) -> None:
    context.log_event(AssetMaterialization(asset_key="external_asset"))


@job
def a_job() -> None:
    an_op()


defs = Definitions(
    assets=[external_asset_from_spec(AssetSpec("external_asset"))], jobs=[a_job]
)