Ask AI

Source code for dagster_dbt.dagster_dbt_translator

from dataclasses import dataclass
from typing import Any, Mapping, Optional, Sequence

from dagster import (
    AssetKey,
    AutoMaterializePolicy,
    FreshnessPolicy,
    PartitionMapping,
    _check as check,
)
from dagster._annotations import public
from dagster._core.definitions.asset_key import (
    CoercibleToAssetKeyPrefix,
    check_opt_coercible_to_asset_key_prefix_param,
)
from dagster._core.definitions.utils import is_valid_definition_tag_key

from .asset_utils import (
    default_asset_key_fn,
    default_auto_materialize_policy_fn,
    default_description_fn,
    default_freshness_policy_fn,
    default_group_from_dbt_resource_props,
    default_metadata_from_dbt_resource_props,
    default_owners_from_dbt_resource_props,
)


[docs]@dataclass(frozen=True) class DagsterDbtTranslatorSettings: """Settings to enable Dagster features for your dbt project. Args: enable_asset_checks (bool): Whether to load dbt tests as Dagster asset checks. Defaults to True. enable_duplicate_source_asset_keys (bool): Whether to allow dbt sources with duplicate Dagster asset keys. Defaults to False. """ enable_asset_checks: bool = True enable_duplicate_source_asset_keys: bool = False
[docs]class DagsterDbtTranslator: """Holds a set of methods that derive Dagster asset definition metadata given a representation of a dbt resource (models, tests, sources, etc). This class is exposed so that methods can be overriden to customize how Dagster asset metadata is derived. """ def __init__(self, settings: Optional[DagsterDbtTranslatorSettings] = None): """Initialize the translator. Args: settings (Optional[DagsterDbtTranslatorSettings]): Settings for the translator. """ self._settings = settings or DagsterDbtTranslatorSettings() @property def settings(self) -> DagsterDbtTranslatorSettings: if not hasattr(self, "_settings"): self._settings = DagsterDbtTranslatorSettings() return self._settings
[docs] @public def get_asset_key(self, dbt_resource_props: Mapping[str, Any]) -> AssetKey: """A function that takes a dictionary representing properties of a dbt resource, and returns the Dagster asset key that represents that resource. Note that a dbt resource is unrelated to Dagster's resource concept, and simply represents a model, seed, snapshot or source in a given dbt project. You can learn more about dbt resources and the properties available in this dictionary here: https://docs.getdbt.com/reference/artifacts/manifest-json#resource-details This method can be overridden to provide a custom asset key for a dbt resource. Args: dbt_resource_props (Mapping[str, Any]): A dictionary representing the dbt resource. Returns: AssetKey: The Dagster asset key for the dbt resource. Examples: Adding a prefix to the default asset key generated for each dbt resource: .. code-block:: python from typing import Any, Mapping from dagster import AssetKey from dagster_dbt import DagsterDbtTranslator class CustomDagsterDbtTranslator(DagsterDbtTranslator): def get_asset_key(self, dbt_resource_props: Mapping[str, Any]) -> AssetKey: return super().get_asset_key(dbt_resource_props).with_prefix("prefix") Adding a prefix to the default asset key generated for each dbt resource, but only for dbt sources: .. code-block:: python from typing import Any, Mapping from dagster import AssetKey from dagster_dbt import DagsterDbtTranslator class CustomDagsterDbtTranslator(DagsterDbtTranslator): def get_asset_key(self, dbt_resource_props: Mapping[str, Any]) -> AssetKey: asset_key = super().get_asset_key(dbt_resource_props) if dbt_resource_props["resource_type"] == "source": asset_key = asset_key.with_prefix("my_prefix") return asset_key """ return default_asset_key_fn(dbt_resource_props)
[docs] @public def get_partition_mapping( self, dbt_resource_props: Mapping[str, Any], dbt_parent_resource_props: Mapping[str, Any], ) -> Optional[PartitionMapping]: """A function that takes two dictionaries: the first, representing properties of a dbt resource; and the second, representing the properties of a parent dependency to the first dbt resource. The function returns the Dagster partition mapping for the dbt dependency. Note that a dbt resource is unrelated to Dagster's resource concept, and simply represents a model, seed, snapshot or source in a given dbt project. You can learn more about dbt resources and the properties available in this dictionary here: https://docs.getdbt.com/reference/artifacts/manifest-json#resource-details This method can be overridden to provide a custom partition mapping for a dbt dependency. Args: dbt_resource_props (Mapping[str, Any]): A dictionary representing the dbt child resource. dbt_parent_resource_props (Mapping[str, Any]): A dictionary representing the dbt parent resource, in relationship to the child. Returns: Optional[PartitionMapping]: The Dagster partition mapping for the dbt resource. If None is returned, the default partition mapping will be used. """ return None
[docs] @public def get_description(self, dbt_resource_props: Mapping[str, Any]) -> str: """A function that takes a dictionary representing properties of a dbt resource, and returns the Dagster description for that resource. Note that a dbt resource is unrelated to Dagster's resource concept, and simply represents a model, seed, snapshot or source in a given dbt project. You can learn more about dbt resources and the properties available in this dictionary here: https://docs.getdbt.com/reference/artifacts/manifest-json#resource-details This method can be overridden to provide a custom description for a dbt resource. Args: dbt_resource_props (Mapping[str, Any]): A dictionary representing the dbt resource. Returns: str: The description for the dbt resource. Examples: .. code-block:: python from typing import Any, Mapping from dagster_dbt import DagsterDbtTranslator class CustomDagsterDbtTranslator(DagsterDbtTranslator): def get_description(self, dbt_resource_props: Mapping[str, Any]) -> str: return "custom description" """ return default_description_fn(dbt_resource_props)
[docs] @public def get_metadata(self, dbt_resource_props: Mapping[str, Any]) -> Mapping[str, Any]: """A function that takes a dictionary representing properties of a dbt resource, and returns the Dagster metadata for that resource. Note that a dbt resource is unrelated to Dagster's resource concept, and simply represents a model, seed, snapshot or source in a given dbt project. You can learn more about dbt resources and the properties available in this dictionary here: https://docs.getdbt.com/reference/artifacts/manifest-json#resource-details This method can be overridden to provide a custom metadata for a dbt resource. Args: dbt_resource_props (Mapping[str, Any]): A dictionary representing the dbt resource. Returns: Mapping[str, Any]: A dictionary representing the Dagster metadata for the dbt resource. Examples: .. code-block:: python from typing import Any, Mapping from dagster_dbt import DagsterDbtTranslator class CustomDagsterDbtTranslator(DagsterDbtTranslator): def get_metadata(self, dbt_resource_props: Mapping[str, Any]) -> Mapping[str, Any]: return {"custom": "metadata"} """ return default_metadata_from_dbt_resource_props(dbt_resource_props)
[docs] @public def get_tags(self, dbt_resource_props: Mapping[str, Any]) -> Mapping[str, str]: """A function that takes a dictionary representing properties of a dbt resource, and returns the Dagster tags for that resource. Note that a dbt resource is unrelated to Dagster's resource concept, and simply represents a model, seed, snapshot or source in a given dbt project. You can learn more about dbt resources and the properties available in this dictionary here: https://docs.getdbt.com/reference/artifacts/manifest-json#resource-details dbt tags are strings, but Dagster tags are key-value pairs. To bridge this divide, the dbt tag string is used as the Dagster tag key, and the Dagster tag value is set to the empty string, "". Any dbt tags that don't match Dagster's supported tag key format (e.g. they contain unsupported characters) will be ignored. This method can be overridden to provide custom tags for a dbt resource. Args: dbt_resource_props (Mapping[str, Any]): A dictionary representing the dbt resource. Returns: Mapping[str, str]: A dictionary representing the Dagster tags for the dbt resource. Examples: .. code-block:: python from typing import Any, Mapping from dagster_dbt import DagsterDbtTranslator class CustomDagsterDbtTranslator(DagsterDbtTranslator): def get_tags(self, dbt_resource_props: Mapping[str, Any]) -> Mapping[str, str]: return {"custom": "tag"} """ tags = dbt_resource_props.get("tags", []) return {tag: "" for tag in tags if is_valid_definition_tag_key(tag)}
[docs] @public def get_group_name(self, dbt_resource_props: Mapping[str, Any]) -> Optional[str]: """A function that takes a dictionary representing properties of a dbt resource, and returns the Dagster group name for that resource. Note that a dbt resource is unrelated to Dagster's resource concept, and simply represents a model, seed, snapshot or source in a given dbt project. You can learn more about dbt resources and the properties available in this dictionary here: https://docs.getdbt.com/reference/artifacts/manifest-json#resource-details This method can be overridden to provide a custom group name for a dbt resource. Args: dbt_resource_props (Mapping[str, Any]): A dictionary representing the dbt resource. Returns: Optional[str]: A Dagster group name. Examples: .. code-block:: python from typing import Any, Mapping from dagster_dbt import DagsterDbtTranslator class CustomDagsterDbtTranslator(DagsterDbtTranslator): def get_group_name(self, dbt_resource_props: Mapping[str, Any]) -> Optional[str]: return "custom_group_prefix" + dbt_resource_props.get("config", {}).get("group") """ return default_group_from_dbt_resource_props(dbt_resource_props)
[docs] @public def get_owners(self, dbt_resource_props: Mapping[str, Any]) -> Optional[Sequence[str]]: """A function that takes a dictionary representing properties of a dbt resource, and returns the Dagster owners for that resource. Note that a dbt resource is unrelated to Dagster's resource concept, and simply represents a model, seed, snapshot or source in a given dbt project. You can learn more about dbt resources and the properties available in this dictionary here: https://docs.getdbt.com/reference/artifacts/manifest-json#resource-details This method can be overridden to provide custom owners for a dbt resource. Args: dbt_resource_props (Mapping[str, Any]): A dictionary representing the dbt resource. Returns: Optional[Sequence[str]]: A set of Dagster owners. Examples: .. code-block:: python from typing import Any, Mapping from dagster_dbt import DagsterDbtTranslator class CustomDagsterDbtTranslator(DagsterDbtTranslator): def get_owners(self, dbt_resource_props: Mapping[str, Any]) -> Optional[Sequence[str]]: return ["user@owner.com", "team:team@owner.com"] """ return default_owners_from_dbt_resource_props(dbt_resource_props)
[docs] @public def get_freshness_policy( self, dbt_resource_props: Mapping[str, Any] ) -> Optional[FreshnessPolicy]: """A function that takes a dictionary representing properties of a dbt resource, and returns the Dagster :py:class:`dagster.FreshnessPolicy` for that resource. Note that a dbt resource is unrelated to Dagster's resource concept, and simply represents a model, seed, snapshot or source in a given dbt project. You can learn more about dbt resources and the properties available in this dictionary here: https://docs.getdbt.com/reference/artifacts/manifest-json#resource-details This method can be overridden to provide a custom freshness policy for a dbt resource. Args: dbt_resource_props (Mapping[str, Any]): A dictionary representing the dbt resource. Returns: Optional[FreshnessPolicy]: A Dagster freshness policy. Examples: Set a custom freshness policy for all dbt resources: .. code-block:: python from typing import Any, Mapping from dagster_dbt import DagsterDbtTranslator class CustomDagsterDbtTranslator(DagsterDbtTranslator): def get_freshness_policy(self, dbt_resource_props: Mapping[str, Any]) -> Optional[FreshnessPolicy]: return FreshnessPolicy(maximum_lag_minutes=60) Set a custom freshness policy for dbt resources with a specific tag: .. code-block:: python from typing import Any, Mapping from dagster_dbt import DagsterDbtTranslator class CustomDagsterDbtTranslator(DagsterDbtTranslator): def get_freshness_policy(self, dbt_resource_props: Mapping[str, Any]) -> Optional[FreshnessPolicy]: freshness_policy = None if "my_custom_tag" in dbt_resource_props.get("tags", []): freshness_policy = FreshnessPolicy(maximum_lag_minutes=60) return freshness_policy """ return default_freshness_policy_fn(dbt_resource_props)
[docs] @public def get_auto_materialize_policy( self, dbt_resource_props: Mapping[str, Any] ) -> Optional[AutoMaterializePolicy]: """A function that takes a dictionary representing properties of a dbt resource, and returns the Dagster :py:class:`dagster.AutoMaterializePolicy` for that resource. Note that a dbt resource is unrelated to Dagster's resource concept, and simply represents a model, seed, snapshot or source in a given dbt project. You can learn more about dbt resources and the properties available in this dictionary here: https://docs.getdbt.com/reference/artifacts/manifest-json#resource-details This method can be overridden to provide a custom auto-materialize policy for a dbt resource. Args: dbt_resource_props (Mapping[str, Any]): A dictionary representing the dbt resource. Returns: Optional[AutoMaterializePolicy]: A Dagster auto-materialize policy. Examples: Set a custom auto-materialize policy for all dbt resources: .. code-block:: python from typing import Any, Mapping from dagster_dbt import DagsterDbtTranslator class CustomDagsterDbtTranslator(DagsterDbtTranslator): def get_auto_materialize_policy(self, dbt_resource_props: Mapping[str, Any]) -> Optional[AutoMaterializePolicy]: return AutoMaterializePolicy.eager() Set a custom auto-materialize policy for dbt resources with a specific tag: .. code-block:: python from typing import Any, Mapping from dagster_dbt import DagsterDbtTranslator class CustomDagsterDbtTranslator(DagsterDbtTranslator): def get_auto_materialize_policy(self, dbt_resource_props: Mapping[str, Any]) -> Optional[AutoMaterializePolicy]: auto_materialize_policy = None if "my_custom_tag" in dbt_resource_props.get("tags", []): auto_materialize_policy = AutoMaterializePolicy.eager() return auto_materialize_policy """ return default_auto_materialize_policy_fn(dbt_resource_props)
class KeyPrefixDagsterDbtTranslator(DagsterDbtTranslator): """A DagsterDbtTranslator that applies prefixes to the asset keys generated from dbt resources. Attributes: asset_key_prefix (Optional[Union[str, Sequence[str]]]): A prefix to apply to all dbt models, seeds, snapshots, etc. This will *not* apply to dbt sources. source_asset_key_prefix (Optional[Union[str, Sequence[str]]]): A prefix to apply to all dbt sources. """ def __init__( self, asset_key_prefix: Optional[CoercibleToAssetKeyPrefix] = None, source_asset_key_prefix: Optional[CoercibleToAssetKeyPrefix] = None, *args, **kwargs, ): self._asset_key_prefix = ( check_opt_coercible_to_asset_key_prefix_param(asset_key_prefix, "asset_key_prefix") or [] ) self._source_asset_key_prefix = ( check_opt_coercible_to_asset_key_prefix_param( source_asset_key_prefix, "source_asset_key_prefix" ) or [] ) super().__init__(*args, **kwargs) @public def get_asset_key(self, dbt_resource_props: Mapping[str, Any]) -> AssetKey: base_key = default_asset_key_fn(dbt_resource_props) if dbt_resource_props["resource_type"] == "source": return base_key.with_prefix(self._source_asset_key_prefix) else: return base_key.with_prefix(self._asset_key_prefix) @dataclass class DbtManifestWrapper: manifest: Mapping[str, Any] def validate_translator(dagster_dbt_translator: DagsterDbtTranslator) -> DagsterDbtTranslator: return check.inst_param( dagster_dbt_translator, "dagster_dbt_translator", DagsterDbtTranslator, additional_message=( "Ensure that the argument is an instantiated class that subclasses" " DagsterDbtTranslator." ), ) def validate_opt_translator( dagster_dbt_translator: Optional[DagsterDbtTranslator], ) -> Optional[DagsterDbtTranslator]: return check.opt_inst_param( dagster_dbt_translator, "dagster_dbt_translator", DagsterDbtTranslator, additional_message=( "Ensure that the argument is an instantiated class that subclasses" " DagsterDbtTranslator." ), )