dagster-airlift is a toolkit for observing and migrating Airflow DAGs within Dagster. This reference page provides additional information for working with dagster-airlift that is not provided within the tutorial. You should start by reading the dagster-airlift tutorial before using this reference page.
When migrating Airflow DAGs to Dagster, we recommend a few best practices:
Create separate packages for the Airflow and Dagster deployments. Airflow has complex dependencies and can be difficult to install in the same environment as Dagster.
Create user acceptance tests in Dagster before migrating. This will help you catch issues easily during migration.
Understand the rollback procedure for your migration. When proxying execution to Dagster from Airflow, you can always rollback with a single line-of-code change in the Airflow DAG.
If your Dagster deployment lives behind a custom auth backend, you can customize the Airflow-to-Dagster proxying behavior to authenticate to your backend. proxying_to_dagster can take a parameter dagster_operator_klass, which allows you to define a custom BaseProxyTasktoDagsterOperator class. This allows you to override how a session is created. Let's say for example, your Dagster installation requires an access key to be set whenever a request is made, and that access key is set in an Airflow Variable called my_api_key. We can create a custom BaseProxyTasktoDagsterOperator subclass which will retrieve that variable value and set it on the session, so that any requests to Dagster's graphql API will be made using that api key.
from pathlib import Path
import requests
from airflow import DAG
from airflow.utils.context import Context
from dagster_airlift.in_airflow import BaseProxyTaskToDagsterOperator, proxying_to_dagster
from dagster_airlift.in_airflow.proxied_state import load_proxied_state_from_yaml
classCustomProxyToDagsterOperator(BaseProxyTaskToDagsterOperator):defget_dagster_session(self, context: Context)-> requests.Session:if"var"notin context:raise ValueError("No variables found in context")
api_key = context["var"]["value"].get("my_api_key")
session = requests.Session()
session.headers.update({"Authorization":f"Bearer {api_key}"})return session
defget_dagster_url(self, context: Context)->str:return"https://dagster.example.com/"
dag = DAG(
dag_id="custom_proxy_example",)# At the end of your dag file
proxying_to_dagster(
global_vars=globals(),
proxied_state=load_proxied_state_from_yaml(Path(__file__).parent /"proxied_state"),
build_from_task_fn=CustomProxyToDagsterOperator.build_from_task,)
You can use a custom proxy operator to establish a connection to a Dagster plus deployment. The below example proxies to Dagster Plus using organization name, deployment name, and user token set as Airflow Variables. To set a Dagster+ user token, follow this guide.
In order to make spin-up more efficient, dagster-airlift caches the state of the Airflow instance in the dagster database, so that repeat fetches of the code location don't require additional calls to Airflow's rest API. However, this means that the Dagster definitions can potentially fall out of sync with Airflow. Here are a few different ways this can manifest:
A new Airflow dag is added. The lineage information does not show up for this dag, and materializations are not recorded.
A dag is removed. The polling sensor begins failing, because there exist assets which expect that dag to exist.
The task dependency structure within a dag changes. This may result in unsynced statuses in Dagster, or missing materializations. This is not an exhaustive list of problems, but most of the time the tell is that materializations are missing, or assets are missing. When you find yourself in this state, you can force dagster-airlift to reload Airflow state by reloading the code location. To do this, go to the Deployment tab on the top nav, and click Redeploy on the code location relevant to your asset. After some time, the code location should be reloaded with refreshed state from Airflow.
If changes to your Airflow instance are controlled via a ci/cd process, you can add a step to automatically induce a redeploy of the relevant code location. See the docs here on using the graphql client to do this.
Airlift supports peering to multiple Airflow instances, as you can invoke build_defs_from_airflow_instance multiple times and combine them with Definitions.merge:
Similar to how we can customize the operator we construct on a per-dag basis, we can customize the operator we construct on a per-dag basis. We can use the build_from_dag_fn argument of proxying_to_dagster to provide a custom operator in place of the default.
For example, in the following example we can see that the operator is customized to provide an authorization header which authenticates Dagster.
from pathlib import Path
import requests
from airflow import DAG
from airflow.utils.context import Context
from dagster_airlift.in_airflow import BaseProxyDAGToDagsterOperator, proxying_to_dagster
from dagster_airlift.in_airflow.proxied_state import load_proxied_state_from_yaml
classCustomProxyToDagsterOperator(BaseProxyDAGToDagsterOperator):defget_dagster_session(self, context: Context)-> requests.Session:if"var"notin context:raise ValueError("No variables found in context")
api_key = context["var"]["value"].get("my_api_key")
session = requests.Session()
session.headers.update({"Authorization":f"Bearer {api_key}"})return session
defget_dagster_url(self, context: Context)->str:return"https://dagster.example.com/"# This method controls how the operator is built from the dag.@classmethoddefbuild_from_dag(cls, dag: DAG):return CustomProxyToDagsterOperator(dag=dag, task_id="OVERRIDDEN")
dag = DAG(
dag_id="custom_dag_level_proxy_example",)# At the end of your dag file
proxying_to_dagster(
global_vars=globals(),
proxied_state=load_proxied_state_from_yaml(Path(__file__).parent /"proxied_state"),
build_from_dag_fn=CustomProxyToDagsterOperator.build_from_dag,)
BaseProxyDAGToDagsterOperator has three abstract methods which must be implemented:
get_dagster_session, which controls the creation of a valid session to access the Dagster graphql API.
get_dagster_url, which retrieves the domain at which the dagster webserver lives.
build_from_dag, which controls how the proxying task is constructed from the provided DAG.