Source code for dagster_embedded_elt.dlt.translator
from dataclasses import dataclass
from typing import Any, Iterable, Mapping, Optional, Sequence, Set
from dagster import AssetKey, AutoMaterializePolicy, AutomationCondition
from dagster._annotations import public
from dlt.common.destination import Destination
from dlt.extract.resource import DltResource
[docs]
@dataclass
class DagsterDltTranslator:
[docs]
@public
def get_asset_key(self, resource: DltResource) -> AssetKey:
"""Defines asset key for a given dlt resource key and dataset name.
This method can be overriden to provide custom asset key for a dlt resource.
Args:
resource (DltResource): dlt resource
Returns:
AssetKey of Dagster asset derived from dlt resource
"""
return AssetKey(f"dlt_{resource.source_name}_{resource.name}")
[docs]
@public
def get_auto_materialize_policy(self, resource: DltResource) -> Optional[AutoMaterializePolicy]:
"""Defines resource specific auto materialize policy.
This method can be overriden to provide custom auto materialize policy for a dlt resource.
Args:
resource (DltResource): dlt resource
Returns:
Optional[AutoMaterializePolicy]: The automaterialize policy for a resource
"""
return None
[docs]
@public
def get_automation_condition(self, resource: DltResource) -> Optional[AutomationCondition]:
"""Defines resource specific automation condition.
This method can be overridden to provide custom automation condition for a dlt resource.
Args:
resource (DltResource): dlt resource
Returns:
Optional[AutomationCondition]: The automation condition for a resource
"""
auto_materialize_policy = self.get_auto_materialize_policy(resource)
return (
auto_materialize_policy.to_automation_condition() if auto_materialize_policy else None
)
[docs]
@public
def get_deps_asset_keys(self, resource: DltResource) -> Iterable[AssetKey]:
"""Defines upstream asset dependencies given a dlt resource.
Defaults to a concatenation of `resource.source_name` and `resource.name`.
Args:
resource (DltResource): dlt resource
Returns:
Iterable[AssetKey]: The Dagster asset keys upstream of `dlt_resource_key`.
"""
if resource.is_transformer:
pipe = resource._pipe # noqa: SLF001
while pipe.has_parent:
pipe = pipe.parent
return [AssetKey(f"{resource.source_name}_{pipe.name}")]
return [AssetKey(f"{resource.source_name}_{resource.name}")]
[docs]
@public
def get_description(self, resource: DltResource) -> Optional[str]:
"""A method that takes in a dlt resource returns the Dagster description of the resource.
This method can be overriden to provide a custom description for a dlt resource.
Args:
resource (DltResource): dlt resource
Returns:
Optional[str]: The Dagster description for the dlt resource.
"""
pipe = resource._pipe # noqa: SLF001
# If the function underlying the resource is a single callable,
# return the docstring of the callable.
if len(pipe.steps) == 1 and callable(pipe.steps[0]):
return pipe.steps[0].__doc__
return None
[docs]
@public
def get_group_name(self, resource: DltResource) -> Optional[str]:
"""A method that takes in a dlt resource and returns the Dagster group name of the resource.
This method can be overriden to provide a custom group name for a dlt resource.
Args:
resource (DltResource): dlt resource
Returns:
Optional[str]: A Dagster group name for the dlt resource.
"""
return None
[docs]
@public
def get_owners(self, resource: DltResource) -> Optional[Sequence[str]]:
"""A method that takes in a dlt resource and returns the Dagster owners of the resource.
This method can be overriden to provide custom owners for a dlt resource.
Args:
resource (DltResource): dlt resource
Returns:
Optional[Sequence[str]]: A sequence of Dagster owners for the dlt resource.
"""
return None
[docs]
@public
def get_kinds(self, resource: DltResource, destination: Destination) -> Set[str]:
"""A method that takes in a dlt resource and returns the kinds which should be
attached. Defaults to the destination type and "dlt".
This method can be overriden to provide custom kinds for a dlt resource.
Args:
resource (DltResource): dlt resource
Returns:
Set[str]: The kinds of the asset.
"""
return {"dlt", destination.destination_name}