Skip to main content

Adjust dbt asset config for incremental models

With lineage in place, there’s one final adjustment to make to our scaffolding. In our dbt project, the daily_metrics model is an incremental model. Incremental models improve performance by avoiding full refreshes: instead of reprocessing everything, they only handle new or modified data based on a time filter.

src/project_dbt/analytics/models/marts/daily_metrics.sql
{{
config(
materialized='incremental',
unique_key='date_of_business',
tags=["daily"]
)
}}

with
trips as (
select *
from {{ ref('stg_trips') }}
),
daily_summary as (
select
date_trunc('day', pickup_datetime) as date_of_business,
count(*) as trip_count,
sum(duration) as total_duration,
sum(duration) / count(*) as average_duration,
sum(total_amount) as total_amount,
sum(total_amount) / count(*) as average_amount,
sum(case when duration > 30 then 1 else 0 end) / count(*) as pct_over_30_min
from trips
group by all
)
select *
from daily_summary

{% if is_incremental() %}
where date_of_business between '{{ var('min_date') }}' and '{{ var('max_date') }}'
{% endif %}

Here’s how incremental models work:

  • First run: the model processes the full dataset with no filters.
  • Subsequent runs: dbt applies the is_incremental() filter, using min_date and max_date values provided at runtime.

1. Include a template var

The first step is to add a new template var to your component. This will be used to define the partitions definition that will be used to partition the assets.

src/project_dbt/defs/transform/template_vars.py
import dagster as dg


@dg.template_var
def daily_partitions_def() -> dg.DailyPartitionsDefinition:
return dg.DailyPartitionsDefinition(start_date="2023-01-01")

2. Update dbt component configuration

The next step is to update the defs.yaml file to use the new template var and apply this partitions definition to all assets using the post_process field:

src/project_dbt/defs/transform/defs.yaml
type: dagster_dbt.DbtProjectComponent

template_vars_module: .template_vars
attributes:
project: '{{ project_root }}/src/project_dbt/analytics'
select: "tag:daily"
translation:
key: "taxi_{{ node.name }}"
group_name: dbt

post_processing:
assets:
- target: "*"
attributes:
partitions_def: "{{ daily_partitions_def }}"

Finally, we need to pass in new configuration to the cli_args field so that the dbt execution actually changes based on what partition is executing. In particular, we want to pass in values to the --vars configuration field that determine the range of time that our incremental models should process.

When the cli_args field is resolved, it has access to a context.partition_time_window object, which is Dagster's representation of the time range that should be processed on the current run. This can be converted into a format recognized by your dbt project using template variables:

src/project_dbt/defs/transform/defs.yaml
type: dagster_dbt.DbtProjectComponent

template_vars_module: .template_vars
attributes:
project: '{{ project_root }}/src/project_dbt/analytics'
select: "tag:daily"
translation:
key: "taxi_{{ node.name }}"
group_name: dbt
cli_args:
- build
- --vars:
min_date: "{{ context.partition_time_window.start.strftime('%Y-%m-%d') }}"
max_date: "{{ context.partition_time_window.end.strftime('%Y-%m-%d') }}"

post_processing:
assets:
- target: "*"
attributes:
partitions_def: "{{ daily_partitions_def }}"

This design lets us:

  • Automatically configure incremental filters with partition context.
  • Keep partitioning consistent across upstream and downstream assets.
  • Run incremental updates or full backfills as needed.

Best of all, this pattern applies automatically to every dbt model in the project. As you add more models — incremental or not — Dagster will handle them the same way, with no extra structural changes.

Next steps