Ask AI

Observe your Airflow pipelines with Dagster#

Dagster can act as a single entry point to all orchestration platforms in use at your organization. By injecting a small amount of code into your existing pipelines, you can report events to Dagster, where you can then visualize the full lineage of pipelines. This can be particularly useful if you have multiple Apache Airflow environments, and hope to build a catalog and observation platform through Dagster.

Emitting materialization events from Airflow to Dagster#

Imagine you have a large number of pipelines written in Apache Airflow and wish to introduce Dagster into your stack. By using custom Airflow operators, you can continue to run your existing pipelines while you work toward migrating them off Airflow, or while building new pipelines in Dagster that are tightly integrated with your legacy systems.

To do this, we will define a DagsterAssetOperator operator downstream of your Airflow DAG to indicate that the pipeline's processing has concluded. The HTTP endpoint of the Dagster server, the asset_key, and additional metadata and descriptions are to be specified to inform Dagster of the materialization.

from typing import Dict, Optional

from airflow.models import BaseOperator
from airflow.utils.decorators import apply_defaults
import requests

class DagsterAssetOperator(BaseOperator):
    @apply_defaults
    def __init__(
        self,
        dagster_webserver_host: str,
        dagster_webserver_port: str,
        asset_key: str,
        metadata: Optional[Dict] = None,
        description: Optional[str] = None,
        *args,
        **kwargs,
    ):
        super().__init__(*args, **kwargs)
        self.dagster_webserver_host = dagster_webserver_host
        self.dagster_webserver_port = dagster_webserver_port
        self.asset_key = asset_key
        self.metadata = metadata or {}
        self.description = description

    def execute(self, context):
        url = f"http://{dagster_webserver_host}:{dagster_webserver_port}/report_asset_materialization/{self.asset_key}"
        payload = {"metadata": self.metadata, "description": self.description}
        headers = {"Content-Type": "application/json"}

        response = requests.post(url, json=payload, headers=headers)
        response.raise_for_status()

        self.log.info(
            f"Reported asset materialization to Dagster. Response: {response.text}"
        )

Then, we can append this to our Airflow DAG to indicate that a pipeline has run successfully.

import os

dagster_webserver_host = os.environ.get("DAGSTER_WEBSERVER_HOST", "localhost")
dagster_webserver_port = os.environ.get("DAGSTER_WEBSERVER_PORT", "3000")

dagster_op = DagsterAssetOperator(
    task_id="report_dagster_asset_materialization",
    dagster_webserver_host=dagster_webserver_host,
    dagster_webserver_port=dagster_webserver_port,
    asset_key="example_external_airflow_asset",
    metadata={"airflow/tag": "example", "source": "external"},
)

Once the events are emitted from Airflow, there are two options for scheduling Dagster materializations following the external Airflow materialization event: asset sensors and auto materialization policies.

An external asset is created in Dagster, and an asset_sensor is used to identify the materialization events that are being sent from Airflow.

from dagster import external_asset_from_spec

example_external_airflow_asset = external_asset_from_spec(
    AssetSpec("example_external_airflow_asset",
    group_name="External")
)
from dagster import (
    AssetKey,
    EventLogEntry,
    RunRequest,
    SensorEvaluationContext,
    asset_sensor
)

@asset_sensor(
    asset_key=AssetKey("example_external_airflow_asset"),
    job=example_external_airflow_asset_job
)
def example_external_airflow_asset_sensor(
    context: SensorEvaluationContext, asset_event: EventLogEntry
):
    assert asset_event.dagster_event and asset_event.dagster_event.asset_key
    yield RunRequest(run_key=context.cursor)

Now, when a materialization event occurs on the external example_external_airflow_asset asset, the example_external_airflow_asset_job job will be triggered. Here, you can define logic that can build upon the DAG from your Airflow environment.