Ask AI

Learning Dagster from Airflow#

In this tutorial, we'll help you make the switch from Airflow to Dagster. Here, we review an Airflow DAG and show how the same functionality can be achieved in Dagster.


Comparing an Airflow DAG to Dagster#

In this tutorial, we'll rewrite an Airflow DAG as a Dagster job. Let's start with a basic Airflow DAG:

from datetime import datetime, timedelta
from textwrap import dedent

from airflow import DAG
from airflow.operators.bash import BashOperator

with DAG(
    "tutorial",
    default_args={
        "retries": 1,
    },
    description="A simple tutorial DAG",
    schedule_interval=timedelta(days=1),
    start_date=datetime(2021, 1, 1),
    catchup=False,
    tags=["example"],
) as dag:
    t1 = BashOperator(
        task_id="print_date",
        bash_command="date",
    )

    t2 = BashOperator(
        task_id="sleep",
        bash_command="sleep 5",
        retries=3,
    )

    templated_command = dedent(
        """
    {% for i in range(5) %}
        echo "{{ ds }}"
        echo "{{ macros.ds_add(ds, 7)}}"
    {% endfor %}
    """
    )

    t3 = BashOperator(
        task_id="templated",
        bash_command=templated_command,
    )

    t1 >> [t2, t3]

To rewrite this DAG in Dagster, we'll break it down into three parts:

  1. Define the computations: the ops - in Airflow, the operators
  2. Define the graph: the job - in Airflow, the DAG
  3. Define the schedule - In Airflow, the schedule (how simple!)

A Dagster job is made up of a graph of ops. This should feel familiar if you've used the Airflow Task API. With ops, the focus is on writing a graph with Python functions as nodes and data dependencies in between them as edges.


Step 1: Defining the ops#

In Dagster, the minimum unit of computation is an op. This directly corresponds to an operator in Airflow. Here, we map the operators of our example Airflow DAG t1, t2, and t3 to their respective Dagster ops.

@op
def print_date(context: OpExecutionContext) -> datetime:
    ds = datetime.now()
    context.log.info(ds)
    return ds


@op(retry_policy=RetryPolicy(max_retries=3), ins={"start": In(Nothing)})
def sleep():
    time.sleep(5)


@op
def templated(context: OpExecutionContext, ds: datetime):
    for _i in range(5):
        context.log.info(ds)
        context.log.info(ds - timedelta(days=7))

Which would yield the following graph of computations in the Dagster UI. We'll spin up the UI later in the tutorial, but wanted to demonstrate:

Screenshot of the dagster UI, showing the newly created graph of tutorial Ops

Op-level retries#

In the tutorial DAG, the t2 operator allowed for three retries. To configure the same behavior in Dagster, you can use op-level retry policies.


Step 2: Define the job#

In Dagster, the computations defined in ops are composed in jobs, which define the sequence and dependency structure of the computations you want to execute. This directly corresponds to a DAG in Airflow. Here, we compose the op's print_date, sleep and templated to match the dependency structure defined by the Airflow operators t1, t2, and t3.

@job(tags={"dagster/max_retries": 1, "dag_name": "example"})
def tutorial_job():
    ds = print_date()
    sleep(ds)
    templated(ds)

Job-level retries#

Job-level retries are managed by the run launcher. Once enabled in your dagster.yaml file, you can define the retry count for the job.


Step 3: Define the schedule#

In Dagster, schedules can be defined for jobs, which determine the cadence at which a job is triggered to be executed. Below we define a schedule that will run the tutorial_job daily:

schedule = ScheduleDefinition(job=tutorial_job, cron_schedule="@daily")

Step 4: Run Dagster locally#

In order to run our newly defined Dagster job we'll need to add it and the schedule to our project's Definitions.

defs = Definitions(
    jobs=[tutorial_job],
    schedules=[schedule],
)

We can now load this file with the UI:

dagster dev -f <your_dagster_file>.py

Completed code example#

That's it! By now, your code should look like this:

import time
from datetime import datetime, timedelta

from dagster import (
    Definitions,
    In,
    Nothing,
    OpExecutionContext,
    RetryPolicy,
    ScheduleDefinition,
    job,
    op,
    schedule,
)


@op
def print_date(context: OpExecutionContext) -> datetime:
    ds = datetime.now()
    context.log.info(ds)
    return ds


@op(retry_policy=RetryPolicy(max_retries=3), ins={"start": In(Nothing)})
def sleep():
    time.sleep(5)


@op
def templated(context: OpExecutionContext, ds: datetime):
    for _i in range(5):
        context.log.info(ds)
        context.log.info(ds - timedelta(days=7))


@job(tags={"dagster/max_retries": 1, "dag_name": "example"})
def tutorial_job():
    ds = print_date()
    sleep(ds)
    templated(ds)


schedule = ScheduleDefinition(job=tutorial_job, cron_schedule="@daily")


defs = Definitions(
    jobs=[tutorial_job],
    schedules=[schedule],
)