Ask AI

dbt (dagster-dbt)

Dagster orchestrates dbt alongside other technologies, so you can combine dbt with Spark, Python, etc. in a single workflow. Dagster’s software-defined asset abstractions make it simple to define data assets that depend on specific dbt models, or to define the computation required to compute the sources that your dbt models depend on.

Related documentation pages: dbt and dbt Cloud.

dagster-dbt

dagster-dbt project scaffold

This command will initialize a new Dagster project and create directories and files that load assets from an existing dbt project.

dagster-dbt project scaffold [OPTIONS]

Options

--project-name <project_name>

Required The name of the Dagster project to initialize for your dbt project.

--dbt-project-dir <dbt_project_dir>

The path of your dbt project directory. This path must contain a dbt_project.yml file. By default, this command will assume that the current working directory contains a dbt project, but you can set a different directory by setting this option.

dbt Core

Here, we provide interfaces to manage dbt projects invoked by the local dbt command line interface (dbt CLI).

Assets (dbt Core)

dagster_dbt.load_assets_from_dbt_project(project_dir, profiles_dir=None, *, select=None, exclude=None, dagster_dbt_translator=None, io_manager_key=None, target_dir=None, key_prefix=None, source_key_prefix=None, op_name=None, runtime_metadata_fn=None, node_info_to_asset_key=<function default_asset_key_fn>, use_build_command=True, partitions_def=None, partition_key_to_vars_fn=None, node_info_to_group_fn=<function default_group_from_dbt_resource_props>, node_info_to_freshness_policy_fn=<function default_freshness_policy_fn>, node_info_to_auto_materialize_policy_fn=<function default_auto_materialize_policy_fn>, node_info_to_definition_metadata_fn=<function default_metadata_from_dbt_resource_props>, display_raw_sql=None, dbt_resource_key='dbt')[source]

deprecated This API will be removed in version 0.24.0.

Use the @dbt_assets decorator, DbtCliResource, and DagsterDbtTranslator instead.

For examples on how to use @dbt_assets and DbtCliResource to execute commands like `dbt run` or `dbt build` on your dbt project, see our API docs: https://docs.dagster.io/_apidocs/libraries/dagster-dbt#dagster_dbt.dbt_assets.

For examples on how to customize your dbt assets using DagsterDbtTranslator see the reference: https://docs.dagster.io/integrations/dbt/reference#understanding-asset-definition-attributes.

To generate a dbt manifest for @dbt_assets at run time using `dbt parse`, see the reference: https://docs.dagster.io/integrations/dbt/reference#loading-dbt-models-from-a-dbt-project.

Loads a set of dbt models from a dbt project into Dagster assets.

Creates one Dagster asset for each dbt model. All assets will be re-materialized using a single dbt run or dbt build command.

When searching for more flexibility in defining the computations that materialize your dbt assets, we recommend that you use dbt_assets.

Parameters:
  • project_dir (Optional[str]) – The directory containing the dbt project to load.

  • profiles_dir (Optional[str]) – The profiles directory to use for loading the DBT project. Defaults to a directory called “config” inside the project_dir.

  • target_dir (Optional[str]) – The target directory where dbt will place compiled artifacts. Defaults to “target” underneath the project_dir.

  • select (Optional[str]) – A dbt selection string for the models in a project that you want to include. Defaults to “fqn:*”.

  • exclude (Optional[str]) – A dbt selection string for the models in a project that you want to exclude. Defaults to “”.

  • dagster_dbt_translator (Optional[DagsterDbtTranslator]) – Allows customizing how to map dbt models, seeds, etc. to asset keys and asset metadata.

  • key_prefix (Optional[Union[str, List[str]]]) – [Deprecated] A key prefix to apply to all assets loaded from the dbt project. Does not apply to input assets. Deprecated: use dagster_dbt_translator=KeyPrefixDagsterDbtTranslator(key_prefix=…) instead.

  • source_key_prefix (Optional[Union[str, List[str]]]) – [Deprecated] A key prefix to apply to all input assets for the set of assets loaded from the dbt project. Deprecated: use dagster_dbt_translator=KeyPrefixDagsterDbtTranslator(source_key_prefix=…) instead.

  • op_name (Optional[str]) – [Deprecated] Sets the name of the underlying Op that will generate the dbt assets. Deprecated: use the @dbt_assets decorator if you need to customize the op name.

  • dbt_resource_key (Optional[str]) – [Deprecated] The resource key that the dbt resource will be specified at. Defaults to “dbt”. Deprecated: use the @dbt_assets decorator if you need to customize the resource key.

  • runtime_metadata_fn (Optional[Callable[[OpExecutionContext, Mapping[str, Any]], Mapping[str, Any]]]) – [Deprecated] A function that will be run after any of the assets are materialized and returns metadata entries for the asset, to be displayed in the asset catalog for that run. Deprecated: use the @dbt_assets decorator if you need to customize runtime metadata.

  • manifest_json (Optional[Mapping[str, Any]]) – [Deprecated] Use the manifest argument instead.

  • selected_unique_ids (Optional[Set[str]]) – [Deprecated] The set of dbt unique_ids that you want to load as assets. Deprecated: use the select argument instead.

  • node_info_to_asset_key (Mapping[str, Any] -> AssetKey) – [Deprecated] A function that takes a dictionary of dbt node info and returns the AssetKey that you want to represent that node. By default, the asset key will simply be the name of the dbt model. Deprecated: instead, provide a custom DagsterDbtTranslator that overrides node_info_to_asset_key.

  • use_build_command (bool) – Flag indicating if you want to use dbt build as the core computation for this asset. Defaults to True. If set to False, then dbt run will be used, and seeds and snapshots won’t be loaded as assets.

  • partitions_def (Optional[PartitionsDefinition]) – [Deprecated] Defines the set of partition keys that compose the dbt assets. Deprecated: use the @dbt_assets decorator to define partitioned dbt assets.

  • partition_key_to_vars_fn (Optional[str -> Dict[str, Any]]) – [Deprecated] A function to translate a given partition key (e.g. ‘2022-01-01’) to a dictionary of vars to be passed into the dbt invocation (e.g. {“run_date”: “2022-01-01”}). Deprecated: use the @dbt_assets decorator to define partitioned dbt assets.

  • node_info_to_group_fn (Dict[str, Any] -> Optional[str]) – [Deprecated] A function that takes a dictionary of dbt node info and returns the group that this node should be assigned to. Deprecated: instead, configure dagster groups on a dbt resource’s meta field or assign dbt groups.

  • node_info_to_freshness_policy_fn (Dict[str, Any] -> Optional[FreshnessPolicy]) – [Deprecated] A function that takes a dictionary of dbt node info and optionally returns a FreshnessPolicy that should be applied to this node. By default, freshness policies will be created from config applied to dbt models, i.e.: dagster_freshness_policy={“maximum_lag_minutes”: 60, “cron_schedule”: “0 9 * * *”} will result in that model being assigned FreshnessPolicy(maximum_lag_minutes=60, cron_schedule=”0 9 * * *”). Deprecated: instead, configure auto-materialize policies on a dbt resource’s meta field.

  • node_info_to_auto_materialize_policy_fn (Dict[str, Any] -> Optional[AutoMaterializePolicy]) – [Deprecated] A function that takes a dictionary of dbt node info and optionally returns a AutoMaterializePolicy that should be applied to this node. By default, AutoMaterializePolicies will be created from config applied to dbt models, i.e.: dagster_auto_materialize_policy={“type”: “lazy”} will result in that model being assigned AutoMaterializePolicy.lazy(). Deprecated: instead, configure auto-materialize policies on a dbt resource’s meta field.

  • node_info_to_definition_metadata_fn (Dict[str, Any] -> Optional[Dict[str, RawMetadataMapping]]) – [Deprecated] A function that takes a dictionary of dbt node info and optionally returns a dictionary of metadata to be attached to the corresponding definition. This is added to the default metadata assigned to the node, which consists of the node’s schema (if present). Deprecated: instead, provide a custom DagsterDbtTranslator that overrides node_info_to_metadata.

  • display_raw_sql (Optional[bool]) – [Deprecated] A flag to indicate if the raw sql associated with each model should be included in the asset description. For large projects, setting this flag to False is advised to reduce the size of the resulting snapshot. Deprecated: instead, provide a custom DagsterDbtTranslator that overrides node_info_to_description.

