Source code for dagster_dbt.dbt_manifest_asset_selection
from typing import AbstractSet, Any, Mapping, Optional
from dagster import (
AssetKey,
AssetSelection,
_check as check,
)
from dagster._core.definitions.asset_check_spec import AssetCheckKey
from dagster._core.definitions.base_asset_graph import BaseAssetGraph
from .asset_utils import get_asset_check_key_for_test, is_non_asset_node
from .dagster_dbt_translator import DagsterDbtTranslator
from .dbt_manifest import DbtManifestParam, validate_manifest
from .utils import (
ASSET_RESOURCE_TYPES,
get_dbt_resource_props_by_dbt_unique_id_from_manifest,
select_unique_ids_from_manifest,
)
[docs]class DbtManifestAssetSelection(AssetSelection, arbitrary_types_allowed=True):
"""Defines a selection of assets from a dbt manifest wrapper and a dbt selection string.
Args:
manifest (Mapping[str, Any]): The dbt manifest blob.
select (str): A dbt selection string to specify a set of dbt resources.
exclude (Optional[str]): A dbt selection string to exclude a set of dbt resources.
Examples:
.. code-block:: python
import json
from pathlib import Path
from dagster_dbt import DbtManifestAssetSelection
manifest = json.loads(Path("path/to/manifest.json").read_text())
# select the dbt assets that have the tag "foo".
my_selection = DbtManifestAssetSelection(manifest=manifest, select="tag:foo")
"""
manifest: Mapping[str, Any]
select: str
dagster_dbt_translator: DagsterDbtTranslator
exclude: str
@classmethod
def build(
cls,
manifest: DbtManifestParam,
select: str = "fqn:*",
*,
dagster_dbt_translator: Optional[DagsterDbtTranslator] = None,
exclude: Optional[str] = None,
):
return cls(
manifest=validate_manifest(manifest),
select=check.str_param(select, "select"),
dagster_dbt_translator=check.opt_inst_param(
dagster_dbt_translator,
"dagster_dbt_translator",
DagsterDbtTranslator,
DagsterDbtTranslator(),
),
exclude=check.opt_str_param(exclude, "exclude", default=""),
)
def resolve_inner(
self, asset_graph: BaseAssetGraph, allow_missing: bool = False
) -> AbstractSet[AssetKey]:
dbt_nodes = get_dbt_resource_props_by_dbt_unique_id_from_manifest(self.manifest)
keys = set()
for unique_id in select_unique_ids_from_manifest(
select=self.select,
exclude=self.exclude,
manifest_json=self.manifest,
):
dbt_resource_props = dbt_nodes[unique_id]
is_dbt_asset = dbt_resource_props["resource_type"] in ASSET_RESOURCE_TYPES
if is_dbt_asset and not is_non_asset_node(dbt_resource_props):
asset_key = self.dagster_dbt_translator.get_asset_key(dbt_resource_props)
keys.add(asset_key)
return keys
def resolve_checks_inner(
self, asset_graph: BaseAssetGraph, allow_missing: bool
) -> AbstractSet[AssetCheckKey]:
if not self.dagster_dbt_translator.settings.enable_asset_checks:
return set()
keys = set()
for unique_id in select_unique_ids_from_manifest(
select=self.select,
exclude=self.exclude,
manifest_json=self.manifest,
):
asset_check_key = get_asset_check_key_for_test(
self.manifest, self.dagster_dbt_translator, test_unique_id=unique_id
)
if asset_check_key:
keys.add(asset_check_key)
return keys