Set up data ingestion
To work with dbt, you need both a storage layer and data. Setting these up isn’t the main focus of this example, but they’re the foundation for running dbt effectively.
We’ll use DuckDB as the storage layer for this example. DuckDB is a lightweight OLAP database that runs locally with minimal setup. Our dbt project will rely on two tables in DuckDB:
taxi_zones
taxi_trips
1. Create taxi_zones
table
The first table, taxi_zones
, is created directly from an S3 file using CREATE TABLE:
@dg.asset(
group_name="ingested",
kinds={"duckdb"},
)
def taxi_zones(context: dg.AssetExecutionContext, database: DuckDBResource):
"""The raw taxi zones dataset, loaded into a DuckDB database."""
taxi_zones_file_path = "https://community-engineering-artifacts.s3.us-west-2.amazonaws.com/dagster-university/data/taxi_zones.csv"
query = f"""
create or replace table zones as (
select
LocationID as zone_id,
zone,
borough,
the_geom as geometry
from '{taxi_zones_file_path}'
);
"""
with database.get_connection() as conn:
conn.execute(query)
2. Create partitioned taxi_trips
table
The second table, taxi_trips
, comes from multiple monthly files. Each file represents a month of data, making this a natural use case for partitions, which split data into chronological sections.
Like taxi_zones
, the files are in cloud storage. But instead of creating the table directly from a file, we first create an empty table and then populate it per month. When you run the asset for a partition, it deletes that month’s data with DELETE
and reloads it with INSERT
. This keeps the asset idempotent:
@dg.asset(
partitions_def=monthly_partition,
group_name="ingested",
kinds={"duckdb"},
)
def taxi_trips(context: dg.AssetExecutionContext, database: DuckDBResource):
"""The raw taxi trips dataset, loaded into a DuckDB database, partitioned by month."""
partition_date_str = context.partition_key
month_to_fetch = partition_date_str[:-3]
taxi_trips_template_file_path = (
f"https://d37ci6vzurychx.cloudfront.net/trip-data/yellow_tripdata_{month_to_fetch}.parquet"
)
query = f"""
create table if not exists trips (
vendor_id integer, pickup_zone_id integer, dropoff_zone_id integer,
rate_code_id double, payment_type integer, dropoff_datetime timestamp,
pickup_datetime timestamp, trip_distance double, passenger_count double,
total_amount double, partition_date varchar
);
delete from trips where partition_date = '{month_to_fetch}';
insert into trips
select
VendorID, PULocationID, DOLocationID, RatecodeID, payment_type, tpep_dropoff_datetime,
tpep_pickup_datetime, trip_distance, passenger_count, total_amount, '{month_to_fetch}' as partition_date
from '{taxi_trips_template_file_path}';
"""
with database.get_connection() as conn:
conn.execute(query)
The partition key that selects the month is available in context.partition_key
.
3. Configure a DuckDB resource
Since both assets use DuckDB, we need to configure a DuckDB resource in the Dagster project. This centralizes the connection and makes it reusable across Dagster entities:
import dagster as dg
from dagster_duckdb import DuckDBResource
database_resource = DuckDBResource(
database=dg.EnvVar("DUCKDB_DATABASE"),
)
@dg.definitions
def resources():
return dg.Definitions(
resources={
"database": database_resource,
},
)
A project can define multiple resources, but this is all we need for now. With this foundation in place, we’re ready to build with dbt in Dagster.
This setup is based on the Dagster Essentials course, which covers ingestion, asset creation, and resource configuration in more depth.
Next steps
- Continue with the dbt project.