Ask AI

Migrating Airflow to Dagster#

There are many strategies that can be employed in migrating an Airflow project to Dagster. Some organization may prefer to migrate all pipelines, or pick and choose workflows while having Airflow and Dagster co-exist. This guide offers a "Choose your own adventure" approach, with a variety of options to assist you in your migration from Apache Airflow.


Translating Airflow pipelines to Dagster#

If your organization makes heavy use of the PythonOperator, the Airflow TaskFlow API, or the KubernetesPodOperator, then migrating to Dagster may be relatively simple!

Migrating pipelines that use the TaskFlow API#

The Airflow piplines that are the most simple to migrate to Dagster are those that use Airflow's TaskFlow API.

With this approach, pipelines are constructed using Airflow @task decorators that can easily be mapped to a function using the Dagster @asset decorator. For example, given the Airflow task below:

from airflow.decorators import task

@task()
def extract():
    data_string = '{"1001": 301.27, "1002": 433.21, "1003": 502.22}'
    order_data_dict = json.loads(data_string)
    return order_data_dict

This can be directly translated to a Dagster asset like so.

from dagster import asset

@asset
def extract():
    data_string = '{"1001": 301.27, "1002": 433.21, "1003": 502.22}'
    order_data_dict = json.loads(data_string)
    return order_data_dict

Now, let’s walk through the full tutorial_taskflow_api.py example DAG, and how it would be translated to Dagster assets.

import json

import pendulum

from airflow.decorators import dag, task

@dag(
    schedule=None,
    start_date=pendulum.datetime(2021, 1, 1, tz="UTC"),
    catchup=False,
    tags=["example"],
)
def tutorial_taskflow_api():
    """
    ### TaskFlow API Tutorial Documentation
    This is a simple data pipeline example which demonstrates the use of
    the TaskFlow API using three simple tasks for Extract, Transform, and Load.
    Documentation that goes along with the Airflow TaskFlow API tutorial is
    located
    [here](https://airflow.apache.org/docs/apache-airflow/stable/tutorial_taskflow_api.html)
    """
    @task
    def extract():
        """
        #### Extract task
        A simple Extract task to get data ready for the rest of the data
        pipeline. In this case, getting data is simulated by reading from a
        hardcoded JSON string.
        """
        data_string = '{"1001": 301.27, "1002": 433.21, "1003": 502.22}'

        order_data_dict = json.loads(data_string)
        return order_data_dict
    @task(multiple_outputs=True)
    def transform(order_data_dict: dict):
        """
        #### Transform task
        A simple Transform task which takes in the collection of order data and
        computes the total order value.
        """
        total_order_value = 0

        for value in order_data_dict.values():
            total_order_value += value

        return {"total_order_value": total_order_value}
    @task
    def load(total_order_value: float):
        """
        #### Load task
        A simple Load task which takes in the result of the Transform task and
        instead of saving it to end user review, just prints it out.
        """

        print(f"Total order value is: {total_order_value:.2f}")
    order_data = extract()
    order_summary = transform(order_data)
    load(order_summary["total_order_value"])


tutorial_taskflow_api()

By converting the Airflow task to a Dagster @asset, and our Airflow dag to a Dagster @job, the resulting code will look like the following.

import json

from dagster import AssetExecutionContext, Definitions, define_asset_job, asset


@asset
def extract():
    """Extract task

    A simple Extract task to get data ready for the rest of the data pipeline. In this case, getting
    data is simulated by reading from a hardcoded JSON string.
    """
    data_string = '{"1001": 301.27, "1002": 433.21, "1003": 502.22}'

    order_data_dict = json.loads(data_string)

    return order_data_dict


@asset
def transform(extract):
    """Transform task

    A simple Transform task which takes in the collection of order data and computes the total order
    value.
    """
    total_order_value = 0

    for value in extract.values():
        total_order_value += value

    return total_order_value


@asset
def load(context: AssetExecutionContext, transform):
    """Load task

    A simple Load task which takes in the result of the Transform task and instead of saving it to
    end user review, just prints it out.
    """
    context.log.info(f"Total order value is: {transform:.2f}")


airflow_taskflow_example = define_asset_job(
    name="airflow_taskflow_example",
    selection=[extract, transform, load]
)

defs = Definitions(
    assets=[extract, transform, load],
    jobs=[airflow_taskflow_example]
)

