Dagster can act as a single entry point to all orchestration platforms in use at your organization. By injecting a small amount of code into your existing pipelines, you can report events to Dagster, where you can then visualize the full lineage of pipelines. This can be particularly useful if you have multiple Apache Airflow environments, and hope to build a catalog and observation platform through Dagster.
Emitting materialization events from Airflow to Dagster#
Imagine you have a large number of pipelines written in Apache Airflow and wish to introduce Dagster into your stack. By using custom Airflow operators, you can continue to run your existing pipelines while you work toward migrating them off Airflow, or while building new pipelines in Dagster that are tightly integrated with your legacy systems.
To do this, we will define a DagsterAssetOperator operator downstream of your Airflow DAG to indicate that the pipeline's processing has concluded. The HTTP endpoint of the Dagster server, the asset_key, and additional metadata and descriptions are to be specified to inform Dagster of the materialization.
Once the events are emitted from Airflow, there are two options for scheduling Dagster materializations following the external Airflow materialization event: asset sensors and auto materialization policies.
An external asset is created in Dagster, and an asset_sensor is used to identify the materialization events that are being sent from Airflow.
from dagster import external_asset_from_spec
example_external_airflow_asset = external_asset_from_spec(
AssetSpec("example_external_airflow_asset",
group_name="External"))
Now, when a materialization event occurs on the external example_external_airflow_asset asset, the example_external_airflow_asset_job job will be triggered. Here, you can define logic that can build upon the DAG from your Airflow environment.