dagster_dbt.load_assets_from_dbt_manifest(manifest=None, *, select=None, exclude=None, io_manager_key=None, dagster_dbt_translator=None, key_prefix=None, source_key_prefix=None, selected_unique_ids=None, display_raw_sql=None, dbt_resource_key='dbt', op_name=None, manifest_json=None, use_build_command=True, partitions_def=None, partition_key_to_vars_fn=None, runtime_metadata_fn=None, node_info_to_asset_key=<function default_asset_key_fn>, node_info_to_group_fn=<function default_group_from_dbt_resource_props>, node_info_to_freshness_policy_fn=<function default_freshness_policy_fn>, node_info_to_auto_materialize_policy_fn=<function default_auto_materialize_policy_fn>, node_info_to_definition_metadata_fn=<function default_metadata_from_dbt_resource_props>)[source]

deprecated This API will be removed in version 0.24.0.

Use the @dbt_assets decorator, DbtCliResource, and DagsterDbtTranslator instead.

For examples on how to use @dbt_assets and DbtCliResource to execute commands like `dbt run` or `dbt build` on your dbt project, see our API docs: https://docs.dagster.io/_apidocs/libraries/dagster-dbt#dagster_dbt.dbt_assets.

For examples on how to customize your dbt assets using DagsterDbtTranslator see the reference: https://docs.dagster.io/integrations/dbt/reference#understanding-asset-definition-attributes.

Loads a set of dbt models, described in a manifest.json, into Dagster assets.

Creates one Dagster asset for each dbt model. All assets will be re-materialized using a single dbt run command.

When searching for more flexibility in defining the computations that materialize your dbt assets, we recommend that you use dbt_assets.

Parameters:
  • manifest (Optional[Mapping[str, Any]]) – The contents of a DBT manifest.json, which contains a set of models to load into assets.

  • select (Optional[str]) – A dbt selection string for the models in a project that you want to include. Defaults to “fqn:*”.

  • exclude (Optional[str]) – A dbt selection string for the models in a project that you want to exclude. Defaults to “”.

  • io_manager_key (Optional[str]) – The IO manager key that will be set on each of the returned assets. When other ops are downstream of the loaded assets, the IOManager specified here determines how the inputs to those ops are loaded. Defaults to “io_manager”.

  • dagster_dbt_translator (Optional[DagsterDbtTranslator]) – Allows customizing how to map dbt models, seeds, etc. to asset keys and asset metadata.

  • key_prefix (Optional[Union[str, List[str]]]) – [Deprecated] A key prefix to apply to all assets loaded from the dbt project. Does not apply to input assets. Deprecated: use dagster_dbt_translator=KeyPrefixDagsterDbtTranslator(key_prefix=…) instead.

  • source_key_prefix (Optional[Union[str, List[str]]]) – [Deprecated] A key prefix to apply to all input assets for the set of assets loaded from the dbt project. Deprecated: use dagster_dbt_translator=KeyPrefixDagsterDbtTranslator(source_key_prefix=…) instead.

  • op_name (Optional[str]) – [Deprecated] Sets the name of the underlying Op that will generate the dbt assets. Deprecated: use the @dbt_assets decorator if you need to customize the op name.

  • dbt_resource_key (Optional[str]) – deprecated (This parameter will be removed in version 0.21. Use the @dbt_assets decorator if you need to customize your resource key.) [Deprecated] The resource key that the dbt resource will be specified at. Defaults to “dbt”. Deprecated: use the @dbt_assets decorator if you need to customize the resource key.

  • runtime_metadata_fn (Optional[Callable[[OpExecutionContext, Mapping[str, Any]], Mapping[str, Any]]]) – deprecated (This parameter will be removed in version 0.21. Use the @dbt_assets decorator if you need to customize runtime metadata.) [Deprecated] A function that will be run after any of the assets are materialized and returns metadata entries for the asset, to be displayed in the asset catalog for that run. Deprecated: use the @dbt_assets decorator if you need to customize runtime metadata.

  • selected_unique_ids (Optional[Set[str]]) – deprecated (This parameter will be removed in version 0.21. Use the select parameter instead.) [Deprecated] The set of dbt unique_ids that you want to load as assets. Deprecated: use the select argument instead.

  • node_info_to_asset_key (Mapping[str, Any] -> AssetKey) – [Deprecated] A function that takes a dictionary of dbt node info and returns the AssetKey that you want to represent that node. By default, the asset key will simply be the name of the dbt model.

  • use_build_command (bool) – deprecated (This parameter will be removed in version 0.21. Use the @dbt_assets decorator if you need to customize the underlying dbt commands.) Flag indicating if you want to use dbt build as the core computation for this asset. Defaults to True. If set to False, then dbt run will be used, and seeds and snapshots won’t be loaded as assets.

  • partitions_def (Optional[PartitionsDefinition]) – deprecated (This parameter will be removed in version 0.21. Use the @dbt_assets decorator to define partitioned dbt assets.) [Deprecated] Defines the set of partition keys that compose the dbt assets. Deprecated: use the @dbt_assets decorator to define partitioned dbt assets.

  • partition_key_to_vars_fn (Optional[str -> Dict[str, Any]]) – deprecated (This parameter will be removed in version 0.21. Use the @dbt_assets decorator to define partitioned dbt assets.) [Deprecated] A function to translate a given partition key (e.g. ‘2022-01-01’) to a dictionary of vars to be passed into the dbt invocation (e.g. {“run_date”: “2022-01-01”}). Deprecated: use the @dbt_assets decorator to define partitioned dbt assets.

  • node_info_to_group_fn (Dict[str, Any] -> Optional[str]) – [Deprecated] A function that takes a dictionary of dbt node info and returns the group that this node should be assigned to. Deprecated: instead, configure dagster groups on a dbt resource’s meta field or assign dbt groups.

  • node_info_to_freshness_policy_fn (Dict[str, Any] -> Optional[FreshnessPolicy]) – [Deprecated] A function that takes a dictionary of dbt node info and optionally returns a FreshnessPolicy that should be applied to this node. By default, freshness policies will be created from config applied to dbt models, i.e.: dagster_freshness_policy={“maximum_lag_minutes”: 60, “cron_schedule”: “0 9 * * *”} will result in that model being assigned FreshnessPolicy(maximum_lag_minutes=60, cron_schedule=”0 9 * * *”). Deprecated: instead, configure auto-materialize policies on a dbt resource’s meta field.

  • node_info_to_auto_materialize_policy_fn (Dict[str, Any] -> Optional[AutoMaterializePolicy]) – [Deprecated] A function that takes a dictionary of dbt node info and optionally returns a AutoMaterializePolicy that should be applied to this node. By default, AutoMaterializePolicies will be created from config applied to dbt models, i.e.: dagster_auto_materialize_policy={“type”: “lazy”} will result in that model being assigned AutoMaterializePolicy.lazy(). Deprecated: instead, configure auto-materialize policies on a dbt resource’s meta field.

  • node_info_to_definition_metadata_fn (Dict[str, Any] -> Optional[Dict[str, RawMetadataMapping]]) – [Deprecated] A function that takes a dictionary of dbt node info and optionally returns a dictionary of metadata to be attached to the corresponding definition. This is added to the default metadata assigned to the node, which consists of the node’s schema (if present). Deprecated: instead, provide a custom DagsterDbtTranslator that overrides node_info_to_metadata.

  • display_raw_sql (Optional[bool]) – [Deprecated] A flag to indicate if the raw sql associated with each model should be included in the asset description. For large projects, setting this flag to False is advised to reduce the size of the resulting snapshot. Deprecated: instead, provide a custom DagsterDbtTranslator that overrides node_info_to_description.

@dagster_dbt.dbt_assets(*, manifest, select='fqn:*', exclude=None, name=None, io_manager_key=None, partitions_def=None, dagster_dbt_translator=<dagster_dbt.dagster_dbt_translator.DagsterDbtTranslator object>, backfill_policy=None, op_tags=None, required_resource_keys=None)[source]

Create a definition for how to compute a set of dbt resources, described by a manifest.json. When invoking dbt commands using DbtCliResource’s cli() method, Dagster events are emitted by calling yield from on the event stream returned by stream().

Parameters:
  • manifest (Union[Mapping[str, Any], str, Path]) – The contents of a manifest.json file or the path to a manifest.json file. A manifest.json contains a representation of a dbt project (models, tests, macros, etc). We use this representation to create corresponding Dagster assets.

  • select (str) – A dbt selection string for the models in a project that you want to include. Defaults to fqn:*.

  • exclude (Optional[str]) – A dbt selection string for the models in a project that you want to exclude. Defaults to “”.

  • name (Optional[str]) – The name of the op.

  • io_manager_key (Optional[str]) – The IO manager key that will be set on each of the returned assets. When other ops are downstream of the loaded assets, the IOManager specified here determines how the inputs to those ops are loaded. Defaults to “io_manager”.

  • partitions_def (Optional[PartitionsDefinition]) – Defines the set of partition keys that compose the dbt assets.

  • dagster_dbt_translator (Optional[DagsterDbtTranslator]) – Allows customizing how to map dbt models, seeds, etc. to asset keys and asset metadata.

  • backfill_policy (Optional[BackfillPolicy]) – If a partitions_def is defined, this determines how to execute backfills that target multiple partitions. If a time window partition definition is used, this parameter defaults to a single-run policy.

  • op_tags (Optional[Dict[str, Any]]) – A dictionary of tags for the op that computes the assets. Frameworks may expect and require certain metadata to be attached to a op. Values that are not strings will be json encoded and must meet the criteria that json.loads(json.dumps(value)) == value.

  • required_resource_keys (Optional[Set[str]]) – Set of required resource handles.

