import os
from dagster_airflow import(
make_dagster_definitions_from_airflow_dags_path,)
migrated_airflow_definitions = make_dagster_definitions_from_airflow_dags_path(
os.path.abspath("./dags/"),)
You can orchestrate Dagster job runs from Airflow by using the DagsterCloudOperator or DagsterOperator operators in your existing Airflow DAGs. For example, here's an Airflow DAG:
from datetime import datetime
from airflow import DAG
from dagster_airflow import DagsterCloudOperator
with DAG(
dag_id="dagster_cloud",
start_date=datetime(2022,5,28),
schedule_interval="*/5 * * * *",
catchup=False,)as dag:
DagsterCloudOperator(
task_id="new_dagster_assets",
repostitory_location_name="example_location",
repository_name="my_dagster_project",
job_name="all_assets_job",)
In Airflow 2.0+, you can create a Dagster connection type to store configuration related to your Dagster+ organization. If you're using Airflow 1.0, you can also pass this directly to the operator.
This example demonstrates how to use make_dagster_job_from_airflow_dag to compile an Airflow DAG into a Dagster job that works the same way as a Dagster-native job.
There are three jobs in the repo:
airflow_simple_dag demonstrates the use of Airflow templates.
airflow_complex_dag shows the translation of a more complex dependency structure.
airflow_kubernetes_dag shows the translation of a DAG using Kubernetes pod operators.
from dagster_airflow import(
make_dagster_job_from_airflow_dag,
make_schedules_and_jobs_from_airflow_dag_bag,)from with_airflow.airflow_complex_dag import complex_dag
from with_airflow.airflow_kubernetes_dag import kubernetes_dag
from with_airflow.airflow_simple_dag import simple_dag
airflow_simple_dag = make_dagster_job_from_airflow_dag(simple_dag)
airflow_complex_dag = make_dagster_job_from_airflow_dag(complex_dag)
airflow_kubernetes_dag = make_dagster_job_from_airflow_dag(kubernetes_dag)@repositorydefwith_airflow():return[airflow_complex_dag, airflow_simple_dag, airflow_kubernetes_dag]
Note that the execution_date for the Airflow DAG is specified through the job tags. To specify tags, call to: