dagster-airlift is a toolkit for observing and migrating Airflow DAGs within Dagster. This reference page provides additional information for working with dagster-airlift that is not provided within the tutorial. You should start by reading the dagster-airlift tutorial before using this reference page.
If your Dagster deployment lives behind a custom auth backend, you can customize the Airflow-to-Dagster proxying behavior to authenticate to your backend. proxying_to_dagster can take a parameter dagster_operator_klass, which allows you to define a custom BaseProxyTasktoDagsterOperator class. This allows you to override how a session is created. Let's say for example, your Dagster installation requires an access key to be set whenever a request is made, and that access key is set in an Airflow Variable called my_api_key. We can create a custom BaseProxyTasktoDagsterOperator subclass which will retrieve that variable value and set it on the session, so that any requests to Dagster's graphql API will be made using that api key.
from pathlib import Path
import requests
from airflow import DAG
from airflow.utils.context import Context
from dagster_airlift.in_airflow import BaseProxyTaskToDagsterOperator, proxying_to_dagster
from dagster_airlift.in_airflow.proxied_state import load_proxied_state_from_yaml
classCustomProxyToDagsterOperator(BaseProxyTaskToDagsterOperator):defget_dagster_session(self, context: Context)-> requests.Session:if"var"notin context:raise ValueError("No variables found in context")
api_key = context["var"]["value"].get("my_api_key")
session = requests.Session()
session.headers.update({"Authorization":f"Bearer {api_key}"})return session
defget_dagster_url(self, context: Context)->str:return"https://dagster.example.com/"
dag = DAG(
dag_id="custom_proxy_example",)# At the end of your dag file
proxying_to_dagster(
global_vars=globals(),
proxied_state=load_proxied_state_from_yaml(Path(__file__).parent /"proxied_state"),
build_from_task_fn=CustomProxyToDagsterOperator.build_from_task,)
You can use a custom proxy operator to establish a connection to a Dagster plus deployment. The below example proxies to Dagster Plus using organization name, deployment name, and user token set as Airflow Variables. To set a Dagster+ user token, follow this guide.
In order to make spin-up more efficient, dagster-airlift caches the state of the Airflow instance in the dagster database, so that repeat fetches of the code location don't require additional calls to Airflow's rest API. However, this means that the Dagster definitions can potentially fall out of sync with Airflow. Here are a few different ways this can manifest:
A new Airflow dag is added. The lineage information does not show up for this dag, and materializations are not recorded.
A dag is removed. The polling sensor begins failing, because there exist assets which expect that dag to exist.
The task dependency structure within a dag changes. This may result in unsynced statuses in Dagster, or missing materializations. This is not an exhaustive list of problems, but most of the time the tell is that materializations are missing, or assets are missing. When you find yourself in this state, you can force dagster-airlift to reload Airflow state by reloading the code location. To do this, go to the Deployment tab on the top nav, and click Redeploy on the code location relevant to your asset. After some time, the code location should be reloaded with refreshed state from Airflow.
If changes to your Airflow instance are controlled via a ci/cd process, you can add a step to automatically induce a redeploy of the relevant code location. See the docs here on using the graphql client to do this.