Examples

Running dbt build for a dbt project:

from pathlib import Path

from dagster import AssetExecutionContext
from dagster_dbt import DbtCliResource, dbt_assets


@dbt_assets(manifest=Path("target", "manifest.json"))
def my_dbt_assets(context: AssetExecutionContext, dbt: DbtCliResource):
    yield from dbt.cli(["build"], context=context).stream()

Running dbt commands with flags:

from pathlib import Path

from dagster import AssetExecutionContext
from dagster_dbt import DbtCliResource, dbt_assets


@dbt_assets(manifest=Path("target", "manifest.json"))
def my_dbt_assets(context: AssetExecutionContext, dbt: DbtCliResource):
    yield from dbt.cli(["build", "--full-refresh"], context=context).stream()

Running dbt commands with --vars:

import json
from pathlib import Path

from dagster import AssetExecutionContext
from dagster_dbt import DbtCliResource, dbt_assets


@dbt_assets(manifest=Path("target", "manifest.json"))
def my_dbt_assets(context: AssetExecutionContext, dbt: DbtCliResource):
    dbt_vars = {"key": "value"}

    yield from dbt.cli(["build", "--vars", json.dumps(dbt_vars)], context=context).stream()

Retrieving dbt artifacts after running a dbt command:

from pathlib import Path

from dagster import AssetExecutionContext
from dagster_dbt import DbtCliResource, dbt_assets


@dbt_assets(manifest=Path("target", "manifest.json"))
def my_dbt_assets(context: AssetExecutionContext, dbt: DbtCliResource):
    dbt_build_invocation = dbt.cli(["build"], context=context)

    yield from dbt_build_invocation.stream()

    run_results_json = dbt_build_invocation.get_artifact("run_results.json")

Running multiple dbt commands for a dbt project:

from pathlib import Path

from dagster import AssetExecutionContext
from dagster_dbt import DbtCliResource, dbt_assets


@dbt_assets(manifest=Path("target", "manifest.json"))
def my_dbt_assets(context: AssetExecutionContext, dbt: DbtCliResource):
    yield from dbt.cli(["run"], context=context).stream()
    yield from dbt.cli(["test"], context=context).stream()

Customizing the Dagster asset metadata inferred from a dbt project using DagsterDbtTranslator:

from pathlib import Path

from dagster import AssetExecutionContext
from dagster_dbt import DagsterDbtTranslator, DbtCliResource, dbt_assets


class CustomDagsterDbtTranslator(DagsterDbtTranslator):
    ...


@dbt_assets(
    manifest=Path("target", "manifest.json"),
    dagster_dbt_translator=CustomDagsterDbtTranslator(),
)
def my_dbt_assets(context: AssetExecutionContext, dbt: DbtCliResource):
    yield from dbt.cli(["build"], context=context).stream()

Using a custom resource key for dbt:

from pathlib import Path

from dagster import AssetExecutionContext
from dagster_dbt import DbtCliResource, dbt_assets


@dbt_assets(manifest=Path("target", "manifest.json"))
def my_dbt_assets(context: AssetExecutionContext, my_custom_dbt_resource_key: DbtCliResource):
    yield from my_custom_dbt_resource_key.cli(["build"], context=context).stream()

Using a dynamically generated resource key for dbt using required_resource_keys:

from pathlib import Path

from dagster import AssetExecutionContext
from dagster_dbt import DbtCliResource, dbt_assets


dbt_resource_key = "my_custom_dbt_resource_key"

@dbt_assets(manifest=Path("target", "manifest.json"), required_resource_keys={my_custom_dbt_resource_key})
def my_dbt_assets(context: AssetExecutionContext):
    dbt = getattr(context.resources, dbt_resource_key)
    yield from dbt.cli(["build"], context=context).stream()

Invoking another Dagster ResourceDefinition alongside dbt:

from pathlib import Path

from dagster import AssetExecutionContext
from dagster_dbt import DbtCliResource, dbt_assets
from dagster_slack import SlackResource


@dbt_assets(manifest=Path("target", "manifest.json"))
def my_dbt_assets(context: AssetExecutionContext, dbt: DbtCliResource, slack: SlackResource):
    yield from dbt.cli(["build"], context=context).stream()

    slack_client = slack.get_client()
    slack_client.chat_postMessage(channel="#my-channel", text="dbt build succeeded!")

Defining and accessing Dagster Config alongside dbt:

from pathlib import Path

from dagster import AssetExecutionContext, Config
from dagster_dbt import DbtCliResource, dbt_assets


class MyDbtConfig(Config):
    full_refresh: bool


@dbt_assets(manifest=Path("target", "manifest.json"))
def my_dbt_assets(context: AssetExecutionContext, dbt: DbtCliResource, config: MyDbtConfig):
    dbt_build_args = ["build"]
    if config.full_refresh:
        dbt_build_args += ["--full-refresh"]

    yield from dbt.cli(dbt_build_args, context=context).stream()

Defining Dagster PartitionDefinition alongside dbt:

import json
from pathlib import Path

from dagster import AssetExecutionContext, DailyPartitionDefinition
from dagster_dbt import DbtCliResource, dbt_assets


@dbt_assets(
    manifest=Path("target", "manifest.json"),
    partitions_def=DailyPartitionsDefinition(start_date="2023-01-01")
)
def partitionshop_dbt_assets(context: AssetExecutionContext, dbt: DbtCliResource):
    time_window = context.partition_time_window

    dbt_vars = {
        "min_date": time_window.start.isoformat(),
        "max_date": time_window.end.isoformat()
    }
    dbt_build_args = ["build", "--vars", json.dumps(dbt_vars)]

    yield from dbt.cli(dbt_build_args, context=context).stream()
class dagster_dbt.DagsterDbtTranslator(settings=None)[source]

Holds a set of methods that derive Dagster asset definition metadata given a representation of a dbt resource (models, tests, sources, etc).

This class is exposed so that methods can be overriden to customize how Dagster asset metadata is derived.

classmethod get_asset_key(dbt_resource_props)[source]

A function that takes a dictionary representing properties of a dbt resource, and returns the Dagster asset key that represents that resource.

Note that a dbt resource is unrelated to Dagster’s resource concept, and simply represents a model, seed, snapshot or source in a given dbt project. You can learn more about dbt resources and the properties available in this dictionary here: https://docs.getdbt.com/reference/artifacts/manifest-json#resource-details

This method can be overridden to provide a custom asset key for a dbt resource.

Parameters:

dbt_resource_props (Mapping[str, Any]) – A dictionary representing the dbt resource.

Returns:

The Dagster asset key for the dbt resource.

Return type:

AssetKey

Examples

Adding a prefix to the default asset key generated for each dbt resource:

from typing import Any, Mapping

from dagster import AssetKey
from dagster_dbt import DagsterDbtTranslator


class CustomDagsterDbtTranslator(DagsterDbtTranslator):
    @classmethod
    def get_asset_key(cls, dbt_resource_props: Mapping[str, Any]) -> AssetKey:
        return super().get_asset_key(dbt_resource_props).with_prefix("prefix")

Adding a prefix to the default asset key generated for each dbt resource, but only for dbt sources:

from typing import Any, Mapping

from dagster import AssetKey
from dagster_dbt import DagsterDbtTranslator


class CustomDagsterDbtTranslator(DagsterDbtTranslator):
    @classmethod
    def get_asset_key(cls, dbt_resource_props: Mapping[str, Any]) -> AssetKey:
        asset_key = super().get_asset_key(dbt_resource_props)

        if dbt_resource_props["resource_type"] == "source":
            asset_key = asset_key.with_prefix("my_prefix")

        return asset_key
classmethod get_auto_materialize_policy(dbt_resource_props)[source]

A function that takes a dictionary representing properties of a dbt resource, and returns the Dagster dagster.AutoMaterializePolicy for that resource.

Note that a dbt resource is unrelated to Dagster’s resource concept, and simply represents a model, seed, snapshot or source in a given dbt project. You can learn more about dbt resources and the properties available in this dictionary here: https://docs.getdbt.com/reference/artifacts/manifest-json#resource-details

This method can be overridden to provide a custom auto-materialize policy for a dbt resource.

