Ask AI

dagster-airlift integration reference#

dagster-airlift is a toolkit for observing and migrating Airflow DAGs within Dagster. This reference page provides additional information for working with dagster-airlift that is not provided within the tutorial. You should start by reading the dagster-airlift tutorial before using this reference page.

Supporting custom authorization#

If your Dagster deployment lives behind a custom auth backend, you can customize the Airflow-to-Dagster proxying behavior to authenticate to your backend. proxying_to_dagster can take a parameter dagster_operator_klass, which allows you to define a custom BaseProxyTasktoDagsterOperator class. This allows you to override how a session is created. Let's say for example, your Dagster installation requires an access key to be set whenever a request is made, and that access key is set in an Airflow Variable called my_api_key. We can create a custom BaseProxyTasktoDagsterOperator subclass which will retrieve that variable value and set it on the session, so that any requests to Dagster's graphql API will be made using that api key.

from pathlib import Path

import requests
from airflow import DAG
from airflow.utils.context import Context
from dagster_airlift.in_airflow import BaseProxyTaskToDagsterOperator, proxying_to_dagster
from dagster_airlift.in_airflow.proxied_state import load_proxied_state_from_yaml


class CustomProxyToDagsterOperator(BaseProxyTaskToDagsterOperator):
    def get_dagster_session(self, context: Context) -> requests.Session:
        if "var" not in context:
            raise ValueError("No variables found in context")
        api_key = context["var"]["value"].get("my_api_key")
        session = requests.Session()
        session.headers.update({"Authorization": f"Bearer {api_key}"})
        return session

    def get_dagster_url(self, context: Context) -> str:
        return "https://dagster.example.com/"


dag = DAG(
    dag_id="custom_proxy_example",
)

# At the end of your dag file
proxying_to_dagster(
    global_vars=globals(),
    proxied_state=load_proxied_state_from_yaml(Path(__file__).parent / "proxied_state"),
    build_from_task_fn=CustomProxyToDagsterOperator.build_from_task,
)

Dagster Plus Authorization#

You can use a custom proxy operator to establish a connection to a Dagster plus deployment. The below example proxies to Dagster Plus using organization name, deployment name, and user token set as Airflow Variables. To set a Dagster+ user token, follow this guide.

import requests
from airflow.utils.context import Context
from dagster_airlift.in_airflow import BaseProxyTaskToDagsterOperator


class DagsterCloudProxyOperator(BaseProxyTaskToDagsterOperator):
    def get_variable(self, context: Context, var_name: str) -> str:
        if "var" not in context:
            raise ValueError("No variables found in context")
        return context["var"]["value"][var_name]

    def get_dagster_session(self, context: Context) -> requests.Session:
        dagster_cloud_user_token = self.get_variable(context, "dagster_cloud_user_token")
        session = requests.Session()
        session.headers.update({"Dagster-Cloud-Api-Token": dagster_cloud_user_token})
        return session

    def get_dagster_url(self, context: Context) -> str:
        org_name = self.get_variable(context, "dagster_plus_organization_name")
        deployment_name = self.get_variable(context, "dagster_plus_deployment_name")
        return f"https://{org_name}.dagster.plus/{deployment_name}"

Dealing with changing Airflow#

In order to make spin-up more efficient, dagster-airlift caches the state of the Airflow instance in the dagster database, so that repeat fetches of the code location don't require additional calls to Airflow's rest API. However, this means that the Dagster definitions can potentially fall out of sync with Airflow. Here are a few different ways this can manifest:

  • A new Airflow dag is added. The lineage information does not show up for this dag, and materializations are not recorded.
  • A dag is removed. The polling sensor begins failing, because there exist assets which expect that dag to exist.
  • The task dependency structure within a dag changes. This may result in unsynced statuses in Dagster, or missing materializations. This is not an exhaustive list of problems, but most of the time the tell is that materializations are missing, or assets are missing. When you find yourself in this state, you can force dagster-airlift to reload Airflow state by reloading the code location. To do this, go to the Deployment tab on the top nav, and click Redeploy on the code location relevant to your asset. After some time, the code location should be reloaded with refreshed state from Airflow.

Automating changes to code locations#

If changes to your Airflow instance are controlled via a ci/cd process, you can add a step to automatically induce a redeploy of the relevant code location. See the docs here on using the graphql client to do this.

Peering to multiple Airflow instances#

Airlift supports peering to multiple Airflow instances, as you can invoke build_defs_from_airflow_instance multiple times and combine them with Definitions.merge:

from dagster import Definitions

from dagster_airlift.core import AirflowInstance, build_defs_from_airflow_instance

defs = Definitions.merge(
    build_defs_from_airflow_instance(
        airflow_instance=AirflowInstance(
            auth_backend=BasicAuthBackend(
                webserver_url="http://yourcompany.com/instance_one",
                username="admin",
                password="admin",
            ),
            name="airflow_instance_one",
        )
    ),
    build_defs_from_airflow_instance(
        airflow_instance=AirflowInstance(
            auth_backend=BasicAuthBackend(
                webserver_url="http://yourcompany.com/instance_two",
                username="admin",
                password="admin",
            ),
            name="airflow_instance_two",
        )
    ),
)

Customizing DAG proxying operator#

Similar to how we can customize the operator we construct on a per-dag basis, we can customize the operator we construct on a per-dag basis. We can use the build_from_dag_fn argument of proxying_to_dagster to provide a custom operator in place of the default.

For example, in the following example we can see that the operator is customized to provide an authorization header which authenticates Dagster.

from pathlib import Path

import requests
from airflow import DAG
from airflow.utils.context import Context
from dagster_airlift.in_airflow import BaseProxyDAGToDagsterOperator, proxying_to_dagster
from dagster_airlift.in_airflow.proxied_state import load_proxied_state_from_yaml


class CustomProxyToDagsterOperator(BaseProxyDAGToDagsterOperator):
    def get_dagster_session(self, context: Context) -> requests.Session:
        if "var" not in context:
            raise ValueError("No variables found in context")
        api_key = context["var"]["value"].get("my_api_key")
        session = requests.Session()
        session.headers.update({"Authorization": f"Bearer {api_key}"})
        return session

    def get_dagster_url(self, context: Context) -> str:
        return "https://dagster.example.com/"

    # This method controls how the operator is built from the dag.
    @classmethod
    def build_from_dag(cls, dag: DAG):
        return CustomProxyToDagsterOperator(dag=dag, task_id="OVERRIDDEN")


dag = DAG(
    dag_id="custom_dag_level_proxy_example",
)

# At the end of your dag file
proxying_to_dagster(
    global_vars=globals(),
    proxied_state=load_proxied_state_from_yaml(Path(__file__).parent / "proxied_state"),
    build_from_dag_fn=CustomProxyToDagsterOperator.build_from_dag,
)

BaseProxyDAGToDagsterOperator has three abstract methods which must be implemented:

  • get_dagster_session, which controls the creation of a valid session to access the Dagster graphql API.
  • get_dagster_url, which retrieves the domain at which the dagster webserver lives.
  • build_from_dag, which controls how the proxying task is constructed from the provided DAG.