Skip to main content

Dagster & dlt with components

info

dg and Dagster Components are under active development. You may encounter feature gaps, and the APIs may change. To report issues or give feedback, please join the #dg-components channel in the Dagster Community Slack.

The dagster-dlt library provides a DltLoadCollectionComponent which can be used to easily represent a collection of dlt sources and pipelines as assets in Dagster.

1. Prepare a Dagster project

To begin, you'll need a Dagster project. You can use an existing components-ready project or create a new one:

uvx create-dagster project my-project && cd my-project/src

Activate the project virtual environment:

source ../.venv/bin/activate

Finally, add the dagster-dlt library to the project:

uv add dagster-dlt

2. Scaffold a dlt component

Now that you have a Dagster project, you can scaffold a dlt component. You may optionally provide the source and destination types, which will pull in the appropriate dlt source:

dg scaffold defs dagster_dlt.DltLoadCollectionComponent github_snowflake_ingest \
--source github --destination snowflake

The scaffold call will generate a basic defs.yaml file and a loads.py file:

tree my_project/defs
my_project/defs
├── __init__.py
└── github_snowflake_ingest
├── defs.yaml
├── github
│   ├── __init__.py
│   ├── helpers.py
│   ├── queries.py
│   ├── README.md
│   └── settings.py
└── loads.py

3 directories, 8 files

The loads.py file contains a skeleton dlt source and pipeline which are referenced by Dagster, but can also be run directly using dlt:

my_project/defs/github_snowflake_ingest/loads.py
import dlt


@dlt.source
def my_source():
@dlt.resource
def hello_world():
yield "hello, world!"

return hello_world


my_load_source = my_source()
my_load_pipeline = dlt.pipeline(destination="snowflake")

Each of these sources and pipelines are referenced by a fully scoped Python identifier in the defs.yaml file, pairing them into a set of loads:

my_project/defs/github_snowflake_ingest/defs.yaml
type: dagster_dlt.DltLoadCollectionComponent

attributes:
loads:
- source: .loads.my_load_source
pipeline: .loads.my_load_pipeline

3. Configure dlt loads

Next, you can fill in the template loads.py file with your own dlt sources and pipelines:

my_project/defs/github_snowflake_ingest/loads.py
import dlt
from .github import github_reactions, github_repo_events, github_stargazers

dlthub_dlt_stargazers_source = github_stargazers("dlt-hub", "dlt")
dlthub_dlt_stargazers_pipeline = dlt.pipeline(
"github_stargazers", destination="snowflake", dataset_name="dlthub_stargazers"
)
my_project/defs/github_snowflake_ingest/defs.yaml
type: dagster_dlt.DltLoadCollectionComponent

attributes:
loads:
- source: .loads.dlthub_dlt_stargazers_source
pipeline: .loads.dlthub_dlt_stargazers_pipeline

You can use dg list defs to list the assets produced by the load:

dg list defs
┏━━━━━━━━━┳━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━┓
┃ Section ┃ Definitions ┃
┡━━━━━━━━━╇━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━┩
│ Assets │ ┏━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━┳━━━━━━━━━┳━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━┳━━━━━━━━━━━┳━━━━━━━━━━━━━┓ │
│ │ ┃ Key ┃ Group ┃ Deps ┃ Kinds ┃ Description ┃ │
│ │ ┡━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━╇━━━━━━━━━╇━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━╇━━━━━━━━━━━╇━━━━━━━━━━━━━┩ │
│ │ │ dlthub_stargazers/stargazers │ default │ github_stargazers_stargazers │ dlt │ │ │
│ │ │ │ │ │ snowflake │ │ │
│ │ ├──────────────────────────────┼─────────┼──────────────────────────────┼───────────┼─────────────┤ │
│ │ │ github_stargazers_stargazers │ default │ │ │ │ │
│ │ └──────────────────────────────┴─────────┴──────────────────────────────┴───────────┴─────────────┘ │
└─────────┴─────────────────────────────────────────────────────────────────────────────────────────────────────┘

4. Customize Dagster assets

Properties of the assets emitted by each load can be customized in the defs.yaml file using the translation key:

my_project/defs/github_snowflake_ingest/defs.yaml
type: dagster_dlt.DltLoadCollectionComponent

attributes:
loads:
- source: .loads.dlthub_dlt_stargazers_source
pipeline: .loads.dlthub_dlt_stargazers_pipeline
translation:
group_name: github_data
description: "Loads all users who have starred the dlt-hub/dlt repo"
dg list defs
┏━━━━━━━━━┳━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━┓
┃ Section ┃ Definitions ┃
┡━━━━━━━━━╇━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━┩
│ Assets │ ┏━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━┳━━━━━━━━━━━━━┳━━━━━━━━━━━━━━━━━━━━━━━┳━━━━━━━━━━━┳━━━━━━━━━━━━━━━━━━━━━━━┓ │
│ │ ┃ Key ┃ Group ┃ Deps ┃ Kinds ┃ Description ┃ │
│ │ ┡━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━╇━━━━━━━━━━━━━╇━━━━━━━━━━━━━━━━━━━━━━━╇━━━━━━━━━━━╇━━━━━━━━━━━━━━━━━━━━━━━┩ │
│ │ │ dlthub_stargazers/stargazers │ github_data │ github_stargazers_st… │ dlt │ Loads all users who │ │
│ │ │ │ │ │ snowflake │ have starred the │ │
│ │ │ │ │ │ │ dlt-hub/dlt repo │ │
│ │ ├──────────────────────────────┼─────────────┼───────────────────────┼───────────┼───────────────────────┤ │
│ │ │ github_stargazers_stargazers │ default │ │ │ │ │
│ │ └──────────────────────────────┴─────────────┴───────────────────────┴───────────┴───────────────────────┘ │
└─────────┴────────────────────────────────────────────────────────────────────────────────────────────────────────────┘

Both the DltResource and Pipeline objects are available in scope, and can be used for dynamic customization:

my_project/defs/github_snowflake_ingest/defs.yaml
type: dagster_dlt.DltLoadCollectionComponent

attributes:
loads:
- source: .loads.dlthub_dlt_stargazers_source
pipeline: .loads.dlthub_dlt_stargazers_pipeline
translation:
metadata:
resource_name: "{{ resource.name }}"
pipeline_name: "{{ pipeline.pipeline_name }}"
is_transformer: "{{ resource.is_transformer }}"