Parameters:

dbt_resource_props (Mapping[str, Any]) – A dictionary representing the dbt resource.

Returns:

A Dagster auto-materialize policy.

Return type:

Optional[AutoMaterializePolicy]

Examples

Set a custom auto-materialize policy for all dbt resources:

from typing import Any, Mapping

from dagster_dbt import DagsterDbtTranslator


class CustomDagsterDbtTranslator(DagsterDbtTranslator):
    @classmethod
    def get_auto_materialize_policy(cls, dbt_resource_props: Mapping[str, Any]) -> Optional[AutoMaterializePolicy]:
        return AutoMaterializePolicy.eager()

Set a custom auto-materialize policy for dbt resources with a specific tag:

from typing import Any, Mapping

from dagster_dbt import DagsterDbtTranslator


class CustomDagsterDbtTranslator(DagsterDbtTranslator):
    @classmethod
    def get_auto_materialize_policy(cls, dbt_resource_props: Mapping[str, Any]) -> Optional[AutoMaterializePolicy]:
        auto_materialize_policy = None
        if "my_custom_tag" in dbt_resource_props.get("tags", []):
            auto_materialize_policy = AutoMaterializePolicy.eager()

        return auto_materialize_policy
classmethod get_description(dbt_resource_props)[source]

A function that takes a dictionary representing properties of a dbt resource, and returns the Dagster description for that resource.

Note that a dbt resource is unrelated to Dagster’s resource concept, and simply represents a model, seed, snapshot or source in a given dbt project. You can learn more about dbt resources and the properties available in this dictionary here: https://docs.getdbt.com/reference/artifacts/manifest-json#resource-details

This method can be overridden to provide a custom description for a dbt resource.

Parameters:

dbt_resource_props (Mapping[str, Any]) – A dictionary representing the dbt resource.

Returns:

The description for the dbt resource.

Return type:

str

Examples

from typing import Any, Mapping

from dagster_dbt import DagsterDbtTranslator


class CustomDagsterDbtTranslator(DagsterDbtTranslator):
    @classmethod
    def get_description(cls, dbt_resource_props: Mapping[str, Any]) -> str:
        return "custom description"
classmethod get_freshness_policy(dbt_resource_props)[source]

A function that takes a dictionary representing properties of a dbt resource, and returns the Dagster dagster.FreshnessPolicy for that resource.

Note that a dbt resource is unrelated to Dagster’s resource concept, and simply represents a model, seed, snapshot or source in a given dbt project. You can learn more about dbt resources and the properties available in this dictionary here: https://docs.getdbt.com/reference/artifacts/manifest-json#resource-details

This method can be overridden to provide a custom freshness policy for a dbt resource.

Parameters:

dbt_resource_props (Mapping[str, Any]) – A dictionary representing the dbt resource.

Returns:

A Dagster freshness policy.

Return type:

Optional[FreshnessPolicy]

Examples

Set a custom freshness policy for all dbt resources:

from typing import Any, Mapping

from dagster_dbt import DagsterDbtTranslator


class CustomDagsterDbtTranslator(DagsterDbtTranslator):
    @classmethod
    def get_freshness_policy(cls, dbt_resource_props: Mapping[str, Any]) -> Optional[FreshnessPolicy]:
        return FreshnessPolicy(maximum_lag_minutes=60)

Set a custom freshness policy for dbt resources with a specific tag:

from typing import Any, Mapping

from dagster_dbt import DagsterDbtTranslator


class CustomDagsterDbtTranslator(DagsterDbtTranslator):
    @classmethod
    def get_freshness_policy(cls, dbt_resource_props: Mapping[str, Any]) -> Optional[FreshnessPolicy]:
        freshness_policy = None
        if "my_custom_tag" in dbt_resource_props.get("tags", []):
            freshness_policy = FreshnessPolicy(maximum_lag_minutes=60)

        return freshness_policy
classmethod get_group_name(dbt_resource_props)[source]

A function that takes a dictionary representing properties of a dbt resource, and returns the Dagster group name for that resource.

Note that a dbt resource is unrelated to Dagster’s resource concept, and simply represents a model, seed, snapshot or source in a given dbt project. You can learn more about dbt resources and the properties available in this dictionary here: https://docs.getdbt.com/reference/artifacts/manifest-json#resource-details

This method can be overridden to provide a custom group name for a dbt resource.

Parameters:

dbt_resource_props (Mapping[str, Any]) – A dictionary representing the dbt resource.

Returns:

A Dagster group name.

Return type:

Optional[str]

Examples

from typing import Any, Mapping

from dagster_dbt import DagsterDbtTranslator


class CustomDagsterDbtTranslator(DagsterDbtTranslator):
    @classmethod
    def get_group_name(cls, dbt_resource_props: Mapping[str, Any]) -> Optional[str]:
        return "custom_group_prefix" + dbt_resource_props.get("config", {}).get("group")
classmethod get_metadata(dbt_resource_props)[source]

A function that takes a dictionary representing properties of a dbt resource, and returns the Dagster metadata for that resource.

Note that a dbt resource is unrelated to Dagster’s resource concept, and simply represents a model, seed, snapshot or source in a given dbt project. You can learn more about dbt resources and the properties available in this dictionary here: https://docs.getdbt.com/reference/artifacts/manifest-json#resource-details

This method can be overridden to provide a custom metadata for a dbt resource.

Parameters:

dbt_resource_props (Mapping[str, Any]) – A dictionary representing the dbt resource.

Returns:

A dictionary representing the Dagster metadata for the dbt resource.

Return type:

Mapping[str, Any]

Examples

from typing import Any, Mapping

from dagster_dbt import DagsterDbtTranslator


class CustomDagsterDbtTranslator(DagsterDbtTranslator):
    @classmethod
    def get_metadata(cls, dbt_resource_props: Mapping[str, Any]) -> Mapping[str, Any]:
        return {"custom": "metadata"}
classmethod get_partition_mapping(dbt_resource_props, dbt_parent_resource_props)[source]

A function that takes two dictionaries: the first, representing properties of a dbt resource; and the second, representing the properties of a parent dependency to the first dbt resource. The function returns the Dagster partition mapping for the dbt dependency.

Note that a dbt resource is unrelated to Dagster’s resource concept, and simply represents a model, seed, snapshot or source in a given dbt project. You can learn more about dbt resources and the properties available in this dictionary here: https://docs.getdbt.com/reference/artifacts/manifest-json#resource-details

This method can be overridden to provide a custom partition mapping for a dbt dependency.

Parameters:
  • dbt_resource_props (Mapping[str, Any]) – A dictionary representing the dbt child resource.

  • dbt_parent_resource_props (Mapping[str, Any]) – A dictionary representing the dbt parent resource, in relationship to the child.

Returns:

The Dagster partition mapping for the dbt resource. If None is returned, the default partition mapping will be used.

Return type:

Optional[PartitionMapping]

class dagster_dbt.DagsterDbtTranslatorSettings(enable_asset_checks=False, enable_duplicate_source_asset_keys=False)[source]

Settings to enable Dagster features for your dbt project.

Parameters:
  • enable_asset_checks (bool) – Whether to load dbt tests as Dagster asset checks. Defaults to False.

  • enable_duplicate_source_asset_keys (bool) – Whether to allow dbt sources with duplicate Dagster asset keys. Defaults to False.

class dagster_dbt.DbtManifestAssetSelection(*, manifest, select, dagster_dbt_translator, exclude)[source]

Defines a selection of assets from a dbt manifest wrapper and a dbt selection string.

Parameters:
  • manifest (Mapping[str, Any]) – The dbt manifest blob.

  • select (str) – A dbt selection string to specify a set of dbt resources.

  • exclude (Optional[str]) – A dbt selection string to exclude a set of dbt resources.

Examples

import json
from pathlib import Path

from dagster_dbt import DbtManifestAssetSelection

manifest = json.loads(Path("path/to/manifest.json").read_text())

# select the dbt assets that have the tag "foo".
my_selection = DbtManifestAssetSelection(manifest=manifest, select="tag:foo")
dagster_dbt.build_dbt_asset_selection(dbt_assets, dbt_select='fqn:*', dbt_exclude=None)[source]

Build an asset selection for a dbt selection string.

See https://docs.getdbt.com/reference/node-selection/syntax#how-does-selection-work for more information.

Parameters:
  • dbt_select (str) – A dbt selection string to specify a set of dbt resources.

  • dbt_exclude (Optional[str]) – A dbt selection string to exclude a set of dbt resources.

Returns:

An asset selection for the selected dbt nodes.

Return type:

AssetSelection

Examples

from dagster_dbt import dbt_assets, build_dbt_asset_selection

@dbt_assets(manifest=...)
def all_dbt_assets():
    ...

