dbt Examples
Here provides examples of how to use dagster-dbt
to integrate
your existing dbt project with the Dagster platform.
This example illustrates how custom solids can be combined with dbt commands in a Dagster pipeline. It will be using the dbt_example_project provided by fishtown-analytics/dbt-starter-project.
Getting Started¶
Before you start, let's make sure you have the environment ready.
Make sure you have the necessary Python libraries installed. Running inside a Python virtualenv is recommended.
pip install -e .
The example dbt project requires a running Postgres database. By default, the project will attempt to connect to
postgresql://dbt_example:dbt_example@localhost:5432/dbt_example
. If you are running Docker, have Docker Compose installed, and aren't running any other process bound to port 5432, you can bring up a default database with this address by running:docker-compose up -d
If you'd like to run the project against a different running database, please set the the following environment variables as appropriate:
DAGSTER_DBT_EXAMPLE_PGHOST
DAGSTER_DBT_EXAMPLE_PGPORT
DAGSTER_DBT_EXAMPLE_PGUSER
DAGSTER_DBT_EXAMPLE_PGPASSWORD
Add the profile for the
dbt_example_project
directory to your dbt 'profiles.yml' file.mkdir -p ~/.dbt/ touch ~/.dbt/profiles.yml cat dbt_example_project/profiles.yml >> ~/.dbt/profiles.yml
Test that this is correctly setup by running
dbt ls
:dbt ls --project-dir dbt_example_project
You are all set.
Execute dbt pipeline using Dagster¶
@pipeline(
mode_defs=[
ModeDefinition(
name="prod",
resource_defs={
"db": postgres,
"slack": slack_resource,
"io_manager": fs_io_manager,
},
),
ModeDefinition(
name="dev",
resource_defs={
"db": postgres,
"slack": mock_slack_resource,
"io_manager": fs_io_manager,
},
),
],
preset_defs=[
PresetDefinition(
name="dev",
run_config={
"solids": {
"download_file": {
"config": {"url": CEREALS_DATASET_URL, "target_path": "cereals.csv"}
},
"post_plot_to_slack": {"config": {"channels": ["foo_channel"]}},
},
"resources": {
"db": {
"config": {
"db_url": "postgresql://dbt_example:dbt_example@localhost:5432/dbt_example"
}
},
"slack": {"config": {"token": "nonce"}},
},
},
mode="dev",
),
PresetDefinition(
name="prod",
run_config={
"solids": {
"download_file": {
"config": {"url": CEREALS_DATASET_URL, "target_path": "cereals.csv"}
},
"post_plot_to_slack": {"config": {"channels": ["foo_channel"]}},
},
"resources": {
"db": {
"config": {
"db_url": "postgresql://dbt_example:dbt_example@localhost:5432/dbt_example"
}
},
"slack": {"config": {"token": "nonce"}},
},
},
mode="prod",
),
],
)
def dbt_example_pipeline():
loaded = load_cereals_from_csv(download_file())
run_results = run_cereals_models(start_after=loaded)
test_cereals_models(start_after=run_results)
post_plot_to_slack(analyze_cereals(run_results))
Run Dagit locally by running the command:
$ dagit -m dbt_example
You will see that Dagit has rendered the pipeline defined by the code above.
The pipeline definition has also provided pre-defined configurations through PresetDefinition
and convenient ways to switch configurations between different modes via ModeDefinition
.
After switching to Dagit's Playground, you can load one of pre-defined Preset to execute the pipeline.
After Launch Execution, you will then see the pipeline being executed and logs as below:
Execute dbt commands in Dagster solids¶
Let's look into one of the solids and see how Dagster works with dbt commands.
run_cereals_models = dbt_cli_run.configured(
config_or_config_fn={"project-dir": PROJECT_DIR, "profiles-dir": PROFILES_DIR},
name="run_cereals_models",
)
For example, the solid run_cereals_models
is created by dagster_dbt.dbt_cli_run
with the configured
Dagster API.
The solid executes dbt run
via the dbt CLI. As you may have noticed in the log above, the solid
also logs the output from the dbt CLI execution in a few AssetMaterialization
events, which allow you to track the external entities in the "Asset Catalog".
You can find more functionalities with detailed API reference in dagster-dbt
library.