Ask AI

Airflow integration reference#

This reference provides a high-level look at working with Airflow using the dagster-airflow integration library.


Airflow DAG directory as a Dagster repository#

To load all Airflow DAGS in a file path into a Dagster repository, use make_dagster_repo_from_airflow_dags_path:

import os

from dagster_airflow import (
    make_dagster_definitions_from_airflow_dags_path,
)

migrated_airflow_definitions = make_dagster_definitions_from_airflow_dags_path(
    os.path.abspath("./dags/"),
)

Orchestrating Dagster jobs from Airflow#

You can orchestrate Dagster job runs from Airflow by using the DagsterCloudOperator or DagsterOperator operators in your existing Airflow DAGs. For example, here's an Airflow DAG:

from datetime import datetime

from airflow import DAG
from dagster_airflow import DagsterCloudOperator

with DAG(
    dag_id="dagster_cloud",
    start_date=datetime(2022, 5, 28),
    schedule_interval="*/5 * * * *",
    catchup=False,
) as dag:
    DagsterCloudOperator(
        task_id="new_dagster_assets",
        repostitory_location_name="example_location",
        repository_name="my_dagster_project",
        job_name="all_assets_job",
    )

In Airflow 2.0+, you can create a Dagster connection type to store configuration related to your Dagster Cloud organization. If you're using Airflow 1.0, you can also pass this directly to the operator.


Ingesting DAGs from Airflow#

This example demonstrates how to use make_dagster_job_from_airflow_dag to compile an Airflow DAG into a Dagster job that works the same way as a Dagster-native job.

There are three jobs in the repo:

  • airflow_simple_dag demonstrates the use of Airflow templates.
  • airflow_complex_dag shows the translation of a more complex dependency structure.
  • airflow_kubernetes_dag shows the translation of a DAG using Kubernetes pod operators.
from dagster_airflow import (
    make_dagster_job_from_airflow_dag,
    make_schedules_and_jobs_from_airflow_dag_bag,
)

from with_airflow.airflow_complex_dag import complex_dag
from with_airflow.airflow_kubernetes_dag import kubernetes_dag
from with_airflow.airflow_simple_dag import simple_dag

airflow_simple_dag = make_dagster_job_from_airflow_dag(simple_dag)
airflow_complex_dag = make_dagster_job_from_airflow_dag(complex_dag)
airflow_kubernetes_dag = make_dagster_job_from_airflow_dag(kubernetes_dag)


@repository
def with_airflow():
    return [airflow_complex_dag, airflow_simple_dag, airflow_kubernetes_dag]

Note that the execution_date for the Airflow DAG is specified through the job tags. To specify tags, call to:

airflow_simple_dag_with_execution_date = make_dagster_job_from_airflow_dag(
    dag=simple_dag, tags={"airflow_execution_date": datetime.now().isoformat()}
)