DagsterDocs

Using dbt with Dagster #

This guide explains how you can run a dbt project as part of a Dagster job.

What is dbt? #

dbt (data build tool) helps engineers transform data in their warehouses by simply writing SELECT statements. dbt automatically builds a dependency graph for your transformations and turns these SELECT statements into tables and views in your data warehouse.

dbt not only runs your data transformations, but also can create data quality tests and generate documentation for your data, right out of the box. To learn more about dbt, visit the official dbt documentation website.

How does dbt work with Dagster? #

Dagster orchestrates dbt alongside other technologies, so you can combine dbt with Spark, Python, etc. in a single job. Dagster also provides built-in operational and data observability capabilities, like storing dbt run results longitudinally and sending alerts when a dbt run fails.

dagster-dbt is an integration library that provides pre-built resources for using dbt together with Dagster. These are all designed to be configurable for any dbt project.

The resources that dagster_dbt provides are

  • dbt_cli_resource (DbtCliResource): for running dbt CLI commands
  • dbt_rpc_resource (DbtRpcClient): for sending dbt commands to an RPC server

Both create resources that derive from the same base class, DbtResource.

This library also provides pre-built ops that can be integrated into your jobs, but we recommend using the resources for most use cases, as they provide much more flexibility in how you interact with dbt. To view example jobs that take advantage of this dbt integration, check out either the Hacker News example repo or the dbt example repo.

Using the dbt CLI resource in a Dagster job #

dagster-dbt provides a dbt_cli_resource to help make it easy to run commands through the dbt CLI.

When you supply this resource to an op, you can call any of the many provided methods to invoke that particular CLI command. You can check out a full list of functions (as well as their signatures) in the DbtCliResource API Docs. All methods on the resource will return a DbtCliOutput object.

To run dbt CLI commands, your dbt project directory must be on your local filesystem and you must have a dbt profile already set up to connect to your data warehouse. Visit the official dbt CLI documentation for more details.

Configuration #

When you are configuring the dbt_cli_resource, you have a number of options available to you. Here, you're able to specify any command line options that you'd want to pass into all of your dbt commands.

Typically, you'll want to configure your project_dir here, as in most cases, you will only be working with a single dbt project in a given job, and wouldn't want to have to pass in this option to every function call. You might want to configure your profiles_dir, or the specific profile you'll be using for similar reasons.

from dagster_dbt import dbt_cli_resource

my_dbt_resource = dbt_cli_resource.configured(
    {"project_dir": "path/to/dbt/project", "profiles_dir": "path/to/dbt/profiles"}
)

While the config schema doesn't have an option for every single dbt flag (as some flags only work with certain commands), if you configure a flag that is not in the schema, it will still get passed into every cli invocation, exactly the same as the pre-defined config options.

There are also a few options that are not associated with command line flags, which may be useful. These are:

  • warn_error: will raise an error for issues that dbt would normally just warn on
  • target_path: the dbt target path, if you set it to something other than the default
  • dbt_executable: the name of the specific dbt executable you're using, if it's not just dbt

Examples #

Below are some examples of using the dbt_cli_resource in different scenarios. Most dbt commands (such as run, seed, test, etc.) have corresponding functions, which work essentially identically, but these examples will focus on dbt run for simplicity.

Note that you can pass in any keyword to these functions that you wish, and they will get added as flags to the underlying dbt command (e.g. my_flag_name = 'foo' will get converted to --my-flag-name foo). If there is a dbt option that you would like to set, but is not reflected in the function signature, this is how you would do so.

Using dbt_cli_resource to run your entire dbt project #

One common way to use this integration is to have the resource run all of the models in a dbt project. To do this, just configure the resource so it knows where your dbt project is, and fire off a dbt.run() command!

from dagster import job, op
from dagster_dbt import dbt_cli_resource

my_dbt_resource = dbt_cli_resource.configured({"project_dir": "path/to/dbt/project"})

@op(required_resource_keys={"dbt"})
def run_all_models(context):
    context.resources.dbt.run()

@job(resource_defs={"dbt": my_dbt_resource})
def my_dbt_job():
    run_all_models()

Using dbt_cli_resource to run a specific set of models #

Sometimes, you just want to run a select set of models in your dbt project, rather than the entire thing. The below examples show two ways of doing this, depending on your use case.

Note that in both cases, the models option takes in a list of strings. The string "tag:staging" uses dbt's node selection syntax to filter models with the tag "staging". For more details, visit the official dbt documentation on the node selection syntax.

