Ask AI

Source code for dagster_dbt.core.resource

import os
import shutil
import uuid
from argparse import ArgumentParser, Namespace
from contextlib import suppress
from pathlib import Path
from typing import Any, Dict, Iterable, List, Mapping, Optional, Sequence, Tuple, Union, cast

import yaml
from dagster import (
    AssetCheckKey,
    AssetExecutionContext,
    AssetsDefinition,
    ConfigurableResource,
    OpExecutionContext,
    get_dagster_logger,
)
from dagster._annotations import public
from dagster._core.errors import DagsterInvalidPropertyError
from dagster._core.execution.context.init import InitResourceContext
from dagster._utils import pushd
from dbt.adapters.base.impl import BaseAdapter
from dbt.adapters.factory import get_adapter, register_adapter, reset_adapters
from dbt.config import RuntimeConfig
from dbt.config.runtime import load_profile, load_project
from dbt.config.utils import parse_cli_vars
from dbt.flags import get_flags, set_from_args
from dbt.version import __version__ as dbt_version
from packaging import version
from pydantic import Field, model_validator, validator
from typing_extensions import Final

from dagster_dbt.asset_utils import (
    DAGSTER_DBT_EXCLUDE_METADATA_KEY,
    DAGSTER_DBT_SELECT_METADATA_KEY,
    dagster_name_fn,
    get_manifest_and_translator_from_dbt_assets,
)
from dagster_dbt.core.dbt_cli_invocation import DbtCliInvocation, _get_dbt_target_path
from dagster_dbt.dagster_dbt_translator import DagsterDbtTranslator, validate_opt_translator
from dagster_dbt.dbt_manifest import DbtManifestParam, validate_manifest
from dagster_dbt.dbt_project import DbtProject
from dagster_dbt.utils import (
    ASSET_RESOURCE_TYPES,
    get_dbt_resource_props_by_dbt_unique_id_from_manifest,
)

IS_DBT_CORE_VERSION_LESS_THAN_1_8_0 = version.parse(dbt_version) < version.parse("1.8.0")
if IS_DBT_CORE_VERSION_LESS_THAN_1_8_0:
    from dbt.events.functions import cleanup_event_logger  # type: ignore
else:
    from dbt_common.events.event_manager_client import cleanup_event_logger

logger = get_dagster_logger()


DBT_EXECUTABLE = "dbt"
DBT_PROJECT_YML_NAME = "dbt_project.yml"
DBT_PROFILES_YML_NAME = "profiles.yml"

DBT_INDIRECT_SELECTION_ENV: Final[str] = "DBT_INDIRECT_SELECTION"
DBT_EMPTY_INDIRECT_SELECTION: Final[str] = "empty"


DAGSTER_GITHUB_REPO_DBT_PACKAGE = "https://github.com/dagster-io/dagster.git"


def _dbt_packages_has_dagster_dbt(packages_file: Path) -> bool:
    """Checks whether any package in the passed yaml file is the Dagster dbt package."""
    packages = cast(
        List[Dict[str, Any]], yaml.safe_load(packages_file.read_text()).get("packages", [])
    )
    return any(package.get("git") == DAGSTER_GITHUB_REPO_DBT_PACKAGE for package in packages)


