Adjust dbt asset config for incremental models
With lineage in place, there’s one final adjustment to make to our scaffolding. In our dbt project, the daily_metrics
model is an incremental model. Incremental models improve performance by avoiding full refreshes: instead of reprocessing everything, they only handle new or modified data based on a time filter.
{{
config(
materialized='incremental',
unique_key='date_of_business',
tags=["daily"]
)
}}
with
trips as (
select *
from {{ ref('stg_trips') }}
),
daily_summary as (
select
date_trunc('day', pickup_datetime) as date_of_business,
count(*) as trip_count,
sum(duration) as total_duration,
sum(duration) / count(*) as average_duration,
sum(total_amount) as total_amount,
sum(total_amount) / count(*) as average_amount,
sum(case when duration > 30 then 1 else 0 end) / count(*) as pct_over_30_min
from trips
group by all
)
select *
from daily_summary
{% if is_incremental() %}
where date_of_business between '{{ var('min_date') }}' and '{{ var('max_date') }}'
{% endif %}
Here’s how incremental models work:
- First run: the model processes the full dataset with no filters.
- Subsequent runs: dbt applies the
is_incremental()
filter, usingmin_date
andmax_date
values provided at runtime.
1. Include a template var
The first step is to add a new template var to your component. This will be used to define the partitions definition that will be used to partition the assets.
import dagster as dg
@dg.template_var
def daily_partitions_def() -> dg.DailyPartitionsDefinition:
return dg.DailyPartitionsDefinition(start_date="2023-01-01")
2. Update dbt component configuration
The next step is to update the defs.yaml
file to use the new template var and apply this partitions definition to all assets using the post_process
field:
type: dagster_dbt.DbtProjectComponent
template_vars_module: .template_vars
attributes:
project: '{{ project_root }}/src/project_dbt/analytics'
select: "tag:daily"
translation:
key: "taxi_{{ node.name }}"
group_name: dbt
post_processing:
assets:
- target: "*"
attributes:
partitions_def: "{{ daily_partitions_def }}"
Finally, we need to pass in new configuration to the cli_args
field so that the dbt execution actually changes based on what partition is executing. In particular, we want to pass in values to the --vars
configuration field that determine the range of time that our incremental models should process.
When the cli_args
field is resolved, it has access to a context.partition_time_window
object, which is Dagster's representation of the time range that should be processed on the current run. This can be converted into a format recognized by your dbt project using template variables:
type: dagster_dbt.DbtProjectComponent
template_vars_module: .template_vars
attributes:
project: '{{ project_root }}/src/project_dbt/analytics'
select: "tag:daily"
translation:
key: "taxi_{{ node.name }}"
group_name: dbt
cli_args:
- build
- --vars:
min_date: "{{ context.partition_time_window.start.strftime('%Y-%m-%d') }}"
max_date: "{{ context.partition_time_window.end.strftime('%Y-%m-%d') }}"
post_processing:
assets:
- target: "*"
attributes:
partitions_def: "{{ daily_partitions_def }}"
This design lets us:
- Automatically configure incremental filters with partition context.
- Keep partitioning consistent across upstream and downstream assets.
- Run incremental updates or full backfills as needed.
Best of all, this pattern applies automatically to every dbt model in the project. As you add more models — incremental or not — Dagster will handle them the same way, with no extra structural changes.
Next steps
- Continue this example with dbt tests.