Ask AI

Source code for dagster_airflow.resources.airflow_persistent_db

import importlib
import os
from typing import List, Optional

import airflow
from airflow.models.connection import Connection
from dagster import (
    Array,
    DagsterRun,
    Field,
    InitResourceContext,
    ResourceDefinition,
    StringSource,
    _check as check,
)

from dagster_airflow.resources.airflow_db import AirflowDatabase
from dagster_airflow.utils import (
    create_airflow_connections,
    is_airflow_2_loaded_in_environment,
    serialize_connections,
)


class AirflowPersistentDatabase(AirflowDatabase):
    """A persistent Airflow database Dagster resource."""

    def __init__(self, dagster_run: DagsterRun, uri: str, dag_run_config: Optional[dict] = None):
        self.uri = uri
        super().__init__(dagster_run=dagster_run, dag_run_config=dag_run_config)

    @staticmethod
    def _initialize_database(uri: str, connections: List[Connection] = []):
        if is_airflow_2_loaded_in_environment("2.3.0"):
            os.environ["AIRFLOW__DATABASE__SQL_ALCHEMY_CONN"] = uri
            importlib.reload(airflow.configuration)
            importlib.reload(airflow.settings)
            importlib.reload(airflow)
        else:
            os.environ["AIRFLOW__CORE__SQL_ALCHEMY_CONN"] = uri
            importlib.reload(airflow)
        create_airflow_connections(connections)

    @staticmethod
    def from_resource_context(context: InitResourceContext) -> "AirflowPersistentDatabase":
        uri = context.resource_config["uri"]
        AirflowPersistentDatabase._initialize_database(
            uri=uri, connections=[Connection(**c) for c in context.resource_config["connections"]]
        )
        return AirflowPersistentDatabase(
            dagster_run=check.not_none(context.dagster_run, "Context must have run"),
            uri=uri,
            dag_run_config=context.resource_config["dag_run_config"],
        )


[docs]def make_persistent_airflow_db_resource( uri: str = "", connections: List[Connection] = [], dag_run_config: Optional[dict] = {}, ) -> ResourceDefinition: """Creates a Dagster resource that provides an persistent Airflow database. Usage: .. code-block:: python from dagster_airflow import ( make_dagster_definitions_from_airflow_dags_path, make_persistent_airflow_db_resource, ) postgres_airflow_db = "postgresql+psycopg2://airflow:airflow@localhost:5432/airflow" airflow_db = make_persistent_airflow_db_resource(uri=postgres_airflow_db) definitions = make_dagster_definitions_from_airflow_example_dags( '/path/to/dags/', resource_defs={"airflow_db": airflow_db} ) Args: uri: SQLAlchemy URI of the Airflow DB to be used connections (List[Connection]): List of Airflow Connections to be created in the Airflow DB dag_run_config (Optional[dict]): dag_run configuration to be used when creating a DagRun Returns: ResourceDefinition: The persistent Airflow DB resource """ if is_airflow_2_loaded_in_environment(): os.environ["AIRFLOW__DATABASE__SQL_ALCHEMY_CONN"] = uri else: os.environ["AIRFLOW__CORE__SQL_ALCHEMY_CONN"] = uri serialized_connections = serialize_connections(connections) airflow_db_resource_def = ResourceDefinition( resource_fn=AirflowPersistentDatabase.from_resource_context, config_schema={ "uri": Field( StringSource, default_value=uri, is_required=False, ), "connections": Field( Array(inner_type=dict), default_value=serialized_connections, is_required=False, ), "dag_run_config": Field( dict, default_value=dag_run_config, is_required=False, ), }, description="Persistent Airflow DB to be used by dagster-airflow ", ) return airflow_db_resource_def