In this example, we are using define_asset_job to define a job in which the selected assets are materialized. Using the selection parameter of the function, we specify that we want our extract, transform, and load assets to be materialized. The lineage of dependencies between the assets are automatically determined through the passing of one asset as a parameter to another.

Finally, we create a Definitions object to register our assets and job and load them by the Dagster tool.


Migrating containerized pipelines#

If you've elected to containerize your Airflow pipelines by using technologies like Kubernetes using the KubernetesPodOperator, or Elastic Container Service using the EcsRunTaskOperator, you'll need a different approach to migration.

In these cases, we recommend leveraging Dagster Pipes for running these external execution environments from Dagster. Refer to the Dagster Pipes documentation for more information.

Some benefits of containerizing your pipelines are as follows:

  • Dependencies are isolated between execution environments
  • Compute requirements can be easily modified per pipeline (computer, memory, GPU requirements, and so on)
  • Pipelines can be language agnostic, allowing you to use R, Rust, Go, and so on
  • Vendor lock-in is limited, and pipelines can be easily migrated to other platforms
  • Pipelines can be versioned using tags on the image repository

Let’s walk through an example of how a containerized pipeline can be run from Airflow, and then let’s walk through how the same would be done in Dagster. Imagine you have a Dockerized pipeline deployed to your registry of choice with an image named example-data-pipeline. In Apache Airflow, you would be able to run the image of that image by using the KubernetesPodOperator.

from airflow import DAG
from airflow.providers.cncf.kubernetes.operators.kubernetes_pod import KubernetesPodOperator
from pendulum import datetime

with DAG(dag_id="example_kubernetes_dag", schedule_interval=None, catchup=False) as dag:
    KubernetesPodOperator(
        image="example-data-pipeline:latest",
        name="example-kubernetes-task",
        task_id="example-kubernetes-task",
        get_logs=True,
    )

Now, let's look at how the same image would be run on Kubernetes using Dagster Pipes and the dagster_k8s wrapper.

from dagster import AssetExecutionContext, asset
from dagster_k8s import PipesK8sClient


@asset
def k8s_pipes_asset(context: AssetExecutionContext, k8s_pipes_client: PipesK8sClient):
    return k8s_pipes_client.run(
        context=context,
        image="example-data-pipeline:latest",
        base_pod_spec={
            "containers": [
                {
                    "name": "data-processing-rs",
                    "image": "data-processing-rs",
                }
            ]
        },
    ).get_materialize_result()
The primary difference between Airflow and Dagster are how the k8s pod specifications are exposed. In Airflow, they are passed as parameters to the `KubernetesPodOperator`, whereas in Dagster they are passed as a `base_pod_spec` dictionary to the `k8s_pipes_client.run` method. Additionally, in Airflow, `get_logs` is required to capture `stdout`. In Dagster, however, they are automatically captured on the `stdout` tab of the step output.

The primary difference between Airflow and Dagster are how the k8s pod specification are expose. In Airflow, they are passed as parameters the `KubernetesPodOperator`, whereas in Dagster they are passed as a `base_pod_spec` dictionary to the `k8s_pipes_client.run` method. Another difference is that in Airflow, `get_logs` is required to capture _stdout_, however, with Dagster they are automatically captured to on the _stdout_ tab of the step output.

In the above example, we demonstrated how to run images on Kubernetes using the `dagster_k8s` library. One of the biggest benefits of Dagster Pipes, however, is that you can leverage the `dagster_pipes` library from within your containerized pipeline to access the full Dagster context, and emit events back to the Dagster UI.

In the above example, we demonstrated how to run images on Kubernetes using the `dagsater_k8s` library. One of the biggest benefits of Dagster Pipes, however, is that you can leverage the `dagster_pipes` library from within your containerized pipeline to access the full Dagster context, and emit events back to the Dagster UI.

A common pattern when building containerized pipelines is to accept a large number of command-line arguments using libraries like `argparse`. However, with Dagster you can pass a dictionary of parameters on the Dagster context using the `extras` parameter. Then, in your pipeline code, you can access the context `PipesContext.get()` if you are using Python.

For a step-by-step walkthrough of using Dagster Pipes, refer to the [Dagster Pipes tutorial](https://docs.dagster.io/concepts/dagster-pipes/subprocess).