Skip to main content

Transform data with SQL

You now have raw data being written to DuckDB tables from two sources: Postgres tables replicated by Sling, and GitHub data loaded by dlt. This step adds the "Transform" in ELT: SQL transformations that turn raw ingested tables into analytics-ready outputs.

To transform the data, you'll use the TemplatedSqlComponent rather than an @asset-decorated function. Each transform becomes a component folder with a defs.yaml and a .sql file, keeping SQL out of Python and giving you full lineage in the asset graph.

Step 1: Create a DuckDB connection component

TemplatedSqlComponent executes SQL through a connection object. DuckDB doesn't ship a built-in connection component, so you'll write a small one in your project's components/ folder.

Add the DuckDBConnectionComponent class to duckdb_connection.py:

src/project_elt_pipeline/components/duckdb_connection.py
import dagster as dg
import duckdb
from dagster._core.definitions.definitions_class import Definitions
from dagster.components.core.context import ComponentLoadContext
from dagster.components.lib.sql_component.sql_client import SQLClient


class DuckDBConnectionComponent(dg.Component, dg.Resolvable, dg.Model, SQLClient):
"""A connection component for DuckDB, for use with TemplatedSqlComponent."""

database: str

def connect_and_execute(self, sql: str) -> None:
with duckdb.connect(self.database) as conn:
conn.execute(sql)

def build_defs(self, context: ComponentLoadContext) -> Definitions:
return Definitions()

Step 2: Scaffold the DuckDB connection component definition

Scaffold a component folder to hold the DuckDB connection config:

dg scaffold defs project_elt_pipeline.DuckDBConnectionComponent duckdb_connection

This creates:

src/project_elt_pipeline/defs/duckdb_connection/
└── defs.yaml

The generated defs.yaml references your component type and reads the database path from your .env file:

src/project_elt_pipeline/defs/duckdb_connection/defs.yaml
type: project_elt_pipeline.DuckDBConnectionComponent

attributes:
database: '{{ env.DEST_DUCKDB_PATH }}'

We will use this connection component in multiple transform components.

Step 3: Scaffold the transform component definitions

Scaffold one component folder per transform:

dg scaffold defs dagster.TemplatedSqlComponent customer_order_summary
dg scaffold defs dagster.TemplatedSqlComponent product_revenue

This creates:

src/project_elt_pipeline/defs/
├── customer_order_summary/
│ └── defs.yaml
└── product_revenue/
└── defs.yaml

Step 4: Write the SQL files

Create a .sql file in each component folder. Keeping SQL in its own file gives you syntax highlighting and keeps defs.yaml focused on configuration.

src/project_elt_pipeline/defs/customer_order_summary/customer_order_summary.sql
CREATE OR REPLACE TABLE customer_order_summary AS
SELECT
u.id AS user_id,
u.name,
u.email,
COUNT(o.id) AS order_count,
SUM(o.total) AS lifetime_value
FROM users u
LEFT JOIN orders o ON o.user_id = u.id
GROUP BY u.id, u.name, u.email
src/project_elt_pipeline/defs/product_revenue/product_revenue.sql
CREATE OR REPLACE TABLE product_revenue AS
SELECT
p.id AS product_id,
p.name,
COUNT(o.id) AS order_count,
SUM(o.total) AS total_revenue
FROM products p
LEFT JOIN orders o ON o.product_id = p.id
GROUP BY p.id, p.name

Step 5: Configure the component definitions

Update each defs.yaml to reference the SQL file, declare upstream dependencies, and reference the DuckDB connection:

src/project_elt_pipeline/defs/customer_order_summary/defs.yaml
type: dagster.TemplatedSqlComponent

attributes:
sql_template:
path: customer_order_summary.sql

connection: "{{ context.load_component('../duckdb_connection') }}"

assets:
- key: customer_order_summary
deps: [users, orders]
group_name: transforms
kinds: [duckdb]
src/project_elt_pipeline/defs/product_revenue/defs.yaml
type: dagster.TemplatedSqlComponent

attributes:
sql_template:
path: product_revenue.sql

connection: "{{ context.load_component('../duckdb_connection') }}"

assets:
- key: product_revenue
deps: [orders, products]
group_name: transforms
kinds: [duckdb]

deps tells Dagster which upstream assets each transform reads from — the same users, orders, and products assets produced by the Sling component. Dagster uses this to draw edges in the asset graph, enforce materialization order, and propagate staleness when upstream data changes.

context.load_component('../duckdb_connection') loads the connection component defined in the sibling folder.

Step 6: Verify and view

Run dg check defs to confirm all assets load without errors:

dg check defs

Reload definitions in the Dagster UI. You should now see three asset groups:

  • sling_ingest: users, orders, products
  • github: issues, pull_requests
  • transforms: customer_order_summary, product_revenue

Open the asset graph and expand the sling_ingest and transforms groups. You'll see edges connecting users to customer_order_summary, orders to both transform assets, and products to product_revenue.

To run the full pipeline, select all assets and click Materialize all. Dagster will sequence the run so ingestion completes before transformations start.

How assets are generated

TemplatedSqlComponent creates one Dagster asset per SQL transform, using deps to wire upstream ingestion assets into the graph — no @asset functions required. Add a component folder with a .sql file and it joins the graph. Remove one and it disappears. Dagster uses the declared deps to enforce materialization order and propagate staleness when upstream data changes.

The full project structure

src/project_elt_pipeline/
├── components/
│ ├── __init__.py
│ └── duckdb_connection.py
├── definitions.py
└── defs/
├── sling_ingest/
│ ├── defs.yaml
│ └── replication.yaml
├── dlt_ingest/
│ ├── defs.yaml
│ └── loads.py
├── duckdb_connection/
│ └── defs.yaml
├── customer_order_summary/
│ ├── defs.yaml
│ └── customer_order_summary.sql
└── product_revenue/
├── defs.yaml
└── product_revenue.sql
note

definitions.py discovers everything in defs/, since component folders are loaded automatically without any explicit imports.

Summary

You've completed the full ELT pipeline using a consistent component-forward approach throughout:

  • Extract + Load (Sling): Postgres tables replicated into DuckDB via SlingReplicationCollectionComponent
  • Extract + Load (dlt): GitHub API data loaded via DltLoadCollectionComponent
  • Transform: SQL assets declared as TemplatedSqlComponent with upstream deps for full lineage

From here you can add a schedule to run the full asset graph on a cadence, or use asset checks to validate data quality after each step.