from typing import Any, Iterator, Mapping, Optional, Union
from dagster import (
AssetExecutionContext,
AssetMaterialization,
ConfigurableResource,
MaterializeResult,
OpExecutionContext,
_check as check,
)
from dagster._annotations import experimental, public
from dlt.common.pipeline import LoadInfo
from dlt.extract.resource import DltResource
from dlt.extract.source import DltSource
from dlt.pipeline.pipeline import Pipeline
from .constants import META_KEY_PIPELINE, META_KEY_SOURCE, META_KEY_TRANSLATOR
from .translator import DagsterDltTranslator
[docs]@experimental
class DagsterDltResource(ConfigurableResource):
@classmethod
def _is_dagster_maintained(cls) -> bool:
return True
def _cast_load_info_metadata(self, mapping: Mapping[Any, Any]) -> Mapping[Any, Any]:
"""Converts pendulum DateTime and Timezone values in a mapping to strings.
Workaround for dagster._core.errors.DagsterInvalidMetadata: Could not resolve the metadata
value for "jobs" to a known type. Value is not JSON serializable.
Args:
mapping (Mapping): Dictionary possibly containing pendulum values
Returns:
Mapping[Any, Any]: Metadata with pendulum DateTime and Timezone values casted to strings
"""
from pendulum import DateTime
try:
from pendulum import Timezone # type: ignore
casted_instance_types = (DateTime, Timezone)
except ImportError:
casted_instance_types = DateTime
def _recursive_cast(value: Any):
if isinstance(value, dict):
return {k: _recursive_cast(v) for k, v in value.items()}
elif isinstance(value, list):
return [_recursive_cast(item) for item in value]
elif isinstance(value, casted_instance_types):
return str(value)
else:
return value
return {k: _recursive_cast(v) for k, v in mapping.items()}
def extract_resource_metadata(
self, resource: DltResource, load_info: LoadInfo
) -> Mapping[str, Any]:
"""Helper method to extract dlt resource metadata from load info dict.
Args:
resource (DltResource): The dlt resource being materialized
load_info (LoadInfo): Run metadata from dlt `pipeline.run(...)`
Returns:
Mapping[str, Any]: Asset-specific metadata dictionary
"""
dlt_base_metadata_types = {
"first_run",
"started_at",
"finished_at",
"dataset_name",
"destination_name",
"destination_type",
}
load_info_dict = self._cast_load_info_metadata(load_info.asdict())
# shared metadata that is displayed for all assets
base_metadata = {k: v for k, v in load_info_dict.items() if k in dlt_base_metadata_types}
# job metadata for specific target `resource.table_name`
base_metadata["jobs"] = [
job
for load_package in load_info_dict.get("load_packages", [])
for job in load_package.get("jobs", [])
if job.get("table_name") == resource.table_name
]
return base_metadata
[docs] @public
def run(
self,
context: Union[OpExecutionContext, AssetExecutionContext],
dlt_source: Optional[DltSource] = None,
dlt_pipeline: Optional[Pipeline] = None,
dagster_dlt_translator: Optional[DagsterDltTranslator] = None,
**kwargs,
) -> Iterator[Union[MaterializeResult, AssetMaterialization]]:
"""Runs the dlt pipeline with subset support.
Args:
context (Union[OpExecutionContext, AssetExecutionContext]): Asset or op execution context
dlt_source (Optional[DltSource]): optional dlt source if resource is used from an `@op`
dlt_pipeline (Optional[Pipeline]): optional dlt pipeline if resource is used from an `@op`
dagster_dlt_translator (Optional[DagsterDltTranslator]): optional dlt translator if resource is used from an `@op`
**kwargs (dict[str, Any]): Keyword args passed to pipeline `run` method
Returns:
Iterator[Union[MaterializeResult, AssetMaterialization]]: An iterator of MaterializeResult or AssetMaterialization
"""
# This resource can be used in both `asset` and `op` definitions. In the context of an asset
# execution, we retrieve the dlt source, pipeline, and translator from the asset metadata.
# This allows us to provide the parameter _only_ in the `dlt_assets` decorator.
if isinstance(context, AssetExecutionContext):
metadata_by_key = context.assets_def.metadata_by_key
first_asset_metadata = next(iter(metadata_by_key.values()))
dlt_source = check.inst(first_asset_metadata.get(META_KEY_SOURCE), DltSource)
dlt_pipeline = check.inst(first_asset_metadata.get(META_KEY_PIPELINE), Pipeline)
dagster_dlt_translator = check.inst(
first_asset_metadata.get(META_KEY_TRANSLATOR), DagsterDltTranslator
)
dlt_source = check.not_none(
dlt_source, "dlt_source is a required parameter in an op context"
)
dlt_pipeline = check.not_none(
dlt_pipeline, "dlt_pipeline is a required parameter in an op context"
)
# Default to base translator if undefined
dagster_dlt_translator = dagster_dlt_translator or DagsterDltTranslator()
asset_key_dlt_source_resource_mapping = {
dagster_dlt_translator.get_asset_key(dlt_source_resource): dlt_source_resource
for dlt_source_resource in dlt_source.resources.values()
}
# Filter sources by asset key sub-selection
if context.is_subset:
asset_key_dlt_source_resource_mapping = {
asset_key: asset_dlt_source_resource
for (
asset_key,
asset_dlt_source_resource,
) in asset_key_dlt_source_resource_mapping.items()
if asset_key in context.selected_asset_keys
}
dlt_source = dlt_source.with_resources(
*[
dlt_source_resource.name
for dlt_source_resource in asset_key_dlt_source_resource_mapping.values()
if dlt_source_resource
]
)
load_info = dlt_pipeline.run(dlt_source, **kwargs)
has_asset_def: bool = bool(context and context.has_assets_def)
for asset_key, dlt_source_resource in asset_key_dlt_source_resource_mapping.items():
metadata = self.extract_resource_metadata(dlt_source_resource, load_info)
if has_asset_def:
yield MaterializeResult(asset_key=asset_key, metadata=metadata)
else:
yield AssetMaterialization(asset_key=asset_key, metadata=metadata)