... with configuration #

If you know what models you want to select ahead of time, you might prefer specifying this while configuring your resource.

from dagster import job, op
from dagster_dbt import dbt_cli_resource

my_dbt_resource = dbt_cli_resource.configured(
    {"project_dir": "path/to/dbt/project", "models": ["tag:staging"]}
)

@op(required_resource_keys={"dbt"})
def run_models(context):
    context.resources.dbt.run()

@job(resource_defs={"dbt": my_dbt_resource})
def my_dbt_job():
    run_models()
... supplying an argument #

If you want to change which models you select depending on what happens during execution, you can supply this as an argument to the function call.

from dagster import op

@op(required_resource_keys={"dbt"})
def run_models(context, some_condition: bool):
    if some_condition:
        context.resources.dbt.run(models=["tag:staging"])
    else:
        context.resources.dbt.run(models=["tag:other"])

Using a different dbt profile for different dagster modes #

Dagster supports multiple jobs for the same graph. dbt has a similar concept, profiles. You might want to run a dev version of your graph that targets the development-specific dbt profile, and then have a prod version that runs using the prod dbt profile. This example shows how to accomplish this.

from dagster import graph, op
from dagster_dbt import dbt_cli_resource

@op(required_resource_keys={"dbt"})
def run_all_models(context):
    context.resources.dbt.run()

@graph
def my_dbt():
    run_all_models()

my_dbt_graph_dev = my_dbt.to_job(
    resource_defs={
        "dbt": dbt_cli_resource.configured(
            {"project_dir": "path/to/dbt/project", "profile": "dev"}
        )
    }
)

my_dbt_graph_prod = my_dbt.to_job(
    resource_defs={
        "dbt": dbt_cli_resource.configured(
            {"project_dir": "path/to/dbt/project", "profile": "prod"}
        )
    }
)

Invoking multiple dbt commands in the same job #

Sometimes, you'll want to run multiple different dbt commands in the same job. The dbt_cli_resource makes this convenient, as you only need to configure your dbt resource once, and all of that configuration will already be set for any ops that are using this resource.

One common use case would be to first run dbt run to update all of your models, and then run dbt test to check that they all are working as expected, seen below.

from dagster import job, op
from dagster_dbt import dbt_cli_resource, DbtCliOutput

my_dbt_resource = dbt_cli_resource.configured({"project_dir": "path/to/dbt/project"})

@op(required_resource_keys={"dbt"})
def run_models(context) -> DbtCliOutput:
    return context.resources.dbt.run()

@op(required_resource_keys={"dbt"})
def test_models(context, run_result: DbtCliOutput):
    context.log.info(f"testing result of `{run_result.command}`!")
    context.resources.dbt.test()

@job(resource_defs={"dbt": my_dbt_resource})
def my_dbt_job():
    run_result = run_models()
    test_models(run_result)

Using a dbt RPC server in a Dagster job #

Alongside the dbt_rpc_resource (which functions similarly to the dbt_cli_resource), dagster-dbt provides ops for running commands through the dbt RPC server. By convention, these ops are named dagster_dbt.dbt_rpc_*.

Note Executing an RPC command is a non-blocking operation, meaning it does not wait for the command to complete before returning. Instead, it will return a response from the RPC server indicating that the request was received. To check the status and results of your command, you will need to use the poll() method.

Configuration #

Your dbt RPC server can be running locally or remotely. To use the dbt RPC ops in your Dagster job, you will need to create a resource for your dbt RPC server.

dagster_dbt.dbt_rpc_resource can be configured with your specific host and port.

from dagster_dbt import dbt_rpc_resource

my_remote_rpc = dbt_rpc_resource.configured({"host": "80.80.80.80", "port": 8080})

For convenience during local development, you may also use dagster_dbt.local_dbt_rpc_resource, which is preconfigured for a dbt RPC server that is running on http://localhost:8580.

Examples #

Using dbt_rpc_run to send a request to run your entire dbt project #

from dagster import job
from dagster_dbt import dbt_rpc_run

@job(resource_defs={"dbt_rpc": my_remote_rpc})
def my_dbt_job():
    dbt_rpc_run()

The code snippet above shows a Dagster job with a single op dbt_rpc_run. The op dbt_rpc_run has a required resource key "dbt_rpc". So, any job that uses dbt_rpc_run will need a resource under the key "dbt_rpc".

Using dbt_rpc_run to run specific models in a dbt project #

