Source code for dagster_airlift.core.serialization.serialized_data
from functools import cached_property
from typing import AbstractSet, Any, Dict, List, Mapping, NamedTuple, Set
from dagster import (
AssetKey,
_check as check,
)
from dagster._annotations import PublicAttr
from dagster._record import record
from dagster._serdes import whitelist_for_serdes
@whitelist_for_serdes
@record
class TaskInfo:
webserver_url: str
dag_id: str
task_id: str
metadata: Dict[str, Any]
@property
def dag_url(self) -> str:
return f"{self.webserver_url}/dags/{self.dag_id}"
@cached_property
def downstream_task_ids(self) -> List[str]:
return check.is_list(self.metadata["downstream_task_ids"], str)
[docs]
@whitelist_for_serdes
@record
class DagInfo:
"""A record containing information about a given airflow dag.
Users should not instantiate this class directly. It is provided when customizing which DAGs are included
in the generated definitions using the `dag_selector_fn` argument of :py:func:`build_defs_from_airflow_instance`.
Attributes:
metadata (Dict[str, Any]): The metadata associated with the dag, retrieved by the Airflow REST API:
https://airflow.apache.org/docs/apache-airflow/stable/stable-rest-api-ref.html#operation/get_dags
"""
webserver_url: str
dag_id: str
metadata: PublicAttr[Dict[str, Any]]
@property
def url(self) -> str:
return f"{self.webserver_url}/dags/{self.dag_id}"
@property
def file_token(self) -> str:
return self.metadata["file_token"]
@whitelist_for_serdes
class TaskHandle(NamedTuple):
dag_id: str
task_id: str
@whitelist_for_serdes
class DagHandle(NamedTuple):
dag_id: str
###################################################################################################
# Serialized data that scopes to airflow DAGs and tasks.
###################################################################################################
# History:
# - created
@whitelist_for_serdes
@record
class SerializedDagData:
"""A record containing pre-computed data about a given airflow dag."""
dag_id: str
dag_info: DagInfo
source_code: str
leaf_asset_keys: Set[AssetKey]
task_infos: Mapping[str, TaskInfo]
@whitelist_for_serdes
@record
class KeyScopedTaskHandles:
asset_key: AssetKey
mapped_tasks: AbstractSet[TaskHandle]
@whitelist_for_serdes
@record
class KeyScopedDagHandles:
asset_key: AssetKey
mapped_dags: AbstractSet[DagHandle]
###################################################################################################
# Serializable data that will be cached to avoid repeated calls to the Airflow API, and to avoid
# repeated scans of passed-in Definitions objects.
###################################################################################################
# History:
# - created
# - removed existing_asset_data
# - added key_scope_data_items
# - added instance_name
@whitelist_for_serdes
@record
class SerializedAirflowDefinitionsData:
instance_name: str
key_scoped_task_handles: List[KeyScopedTaskHandles]
key_scoped_dag_handles: List[KeyScopedDagHandles]
dag_datas: Mapping[str, SerializedDagData]
@cached_property
def all_mapped_tasks(self) -> Dict[AssetKey, AbstractSet[TaskHandle]]:
return {item.asset_key: item.mapped_tasks for item in self.key_scoped_task_handles}
@cached_property
def all_mapped_dags(self) -> Dict[AssetKey, AbstractSet[DagHandle]]:
return {item.asset_key: item.mapped_dags for item in self.key_scoped_dag_handles}