Skip to main content

Federate execution

In the previous step, we created Dagster asset representations of Airflow DAGs in order to observe the Airflow instances from Dagster, and set up cross-instance lineage for the DAGs. In this step, we'll federate the execution of the DAGs across both Airflow instances by using Dagster's Declarative Automation framework.

Make the customer_metrics DAG executable

To federate execution of the customer_metrics Airflow DAG, you first need to make its corresponding asset executable within Dagster.

To do this, you can use the @multi_asset decorator to define how the customer_metrics asset should be executed. You'll use the AirflowInstance defined earlier to trigger a run of the customer_metrics DAG. If the run completes successfully, the asset will be materialized. If the run fails, an exception will be raised:

@dg.multi_asset(specs=[customer_metrics_dag_asset])
def run_customer_metrics() -> dg.MaterializeResult:
run_id = metrics_airflow_instance.trigger_dag("customer_metrics")
metrics_airflow_instance.wait_for_run_completion("customer_metrics", run_id)
if metrics_airflow_instance.get_run_state("customer_metrics", run_id) == "success":
return dg.MaterializeResult(asset_key=customer_metrics_dag_asset.key)
else:
raise Exception("Dag run failed.")

Next, replace the customer_metrics_dag_asset in the Definitions object with the run_customer_metrics function:

defs = dg.Definitions(
assets=[load_customers_dag_asset, run_customer_metrics],
sensors=[warehouse_sensor, metrics_sensor],
)

In the Dagster UI, you should see that the customer_metrics asset can now be materialized.

Federate execution

Ultimately, we would like to trigger a run of customer_metrics whenever load_customers completes successfully. We're already retrieving a materialization when load_customers completes, so we can use this to trigger a run of customer_metrics by using Declarative Automation.

First, add an AutomationCondition.eager() to the customer_metrics_dag_asset. This will tell Dagster to run the run_customer_metrics function whenever the load_customers asset is materialized:

customer_metrics_dag_asset = customer_metrics_dag_asset.replace_attributes(
automation_condition=dg.AutomationCondition.eager(),
)

Next, create an AutomationConditionSensorDefinition to set up Declarative Automation:

automation_sensor = dg.AutomationConditionSensorDefinition(
name="automation_sensor",
target="*",
default_status=dg.DefaultSensorStatus.RUNNING,
minimum_interval_seconds=1,
)

Add this sensor to the Definitions object:

defs = dg.Definitions(
assets=[load_customers_dag_asset, run_customer_metrics],
sensors=[warehouse_sensor, metrics_sensor, automation_sensor],
)

Now the run_customer_metrics function will be executed whenever the load_customers asset is materialized. You can test this by triggering a run of the load_customers DAG in Airflow. When the run completes, you should see a materialization of the customer_metrics asset start in the Dagster UI, and eventually a run of the customer_metrics DAG in the metrics Airflow instance.

Complete code

To see what the code should look like after you have completed all the steps above, check out the example in GitHub.

Conclusion

That concludes the tutorial! If you followed all the steps, you should have successfully federated the execution of two DAGs across two Airflow instances using Dagster's Declarative Automation system and set up cross-instance lineage for the DAGs. You can now observe the lineage and execution of both DAGs in the Dagster UI.