This is similar to having "params": {"models": "tag:staging"} in your dbt RPC request body.

from dagster import job
from dagster_dbt import dbt_rpc_run

run_staging_models = dbt_rpc_run.configured(
    {"models": ["tag:staging"]},
    name="run_staging_models",
)

@job(resource_defs={"dbt_rpc": my_remote_rpc})
def my_dbt_job():
    run_staging_models()

Note that the op above will NOT wait until the dbt RPC server has finished executing your request. Instead, it will return immediately with a request token from the dbt RPC server. If you want the op to wait until execution is finished, see the dagster_dbt.dbt_rpc_run_and_wait.

Using dbt_rpc_run_and_wait to run specific models in a dbt project and poll the dbt RPC server until it has finished executing your request #

from dagster import job
from dagster_dbt import dbt_rpc_run_and_wait

@job(resource_defs={"dbt_rpc": my_remote_rpc})
def my_dbt_job():
    dbt_rpc_run_and_wait()

Use dbt Cloud in a Dagster job #

dagster_dbt currently does not provide ops or resources for invoking dbt commands via dbt Cloud. However, this use case is possible by writing your own op to create and start Jobs via the dbt Cloud API. For more details about each HTTP endpoint, visit the official documentation for the dbt Cloud API.

Advanced Configuration #

For full documentation on all available config, visit the API docs for dagster-dbt.

dbt CLI: Set the dbt profile and target to load

from dagster import job
from dagster_dbt import dbt_cli_resource

config = {"profile": PROFILE_NAME, "target": TARGET_NAME}

@job(resource_defs={"dbt": dbt_cli_resource.configured(config)})
def my_job():
    # ...

dbt CLI: Set the path to the dbt executable

from dagster import job
from dagster_dbt import dbt_cli_resource

config = {"dbt_executable": "path/to/dbt/executable"}

@job(resource_defs={"dbt": dbt_cli_resource.configured(config)})
def my_job():
    # ...

dbt CLI: Select specific models to run

from dagster import job
from dagster_dbt import dbt_cli_resource

config = {"models": ["my_dbt_model+", "path.to.models", "tag:nightly"]}

@job(resource_defs={"dbt": dbt_cli_resource.configured(config)})
def my_job():
    # ...

For more details, visit the official documentation on dbt's node selection syntax.

dbt CLI: Exclude specific models

from dagster import job
from dagster_dbt import dbt_cli_resource

config = {"exclude": ["my_dbt_model+", "path.to.models", "tag:nightly"]}

@job(resource_defs={"dbt": dbt_cli_resource.configured(config)})
def my_job():
    # ...

For more details, visit the official documentation on dbt's node selection syntax.

dbt CLI: Set key-values for dbt vars

from dagster import job
from dagster_dbt import dbt_cli_resource

config = {"vars": {"key": "value"}}

@job(resource_defs={"dbt": dbt_cli_resource.configured(config)})
def my_job():
    # ...

For more details, visit the official documentation on using variables in dbt.

dbt RPC: Configure a remote dbt RPC resource

from dagster_dbt import dbt_rpc_resource

custom_resource = dbt_rpc_resource.configured({"host": HOST, "post": PORT})

dbt RPC: Select specific models to run

config = {"models": ["my_dbt_model+", "path.to.models", "tag:nightly"]}

from dagster_dbt import dbt_rpc_run

custom_op = dbt_rpc_run.configured(config, name="custom_op")

For more details, visit the official documentation on dbt's node selection syntax.

dbt RPC: Exclude specific models

config = {"exclude": ["my_dbt_model+", "path.to.models", "tag:nightly"]}

from dagster_dbt import dbt_rpc_run

custom_op = dbt_rpc_run.configured(config, name="custom_op")

For more details, visit the official documentation on dbt's node selection syntax.

dbt RPC: Configure polling interval when using a dbt_rpc_*_and_wait op

config = {"interval": 3}  # Poll the dbt RPC server every 3 seconds.

from dagster_dbt import dbt_rpc_run

custom_op = dbt_rpc_run.configured(config, name="custom_op")

dbt RPC: Disable default asset materializations

config = {"yield_materializations": False}

from dagster_dbt import dbt_rpc_run

custom_op = dbt_rpc_run.configured(config, name="custom_op")

Conclusion #

If you find a bug or want to add a feature to the dagster-dbt library, we invite you to contribute.

If you have questions on using dbt with Dagster, we'd love to hear from you:

join-us-on-slack