# Select the dbt assets that have the tag "foo".
foo_selection = build_dbt_asset_selection([dbt_assets], dbt_select="tag:foo")

# Select the dbt assets that have the tag "foo" and all Dagster assets downstream
# of them (dbt-related or otherwise)
foo_and_downstream_selection = foo_selection.downstream()
dagster_dbt.build_schedule_from_dbt_selection(dbt_assets, job_name, cron_schedule, dbt_select='fqn:*', dbt_exclude=None, tags=None, config=None, execution_timezone=None, default_status=DefaultScheduleStatus.STOPPED)[source]

Build a schedule to materialize a specified set of dbt resources from a dbt selection string.

See https://docs.getdbt.com/reference/node-selection/syntax#how-does-selection-work for more information.

Parameters:
  • job_name (str) – The name of the job to materialize the dbt resources.

  • cron_schedule (str) – The cron schedule to define the schedule.

  • dbt_select (str) – A dbt selection string to specify a set of dbt resources.

  • dbt_exclude (Optional[str]) – A dbt selection string to exclude a set of dbt resources.

  • tags (Optional[Mapping[str, str]]) – A dictionary of tags (string key-value pairs) to attach to the scheduled runs.

  • config (Optional[RunConfig]) – The config that parameterizes the execution of this schedule.

  • execution_timezone (Optional[str]) – Timezone in which the schedule should run. Supported strings for timezones are the ones provided by the IANA time zone database <https://www.iana.org/time-zones> - e.g. “America/Los_Angeles”.

Returns:

A definition to materialize the selected dbt resources on a cron schedule.

Return type:

ScheduleDefinition

Examples

from dagster_dbt import dbt_assets, build_schedule_from_dbt_selection

@dbt_assets(manifest=...)
def all_dbt_assets():
    ...

daily_dbt_assets_schedule = build_schedule_from_dbt_selection(
    [all_dbt_assets],
    job_name="all_dbt_assets",
    cron_schedule="0 0 * * *",
    dbt_select="fqn:*",
)
dagster_dbt.get_asset_key_for_model(dbt_assets, model_name)[source]

Return the corresponding Dagster asset key for a dbt model, seed, or snapshot.

Parameters:
  • dbt_assets (AssetsDefinition) – An AssetsDefinition object produced by load_assets_from_dbt_project, load_assets_from_dbt_manifest, or @dbt_assets.

  • model_name (str) – The name of the dbt model, seed, or snapshot.

Returns:

The corresponding Dagster asset key.

Return type:

AssetKey

Examples

from dagster import asset
from dagster_dbt import dbt_assets, get_asset_key_for_model

@dbt_assets(manifest=...)
def all_dbt_assets():
    ...


@asset(deps={get_asset_key_for_model([all_dbt_assets], "customers")})
def cleaned_customers():
    ...
dagster_dbt.get_asset_key_for_source(dbt_assets, source_name)[source]

Returns the corresponding Dagster asset key for a dbt source with a singular table.

Parameters:

source_name (str) – The name of the dbt source.

Raises:

DagsterInvalidInvocationError – If the source has more than one table.

Returns:

The corresponding Dagster asset key.

Return type:

AssetKey

Examples

from dagster import asset
from dagster_dbt import dbt_assets, get_asset_key_for_source

@dbt_assets(manifest=...)
def all_dbt_assets():
    ...

@asset(key=get_asset_key_for_source([all_dbt_assets], "my_source"))
def upstream_python_asset():
    ...
dagster_dbt.get_asset_keys_by_output_name_for_source(dbt_assets, source_name)[source]

Returns the corresponding Dagster asset keys for all tables in a dbt source.

This is a convenience method that makes it easy to define a multi-asset that generates all the tables for a given dbt source.

Parameters:

source_name (str) – The name of the dbt source.

Returns:

A mapping of the table name to corresponding Dagster asset key

for all tables in the given dbt source.

Return type:

Mapping[str, AssetKey]

Examples

from dagster import AssetOut, multi_asset
from dagster_dbt import dbt_assets, get_asset_keys_by_output_name_for_source

@dbt_assets(manifest=...)
def all_dbt_assets():
    ...

@multi_asset(
    outs={
        name: AssetOut(key=asset_key)
        for name, asset_key in get_asset_keys_by_output_name_for_source(
            [all_dbt_assets], "raw_data"
        ).items()
    },
)
def upstream_python_asset():
    ...

Resources (dbt Core)

CLI Resource

class dagster_dbt.DbtCliResource(*, project_dir, global_config_flags=[], profiles_dir=None, profile=None, target=None, dbt_executable='dbt')[source]

A resource used to execute dbt CLI commands.

project_dir

The path to the dbt project directory. This directory should contain a dbt_project.yml. See https://docs.getdbt.com/reference/dbt_project.yml for more information.

Type:

str

global_config_flags

A list of global flags configuration to pass to the dbt CLI invocation. See https://docs.getdbt.com/reference/global-configs for a full list of configuration.

Type:

List[str]

profiles_dir

The path to the directory containing your dbt profiles.yml. By default, the current working directory is used, which is the dbt project directory. See https://docs.getdbt.com/docs/core/connect-data-platform/connection-profiles for more information.

Type:

Optional[str]

profile

The profile from your dbt profiles.yml to use for execution. See https://docs.getdbt.com/docs/core/connect-data-platform/connection-profiles for more information.

Type:

Optional[str]

target

The target from your dbt profiles.yml to use for execution. See https://docs.getdbt.com/docs/core/connect-data-platform/connection-profiles for more information.

Type:

Optional[str]

dbt_executable

The path to the dbt executable. By default, this is dbt.

Type:

str

Examples

Creating a dbt resource with only a reference to project_dir:

from dagster_dbt import DbtCliResource

dbt = DbtCliResource(project_dir="/path/to/dbt/project")

Creating a dbt resource with a custom profiles_dir:

from dagster_dbt import DbtCliResource

dbt = DbtCliResource(
    project_dir="/path/to/dbt/project",
    profiles_dir="/path/to/dbt/project/profiles",
)

Creating a dbt resource with a custom profile and target:

from dagster_dbt import DbtCliResource

dbt = DbtCliResource(
    project_dir="/path/to/dbt/project",
    profiles_dir="/path/to/dbt/project/profiles",
    profile="jaffle_shop",
    target="dev",
)

Creating a dbt resource with global configs, e.g. disabling colored logs with --no-use-color:

from dagster_dbt import DbtCliResource

dbt = DbtCliResource(
    project_dir="/path/to/dbt/project",
    global_config_flags=["--no-use-color"],
)

Creating a dbt resource with custom dbt executable path:

from dagster_dbt import DbtCliResource

dbt = DbtCliResource(
    project_dir="/path/to/dbt/project",
    dbt_executable="/path/to/dbt/executable",
)
cli(args, *, raise_on_error=True, manifest=None, dagster_dbt_translator=None, context=None, target_path=None)[source]

Create a subprocess to execute a dbt CLI command.

Parameters:
  • args (List[str]) – The dbt CLI command to execute.

  • raise_on_error (bool) – Whether to raise an exception if the dbt CLI command fails.

  • manifest (Optional[Union[Mapping[str, Any], str, Path]]) – The dbt manifest blob. If an execution context from within @dbt_assets is provided to the context argument, then the manifest provided to @dbt_assets will be used.

  • dagster_dbt_translator (Optional[DagsterDbtTranslator]) – The translator to link dbt nodes to Dagster assets. If an execution context from within @dbt_assets is provided to the context argument, then the dagster_dbt_translator provided to @dbt_assets will be used.

  • context (Optional[Union[OpExecutionContext, AssetExecutionContext]]) – The execution context from within @dbt_assets. If an AssetExecutionContext is passed, its underlying OpExecutionContext will be used.

  • target_path (Optional[Path]) – An explicit path to a target folder to use to store and retrieve dbt artifacts when running a dbt CLI command. If not provided, a unique target path will be generated.

Returns:

A invocation instance that can be used to retrieve the output of the

dbt CLI command.

Return type:

DbtCliInvocation

Examples

Streaming Dagster events for dbt asset materializations and observations:

from pathlib import Path

from dagster import AssetExecutionContext
from dagster_dbt import DbtCliResource, dbt_assets


@dbt_assets(manifest=Path("target", "manifest.json"))
def my_dbt_assets(context: AssetExecutionContext, dbt: DbtCliResource):
    yield from dbt.cli(["run"], context=context).stream()

Retrieving a dbt artifact after streaming the Dagster events:

from pathlib import Path

from dagster import AssetExecutionContext
from dagster_dbt import DbtCliResource, dbt_assets


