DagsterDocs

Using dbt with Dagster#

This guide explains how you can run a dbt project in your Dagster pipelines.

What is dbt?#

dbt (data build tool) helps engineers transform data in their warehouses by simply writing SELECT statements. dbt automatically builds a dependency graph for your transformations and turns these SELECT statements into tables and views in your data warehouse.

dbt not only runs your data transformations, but also can create data quality tests and generate documentation for your data, right out of the box. To learn more about dbt, visit the official dbt documentation website.

How does dbt work with Dagster?#

Dagster orchestrates dbt alongside other technologies, so you can combine dbt with Spark, Python, etc. in a single pipeline. Dagster also provides built-in operational and data observability capabilities, like storing dbt run results longitudinally and sending alerts when a dbt run fails.

dagster-dbt is an integration library that provides pre-built resources for using dbt together with Dagster. These are all designed to be configurable for any dbt project.

The resources that dagster_dbt provides are

  • dbt_cli_resource (DbtCliResource): for running dbt CLI commands
  • dbt_rpc_resource (DbtRpcClient): for sending dbt commands to an RPC server

Both create resources that derive from the same base class, DbtResource.

This library also provides pre-built solids that can be integrated into your pipelines, but we recommend using the resources for most use cases, as they provide much more flexibility in how you interact with dbt.

Using the dbt CLI resource in a Dagster pipeline#

dagster-dbt provides a dbt_cli_resource to help make it easy to run commands through the dbt CLI.

When you supply this resource to a solid, you can call any of the many provided methods to invoke that particular CLI command. You can check out a full list of functions (as well as their signatures) in the DbtCliResource API Docs. All methods on the resource will return a DbtCliOutput object.

To run dbt CLI commands, your dbt project directory must be on your local filesystem and you must have a dbt profile already set up to connect to your data warehouse. Visit the official dbt CLI documentation for more details.

Configuration#

When you are configuring the dbt_cli_resource, you have a number of options available to you. Here, you're able to specify any command line options that you'd want to pass into all of your dbt commands.

Typically, you'll want to configure your project_dir here, as in most cases, you will only be working with a single dbt project in a given pipeline, and wouldn't want to have to pass in this option to every function call. You might want to configure your profiles_dir, or the specific profile you'll be using for similar reasons.

from dagster_dbt import dbt_cli_resource

my_dbt_resource = dbt_cli_resource.configured(
    {"project_dir": "path/to/dbt/project", "profiles_dir": "path/to/dbt/profiles"}
)

While the config schema doesn't have an option for every single dbt flag (as some flags only work with certain commands), if you configure a flag that is not in the schema, it will still get passed into every cli invocation, exactly the same as the pre-defined config options.

There are also a few options that are not associated with command line flags, which may be useful. These are:

  • warn_error: will raise an error for issues that dbt would normally just warn on
  • target_path: the dbt target path, if you set it to something other than the default
  • dbt_executable: the name of the specific dbt executable you're using, if it's not just dbt

Examples#

Below are some examples of using the dbt_cli_resource in different scenarios. Most dbt commands (such as run, seed, test, etc.) have corresponding functions, which work essentially identically, but these examples will focus on dbt run for simplicity.

Note that you can pass in any keyword to these functions that you wish, and they will get added as flags to the underlying dbt command (e.g. my_flag_name = 'foo' will get converted to --my-flag-name foo). If there is a dbt option that you would like to set, but is not reflected in the function signature, this is how you would do so.

Using dbt_cli_resource to run your entire dbt project#

One common way to use this integration is to have the resource run all of the models in a dbt project. To do this, just configure the resource so it knows where your dbt project is, and fire off a dbt.run() command!

from dagster import pipeline, solid, ModeDefinition
from dagster_dbt import dbt_cli_resource

my_dbt_resource = dbt_cli_resource.configured({"project_dir": "path/to/dbt/project"})

@solid(required_resource_keys={"dbt"})
def run_all_models(context):
    context.resources.dbt.run()

@pipeline(mode_defs=[ModeDefinition(resource_defs={"dbt": my_dbt_resource})])
def my_dbt_pipeline():
    run_all_models()

Using dbt_cli_resource to run a specific set of models#

Sometimes, you just want to run a select set of models in your dbt project, rather than the entire thing. The below examples show two ways of doing this, depending on your use case.

Note that in both cases, the models option takes in a list of strings. The string "tag:staging" uses dbt's node selection syntax to filter models with the tag "staging". For more details, visit the official dbt documentation on the node selection syntax.

... with configuration#

If you know what models you want to select ahead of time, you might prefer specifying this while configuring your resource.

from dagster import pipeline, solid, ModeDefinition
from dagster_dbt import dbt_cli_resource

my_dbt_resource = dbt_cli_resource.configured(
    {"project_dir": "path/to/dbt/project", "models": ["tag:staging"]}
)