[docs] class DbtCliResource(ConfigurableResource): """A resource used to execute dbt CLI commands. Attributes: project_dir (str): The path to the dbt project directory. This directory should contain a `dbt_project.yml`. See https://docs.getdbt.com/reference/dbt_project.yml for more information. global_config_flags (List[str]): A list of global flags configuration to pass to the dbt CLI invocation. Invoke `dbt --help` to see a full list of global flags. profiles_dir (Optional[str]): The path to the directory containing your dbt `profiles.yml`. By default, the current working directory is used, which is the dbt project directory. See https://docs.getdbt.com/docs/core/connect-data-platform/connection-profiles for more information. profile (Optional[str]): The profile from your dbt `profiles.yml` to use for execution. See https://docs.getdbt.com/docs/core/connect-data-platform/connection-profiles for more information. target (Optional[str]): The target from your dbt `profiles.yml` to use for execution. See https://docs.getdbt.com/docs/core/connect-data-platform/connection-profiles for more information. dbt_executable (str): The path to the dbt executable. By default, this is `dbt`. state_path (Optional[str]): The path, relative to the project directory, to a directory of dbt artifacts to be used with `--state` / `--defer-state`. Examples: Creating a dbt resource with only a reference to ``project_dir``: .. code-block:: python from dagster_dbt import DbtCliResource dbt = DbtCliResource(project_dir="/path/to/dbt/project") Creating a dbt resource with a custom ``profiles_dir``: .. code-block:: python from dagster_dbt import DbtCliResource dbt = DbtCliResource( project_dir="/path/to/dbt/project", profiles_dir="/path/to/dbt/project/profiles", ) Creating a dbt resource with a custom ``profile`` and ``target``: .. code-block:: python from dagster_dbt import DbtCliResource dbt = DbtCliResource( project_dir="/path/to/dbt/project", profiles_dir="/path/to/dbt/project/profiles", profile="jaffle_shop", target="dev", ) Creating a dbt resource with global configs, e.g. disabling colored logs with ``--no-use-color``: .. code-block:: python from dagster_dbt import DbtCliResource dbt = DbtCliResource( project_dir="/path/to/dbt/project", global_config_flags=["--no-use-color"], ) Creating a dbt resource with custom dbt executable path: .. code-block:: python from dagster_dbt import DbtCliResource dbt = DbtCliResource( project_dir="/path/to/dbt/project", dbt_executable="/path/to/dbt/executable", ) """ project_dir: str = Field( description=( "The path to your dbt project directory. This directory should contain a" " `dbt_project.yml`. See https://docs.getdbt.com/reference/dbt_project.yml for more" " information." ), ) global_config_flags: List[str] = Field( default=[], description=( "A list of global flags configuration to pass to the dbt CLI invocation. See" " https://docs.getdbt.com/reference/global-configs for a full list of configuration." ), ) profiles_dir: Optional[str] = Field( default=None, description=( "The path to the directory containing your dbt `profiles.yml`. By default, the current" " working directory is used, which is the dbt project directory." " See https://docs.getdbt.com/docs/core/connect-data-platform/connection-profiles for " " more information." ), ) profile: Optional[str] = Field( default=None, description=( "The profile from your dbt `profiles.yml` to use for execution. See" " https://docs.getdbt.com/docs/core/connect-data-platform/connection-profiles for more" " information." ), ) target: Optional[str] = Field( default=None, description=( "The target from your dbt `profiles.yml` to use for execution. See" " https://docs.getdbt.com/docs/core/connect-data-platform/connection-profiles for more" " information." ), ) dbt_executable: str = Field( default=DBT_EXECUTABLE, description="The path to the dbt executable.", ) state_path: Optional[str] = Field( description=( "The path, relative to the project directory, to a directory of dbt artifacts to be" " used with --state / --defer-state." " This can be used with methods such as get_defer_args to allow for a @dbt_assets to" " use defer in the appropriate environments." ) ) def __init__( self, project_dir: Union[str, Path, DbtProject], global_config_flags: Optional[List[str]] = None, profiles_dir: Optional[Union[str, Path]] = None, profile: Optional[str] = None, target: Optional[str] = None, dbt_executable: Union[str, Path] = DBT_EXECUTABLE, state_path: Optional[Union[str, Path]] = None, **kwargs, # allow custom subclasses to add fields ): if isinstance(project_dir, DbtProject): if not state_path and project_dir.state_path: state_path = project_dir.state_path if not target and project_dir.target: target = project_dir.target project_dir = project_dir.project_dir project_dir = os.fspath(project_dir) state_path = state_path and os.fspath(state_path) # static typing doesn't understand whats going on here, thinks these fields dont exist super().__init__( project_dir=project_dir, # type: ignore global_config_flags=global_config_flags or [], # type: ignore profiles_dir=profiles_dir, # type: ignore profile=profile, # type: ignore target=target, # type: ignore dbt_executable=dbt_executable, # type: ignore state_path=state_path, # type: ignore **kwargs, ) @classmethod def _validate_absolute_path_exists(cls, path: Union[str, Path]) -> Path: absolute_path = Path(path).absolute() try: resolved_path = absolute_path.resolve(strict=True) except FileNotFoundError: raise ValueError(f"The absolute path of '{path}' ('{absolute_path}') does not exist") return resolved_path @classmethod def _validate_path_contains_file(cls, path: Path, file_name: str, error_message: str): if not path.joinpath(file_name).exists(): raise ValueError(error_message) @validator("project_dir", "profiles_dir", "dbt_executable", pre=True) def convert_path_to_str(cls, v: Any) -> Any: """Validate that the path is converted to a string.""" if isinstance(v, Path): resolved_path = cls._validate_absolute_path_exists(v) absolute_path = Path(v).absolute() try: resolved_path = absolute_path.resolve(strict=True) except FileNotFoundError: raise ValueError(f"The absolute path of '{v}' ('{absolute_path}') does not exist") return os.fspath(resolved_path) return v @validator("project_dir") def validate_project_dir(cls, project_dir: str) -> str: resolved_project_dir = cls._validate_absolute_path_exists(project_dir) cls._validate_path_contains_file( path=resolved_project_dir, file_name=DBT_PROJECT_YML_NAME, error_message=( f"{resolved_project_dir} does not contain a {DBT_PROJECT_YML_NAME} file. Please" " specify a valid path to a dbt project." ), ) return os.fspath(resolved_project_dir) @validator("profiles_dir") def validate_profiles_dir(cls, profiles_dir: Optional[str]) -> Optional[str]: if profiles_dir is None: return None resolved_profiles_dir = cls._validate_absolute_path_exists(profiles_dir) cls._validate_path_contains_file( path=resolved_profiles_dir, file_name=DBT_PROFILES_YML_NAME, error_message=( f"{resolved_profiles_dir} does not contain a {DBT_PROFILES_YML_NAME} file. Please" " specify a valid path to a dbt profile directory." ), ) return os.fspath(resolved_profiles_dir) @validator("dbt_executable") def validate_dbt_executable(cls, dbt_executable: str) -> str: resolved_dbt_executable = shutil.which(dbt_executable) if not resolved_dbt_executable: raise ValueError( f"The dbt executable '{dbt_executable}' does not exist. Please specify a valid" " path to a dbt executable." ) return dbt_executable @model_validator(mode="before") def validate_dbt_version(cls, values: Dict[str, Any]) -> Dict[str, Any]: """Validate that the dbt version is supported.""" if version.parse(dbt_version) < version.parse("1.7.0"): raise ValueError( "To use `dagster_dbt.DbtCliResource`, you must use `dbt-core>=1.7.0`. Currently," f" you are using `dbt-core=={dbt_version}`. Please install a compatible dbt-core" " version." ) return values @validator("state_path") def validate_state_path(cls, state_path: Optional[str]) -> Optional[str]: if state_path is None: return None return os.fspath(Path(state_path).absolute().resolve()) def _get_unique_target_path( self, *, context: Optional[Union[OpExecutionContext, AssetExecutionContext]] ) -> Path: """Get a unique target path for the dbt CLI invocation. Args: context (Optional[Union[OpExecutionContext, AssetExecutionContext]]): The execution context. Returns: str: A unique target path for the dbt CLI invocation. """ unique_id = str(uuid.uuid4())[:7] path = unique_id if context: path = f"{context.op_execution_context.op.name}-{context.run.run_id[:7]}-{unique_id}" current_target_path = _get_dbt_target_path() return current_target_path.joinpath(path) def _initialize_adapter(self, cli_vars) -> BaseAdapter: if not IS_DBT_CORE_VERSION_LESS_THAN_1_8_0: from dbt_common.context import set_invocation_context set_invocation_context(os.environ.copy()) # constructs a dummy set of flags, using the `run` command (ensures profile/project reqs get loaded) profiles_dir = self.profiles_dir if self.profiles_dir else self.project_dir set_from_args(Namespace(profiles_dir=profiles_dir), None) flags = get_flags() profile = load_profile(self.project_dir, cli_vars, self.profile, self.target) project = load_project(self.project_dir, False, profile, cli_vars) config = RuntimeConfig.from_parts(project, profile, flags) # these flags are required for the adapter to be able to look up # relations correctly new_flags = Namespace() for key, val in config.args.__dict__.items(): setattr(new_flags, key, val) setattr(new_flags, "profile", profile.profile_name) setattr(new_flags, "target", profile.target_name) config.args = new_flags # If the dbt adapter is DuckDB, set the access mode to READ_ONLY, since DuckDB only allows # simultaneous connections for read-only access. if config.credentials and config.credentials.__class__.__name__ == "DuckDBCredentials": from dbt.adapters.duckdb.credentials import DuckDBCredentials if isinstance(config.credentials, DuckDBCredentials): if not config.credentials.config_options: config.credentials.config_options = {} config.credentials.config_options["access_mode"] = "READ_ONLY" # convert adapter duckdb filepath to absolute path, since the Python # working directory may not be the same as the dbt project directory with pushd(self.project_dir): config.credentials.path = os.fspath(Path(config.credentials.path).absolute()) cleanup_event_logger() # reset adapters list in case we have instantiated an adapter before in this process reset_adapters() if IS_DBT_CORE_VERSION_LESS_THAN_1_8_0: register_adapter(config) # type: ignore else: from dbt.adapters.protocol import MacroContextGeneratorCallable from dbt.context.providers import generate_runtime_macro_context from dbt.mp_context import get_mp_context from dbt.parser.manifest import ManifestLoader register_adapter(config, get_mp_context()) adapter = cast(BaseAdapter, get_adapter(config)) manifest = ManifestLoader.load_macros( config, adapter.connections.set_query_header, # type: ignore base_macros_only=True, ) adapter.set_macro_resolver(manifest) adapter.set_macro_context_generator( cast(MacroContextGeneratorCallable, generate_runtime_macro_context) ) adapter = cast(BaseAdapter, get_adapter(config)) return adapter
[docs] @public def get_defer_args(self) -> Sequence[str]: """Build the defer arguments for the dbt CLI command, using the supplied state directory. If no state directory is supplied, or the state directory does not have a manifest for. comparison, an empty list of arguments is returned. Returns: Sequence[str]: The defer arguments for the dbt CLI command. """ if not (self.state_path and Path(self.state_path).joinpath("manifest.json").exists()): return [] return ["--defer", "--defer-state", self.state_path]
[docs] @public def get_state_args(self) -> Sequence[str]: """Build the state arguments for the dbt CLI command, using the supplied state directory. If no state directory is supplied, or the state directory does not have a manifest for. comparison, an empty list of arguments is returned. Returns: Sequence[str]: The state arguments for the dbt CLI command. """ if not (self.state_path and Path(self.state_path).joinpath("manifest.json").exists()): return [] return ["--state", self.state_path]
[docs] @public def cli( self, args: Sequence[str], *, raise_on_error: bool = True, manifest: Optional[DbtManifestParam] = None, dagster_dbt_translator: Optional[DagsterDbtTranslator] = None, context: Optional[Union[OpExecutionContext, AssetExecutionContext]] = None, target_path: Optional[Path] = None, ) -> DbtCliInvocation: """Create a subprocess to execute a dbt CLI command. Args: args (Sequence[str]): The dbt CLI command to execute. raise_on_error (bool): Whether to raise an exception if the dbt CLI command fails. manifest (Optional[Union[Mapping[str, Any], str, Path]]): The dbt manifest blob. If an execution context from within `@dbt_assets` is provided to the context argument, then the manifest provided to `@dbt_assets` will be used. dagster_dbt_translator (Optional[DagsterDbtTranslator]): The translator to link dbt nodes to Dagster assets. If an execution context from within `@dbt_assets` is provided to the context argument, then the dagster_dbt_translator provided to `@dbt_assets` will be used. context (Optional[Union[OpExecutionContext, AssetExecutionContext]]): The execution context from within `@dbt_assets`. If an AssetExecutionContext is passed, its underlying OpExecutionContext will be used. target_path (Optional[Path]): An explicit path to a target folder to use to store and retrieve dbt artifacts when running a dbt CLI command. If not provided, a unique target path will be generated. Returns: DbtCliInvocation: A invocation instance that can be used to retrieve the output of the dbt CLI command. Examples: Streaming Dagster events for dbt asset materializations and observations: .. code-block:: python from pathlib import Path from dagster import AssetExecutionContext from dagster_dbt import DbtCliResource, dbt_assets @dbt_assets(manifest=Path("target", "manifest.json")) def my_dbt_assets(context: AssetExecutionContext, dbt: DbtCliResource): yield from dbt.cli(["run"], context=context).stream() Retrieving a dbt artifact after streaming the Dagster events: .. code-block:: python from pathlib import Path from dagster import AssetExecutionContext from dagster_dbt import DbtCliResource, dbt_assets @dbt_assets(manifest=Path("target", "manifest.json")) def my_dbt_assets(context: AssetExecutionContext, dbt: DbtCliResource): dbt_run_invocation = dbt.cli(["run"], context=context) yield from dbt_run_invocation.stream() # Retrieve the `run_results.json` dbt artifact as a dictionary: run_results_json = dbt_run_invocation.get_artifact("run_results.json") # Retrieve the `run_results.json` dbt artifact as a file path: run_results_path = dbt_run_invocation.target_path.joinpath("run_results.json") Customizing the asset materialization metadata when streaming the Dagster events: .. code-block:: python from pathlib import Path from dagster import AssetExecutionContext from dagster_dbt import DbtCliResource, dbt_assets @dbt_assets(manifest=Path("target", "manifest.json")) def my_dbt_assets(context: AssetExecutionContext, dbt: DbtCliResource): dbt_cli_invocation = dbt.cli(["run"], context=context) for dagster_event in dbt_cli_invocation.stream(): if isinstance(dagster_event, Output): context.add_output_metadata( metadata={ "my_custom_metadata": "my_custom_metadata_value", }, output_name=dagster_event.output_name, ) yield dagster_event Suppressing exceptions from a dbt CLI command when a non-zero exit code is returned: .. code-block:: python from pathlib import Path from dagster import AssetExecutionContext from dagster_dbt import DbtCliResource, dbt_assets @dbt_assets(manifest=Path("target", "manifest.json")) def my_dbt_assets(context: AssetExecutionContext, dbt: DbtCliResource): dbt_run_invocation = dbt.cli(["run"], context=context, raise_on_error=False) if dbt_run_invocation.is_successful(): yield from dbt_run_invocation.stream() else: ... Invoking a dbt CLI command in a custom asset or op: .. code-block:: python import json from dagster import Nothing, Out, asset, op from dagster_dbt import DbtCliResource @asset def my_dbt_asset(dbt: DbtCliResource): dbt_macro_args = {"key": "value"} dbt.cli(["run-operation", "my-macro", json.dumps(dbt_macro_args)]).wait() @op(out=Out(Nothing)) def my_dbt_op(dbt: DbtCliResource): dbt_macro_args = {"key": "value"} yield from dbt.cli(["run-operation", "my-macro", json.dumps(dbt_macro_args)]).stream() """ dagster_dbt_translator = validate_opt_translator(dagster_dbt_translator) assets_def: Optional[AssetsDefinition] = None with suppress(DagsterInvalidPropertyError): assets_def = context.assets_def if context else None target_path = target_path or self._get_unique_target_path(context=context) env = { # Allow IO streaming when running in Windows. # Also, allow it to be overriden by the current environment. "PYTHONLEGACYWINDOWSSTDIO": "1", # Pass the current environment variables to the dbt CLI invocation. **os.environ.copy(), # An environment variable to indicate that the dbt CLI is being invoked from Dagster. "DAGSTER_DBT_CLI": "true", # Run dbt with unbuffered output. "PYTHONUNBUFFERED": "1", # Disable anonymous usage statistics for performance. "DBT_SEND_ANONYMOUS_USAGE_STATS": "false", # The DBT_LOG_FORMAT environment variable must be set to `json`. We use this # environment variable to ensure that the dbt CLI outputs structured logs. "DBT_LOG_FORMAT": "json", # The DBT_TARGET_PATH environment variable is set to a unique value for each dbt # invocation so that artifact paths are separated. # See https://discourse.getdbt.com/t/multiple-run-results-json-and-manifest-json-files/7555 # for more information. "DBT_TARGET_PATH": os.fspath(target_path), # The DBT_LOG_PATH environment variable is set to the same value as DBT_TARGET_PATH # so that logs for each dbt invocation has separate log files. "DBT_LOG_PATH": os.fspath(target_path), # The DBT_PROFILES_DIR environment variable is set to the path containing the dbt # profiles.yml file. # See https://docs.getdbt.com/docs/core/connect-data-platform/connection-profiles#advanced-customizing-a-profile-directory # for more information. **({"DBT_PROFILES_DIR": self.profiles_dir} if self.profiles_dir else {}), # The DBT_PROJECT_DIR environment variable is set to the path containing the dbt project # See https://docs.getdbt.com/reference/dbt_project.yml for more information. **({"DBT_PROJECT_DIR": self.project_dir} if self.project_dir else {}), } selection_args: List[str] = [] dagster_dbt_translator = dagster_dbt_translator or DagsterDbtTranslator() if context and assets_def is not None: manifest, dagster_dbt_translator = get_manifest_and_translator_from_dbt_assets( [assets_def] ) selection_args, indirect_selection = _get_subset_selection_for_context( context=context, manifest=manifest, select=context.op.tags.get(DAGSTER_DBT_SELECT_METADATA_KEY), exclude=context.op.tags.get(DAGSTER_DBT_EXCLUDE_METADATA_KEY), dagster_dbt_translator=dagster_dbt_translator, current_dbt_indirect_selection_env=env.get(DBT_INDIRECT_SELECTION_ENV, None), ) # set dbt indirect selection if needed to execute specific dbt tests due to asset check # selection if indirect_selection: env[DBT_INDIRECT_SELECTION_ENV] = indirect_selection else: manifest = validate_manifest(manifest) if manifest else {} # TODO: verify that args does not have any selection flags if the context and manifest # are passed to this function. profile_args: List[str] = [] if self.profile: profile_args = ["--profile", self.profile] if self.target: profile_args += ["--target", self.target] full_dbt_args = [ self.dbt_executable, *self.global_config_flags, *args, *profile_args, *selection_args, ] project_dir = Path(self.project_dir) if not target_path.is_absolute(): target_path = project_dir.joinpath(target_path) adapter: Optional[BaseAdapter] = None with pushd(self.project_dir): try: cli_vars = parse_cli_vars_from_args(args) adapter = self._initialize_adapter(cli_vars) except: logger.warning( "An error was encountered when creating a handle to the dbt adapter in Dagster.", exc_info=True, ) return DbtCliInvocation.run( args=full_dbt_args, env=env, manifest=manifest, dagster_dbt_translator=dagster_dbt_translator, project_dir=project_dir, target_path=target_path, raise_on_error=raise_on_error, context=context, adapter=adapter, )
def setup_for_execution(self, context: InitResourceContext) -> None: packages_yaml = Path(self.project_dir).joinpath("packages.yml") dependencies_yaml = Path(self.project_dir).joinpath("dependencies.yml") if context.log and ( (packages_yaml.exists() and _dbt_packages_has_dagster_dbt(packages_yaml)) or (dependencies_yaml.exists() and _dbt_packages_has_dagster_dbt(dependencies_yaml)) ): context.log.warn( "Fetching column metadata using `log_column_level_metadata` macro is deprecated and will be" " removed in dagster-dbt 0.24.0. Use the `fetch_column_metadata` method in your asset definition" " to fetch column metadata instead." )
def parse_cli_vars_from_args(args: Sequence[str]) -> Dict[str, Any]: parser = ArgumentParser(description="Parse cli vars from dbt command") parser.add_argument("--vars") var_args, _ = parser.parse_known_args(args) if not var_args.vars: return {} return parse_cli_vars(var_args.vars) def _get_subset_selection_for_context( context: Union[OpExecutionContext, AssetExecutionContext], manifest: Mapping[str, Any], select: Optional[str], exclude: Optional[str], dagster_dbt_translator: DagsterDbtTranslator, current_dbt_indirect_selection_env: Optional[str], ) -> Tuple[List[str], Optional[str]]: """Generate a dbt selection string and DBT_INDIRECT_SELECTION setting to execute the selected resources in a subsetted execution context. See https://docs.getdbt.com/reference/node-selection/syntax#how-does-selection-work. Args: context (Union[OpExecutionContext, AssetExecutionContext]): The execution context for the current execution step. manifest (Mapping[str, Any]): The dbt manifest blob. select (Optional[str]): A dbt selection string to select resources to materialize. exclude (Optional[str]): A dbt selection string to exclude resources from materializing. dagster_dbt_translator (DagsterDbtTranslator): The translator to link dbt nodes to Dagster assets. current_dbt_indirect_selection_env (Optional[str]): The user's value for the DBT_INDIRECT_SELECTION environment variable. Returns: List[str]: dbt CLI arguments to materialize the selected resources in a subsetted execution context. If the current execution context is not performing a subsetted execution, return CLI arguments composed of the inputed selection and exclusion arguments. Optional[str]: A value for the DBT_INDIRECT_SELECTION environment variable. If None, then the environment variable is not set and will either use dbt's default (eager) or the user's setting. """ default_dbt_selection = [] if select: default_dbt_selection += ["--select", select] if exclude: default_dbt_selection += ["--exclude", exclude] assets_def = context.assets_def is_asset_subset = assets_def.keys_by_output_name != assets_def.node_keys_by_output_name is_checks_subset = ( assets_def.check_specs_by_output_name != assets_def.node_check_specs_by_output_name ) # It's nice to use the default dbt selection arguments when not subsetting for readability. We # also use dbt indirect selection to avoid hitting cli arg length limits. # https://github.com/dagster-io/dagster/issues/16997#issuecomment-1832443279 # A biproduct is that we'll run singular dbt tests (not currently modeled as asset checks) in # cases when we can use indirection selection, an not when we need to turn it off. if not (is_asset_subset or is_checks_subset): logger.info( "A dbt subsetted execution is not being performed. Using the default dbt selection" f" arguments `{default_dbt_selection}`." ) # default eager indirect selection. This means we'll also run any singular tests (which # aren't modeled as asset checks currently). return default_dbt_selection, None # Explicitly select a dbt resource by its path. Selecting a resource by path is more terse # than selecting it by its fully qualified name. # https://docs.getdbt.com/reference/node-selection/methods#the-path-method dbt_resource_props_by_output_name = get_dbt_resource_props_by_output_name(manifest) selected_dbt_non_test_resources = get_dbt_resource_names_for_output_names( output_names=context.op_execution_context.selected_output_names, dbt_resource_props_by_output_name=dbt_resource_props_by_output_name, dagster_dbt_translator=dagster_dbt_translator, ) # if all asset checks for the subsetted assets are selected, then we can just select the # assets and use indirect selection for the tests. We verify that # 1. all the selected checks are for selected assets # 2. no checks for selected assets are excluded # This also means we'll run any singular tests. checks_on_non_selected_assets = [ check_key for check_key in context.selected_asset_check_keys if check_key.asset_key not in context.selected_asset_keys ] all_check_keys = { check_spec.key for check_spec in assets_def.node_check_specs_by_output_name.values() } excluded_checks = all_check_keys.difference(context.selected_asset_check_keys) excluded_checks_on_selected_assets = [ check_key for check_key in excluded_checks if check_key.asset_key in context.selected_asset_keys ] # note that this will always be false if checks are disabled (which means the assets_def has no # check specs) if excluded_checks_on_selected_assets: # select all assets and tests explicitly, and turn off indirect selection. This risks # hitting the CLI argument length limit, but in the common scenarios that can be launched from the UI # (all checks disabled, only one check and no assets) it's not a concern. # Since we're setting DBT_INDIRECT_SELECTION=empty, we won't run any singular tests. selected_dbt_resources = [ *selected_dbt_non_test_resources, *get_dbt_test_names_for_asset_checks( check_keys=context.selected_asset_check_keys, dbt_resource_props_by_test_name=get_dbt_resource_props_by_test_name(manifest), dagster_dbt_translator=dagster_dbt_translator, ), ] indirect_selection_override = DBT_EMPTY_INDIRECT_SELECTION logger.info( "Overriding default `DBT_INDIRECT_SELECTION` " f"{current_dbt_indirect_selection_env or 'eager'} with " f"`{indirect_selection_override}` due to additional checks " f"{', '.join([c.to_user_string() for c in checks_on_non_selected_assets])} " f"and excluded checks {', '.join([c.to_user_string() for c in excluded_checks_on_selected_assets])}." ) elif checks_on_non_selected_assets: # explicitly select the tests that won't be run via indirect selection selected_dbt_resources = [ *selected_dbt_non_test_resources, *get_dbt_test_names_for_asset_checks( check_keys=checks_on_non_selected_assets, dbt_resource_props_by_test_name=get_dbt_resource_props_by_test_name(manifest), dagster_dbt_translator=dagster_dbt_translator, ), ] indirect_selection_override = None else: selected_dbt_resources = selected_dbt_non_test_resources indirect_selection_override = None logger.info( "A dbt subsetted execution is being performed. Overriding default dbt selection" f" arguments `{default_dbt_selection}` with arguments: `{selected_dbt_resources}`." ) # Take the union of all the selected resources. # https://docs.getdbt.com/reference/node-selection/set-operators#unions union_selected_dbt_resources = ["--select"] + [" ".join(selected_dbt_resources)] return union_selected_dbt_resources, indirect_selection_override def get_dbt_resource_props_by_output_name( manifest: Mapping[str, Any], ) -> Mapping[str, Mapping[str, Any]]: node_info_by_dbt_unique_id = get_dbt_resource_props_by_dbt_unique_id_from_manifest(manifest) return { dagster_name_fn(node): node for node in node_info_by_dbt_unique_id.values() if node["resource_type"] in ASSET_RESOURCE_TYPES } def get_dbt_resource_props_by_test_name( manifest: Mapping[str, Any], ) -> Mapping[str, Mapping[str, Any]]: return { dbt_resource_props["name"]: dbt_resource_props for unique_id, dbt_resource_props in manifest["nodes"].items() if unique_id.startswith("test") } def get_dbt_resource_names_for_output_names( output_names: Iterable[str], dbt_resource_props_by_output_name: Mapping[str, Any], dagster_dbt_translator: DagsterDbtTranslator, ) -> Sequence[str]: dbt_resource_props_gen = ( dbt_resource_props_by_output_name[output_name] for output_name in output_names # output names corresponding to asset checks won't be in this dict if output_name in dbt_resource_props_by_output_name ) # Explicitly select a dbt resource by its file name. # https://docs.getdbt.com/reference/node-selection/methods#the-file-method if dagster_dbt_translator.settings.enable_dbt_selection_by_name: return [ Path(dbt_resource_props["original_file_path"]).stem for dbt_resource_props in dbt_resource_props_gen ] # Explictly select a dbt resource by its fully qualified name (FQN). # https://docs.getdbt.com/reference/node-selection/methods#the-file-or-fqn-method return [".".join(dbt_resource_props["fqn"]) for dbt_resource_props in dbt_resource_props_gen] def get_dbt_test_names_for_asset_checks( check_keys: Iterable[AssetCheckKey], dbt_resource_props_by_test_name: Mapping[str, Any], dagster_dbt_translator: DagsterDbtTranslator, ) -> Sequence[str]: # Explicitly select a dbt test by its test name. # https://docs.getdbt.com/reference/node-selection/test-selection-examples#more-complex-selection. if dagster_dbt_translator.settings.enable_dbt_selection_by_name: return [asset_check_key.name for asset_check_key in check_keys] # Explictly select a dbt test by its fully qualified name (FQN). # https://docs.getdbt.com/reference/node-selection/methods#the-file-or-fqn-method return [ ".".join(dbt_resource_props_by_test_name[asset_check_key.name]["fqn"]) for asset_check_key in check_keys ]