@dbt_assets(manifest=Path("target", "manifest.json"))
def my_dbt_assets(context: AssetExecutionContext, dbt: DbtCliResource):
    dbt_run_invocation = dbt.cli(["run"], context=context)

    yield from dbt_run_invocation.stream()

    # Retrieve the `run_results.json` dbt artifact as a dictionary:
    run_results_json = dbt_run_invocation.get_artifact("run_results.json")

    # Retrieve the `run_results.json` dbt artifact as a file path:
    run_results_path = dbt_run_invocation.target_path.joinpath("run_results.json")

Customizing the asset materialization metadata when streaming the Dagster events:

from pathlib import Path

from dagster import AssetExecutionContext
from dagster_dbt import DbtCliResource, dbt_assets


@dbt_assets(manifest=Path("target", "manifest.json"))
def my_dbt_assets(context: AssetExecutionContext, dbt: DbtCliResource):
    dbt_cli_invocation = dbt.cli(["run"], context=context)

    for dagster_event in dbt_cli_invocation.stream():
        if isinstance(dagster_event, Output):
            context.add_output_metadata(
                metadata={
                    "my_custom_metadata": "my_custom_metadata_value",
                },
                output_name=dagster_event.output_name,
            )

        yield dagster_event

Suppressing exceptions from a dbt CLI command when a non-zero exit code is returned:

from pathlib import Path

from dagster import AssetExecutionContext
from dagster_dbt import DbtCliResource, dbt_assets


@dbt_assets(manifest=Path("target", "manifest.json"))
def my_dbt_assets(context: AssetExecutionContext, dbt: DbtCliResource):
    dbt_run_invocation = dbt.cli(["run"], context=context, raise_on_error=False)

    if dbt_run_invocation.is_successful():
        yield from dbt_run_invocation.stream()
    else:
        ...

Invoking a dbt CLI command in a custom asset or op:

import json

from dagster import asset, op
from dagster_dbt import DbtCliResource


@asset
def my_dbt_asset(dbt: DbtCliResource):
    dbt_macro_args = {"key": "value"}
    dbt.cli(["run-operation", "my-macro", json.dumps(dbt_macro_args)]).wait()


@op
def my_dbt_op(dbt: DbtCliResource):
    dbt_macro_args = {"key": "value"}
    dbt.cli(["run-operation", "my-macro", json.dumps(dbt_macro_args)]).wait()
class dagster_dbt.DbtCliInvocation(process, manifest, dagster_dbt_translator, project_dir, target_path, raise_on_error, log_level, context=None)[source]

The representation of an invoked dbt command.

Parameters:
  • process (subprocess.Popen) – The process running the dbt command.

  • manifest (Mapping[str, Any]) – The dbt manifest blob.

  • project_dir (Path) – The path to the dbt project.

  • target_path (Path) – The path to the dbt target folder.

  • raise_on_error (bool) – Whether to raise an exception if the dbt command fails.

get_artifact(artifact)[source]

Retrieve a dbt artifact from the target path.

See https://docs.getdbt.com/reference/artifacts/dbt-artifacts for more information.

Parameters:

artifact (Union[Literal["manifest.json"], Literal["catalog.json"], Literal["run_results.json"], Literal["sources.json"]]) – The name of the artifact to retrieve.

Returns:

The artifact as a dictionary.

Return type:

Dict[str, Any]

Examples

from dagster_dbt import DbtCliResource

dbt = DbtCliResource(project_dir="/path/to/dbt/project")

dbt_cli_invocation = dbt.cli(["run"]).wait()

# Retrieve the run_results.json artifact.
run_results = dbt_cli_invocation.get_artifact("run_results.json")
is_successful()[source]

Return whether the dbt CLI process completed successfully.

Returns:

True, if the dbt CLI process returns with a zero exit code, and False otherwise.

Return type:

bool

Examples

from dagster_dbt import DbtCliResource

dbt = DbtCliResource(project_dir="/path/to/dbt/project")

dbt_cli_invocation = dbt.cli(["run"], raise_on_error=False)

if dbt_cli_invocation.is_successful():
    ...
stream()[source]

Stream the events from the dbt CLI process and convert them to Dagster events.

Returns:

A set of corresponding Dagster events.

In a Dagster asset definition, the following are yielded: - Output for refables (e.g. models, seeds, snapshots.) - AssetCheckResult for dbt test results that are enabled as asset checks. - AssetObservation for dbt test results that are not enabled as asset checks.

In a Dagster op definition, the following are yielded: - AssetMaterialization for dbt test results that are not enabled as asset checks. - AssetObservation for dbt test results.

Return type:

Iterator[Union[Output, AssetMaterialization, AssetObservation, AssetCheckResult]]

Examples

from pathlib import Path
from dagster_dbt import DbtCliResource, dbt_assets

@dbt_assets(manifest=Path("target", "manifest.json"))
def my_dbt_assets(context, dbt: DbtCliResource):
    yield from dbt.cli(["run"], context=context).stream()
stream_raw_events()[source]

Stream the events from the dbt CLI process.

Returns:

An iterator of events from the dbt CLI process.

Return type:

Iterator[DbtCliEventMessage]

wait()[source]

Wait for the dbt CLI process to complete.

Returns:

The current representation of the dbt CLI invocation.

Return type:

DbtCliInvocation

Examples

from dagster_dbt import DbtCliResource

dbt = DbtCliResource(project_dir="/path/to/dbt/project")

dbt_cli_invocation = dbt.cli(["run"]).wait()
class dagster_dbt.DbtCliEventMessage(raw_event, event_history_metadata)[source]

The representation of a dbt CLI event.

Parameters:
to_default_asset_events(manifest, dagster_dbt_translator=<dagster_dbt.dagster_dbt_translator.DagsterDbtTranslator object>, context=None)[source]

Convert a dbt CLI event to a set of corresponding Dagster events.

Parameters:
  • manifest (Union[Mapping[str, Any], str, Path]) – The dbt manifest blob.

  • dagster_dbt_translator (DagsterDbtTranslator) – Optionally, a custom translator for linking dbt nodes to Dagster assets.

Returns:

A set of corresponding Dagster events.

In a Dagster asset definition, the following are yielded: - Output for refables (e.g. models, seeds, snapshots.) - AssetCheckResult for dbt test results that are enabled as asset checks. - AssetObservation for dbt test results that are not enabled as asset checks.

In a Dagster op definition, the following are yielded: - AssetMaterialization for dbt test results that are not enabled as asset checks. - AssetObservation for dbt test results.

Return type:

Iterator[Union[Output, AssetMaterialization, AssetObservation, AssetCheckResult]]

Deprecated (dbt Core)

class dagster_dbt.DbtCliOutput(command, return_code, raw_output, logs, result, docs_url=None)[source]

deprecated This API will be removed in version 0.24.0.

The results of executing a dbt command, along with additional metadata about the dbt CLI process that was run.

This class is deprecated, because it’s only produced by methods of the DbtCliClientResource class, which is deprecated in favor of DbtCliResource.

Note that users should not construct instances of this class directly. This class is intended to be constructed from the JSON output of dbt commands.

command

The full shell command that was executed.

Type:

str

return_code

The return code of the dbt CLI process.

Type:

int

raw_output

The raw output (stdout) of the dbt CLI process.

Type:

str

logs

List of parsed JSON logs produced by the dbt command.

Type:

List[Dict[str, Any]]

result

Dictionary containing dbt-reported result information contained in run_results.json. Some dbt commands do not produce results, and will therefore have result = None.

Type:

Optional[Dict[str, Any]]

docs_url

Hostname where dbt docs are being served for this project.

Type:

Optional[str]

dagster_dbt.dbt_cli_resource ResourceDefinition[source]

Config Schema:
project_dir (Union[dagster.StringSource, None], optional):

Which directory to look in for the dbt_project.yml file. Default is the current working directory and its parents.

Default Value: ‘.’

profiles_dir (Union[dagster.StringSource, None], optional):

Which directory to look in for the profiles.yml file. Default = $DBT_PROFILES_DIR or $HOME/.dbt

profile (Union[dagster.StringSource, None], optional):

Which profile to load. Overrides setting in dbt_project.yml.

target (Union[dagster.StringSource, None], optional):

Which target to load for the given profile.

vars (Union[dict, None], optional):

Supply variables to the project. This argument overrides variables defined in your dbt_project.yml file. This argument should be a dictionary, eg. {‘my_variable’: ‘my_value’}

bypass_cache (Union[dagster.BoolSource, None], optional):

If set, bypass the adapter-level cache of database state

Default Value: False

warn_error (Union[dagster.BoolSource, None], optional):

If dbt would normally warn, instead raise an exception. Examples include –models that selects nothing, deprecations, configurations with no associated models, invalid test configurations, and missing sources/refs in tests.

Default Value: False

dbt_executable (Union[dagster.StringSource, None], optional):