@solid(required_resource_keys={"dbt"})
def run_models(context):
    context.resources.dbt.run()

@pipeline(mode_defs=[ModeDefinition(resource_defs={"dbt": my_dbt_resource})])
def my_dbt_pipeline():
    run_models()
... supplying an argument#

If you want to change which models you select depending on what happens in your pipeline, you can supply this as an argument to the function call.

from dagster import solid

@solid(required_resource_keys={"dbt"})
def run_models(context, some_condition: bool):
    if some_condition:
        context.resources.dbt.run(models=["tag:staging"])
    else:
        context.resources.dbt.run(models=["tag:other"])

Using a different dbt profile for different dagster modes#

Dagster supports running pipelines in different Modes. dbt has a similar concept, profiles. You might want to run a dev version of your pipeline that targets the development-specific dbt profile, and then have a prod version that runs using the prod dbt profile. This example shows how to accomplish this.

from dagster import pipeline, solid, ModeDefinition
from dagster_dbt import dbt_cli_resource

@solid(required_resource_keys={"dbt"})
def run_all_models(context):
    context.resources.dbt.run()

@pipeline(
    mode_defs=[
        ModeDefinition(
            "dev",
            resource_defs={
                "dbt": dbt_cli_resource.configured(
                    {"project_dir": "path/to/dbt/project", "profile": "dev"}
                )
            },
        ),
        ModeDefinition(
            "prod",
            resource_defs={
                "dbt": dbt_cli_resource.configured(
                    {"project_dir": "path/to/dbt/project", "profile": "prod"}
                )
            },
        ),
    ]
)
def my_dbt_pipeline():
    run_all_models()

Invoking multiple dbt commands in the same pipeline#

Sometimes, you'll want to run multiple different dbt commands in the same pipeline. The dbt_cli_resource makes this convenient, as you only need to configure your dbt resource once, and all of that configuration will already be set for any solids that are using this resource.

One common use case would be to first run dbt run to update all of your models, and then run dbt test to check that they all are working as expected, seen below.

from dagster import pipeline, solid, ModeDefinition
from dagster_dbt import dbt_cli_resource, DbtCliOutput

my_dbt_resource = dbt_cli_resource.configured({"project_dir": "path/to/dbt/project"})

@solid(required_resource_keys={"dbt"})
def run_models(context) -> DbtCliOutput:
    return context.resources.dbt.run()

@solid(required_resource_keys={"dbt"})
def test_models(context, run_result: DbtCliOutput):
    context.log.info(f"testing result of `{run_result.command}`!")
    context.resources.dbt.test()

@pipeline(mode_defs=[ModeDefinition(resource_defs={"dbt": my_dbt_resource})])
def my_dbt_pipeline():
    run_result = run_models()
    test_models(run_result)

Using a dbt RPC server in a Dagster pipeline#

Alongside the dbt_rpc_resource (which functions similarly to the dbt_cli_resource), dagster-dbt provides solids for running commands through the dbt RPC server. By convention, these solids are named dagster_dbt.dbt_rpc_*.

Note Executing an RPC command is a non-blocking operation, meaning it does not wait for the command to complete before returning. Instead, it will return a response from the RPC server indicating that the request was received. To check the status and results of your command, you will need to use the poll() method.

Configuration#

Your dbt RPC server can be running locally or remotely. To use the dbt RPC solids in your Dagster pipeline, you will need to create a resource for your dbt RPC server.

dagster_dbt.dbt_rpc_resource can be configured with your specific host and port.

from dagster_dbt import dbt_rpc_resource

my_remote_rpc = dbt_rpc_resource.configured({"host": "80.80.80.80", "port": 8080})

For convenience during local development, you may also use dagster_dbt.local_dbt_rpc_resource, which is preconfigured for a dbt RPC server that is running on http://localhost:8580.

Examples#

Using dbt_rpc_run to send a request to run your entire dbt project#

from dagster import ModeDefinition, pipeline
from dagster_dbt import dbt_rpc_run

@pipeline(mode_defs=[ModeDefinition(resource_defs={"dbt_rpc": my_remote_rpc})])
def my_dbt_pipeline():
    dbt_rpc_run()

The code snippet above shows a Dagster pipeline with a single solid dbt_rpc_run. The solid dbt_rpc_run has a required resource key "dbt_rpc". So, any pipeline that uses dbt_rpc_run will need a ModeDefinition that defines a resource under the key "dbt_rpc".

Using dbt_rpc_run to run specific models in a dbt project#

This is similar to having "params": {"models": "tag:staging"} in your dbt RPC request body.

from dagster import ModeDefinition, pipeline
from dagster_dbt import dbt_rpc_run

run_staging_models = dbt_rpc_run.configured(
    {"models": ["tag:staging"]},
    name="run_staging_models",
)

