dbt Examples

You can find the code for this example on Github.

Here provides examples of how to use dagster-dbt to integrate your existing dbt project with the Dagster platform.

This example illustrates how custom solids can be combined with dbt commands in a Dagster pipeline. It will be using the dbt_example_project provided by fishtown-analytics/dbt-starter-project.

Getting Started

Before you start, let's make sure you have the environment ready.

  1. Make sure you have the necessary Python libraries installed. Running inside a Python virtualenv is recommended.

    pip install -e .
  2. The example dbt project requires a running Postgres database. By default, the project will attempt to connect to postgresql://dbt_example:dbt_example@localhost:5432/dbt_example. If you are running Docker, have Docker Compose installed, and aren't running any other process bound to port 5432, you can bring up a default database with this address by running:

    docker-compose up -d

    If you'd like to run the project against a different running database, please set the the following environment variables as appropriate:

    • DAGSTER_DBT_EXAMPLE_PGHOST
    • DAGSTER_DBT_EXAMPLE_PGPORT
    • DAGSTER_DBT_EXAMPLE_PGUSER
    • DAGSTER_DBT_EXAMPLE_PGPASSWORD
  3. Add the profile for the dbt_example_project directory to your dbt 'profiles.yml' file.

    mkdir -p ~/.dbt/
    touch ~/.dbt/profiles.yml
    cat dbt_example_project/profiles.yml >> ~/.dbt/profiles.yml
    

    Test that this is correctly setup by running dbt ls:

    dbt ls --project-dir dbt_example_project
  4. You are all set.

Execute dbt pipeline using Dagster

pipelines.py
@pipeline(
    mode_defs=[
        ModeDefinition(
            name="prod",
            resource_defs={
                "db": postgres,
                "slack": slack_resource,
                "io_manager": fs_io_manager,
            },
        ),
        ModeDefinition(
            name="dev",
            resource_defs={
                "db": postgres,
                "slack": mock_slack_resource,
                "io_manager": fs_io_manager,
            },
        ),
    ],
    preset_defs=[
        PresetDefinition(
            name="dev",
            run_config={
                "solids": {
                    "download_file": {
                        "config": {"url": CEREALS_DATASET_URL, "target_path": "cereals.csv"}
                    },
                    "post_plot_to_slack": {"config": {"channels": ["foo_channel"]}},
                },
                "resources": {
                    "db": {
                        "config": {
                            "db_url": "postgresql://dbt_example:dbt_example@localhost:5432/dbt_example"
                        }
                    },
                    "slack": {"config": {"token": "nonce"}},
                },
            },
            mode="dev",
        ),
        PresetDefinition(
            name="prod",
            run_config={
                "solids": {
                    "download_file": {
                        "config": {"url": CEREALS_DATASET_URL, "target_path": "cereals.csv"}
                    },
                    "post_plot_to_slack": {"config": {"channels": ["foo_channel"]}},
                },
                "resources": {
                    "db": {
                        "config": {
                            "db_url": "postgresql://dbt_example:dbt_example@localhost:5432/dbt_example"
                        }
                    },
                    "slack": {"config": {"token": "nonce"}},
                },
            },
            mode="prod",
        ),
    ],
)
def dbt_example_pipeline():
    loaded = load_cereals_from_csv(download_file())
    run_results = run_cereals_models(start_after=loaded)
    test_cereals_models(start_after=run_results)
    post_plot_to_slack(analyze_cereals(run_results))

Run Dagit locally by running the command:

$ dagit -m dbt_example

You will see that Dagit has rendered the pipeline defined by the code above.

dbt-example-dagit-pipeline-def.png

The pipeline definition has also provided pre-defined configurations through PresetDefinition and convenient ways to switch configurations between different modes via ModeDefinition.

After switching to Dagit's Playground, you can load one of pre-defined Preset to execute the pipeline.

dbt-example-dagit-playground.png

After Launch Execution, you will then see the pipeline being executed and logs as below:

dbt-example-dagit-solid-result

Execute dbt commands in Dagster solids

Let's look into one of the solids and see how Dagster works with dbt commands.

solids.py
run_cereals_models = dbt_cli_run.configured(
    config_or_config_fn={"project-dir": PROJECT_DIR, "profiles-dir": PROFILES_DIR},
    name="run_cereals_models",
)

For example, the solid run_cereals_models is created by dagster_dbt.dbt_cli_run with the configured Dagster API.

The solid executes dbt run via the dbt CLI. As you may have noticed in the log above, the solid also logs the output from the dbt CLI execution in a few AssetMaterialization events, which allow you to track the external entities in the "Asset Catalog".

dbt-example-dagit-asset

You can find more functionalities with detailed API reference in dagster-dbt library.