Path to the dbt executable. Default is dbt

Default Value: ‘dbt’

ignore_handled_error (Union[dagster.BoolSource, None], optional):

When True, will not raise an exception when the dbt CLI returns error code 1. Default is False.

Default Value: False

target_path (Union[dagster.StringSource, None], optional):

The directory path for target if different from the default target-path in your dbt project configuration file.

Default Value: ‘target’

docs_url (Union[dagster.StringSource, None], optional):

The url for where dbt docs are being served for this project.

json_log_format (Union[dagster.BoolSource, None], optional):

When True, dbt will invoked with the –log-format json flag, allowing Dagster to parse the log messages and emit simpler log messages to the event log.

Default Value: True

capture_logs (Union[dagster.BoolSource, None], optional):

When True, dbt will invoked with the –capture-output flag, allowing Dagster to capture the logs and emit them to the event log.

Default Value: True

debug (Union[dagster.BoolSource, None], optional):

When True, dbt will invoked with the –debug flag, which will print additional debug information to the console.

Default Value: False

deprecated This API will be removed in version 0.24.0.

Use DbtCliResource instead.

This resource issues dbt CLI commands against a configured dbt project. It is deprecated in favor of DbtCliResource.

Ops (dbt Core)

If you’re using asset-based dbt APIs like load_assets_from_dbt_project, you usually will not also use the below op-based APIs.

dagster_dbt provides a set of pre-built ops that work with the CLI. For more advanced use cases, we suggest building your own ops which directly interact with these resources.

dagster_dbt.dbt_run_op = <dagster._core.definitions.op_definition.OpDefinition object>[source]

Config Schema:
yield_materializations (Union[dagster.BoolSource, None], optional):

If True, materializations corresponding to the results of the dbt operation will be yielded when the op executes. Default: True

Default Value: True

asset_key_prefix (Union[List[dagster.StringSource], None], optional):

If provided and yield_materializations is True, these components will be used to prefix the generated asset keys.

Default Value: [‘dbt’]

This op executes a dbt run command. It requires the use of a dbt resource, which can be set to execute this command through the CLI (using the dbt_cli_resource).

Examples:

from dagster import job
from dagster_dbt import dbt_run_op, dbt_cli_resource

@job(resource_defs={"dbt":dbt_cli_resource})
def my_dbt_cli_job():
    dbt_run_op()
dagster_dbt.dbt_compile_op(context)[source]

This op executes a dbt compile command. It requires the use of a dbt resource, which can be set to execute this command through the CLI (using the dbt_cli_resource).

Examples:

from dagster import job
from dagster_dbt import dbt_compile_op, dbt_cli_resource

@job(resource_defs={"dbt":dbt_cli_resource})
def my_dbt_cli_job():
    dbt_compile_op()
dagster_dbt.dbt_ls_op(context)[source]

This op executes a dbt ls command. It requires the use of a dbt resource, which can be set to execute this command through the CLI (using the dbt_cli_resource).

Examples:

from dagster import job
from dagster_dbt import dbt_ls_op, dbt_cli_resource

@job(resource_defs={"dbt":dbt_cli_resource})
def my_dbt_cli_job():
    dbt_ls_op()
dagster_dbt.dbt_test_op(context)[source]

This op executes a dbt test command. It requires the use of a dbt resource, which can be set to execute this command through the CLI (using the dbt_cli_resource).

Examples:

from dagster import job
from dagster_dbt import dbt_test_op, dbt_cli_resource

@job(resource_defs={"dbt":dbt_cli_resource})
def my_dbt_cli_job():
    dbt_test_op()
dagster_dbt.dbt_snapshot_op(context)[source]

This op executes a dbt snapshot command. It requires the use of a dbt resource, which can be set to execute this command through the CLI (using the dbt_cli_resource).

Examples:

from dagster import job
from dagster_dbt import dbt_snapshot_op, dbt_cli_resource

@job(resource_defs={"dbt":dbt_cli_resource})
def my_dbt_cli_job():
    dbt_snapshot_op()
dagster_dbt.dbt_seed_op(context)[source]

This op executes a dbt seed command. It requires the use of a dbt resource, which can be set to execute this command through the CLI (using the dbt_cli_resource).

Examples:

from dagster import job
from dagster_dbt import dbt_seed_op, dbt_cli_resource

@job(resource_defs={"dbt":dbt_cli_resource})
def my_dbt_cli_job():
    dbt_seed_op()
dagster_dbt.dbt_docs_generate_op(context)[source]

This op executes a dbt docs generate command. It requires the use of a dbt resource, which can be set to execute this command through the CLI (using the dbt_cli_resource).

Examples:

from dagster import job
from dagster_dbt import dbt_docs_generate_op, dbt_cli_resource

@job(resource_defs={"dbt":dbt_cli_resource})
def my_dbt_cli_job():
    dbt_docs_generate_op()

dbt Cloud

Here, we provide interfaces to manage dbt projects invoked by the hosted dbt Cloud service.

Assets (dbt Cloud)

dagster_dbt.load_assets_from_dbt_cloud_job(dbt_cloud, job_id, node_info_to_asset_key=<function default_asset_key_fn>, node_info_to_group_fn=<function default_group_from_dbt_resource_props>, node_info_to_freshness_policy_fn=<function default_freshness_policy_fn>, node_info_to_auto_materialize_policy_fn=<function default_auto_materialize_policy_fn>, partitions_def=None, partition_key_to_vars_fn=None)[source]

experimental This API may break in future versions, even between dot releases.

Loads a set of dbt models, managed by a dbt Cloud job, into Dagster assets. In order to determine the set of dbt models, the project is compiled to generate the necessary artifacts that define the dbt models and their dependencies.

One Dagster asset is created for each dbt model.

Parameters:
  • dbt_cloud (ResourceDefinition) – The dbt Cloud resource to use to connect to the dbt Cloud API.

  • job_id (int) – The ID of the dbt Cloud job to load assets from.

  • node_info_to_asset_key – (Mapping[str, Any] -> AssetKey): A function that takes a dictionary of dbt metadata and returns the AssetKey that you want to represent a given model or source. By default: dbt model -> AssetKey([model_name]) and dbt source -> AssetKey([source_name, table_name])

  • node_info_to_group_fn (Dict[str, Any] -> Optional[str]) – A function that takes a dictionary of dbt node info and returns the group that this node should be assigned to.

  • node_info_to_freshness_policy_fn (Dict[str, Any] -> Optional[FreshnessPolicy]) – A function that takes a dictionary of dbt node info and optionally returns a FreshnessPolicy that should be applied to this node. By default, freshness policies will be created from config applied to dbt models, i.e.: dagster_freshness_policy={“maximum_lag_minutes”: 60, “cron_schedule”: “0 9 * * *”} will result in that model being assigned FreshnessPolicy(maximum_lag_minutes=60, cron_schedule=”0 9 * * *”)

  • node_info_to_auto_materialize_policy_fn (Dict[str, Any] -> Optional[AutoMaterializePolicy]) – A function that takes a dictionary of dbt node info and optionally returns a AutoMaterializePolicy that should be applied to this node. By default, AutoMaterializePolicies will be created from config applied to dbt models, i.e.: dagster_auto_materialize_policy={“type”: “lazy”} will result in that model being assigned AutoMaterializePolicy.lazy()

  • node_info_to_definition_metadata_fn (Dict[str, Any] -> Optional[Dict[str, RawMetadataMapping]]) – A function that takes a dictionary of dbt node info and optionally returns a dictionary of metadata to be attached to the corresponding definition. This is added to the default metadata assigned to the node, which consists of the node’s schema (if present).

  • partitions_def (Optional[PartitionsDefinition]) – experimental (This parameter may break in future versions, even between dot releases.) Defines the set of partition keys that compose the dbt assets.

  • partition_key_to_vars_fn (Optional[str -> Dict[str, Any]]) – experimental (This parameter may break in future versions, even between dot releases.) A function to translate a given partition key (e.g. ‘2022-01-01’) to a dictionary of vars to be passed into the dbt invocation (e.g. {“run_date”: “2022-01-01”})

Returns:

A definition for the loaded assets.

Return type:

CacheableAssetsDefinition

Examples

from dagster import repository
from dagster_dbt import dbt_cloud_resource, load_assets_from_dbt_cloud_job

DBT_CLOUD_JOB_ID = 1234

dbt_cloud = dbt_cloud_resource.configured(
    {
        "auth_token": {"env": "DBT_CLOUD_API_TOKEN"},
        "account_id": {"env": "DBT_CLOUD_ACCOUNT_ID"},
    }
)

dbt_cloud_assets = load_assets_from_dbt_cloud_job(
    dbt_cloud=dbt_cloud, job_id=DBT_CLOUD_JOB_ID
)


