Ask AI

dbt Cloud & Dagster#

Using the local dbt Core CLI? Check out the dbt Core with Dagster guide!

Dagster allows you to run dbt Cloud alongside other technologies like Spark, Python, etc., and has built-in support for loading dbt Cloud models, seeds, and snapshots as Software-defined Assets.


Prerequisites#

To get started, you will need to install the dagster and dagster-dbt Python packages:

pip install dagster dagster-dbt

You'll also want to have a dbt Cloud instance with an existing project that is deployed with a dbt Cloud job. If you don't have one already, you can set up dbt Cloud with a sample project. Refer to the dbt documentation for a quickstart guide specific to your data warehouse.

To manage the dbt Cloud job from Dagster, you'll need three values:

  1. An auth_token for connecting with the dbt Cloud API, stored in an environment variable DBT_CLOUD_API_TOKEN. The auth_token can also be found by generating a Service account token in the dbt Cloud console.
  2. The account_id of your dbt Cloud account, stored in an environment variable DBT_CLOUD_ACCOUNT_ID, and
  3. The job_id of the dbt Cloud job you want to manage in Dagster

The account_id and job_id can be obtained by inspecting the URL of the dbt Cloud job in the dbt Cloud console. For example, in this screenshot, the account_id is 111111 and the job_id is 33333:

Screenshot of the dbt Cloud console on the job page.

Step 1: Connecting to dbt Cloud#

The first step in using dbt Cloud with Dagster is to tell Dagster how to connect to your dbt Cloud instance using a dbt Cloud resource. This resource contains information on where the dbt Cloud instance is located and any credentials sourced from environment variables that are needed to access it.

from dagster_dbt import DbtCloudClientResource
from dagster import EnvVar

dbt_cloud_instance = DbtCloudClientResource(
    auth_token=EnvVar("DBT_CLOUD_API_TOKEN"),
    account_id=EnvVar.int("DBT_CLOUD_ACCOUNT_ID"),
)

In this example, EnvVar is used to pass in dbt Cloud credentials.


Step 2: Loading dbt Cloud models as assets#

In this step, you'll load the dbt Cloud models managed by a dbt Cloud job into Dagster as Software-defined Assets.

For context, a dbt Cloud job defines set of commands to run for a dbt Cloud project. The dbt Cloud models managed by a dbt Cloud job are the models that are run by the job after filtering options are respected.

Using the dbt Cloud resource, you can retrieve information about the models that the dbt Cloud job is managing:

from dagster_dbt import load_assets_from_dbt_cloud_job

# Use the dbt_cloud_instance resource we defined in Step 1, and the job_id from Prerequisites
dbt_cloud_assets = load_assets_from_dbt_cloud_job(
    dbt_cloud=dbt_cloud_instance,
    job_id=33333,
)
Note: Dagster supports dbt Cloud jobs with multiple commands, but one of the commands must be a materialization command. A materialization command is one of either dbt run or dbt build, along with any optional command line arguments.

The load_assets_from_dbt_cloud_job function loads the dbt Cloud models into Dagster as assets, creating one Dagster asset for each model.

When invoked, the function:

  1. Invokes your dbt Cloud job with command overrides to compile your dbt project,
  2. Parses the metadata provided by dbt Cloud, and
  3. Generates a set of Software-defined Assets reflecting the models in the project managed by the dbt Cloud job. Materializing these assets will run the dbt Cloud job that is represented by the loaded assets.

Step 3: Schedule dbt Cloud job runs#

Now that your dbt Cloud assets are loaded, you can define a Dagster job that materializes some or all of these assets, triggering the underlying dbt Cloud job.

You can explicitly define when assets should be materialized. For example, you can schedule assets based on their upstream or downstream dependencies, external events using a sensor, or a cron schedule.

from dagster import (
    ScheduleDefinition,
    define_asset_job,
    AssetSelection,
    Definitions,
)

# Materialize all assets
run_everything_job = define_asset_job("run_everything_job", AssetSelection.all())

defs = Definitions(
    # Use the dbt_cloud_assets defined in Step 2
    assets=[dbt_cloud_assets],
    schedules=[
        ScheduleDefinition(
            job=run_everything_job,
            cron_schedule="@daily",
        ),
    ],
)

Step 4: Cache the dbt Cloud job compilation#

If developing locally, this step is optional.

In production, however, caching is recommended as the latency of Step 2 (loading dbt models as Dagster assets) will scale with the size of the dbt project in dbt Cloud.

You'll need the following secrets in your GitHub repository and local environment:

  • DBT_CLOUD_API_TOKEN - The API token for your dbt Cloud account, as described in the Prerequisites.
  • DBT_CLOUD_ACCOUNT_ID - The ID of your dbt Cloud account, as described in the Prerequisites.
  • DBT_CLOUD_PROJECT_ID - The ID of your dbt Cloud project, that corresponds to the dbt project managed in git.

Step 4.1 Add DBT_DAGSTER_COMPILE_RUN_ID environment variable to dbt Cloud#

In order to cache the compilation of your dbt project, you'll need to add an environment variable to your dbt Cloud project. Navigate to dbt Cloud's Environment Variables, and set the DBT_DAGSTER_COMPILE_RUN_ID variable with default values of -1 at the project and environment level.

Step 4.2 Override DBT_DAGSTER_COMPILE_RUN_ID on dbt Cloud jobs managed by Dagster#

Next, in the dbt Cloud jobs that you are managing with Dagster, you'll need to override the DBT_DAGSTER_COMPILE_RUN_ID environment variable. The presence of this override at the job level will cause the dbt Cloud job to use the cached compilation of your dbt project. Set the job level override to also be -1.

Step 4.3 Test the cache locally#

To populate the cache for your dbt Cloud jobs, run:

dagster-dbt-cloud cache-compile-references

This command will compile your dbt project and cache the compilation. The compilation will be cached for each dbt Cloud job that you are managing with Dagster. After running this command, the DBT_DAGSTER_COMPILE_RUN_ID environment variable for your job will now be updated to the run ID of the cached compilation.

When running dagster dev locally, your dbt Cloud assets will now be loaded from the cached compilation, rather than compiling when the Dagster definitions are loaded.

Step 4.4: Set up a GitHub action to automate the cache#

A reference to this cached compilation can be automated by using a GitHub action on your dbt Cloud project. For example, add a GitHub action that will cache the compilation of your dbt project. This action can be triggered on a push to your main branch:

name: Compile dbt Cloud project
on:
  push:
    branches:
      - main
jobs:
  build:
    runs-on: ubuntu-latest
    steps:
      - uses: actions/checkout@v2
      - uses: actions/setup-python@v2
        with:
          python-version: "3.9"
      - name: Install dependencies
        run: |
          python -m pip install --upgrade pip
          pip install dagster_dbt
      - name: Run commit script
        run: python -m dagster_dbt.cloud.cli cache-compile-references
        env:
          DBT_CLOUD_API_TOKEN: ${{ secrets.DBT_CLOUD_API_TOKEN }}
          DBT_CLOUD_ACCOUNT_ID: ${{ secrets.DBT_CLOUD_ACCOUNT_ID }}
          DBT_CLOUD_PROJECT_ID: ${{ secrets.DBT_CLOUD_PROJECT_ID }}

What's next?#

By now, you should have a working dbt Cloud and Dagster integration and a handful of materialized Dagster assets.

What's next? From here, you can: