import json
import shlex
from argparse import ArgumentParser, Namespace
from contextlib import suppress
from typing import (
Any,
Callable,
Dict,
FrozenSet,
List,
Mapping,
Optional,
Sequence,
Set,
Tuple,
Union,
cast,
)
import dagster._check as check
from dagster import (
AssetExecutionContext,
AssetKey,
AssetOut,
AssetsDefinition,
AutoMaterializePolicy,
FreshnessPolicy,
MetadataValue,
PartitionsDefinition,
ResourceDefinition,
multi_asset,
with_resources,
)
from dagster._annotations import experimental, experimental_param
from dagster._core.definitions.cacheable_assets import (
AssetsDefinitionCacheableData,
CacheableAssetsDefinition,
)
from dagster._core.definitions.metadata import RawMetadataMapping
from dagster._core.execution.context.init import build_init_resource_context
from dagster_dbt.asset_utils import (
default_asset_key_fn,
default_auto_materialize_policy_fn,
default_description_fn,
default_freshness_policy_fn,
default_group_from_dbt_resource_props,
get_asset_deps,
get_deps,
)
from dagster_dbt.cloud.resources import DbtCloudClient, DbtCloudClientResource, DbtCloudRunStatus
from dagster_dbt.cloud.utils import result_to_events
from dagster_dbt.dagster_dbt_translator import DagsterDbtTranslator
from dagster_dbt.errors import DagsterDbtCloudJobInvariantViolationError
from dagster_dbt.utils import ASSET_RESOURCE_TYPES
DAGSTER_DBT_COMPILE_RUN_ID_ENV_VAR = "DBT_DAGSTER_COMPILE_RUN_ID"
class DbtCloudCacheableAssetsDefinition(CacheableAssetsDefinition):
def __init__(
self,
dbt_cloud_resource_def: Union[DbtCloudClientResource, ResourceDefinition],
job_id: int,
node_info_to_asset_key: Callable[[Mapping[str, Any]], AssetKey],
node_info_to_group_fn: Callable[[Mapping[str, Any]], Optional[str]],
node_info_to_freshness_policy_fn: Callable[[Mapping[str, Any]], Optional[FreshnessPolicy]],
node_info_to_auto_materialize_policy_fn: Callable[
[Mapping[str, Any]], Optional[AutoMaterializePolicy]
],
partitions_def: Optional[PartitionsDefinition] = None,
partition_key_to_vars_fn: Optional[Callable[[str], Mapping[str, Any]]] = None,
):
self._dbt_cloud_resource_def: ResourceDefinition = (
dbt_cloud_resource_def.get_resource_definition()
if isinstance(dbt_cloud_resource_def, DbtCloudClientResource)
else dbt_cloud_resource_def
)
self._dbt_cloud: DbtCloudClient = (
dbt_cloud_resource_def.process_config_and_initialize().get_dbt_client()
if isinstance(dbt_cloud_resource_def, DbtCloudClientResource)
else dbt_cloud_resource_def(build_init_resource_context())
)
self._job_id = job_id
self._project_id: int
self._has_generate_docs: bool
self._job_commands: List[str]
self._job_materialization_command_step: int
self._node_info_to_asset_key = node_info_to_asset_key
self._node_info_to_group_fn = node_info_to_group_fn
self._node_info_to_freshness_policy_fn = node_info_to_freshness_policy_fn
self._node_info_to_auto_materialize_policy_fn = node_info_to_auto_materialize_policy_fn
self._partitions_def = partitions_def
self._partition_key_to_vars_fn = partition_key_to_vars_fn
super().__init__(unique_id=f"dbt-cloud-{job_id}")
def compute_cacheable_data(self) -> Sequence[AssetsDefinitionCacheableData]:
dbt_nodes, dbt_dependencies = self._get_dbt_nodes_and_dependencies()
return [self._build_dbt_cloud_assets_cacheable_data(dbt_nodes, dbt_dependencies)]
def build_definitions(
self, data: Sequence[AssetsDefinitionCacheableData]
) -> Sequence[AssetsDefinition]:
return with_resources(
[
self._build_dbt_cloud_assets_from_cacheable_data(assets_definition_metadata)
for assets_definition_metadata in data
],
{"dbt_cloud": self._dbt_cloud_resource_def},
)
@staticmethod
def parse_dbt_command(dbt_command: str) -> Namespace:
from dbt.cli.flags import Flags, args_to_context
args = shlex.split(dbt_command)[1:]
# nasty hack to get dbt to parse the args, profiles-dir must be set to an existing directory
return Namespace(**vars(Flags(args_to_context(args + ["--profiles-dir", "."]))))
@staticmethod
def get_job_materialization_command_step(execute_steps: List[str]) -> int:
materialization_command_filter = [
DbtCloudCacheableAssetsDefinition.parse_dbt_command(command).which in ["run", "build"]
for command in execute_steps
]
if sum(materialization_command_filter) != 1:
raise DagsterDbtCloudJobInvariantViolationError(
"The dbt Cloud job must have a single `dbt run` or `dbt build` in its commands. "
f"Received commands: {execute_steps}."
)
return materialization_command_filter.index(True)
@staticmethod
def get_compile_filters(parsed_args: Namespace) -> List[str]:
dbt_compile_options: List[str] = []
selected_models = parsed_args.select or []
if selected_models:
dbt_compile_options.append(f"--select {' '.join(selected_models)}")
excluded_models = parsed_args.exclude or []
if excluded_models:
dbt_compile_options.append(f"--exclude {' '.join(excluded_models)}")
selector = getattr(parsed_args, "selector_name", None) or getattr(
parsed_args, "selector", None
)
if selector:
dbt_compile_options.append(f"--selector {selector}")
return dbt_compile_options
def _get_cached_compile_dbt_cloud_job_run(self, compile_run_id: int) -> Tuple[int, int]:
# If the compile run is ongoing, allow it a grace period of 10 minutes to finish.
with suppress(Exception):
self._dbt_cloud.poll_run(run_id=compile_run_id, poll_timeout=600)
compile_run = self._dbt_cloud.get_run(
run_id=compile_run_id, include_related=["trigger", "run_steps"]
)
compile_run_status: str = compile_run["status_humanized"]
if compile_run_status != DbtCloudRunStatus.SUCCESS:
raise DagsterDbtCloudJobInvariantViolationError(
f"The cached dbt Cloud job run `{compile_run_id}` must have a status of"
f" `{DbtCloudRunStatus.SUCCESS}`. Received status: `{compile_run_status}. You can"
f" view the full status of your dbt Cloud run at {compile_run['href']}. Once it has"
" successfully completed, reload your Dagster definitions. If your run has failed,"
" you must manually refresh the cache using the `dagster-dbt"
" cache-compile-references` CLI."
)
compile_run_has_generate_docs = compile_run["trigger"]["generate_docs_override"]
compile_job_materialization_command_step = len(compile_run["run_steps"])
if compile_run_has_generate_docs:
compile_job_materialization_command_step -= 1
return compile_run_id, compile_job_materialization_command_step
def _compile_dbt_cloud_job(self, dbt_cloud_job: Mapping[str, Any]) -> Tuple[int, int]:
# Retrieve the filters options from the dbt Cloud job's materialization command.
#
# There are three filters: `--select`, `--exclude`, and `--selector`.
materialization_command = self._job_commands[self._job_materialization_command_step]
parsed_args = DbtCloudCacheableAssetsDefinition.parse_dbt_command(materialization_command)
dbt_compile_options = DbtCloudCacheableAssetsDefinition.get_compile_filters(
parsed_args=parsed_args
)
# Add the partition variable as a variable to the dbt Cloud job command.
#
# If existing variables passed through the dbt Cloud job's command, an error will be
# raised. Since these are static variables anyways, they can be moved to the
# `dbt_project.yml` without loss of functionality.
#
# Since we're only doing this to generate the dependency structure, just use an arbitrary
# partition key (e.g. the last one) to retrieve the partition variable.
if parsed_args.vars and parsed_args.vars != "{}":
raise DagsterDbtCloudJobInvariantViolationError(
f"The dbt Cloud job '{dbt_cloud_job['name']}' ({dbt_cloud_job['id']}) must not have"
" variables defined from `--vars` in its `dbt run` or `dbt build` command."
" Instead, declare the variables in the `dbt_project.yml` file. Received commands:"
f" {self._job_commands}."
)
if self._partitions_def and self._partition_key_to_vars_fn:
last_partition_key = self._partitions_def.get_last_partition_key()
if last_partition_key is None:
check.failed("PartitionsDefinition has no partitions")
partition_var = self._partition_key_to_vars_fn(last_partition_key)
dbt_compile_options.append(f"--vars '{json.dumps(partition_var)}'")
# We need to retrieve the dependency structure for the assets in the dbt Cloud project.
# However, we can't just use the dependency structure from the latest run, because
# this historical structure may not be up-to-date with the current state of the project.
#
# By always doing a compile step, we can always get the latest dependency structure.
# This incurs some latency, but at least it doesn't run through the entire materialization
# process.
dbt_compile_command = f"dbt compile {' '.join(dbt_compile_options)}"
compile_run_dbt_output = self._dbt_cloud.run_job_and_poll(
job_id=self._job_id,
cause="Generating software-defined assets for Dagster.",
steps_override=[dbt_compile_command],
)
# Target the compile execution step when retrieving run artifacts, rather than assuming
# that the last step is the correct target.
#
# Here, we ignore the `dbt docs generate` step.
compile_job_materialization_command_step = len(
compile_run_dbt_output.run_details.get("run_steps", [])
)
if self._has_generate_docs:
compile_job_materialization_command_step -= 1
return compile_run_dbt_output.run_id, compile_job_materialization_command_step
def _get_dbt_nodes_and_dependencies(
self,
) -> Tuple[Mapping[str, Any], Mapping[str, FrozenSet[str]]]:
"""For a given dbt Cloud job, fetch the latest run's dependency structure of executed nodes."""
# Fetch information about the job.
job = self._dbt_cloud.get_job(job_id=self._job_id)
self._project_id = job["project_id"]
self._has_generate_docs = job["generate_docs"]
# We constraint the kinds of dbt Cloud jobs that we support running.
#
# A simple constraint is that we only support jobs that run multiple steps,
# but it must contain one of either `dbt run` or `dbt build`.
#
# As a reminder, `dbt deps` is automatically run before the job's configured commands.
# And if the settings are enabled, `dbt docs generate` and `dbt source freshness` can
# automatically run after the job's configured commands.
#
# These commands that execute before and after the job's configured commands do not count
# towards the single command constraint.
self._job_commands = job["execute_steps"]
self._job_materialization_command_step = (
DbtCloudCacheableAssetsDefinition.get_job_materialization_command_step(
execute_steps=self._job_commands
)
)
# Determine whether to use a cached compile run. This should only be set up if the user is
# using a GitHub action along with their dbt project.
dbt_cloud_job_env_vars = self._dbt_cloud.get_job_environment_variables(
project_id=self._project_id, job_id=self._job_id
)
compile_run_id = (
dbt_cloud_job_env_vars.get(DAGSTER_DBT_COMPILE_RUN_ID_ENV_VAR, {})
.get("job", {})
.get("value")
)
compile_run_id, compile_job_materialization_command_step = (
# If a compile run is cached, then use it.
self._get_cached_compile_dbt_cloud_job_run(compile_run_id=int(compile_run_id))
if compile_run_id
# Otherwise, compile the dbt Cloud project in an ad-hoc manner.
else self._compile_dbt_cloud_job(dbt_cloud_job=job)
)
manifest_json = self._dbt_cloud.get_manifest(
run_id=compile_run_id, step=compile_job_materialization_command_step
)
run_results_json = self._dbt_cloud.get_run_results(
run_id=compile_run_id, step=compile_job_materialization_command_step
)
# Filter the manifest to only include the nodes that were executed.
dbt_nodes: Dict[str, Any] = {
**manifest_json.get("nodes", {}),
**manifest_json.get("sources", {}),
**manifest_json.get("metrics", {}),
}
executed_node_ids: Set[str] = set(
result["unique_id"] for result in run_results_json["results"]
)
# If there are no executed nodes, then there are no assets to generate.
# Inform the user to inspect their dbt Cloud job's command.
if not executed_node_ids:
raise DagsterDbtCloudJobInvariantViolationError(
f"The dbt Cloud job '{job['name']}' ({job['id']}) does not generate any "
"software-defined assets. Ensure that your dbt project has nodes to execute, "
"and that your dbt Cloud job's materialization command has the proper filter "
f"options applied. Received commands: {self._job_commands}."
)
# Generate the dependency structure for the executed nodes.
dbt_dependencies = get_deps(
dbt_nodes=dbt_nodes,
selected_unique_ids=executed_node_ids,
asset_resource_types=ASSET_RESOURCE_TYPES,
)
return dbt_nodes, dbt_dependencies
def _build_dbt_cloud_assets_cacheable_data(
self, dbt_nodes: Mapping[str, Any], dbt_dependencies: Mapping[str, FrozenSet[str]]
) -> AssetsDefinitionCacheableData:
"""Given all of the nodes and dependencies for a dbt Cloud job, build the cacheable
representation that generate the asset definition for the job.
"""
class CustomDagsterDbtTranslator(DagsterDbtTranslator):
@classmethod
def get_asset_key(cls, dbt_resource_props):
return self._node_info_to_asset_key(dbt_resource_props)
@classmethod
def get_description(cls, dbt_resource_props):
# We shouldn't display the raw sql. Instead, inspect if dbt docs were generated,
# and attach metadata to link to the docs.
return default_description_fn(dbt_resource_props, display_raw_sql=False)
@classmethod
def get_group_name(cls, dbt_resource_props):
return self._node_info_to_group_fn(dbt_resource_props)
@classmethod
def get_freshness_policy(cls, dbt_resource_props):
return self._node_info_to_freshness_policy_fn(dbt_resource_props)
@classmethod
def get_auto_materialize_policy(cls, dbt_resource_props):
return self._node_info_to_auto_materialize_policy_fn(dbt_resource_props)
(
asset_deps,
asset_ins,
asset_outs,
group_names_by_key,
freshness_policies_by_key,
automation_conditions_by_key,
_,
fqns_by_output_name,
metadata_by_output_name,
) = get_asset_deps(
dbt_nodes=dbt_nodes,
deps=dbt_dependencies,
# TODO: In the future, allow the IO manager to be specified.
io_manager_key=None,
dagster_dbt_translator=CustomDagsterDbtTranslator(),
manifest=None,
)
return AssetsDefinitionCacheableData(
# TODO: In the future, we should allow additional upstream assets to be specified.
keys_by_input_name={
input_name: asset_key for asset_key, (input_name, _) in asset_ins.items()
},
keys_by_output_name={
output_name: asset_key for asset_key, (output_name, _) in asset_outs.items()
},
internal_asset_deps={
asset_outs[asset_key][0]: asset_deps for asset_key, asset_deps in asset_deps.items()
},
# We don't rely on a static group name. Instead, we map over the dbt metadata to
# determine the group name for each asset.
group_name=None,
metadata_by_output_name={
output_name: self._build_dbt_cloud_assets_metadata(dbt_metadata)
for output_name, dbt_metadata in metadata_by_output_name.items()
},
# TODO: In the future, we should allow the key prefix to be specified.
key_prefix=None,
can_subset=True,
extra_metadata={
"job_id": self._job_id,
"job_commands": self._job_commands,
"job_materialization_command_step": self._job_materialization_command_step,
"group_names_by_output_name": {
asset_outs[asset_key][0]: group_name
for asset_key, group_name in group_names_by_key.items()
},
"fqns_by_output_name": fqns_by_output_name,
},
freshness_policies_by_output_name={
asset_outs[asset_key][0]: freshness_policy
for asset_key, freshness_policy in freshness_policies_by_key.items()
},
auto_materialize_policies_by_output_name={
asset_outs[asset_key][0]: automation_condition.as_auto_materialize_policy()
for asset_key, automation_condition in automation_conditions_by_key.items()
},
)
def _build_dbt_cloud_assets_metadata(self, dbt_metadata: Dict[str, Any]) -> RawMetadataMapping:
metadata = {
"dbt Cloud Job": MetadataValue.url(
self._dbt_cloud.build_url_for_job(
project_id=self._project_id,
job_id=self._job_id,
)
),
}
if self._has_generate_docs:
metadata["dbt Cloud Documentation"] = MetadataValue.url(
self._dbt_cloud.build_url_for_cloud_docs(
job_id=self._job_id,
resource_type=dbt_metadata["resource_type"],
unique_id=dbt_metadata["unique_id"],
)
)
return metadata
def _build_dbt_cloud_assets_from_cacheable_data(
self, assets_definition_cacheable_data: AssetsDefinitionCacheableData
) -> AssetsDefinition:
metadata = cast(Mapping[str, Any], assets_definition_cacheable_data.extra_metadata)
job_id = cast(int, metadata["job_id"])
job_commands = cast(List[str], list(metadata["job_commands"]))
job_materialization_command_step = cast(int, metadata["job_materialization_command_step"])
group_names_by_output_name = cast(Mapping[str, str], metadata["group_names_by_output_name"])
fqns_by_output_name = cast(Mapping[str, List[str]], metadata["fqns_by_output_name"])
@multi_asset(
name=f"dbt_cloud_job_{job_id}",
deps=list((assets_definition_cacheable_data.keys_by_input_name or {}).values()),
outs={
output_name: AssetOut(
key=asset_key,
group_name=group_names_by_output_name.get(output_name),
freshness_policy=(
assets_definition_cacheable_data.freshness_policies_by_output_name or {}
).get(
output_name,
),
auto_materialize_policy=(
assets_definition_cacheable_data.auto_materialize_policies_by_output_name
or {}
).get(
output_name,
),
metadata=(assets_definition_cacheable_data.metadata_by_output_name or {}).get(
output_name
),
is_required=False,
)
for output_name, asset_key in (
assets_definition_cacheable_data.keys_by_output_name or {}
).items()
},
internal_asset_deps={
output_name: set(asset_deps)
for output_name, asset_deps in (
assets_definition_cacheable_data.internal_asset_deps or {}
).items()
},
partitions_def=self._partitions_def,
can_subset=assets_definition_cacheable_data.can_subset,
required_resource_keys={"dbt_cloud"},
compute_kind="dbt",
)
def _assets(context: AssetExecutionContext):
dbt_cloud = cast(DbtCloudClient, context.resources.dbt_cloud)
# Add the partition variable as a variable to the dbt Cloud job command.
dbt_options: List[str] = []
if context.has_partition_key and self._partition_key_to_vars_fn:
partition_var = self._partition_key_to_vars_fn(context.partition_key)
dbt_options.append(f"--vars '{json.dumps(partition_var)}'")
# Prepare the materialization step to be overriden with the selection filter
materialization_command = job_commands[job_materialization_command_step]
# Map the selected outputs to dbt models that should be materialized.
#
# From version 1.5.0 dbt allows multiple select args to be used in command,
# so we cannot just add our arg as last one to be used and need to remove
# both command-native --select args and --selector arg to run dagster-generated
# subset of models
#
# See https://docs.getdbt.com/reference/node-selection/syntax for details.
if context.is_subset:
selected_models = [
".".join(fqns_by_output_name[output_name])
for output_name in context.op_execution_context.selected_output_names
# outputs corresponding to asset checks from dbt tests won't be in this dict
if output_name in fqns_by_output_name
]
dbt_options.append(f"--select {' '.join(sorted(selected_models))}")
parser = ArgumentParser(description="Parse selection args from dbt command")
# Select arg should have nargs="+", but we probably want dbt itself to deal with it
parser.add_argument("-s", "--select", nargs="*", action="append")
parser.add_argument("--selector", nargs="*")
split_materialization_command = shlex.split(materialization_command)
_, non_selection_command_parts = parser.parse_known_args(
split_materialization_command
)
materialization_command = " ".join(non_selection_command_parts)
job_commands[job_materialization_command_step] = (
f"{materialization_command} {' '.join(dbt_options)}".strip()
)
# Run the dbt Cloud job to rematerialize the assets.
dbt_cloud_output = dbt_cloud.run_job_and_poll(
job_id=job_id,
cause=f"Materializing software-defined assets in Dagster run {context.run.run_id[:8]}",
steps_override=job_commands,
)
# Target the materialization step when retrieving run artifacts, rather than assuming
# that the last step is the correct target.
#
# We ignore the commands in front of the materialization command. And again, we ignore
# the `dbt docs generate` step.
materialization_command_step = len(dbt_cloud_output.run_details.get("run_steps", []))
materialization_command_step -= len(job_commands) - job_materialization_command_step - 1
if dbt_cloud_output.run_details.get("job", {}).get("generate_docs"):
materialization_command_step -= 1
# TODO: Assume the run completely fails or completely succeeds.
# In the future, we can relax this assumption.
manifest_json = dbt_cloud.get_manifest(
run_id=dbt_cloud_output.run_id, step=materialization_command_step
)
run_results_json = self._dbt_cloud.get_run_results(
run_id=dbt_cloud_output.run_id, step=materialization_command_step
)
for result in run_results_json.get("results", []):
yield from result_to_events(
result=result,
docs_url=dbt_cloud_output.docs_url,
node_info_to_asset_key=self._node_info_to_asset_key,
manifest_json=manifest_json,
# TODO: In the future, allow arbitrary mappings to Dagster output metadata from
# the dbt metadata.
extra_metadata=None,
generate_asset_outputs=True,
)
return _assets
[docs]
@experimental
@experimental_param(param="partitions_def")
@experimental_param(param="partition_key_to_vars_fn")
def load_assets_from_dbt_cloud_job(
dbt_cloud: Union[DbtCloudClientResource, ResourceDefinition],
job_id: int,
node_info_to_asset_key: Callable[[Mapping[str, Any]], AssetKey] = default_asset_key_fn,
node_info_to_group_fn: Callable[
[Mapping[str, Any]], Optional[str]
] = default_group_from_dbt_resource_props,
node_info_to_freshness_policy_fn: Callable[
[Mapping[str, Any]], Optional[FreshnessPolicy]
] = default_freshness_policy_fn,
node_info_to_auto_materialize_policy_fn: Callable[
[Mapping[str, Any]], Optional[AutoMaterializePolicy]
] = default_auto_materialize_policy_fn,
partitions_def: Optional[PartitionsDefinition] = None,
partition_key_to_vars_fn: Optional[Callable[[str], Mapping[str, Any]]] = None,
) -> CacheableAssetsDefinition:
"""Loads a set of dbt models, managed by a dbt Cloud job, into Dagster assets. In order to
determine the set of dbt models, the project is compiled to generate the necessary artifacts
that define the dbt models and their dependencies.
One Dagster asset is created for each dbt model.
Args:
dbt_cloud (ResourceDefinition): The dbt Cloud resource to use to connect to the dbt Cloud API.
job_id (int): The ID of the dbt Cloud job to load assets from.
node_info_to_asset_key: (Mapping[str, Any] -> AssetKey): A function that takes a dictionary
of dbt metadata and returns the AssetKey that you want to represent a given model or
source. By default: dbt model -> AssetKey([model_name]) and
dbt source -> AssetKey([source_name, table_name])
node_info_to_group_fn (Dict[str, Any] -> Optional[str]): A function that takes a
dictionary of dbt node info and returns the group that this node should be assigned to.
node_info_to_freshness_policy_fn (Dict[str, Any] -> Optional[FreshnessPolicy]): A function
that takes a dictionary of dbt node info and optionally returns a FreshnessPolicy that
should be applied to this node. By default, freshness policies will be created from
config applied to dbt models, i.e.:
`dagster_freshness_policy={"maximum_lag_minutes": 60, "cron_schedule": "0 9 * * *"}`
will result in that model being assigned
`FreshnessPolicy(maximum_lag_minutes=60, cron_schedule="0 9 * * *")`
node_info_to_auto_materialize_policy_fn (Dict[str, Any] -> Optional[AutoMaterializePolicy]):
A function that takes a dictionary of dbt node info and optionally returns a AutoMaterializePolicy
that should be applied to this node. By default, AutoMaterializePolicies will be created from
config applied to dbt models, i.e.:
`dagster_auto_materialize_policy={"type": "lazy"}` will result in that model being assigned
`AutoMaterializePolicy.lazy()`
node_info_to_definition_metadata_fn (Dict[str, Any] -> Optional[Dict[str, RawMetadataMapping]]):
A function that takes a dictionary of dbt node info and optionally returns a dictionary
of metadata to be attached to the corresponding definition. This is added to the default
metadata assigned to the node, which consists of the node's schema (if present).
partitions_def (Optional[PartitionsDefinition]): Defines the set of partition keys that
compose the dbt assets.
partition_key_to_vars_fn (Optional[str -> Dict[str, Any]]): A function to translate a given
partition key (e.g. '2022-01-01') to a dictionary of vars to be passed into the dbt
invocation (e.g. {"run_date": "2022-01-01"})
Returns:
CacheableAssetsDefinition: A definition for the loaded assets.
Examples:
.. code-block:: python
from dagster import repository
from dagster_dbt import dbt_cloud_resource, load_assets_from_dbt_cloud_job
DBT_CLOUD_JOB_ID = 1234
dbt_cloud = dbt_cloud_resource.configured(
{
"auth_token": {"env": "DBT_CLOUD_API_TOKEN"},
"account_id": {"env": "DBT_CLOUD_ACCOUNT_ID"},
}
)
dbt_cloud_assets = load_assets_from_dbt_cloud_job(
dbt_cloud=dbt_cloud, job_id=DBT_CLOUD_JOB_ID
)
@repository
def dbt_cloud_sandbox():
return [dbt_cloud_assets]
"""
if partition_key_to_vars_fn:
check.invariant(
partitions_def is not None,
"Cannot supply a `partition_key_to_vars_fn` without a `partitions_def`.",
)
return DbtCloudCacheableAssetsDefinition(
dbt_cloud_resource_def=dbt_cloud,
job_id=job_id,
node_info_to_asset_key=node_info_to_asset_key,
node_info_to_group_fn=node_info_to_group_fn,
node_info_to_freshness_policy_fn=node_info_to_freshness_policy_fn,
node_info_to_auto_materialize_policy_fn=node_info_to_auto_materialize_policy_fn,
partitions_def=partitions_def,
partition_key_to_vars_fn=partition_key_to_vars_fn,
)