@repository
def dbt_cloud_sandbox():
    return [dbt_cloud_assets]

Ops (dbt Cloud)

dagster_dbt.dbt_cloud_run_op = <dagster._core.definitions.op_definition.OpDefinition object>[source]

Config Schema:
job_id (dagster.IntSource):

The integer ID of the relevant dbt Cloud job. You can find this value by going to the details page of your job in the dbt Cloud UI. It will be the final number in the url, e.g.: https://cloud.getdbt.com/#/accounts/{account_id}/projects/{project_id}/jobs/{job_id}/

poll_interval (Union[Float, None], optional):

The time (in seconds) that will be waited between successive polls.

Default Value: 10

poll_timeout (Union[Float, None], optional):

The maximum time that will waited before this operation is timed out. By default, this will never time out.

yield_materializations (Union[dagster.BoolSource, None], optional):

If True, materializations corresponding to the results of the dbt operation will be yielded when the op executes.

Default Value: True

asset_key_prefix (Union[List[dagster.StringSource], None], optional):

If provided and yield_materializations is True, these components will be used to prefix the generated asset keys.

Default Value: [‘dbt’]

Initiates a run for a dbt Cloud job, then polls until the run completes. If the job fails or is otherwised stopped before succeeding, a dagster.Failure exception will be raised, and this op will fail.

It requires the use of a ‘dbt_cloud’ resource, which is used to connect to the dbt Cloud API.

Config Options:

job_id (int)

The integer ID of the relevant dbt Cloud job. You can find this value by going to the details page of your job in the dbt Cloud UI. It will be the final number in the url, e.g.: https://cloud.getdbt.com/#/accounts/{account_id}/projects/{project_id}/jobs/{job_id}/

poll_interval (float)

The time (in seconds) that will be waited between successive polls. Defaults to 10.

poll_timeout (float)

The maximum time (in seconds) that will waited before this operation is timed out. By default, this will never time out.

yield_materializations (bool)

If True, materializations corresponding to the results of the dbt operation will be yielded when the solid executes. Defaults to True.

rasset_key_prefix (float)

If provided and yield_materializations is True, these components will be used to ” prefix the generated asset keys. Defaults to [“dbt”].

Examples:

from dagster import job
from dagster_dbt import dbt_cloud_resource, dbt_cloud_run_op

my_dbt_cloud_resource = dbt_cloud_resource.configured(
    {"auth_token": {"env": "DBT_CLOUD_AUTH_TOKEN"}, "account_id": 77777}
)
run_dbt_nightly_sync = dbt_cloud_run_op.configured(
    {"job_id": 54321}, name="run_dbt_nightly_sync"
)

@job(resource_defs={"dbt_cloud": my_dbt_cloud_resource})
def dbt_cloud():
    run_dbt_nightly_sync()

Resources (dbt Cloud)

class dagster_dbt.DbtCloudClientResource(*, auth_token, account_id, disable_schedule_on_trigger=True, request_max_retries=3, request_retry_delay=0.25, dbt_cloud_host='https://cloud.getdbt.com/')[source]

This resource helps interact with dbt Cloud connectors.

Deprecated (dbt Cloud)

dagster_dbt.dbt_cloud_resource ResourceDefinition[source]

Config Schema:
auth_token (dagster.StringSource):

dbt Cloud API Token. User tokens can be found in the [dbt Cloud UI](https://cloud.getdbt.com/#/profile/api/), or see the [dbt Cloud Docs](https://docs.getdbt.com/docs/dbt-cloud/dbt-cloud-api/service-tokens) for instructions on creating a Service Account token.

account_id (dagster.IntSource):

dbt Cloud Account ID. This value can be found in the url of a variety of views in the dbt Cloud UI, e.g. https://cloud.getdbt.com/#/accounts/{account_id}/settings/.

disable_schedule_on_trigger (Union[dagster.BoolSource, None], optional):

Specifies if you would like any job that is triggered using this resource to automatically disable its schedule.

Default Value: True

request_max_retries (Union[dagster.IntSource, None], optional):

The maximum number of times requests to the dbt Cloud API should be retried before failing.

Default Value: 3

request_retry_delay (Union[Float, None], optional):

Time (in seconds) to wait between each request retry.

Default Value: 0.25

dbt_cloud_host (Union[dagster.StringSource, None], optional):

The hostname where dbt cloud is being hosted (e.g. https://my_org.cloud.getdbt.com/).

Default Value:https://cloud.getdbt.com/

This resource allows users to programatically interface with the dbt Cloud Administrative REST API (v2) to launch jobs and monitor their progress. This currently implements only a subset of the functionality exposed by the API.

For a complete set of documentation on the dbt Cloud Administrative REST API, including expected response JSON schemae, see the dbt Cloud API Docs.

To configure this resource, we recommend using the configured method.

Examples:

from dagster import job
from dagster_dbt import dbt_cloud_resource

my_dbt_cloud_resource = dbt_cloud_resource.configured(
    {
        "auth_token": {"env": "DBT_CLOUD_AUTH_TOKEN"},
        "account_id": {"env": "DBT_CLOUD_ACCOUNT_ID"},
    }
)

@job(resource_defs={"dbt_cloud": my_dbt_cloud_resource})
def my_dbt_cloud_job():
    ...

Types

class dagster_dbt.DbtOutput(result)[source]

deprecated This API will be removed in version 0.24.0.

Base class for both DbtCliOutput and DbtRPCOutput. Contains a single field, result, which represents the dbt-formatted result of the command that was run (if any).

Used internally, should not be instantiated directly by the user.

class dagster_dbt.DbtResource(logger=None)[source]

deprecated This API will be removed in version 0.24.0.

Use DbtCliResource instead.

Errors

exception dagster_dbt.DagsterDbtError(description=None, metadata=None, allow_retries=None)[source]

The base exception of the dagster-dbt library.

exception dagster_dbt.DagsterDbtCliRuntimeError(description, logs=None, raw_output=None, messages=None)[source]

Represents an error while executing a dbt CLI command.

exception dagster_dbt.DagsterDbtCliFatalRuntimeError(logs=None, raw_output=None, messages=None)[source]

deprecated This API will be removed in version 0.24.0.

Represents a fatal error in the dbt CLI (return code 2).

exception dagster_dbt.DagsterDbtCliHandledRuntimeError(logs=None, raw_output=None, messages=None)[source]

deprecated This API will be removed in version 0.24.0.

Represents a model error reported by the dbt CLI at runtime (return code 1).

exception dagster_dbt.DagsterDbtCliOutputsNotFoundError(path)[source]

deprecated This API will be removed in version 0.24.0.

Represents a problem in finding the target/run_results.json artifact when executing a dbt CLI command.

For more details on target/run_results.json, see https://docs.getdbt.com/reference/dbt-artifacts#run_resultsjson.

exception dagster_dbt.DagsterDbtCliUnexpectedOutputError(invalid_line_nos)[source]

Represents an error when parsing the output of a dbt CLI command.

invalid_line_nos

Utils

dagster_dbt.default_group_from_dbt_resource_props(dbt_resource_props)[source]

Get the group name for a dbt node.

If a Dagster group is configured in the metadata for the node, use that.

Otherwise, if a dbt group is configured for the node, use that.

dagster_dbt.group_from_dbt_resource_props_fallback_to_directory(dbt_resource_props)[source]

Get the group name for a dbt node.

Has the same behavior as the default_group_from_dbt_resource_props, except for that, if no group can be determined from config or metadata, falls back to using the subdirectory of the models directory that the source file is in.

Parameters:

dbt_resource_props (Mapping[str, Any]) – A dictionary representing the dbt resource.

Examples

from dagster_dbt import group_from_dbt_resource_props_fallback_to_directory

dbt_assets = load_assets_from_dbt_manifest(
    manifest=manifest,
    node_info_to_group_fn=group_from_dbt_resource_props_fallback_to_directory,
)
dagster_dbt.default_metadata_from_dbt_resource_props(dbt_resource_props)[source]
dagster_dbt.utils.generate_materializations(dbt_output, asset_key_prefix=None)[source]

This function yields dagster.AssetMaterialization events for each model updated by a dbt command.

Information parsed from a DbtOutput object.

Examples

from dagster import op, Output
from dagster_dbt.utils import generate_materializations
from dagster_dbt import dbt_cli_resource

@op(required_resource_keys={"dbt"})
def my_custom_dbt_run(context):
    dbt_output = context.resources.dbt.run()
    for materialization in generate_materializations(dbt_output):
        # you can modify the materialization object to add extra metadata, if desired
        yield materialization
    yield Output(my_dbt_output)

@job(resource_defs={{"dbt":dbt_cli_resource}})
def my_dbt_cli_job():
    my_custom_dbt_run()