Ask AI

Source code for dagster_airlift.core.load_defs

from dataclasses import dataclass
from typing import Any, Callable, Iterable, Iterator, Optional, Sequence, Union

from dagster import (
    AssetsDefinition,
    AssetSpec,
    Definitions,
    _check as check,
)
from dagster._core.definitions.definitions_load_context import StateBackedDefinitionsLoader
from dagster._core.definitions.external_asset import external_asset_from_spec
from dagster._utils.warnings import suppress_dagster_warnings

from dagster_airlift.core.airflow_defs_data import MappedAsset
from dagster_airlift.core.airflow_instance import AirflowInstance
from dagster_airlift.core.sensor.event_translation import (
    DagsterEventTransformerFn,
    default_event_transformer,
)
from dagster_airlift.core.sensor.sensor_builder import (
    DEFAULT_AIRFLOW_SENSOR_INTERVAL_SECONDS,
    build_airflow_polling_sensor,
)
from dagster_airlift.core.serialization.compute import DagSelectorFn, compute_serialized_data
from dagster_airlift.core.serialization.defs_construction import (
    construct_automapped_dag_assets_defs,
    construct_dag_assets_defs,
    get_airflow_data_to_spec_mapper,
)
from dagster_airlift.core.serialization.serialized_data import (
    DagInfo,
    SerializedAirflowDefinitionsData,
)
from dagster_airlift.core.utils import get_metadata_key, spec_iterator


@dataclass
class AirflowInstanceDefsLoader(StateBackedDefinitionsLoader[SerializedAirflowDefinitionsData]):
    airflow_instance: AirflowInstance
    mapped_assets: Sequence[MappedAsset]
    sensor_minimum_interval_seconds: int = DEFAULT_AIRFLOW_SENSOR_INTERVAL_SECONDS
    dag_selector_fn: Optional[Callable[[DagInfo], bool]] = None

    @property
    def defs_key(self) -> str:
        return get_metadata_key(self.airflow_instance.name)

    def fetch_state(self) -> SerializedAirflowDefinitionsData:
        return compute_serialized_data(
            airflow_instance=self.airflow_instance,
            mapped_assets=self.mapped_assets,
            dag_selector_fn=self.dag_selector_fn,
        )

    def defs_from_state(
        self, serialized_airflow_data: SerializedAirflowDefinitionsData
    ) -> Definitions:
        return Definitions(
            assets=[
                *_apply_airflow_data_to_specs(self.mapped_assets, serialized_airflow_data),
                *construct_dag_assets_defs(serialized_airflow_data),
            ]
        )