@pipeline(mode_defs=[ModeDefinition(resource_defs={"dbt_rpc": my_remote_rpc})])
def my_dbt_pipeline():
    run_staging_models()

Note that the solid above will NOT wait until the dbt RPC server has finished executing your request. Instead, it will return immediately with a request token from the dbt RPC server. If you want the solid to wait until execution is finished, see the dagster_dbt.dbt_rpc_run_and_wait.

Using dbt_rpc_run_and_wait to run specific models in a dbt project and poll the dbt RPC server until it has finished executing your request#

from dagster import ModeDefinition, pipeline
from dagster_dbt import dbt_rpc_run_and_wait

@pipeline(mode_defs=[ModeDefinition(resource_defs={"dbt_rpc": my_remote_rpc})])
def my_dbt_pipeline():
    dbt_rpc_run_and_wait()

Use dbt Cloud in a Dagster pipeline#

dagster_dbt currently does not provide solids or resources for invoking dbt commands via dbt Cloud. However, this use case is possible by writing your own solid to create and start Jobs via the dbt Cloud API. For more details about each HTTP endpoint, visit the official documentation for the dbt Cloud API.

Advanced Configuration#

For full documentation on all available config, visit the API docs for dagster-dbt.

dbt CLI: Set the dbt profile and target to load

from dagster import pipeline, ModeDefinition
from dagster_dbt import dbt_cli_resource

config = {"profile": PROFILE_NAME, "target": TARGET_NAME}

@pipeline(
    mode_defs=[ModeDefinition(resource_defs={"dbt": dbt_cli_resource.configured(config)})]
)
def my_pipeline():
    # ...

dbt CLI: Set the path to the dbt executable

from dagster import pipeline, ModeDefinition
from dagster_dbt import dbt_cli_resource

config = {"dbt_executable": "path/to/dbt/executable"}

@pipeline(
    mode_defs=[ModeDefinition(resource_defs={"dbt": dbt_cli_resource.configured(config)})]
)
def my_pipeline():
    # ...

dbt CLI: Select specific models to run

from dagster import pipeline, ModeDefinition
from dagster_dbt import dbt_cli_resource

config = {"models": ["my_dbt_model+", "path.to.models", "tag:nightly"]}

@pipeline(
    mode_defs=[ModeDefinition(resource_defs={"dbt": dbt_cli_resource.configured(config)})]
)
def my_pipeline():
    # ...

For more details, visit the official documentation on dbt's node selection syntax.

dbt CLI: Exclude specific models

from dagster import pipeline, ModeDefinition
from dagster_dbt import dbt_cli_resource

config = {"exclude": ["my_dbt_model+", "path.to.models", "tag:nightly"]}

@pipeline(
    mode_defs=[ModeDefinition(resource_defs={"dbt": dbt_cli_resource.configured(config)})]
)
def my_pipeline():
    # ...

For more details, visit the official documentation on dbt's node selection syntax.

dbt CLI: Set key-values for dbt vars

from dagster import pipeline, ModeDefinition
from dagster_dbt import dbt_cli_resource

config = {"vars": {"key": "value"}}

@pipeline(
    mode_defs=[ModeDefinition(resource_defs={"dbt": dbt_cli_resource.configured(config)})]
)
def my_pipeline():
    # ...

For more details, visit the official documentation on using variables in dbt.

dbt RPC: Configure a remote dbt RPC resource

from dagster_dbt import dbt_rpc_resource

custom_resource = dbt_rpc_resource.configured({"host": HOST, "post": PORT})

dbt RPC: Select specific models to run

config = {"models": ["my_dbt_model+", "path.to.models", "tag:nightly"]}

from dagster_dbt import dbt_rpc_run

custom_solid = dbt_rpc_run.configured(config, name="custom_solid")

For more details, visit the official documentation on dbt's node selection syntax.

dbt RPC: Exclude specific models

config = {"exclude": ["my_dbt_model+", "path.to.models", "tag:nightly"]}

from dagster_dbt import dbt_rpc_run

custom_solid = dbt_rpc_run.configured(config, name="custom_solid")

For more details, visit the official documentation on dbt's node selection syntax.

dbt RPC: Configure polling interval when using a dbt_rpc_*_and_wait solid

config = {"interval": 3}  # Poll the dbt RPC server every 3 seconds.

from dagster_dbt import dbt_rpc_run

custom_solid = dbt_rpc_run.configured(config, name="custom_solid")

dbt RPC: Disable default asset materializations

config = {"yield_materializations": False}

from dagster_dbt import dbt_rpc_run

custom_solid = dbt_rpc_run.configured(config, name="custom_solid")

Conclusion#

If you find a bug or want to add a feature to the dagster-dbt library, we invite you to contribute.

If you have questions on using dbt with Dagster, we'd love to hear from you:

join-us-on-slack