[docs] @suppress_dagster_warnings def build_defs_from_airflow_instance( *, airflow_instance: AirflowInstance, defs: Optional[Definitions] = None, sensor_minimum_interval_seconds: int = DEFAULT_AIRFLOW_SENSOR_INTERVAL_SECONDS, event_transformer_fn: DagsterEventTransformerFn = default_event_transformer, dag_selector_fn: Optional[DagSelectorFn] = None, ) -> Definitions: """Builds a :py:class:`dagster.Definitions` object from an Airflow instance. For every DAG in the Airflow instance, this function will create a Dagster asset for the DAG with an asset key instance_name/dag/dag_id. It will also create a sensor that polls the Airflow instance for DAG runs and emits Dagster events for each successful run. An optional `defs` argument can be provided, where the user can pass in a :py:class:`dagster.Definitions` object containing assets which are mapped to Airflow DAGs and tasks. These assets will be enriched with metadata from the Airflow instance, and placed upstream of the automatically generated DAG assets. An optional `event_transformer_fn` can be provided, which allows the user to modify the Dagster events produced by the sensor. The function takes the Dagster events produced by the sensor and returns a sequence of Dagster events. An optional `dag_selector_fn` can be provided, which allows the user to filter which DAGs assets are created for. The function takes a :py:class:`dagster_airlift.core.serialization.serialized_data.DagInfo` object and returns a boolean indicating whether the DAG should be included. Args: airflow_instance (AirflowInstance): The Airflow instance to build assets and the sensor from. defs: Optional[Definitions]: A :py:class:`dagster.Definitions` object containing assets that are mapped to Airflow DAGs and tasks. sensor_minimum_interval_seconds (int): The minimum interval in seconds between sensor runs. event_transformer_fn (DagsterEventTransformerFn): A function that allows for modifying the Dagster events produced by the sensor. dag_selector_fn (Optional[DagSelectorFn]): A function that allows for filtering which DAGs assets are created for. Returns: Definitions: A :py:class:`dagster.Definitions` object containing the assets and sensor. Examples: Building a :py:class:`dagster.Definitions` object from an Airflow instance. .. code-block:: python from dagster_airlift.core import ( AirflowInstance, AirflowBasicAuthBackend, build_defs_from_airflow_instance, ) from .constants import AIRFLOW_BASE_URL, AIRFLOW_INSTANCE_NAME, PASSWORD, USERNAME airflow_instance = AirflowInstance( auth_backend=AirflowBasicAuthBackend( webserver_url=AIRFLOW_BASE_URL, username=USERNAME, password=PASSWORD ), name=AIRFLOW_INSTANCE_NAME, ) defs = build_defs_from_airflow_instance(airflow_instance=airflow_instance) Providing task-mapped assets to the function. .. code-block:: python from dagster import Definitions from dagster_airlift.core import ( AirflowInstance, AirflowBasicAuthBackend, assets_with_task_mappings, build_defs_from_airflow_instance, ) ... defs = build_defs_from_airflow_instance( airflow_instance=airflow_instance, # same as above defs=Definitions( assets=assets_with_task_mappings( dag_id="rebuild_iris_models", task_mappings={ "my_task": [AssetSpec("my_first_asset"), AssetSpec("my_second_asset")], }, ), ), ) Providing a custom event transformer function. .. code-block:: python from typing import Sequence from dagster import Definitions, SensorEvaluationContext from dagster_airlift.core import ( AirflowInstance, AirflowBasicAuthBackend, AssetEvent, assets_with_task_mappings, build_defs_from_airflow_instance, AirflowDefinitionsData, ) ... def add_tags_to_events( context: SensorEvaluationContext, defs_data: AirflowDefinitionsData, events: Sequence[AssetEvent] ) -> Sequence[AssetEvent]: altered_events = [] for event in events: altered_events.append(event._replace(tags={"my_tag": "my_value"})) return altered_events defs = build_defs_from_airflow_instance( airflow_instance=airflow_instance, # same as above event_transformer_fn=add_tags_to_events, ) Filtering which DAGs assets are created for. .. code-block:: python from dagster import Definitions from dagster_airlift.core import ( AirflowInstance, AirflowBasicAuthBackend, AssetEvent, assets_with_task_mappings, build_defs_from_airflow_instance, DagInfo, ) ... def only_include_dag(dag_info: DagInfo) -> bool: return dag_info.dag_id == "my_dag_id" defs = build_defs_from_airflow_instance( airflow_instance=airflow_instance, # same as above dag_selector_fn=only_include_dag, ) """ defs = defs or Definitions() mapped_assets = _type_narrow_defs_assets(defs) serialized_airflow_data = AirflowInstanceDefsLoader( airflow_instance=airflow_instance, mapped_assets=mapped_assets, dag_selector_fn=dag_selector_fn, ).get_or_fetch_state() mapped_and_constructed_assets = [ *_apply_airflow_data_to_specs(mapped_assets, serialized_airflow_data), *construct_dag_assets_defs(serialized_airflow_data), ] defs_with_airflow_assets = replace_assets_in_defs( defs=defs, assets=mapped_and_constructed_assets ) return Definitions.merge( defs_with_airflow_assets, Definitions( sensors=[ build_airflow_polling_sensor( mapped_assets=mapped_and_constructed_assets, airflow_instance=airflow_instance, minimum_interval_seconds=sensor_minimum_interval_seconds, event_transformer_fn=event_transformer_fn, ) ] ), )
@dataclass class FullAutomappedDagsLoader(StateBackedDefinitionsLoader[SerializedAirflowDefinitionsData]): airflow_instance: AirflowInstance mapped_assets: Sequence[MappedAsset] sensor_minimum_interval_seconds: int @property def defs_key(self) -> str: return get_metadata_key(self.airflow_instance.name) def fetch_state(self) -> SerializedAirflowDefinitionsData: return compute_serialized_data( airflow_instance=self.airflow_instance, mapped_assets=self.mapped_assets, dag_selector_fn=None, ) def defs_from_state( self, serialized_airflow_data: SerializedAirflowDefinitionsData ) -> Definitions: return Definitions( assets=[ *_apply_airflow_data_to_specs(self.mapped_assets, serialized_airflow_data), *construct_automapped_dag_assets_defs(serialized_airflow_data), ] ) def build_full_automapped_dags_from_airflow_instance( *, airflow_instance: AirflowInstance, sensor_minimum_interval_seconds: int = DEFAULT_AIRFLOW_SENSOR_INTERVAL_SECONDS, defs: Optional[Definitions] = None, ) -> Definitions: defs = defs or Definitions() mapped_assets = _type_narrow_defs_assets(defs or Definitions()) serialized_data = FullAutomappedDagsLoader( airflow_instance=airflow_instance, sensor_minimum_interval_seconds=sensor_minimum_interval_seconds, mapped_assets=mapped_assets, ).get_or_fetch_state() airflow_assets = [ *_apply_airflow_data_to_specs(mapped_assets, serialized_data), *construct_automapped_dag_assets_defs(serialized_data), ] resolved_defs = replace_assets_in_defs(defs=defs, assets=airflow_assets) return Definitions.merge( resolved_defs, Definitions( sensors=[ build_airflow_polling_sensor( minimum_interval_seconds=sensor_minimum_interval_seconds, mapped_assets=airflow_assets, airflow_instance=airflow_instance, ) ] ), ) def _type_check_asset(asset: Any) -> MappedAsset: return check.inst( asset, (AssetSpec, AssetsDefinition), "Expected passed assets to all be AssetsDefinitions or AssetSpecs.", ) def _type_narrow_defs_assets(defs: Definitions) -> Sequence[MappedAsset]: return [_type_check_asset(asset) for asset in defs.assets or []] def _apply_airflow_data_to_specs( assets: Sequence[MappedAsset], serialized_data: SerializedAirflowDefinitionsData, ) -> Iterator[AssetsDefinition]: """Apply asset spec transformations to the asset definitions.""" for asset in assets: narrowed_asset = _type_check_asset(asset) assets_def = ( narrowed_asset if isinstance(narrowed_asset, AssetsDefinition) else external_asset_from_spec(narrowed_asset) ) yield assets_def.map_asset_specs(get_airflow_data_to_spec_mapper(serialized_data)) def replace_assets_in_defs( defs: Definitions, assets: Iterable[Union[AssetSpec, AssetsDefinition]] ) -> Definitions: return Definitions( assets=list(assets), asset_checks=defs.asset_checks, sensors=defs.sensors, schedules=defs.schedules, jobs=defs.jobs, executor=defs.executor, loggers=defs.loggers, resources=defs.resources, ) def enrich_airflow_mapped_assets( mapped_assets: Sequence[MappedAsset], airflow_instance: AirflowInstance, ) -> Sequence[AssetsDefinition]: """Enrich Airflow-mapped assets with metadata from the provided :py:class:`AirflowInstance`.""" serialized_data = AirflowInstanceDefsLoader( airflow_instance=airflow_instance, mapped_assets=mapped_assets ).get_or_fetch_state() return list(_apply_airflow_data_to_specs(mapped_assets, serialized_data)) def load_airflow_dag_asset_specs( airflow_instance: AirflowInstance, mapped_assets: Optional[Sequence[MappedAsset]] = None, dag_selector_fn: Optional[DagSelectorFn] = None, ) -> Sequence[AssetSpec]: """Load asset specs for Airflow DAGs from the provided :py:class:`AirflowInstance`, and link upstreams from mapped assets.""" serialized_data = AirflowInstanceDefsLoader( airflow_instance=airflow_instance, mapped_assets=mapped_assets or [], dag_selector_fn=dag_selector_fn, ).get_or_fetch_state() return list(spec_iterator(construct_dag_assets_defs(serialized_data)))