Ingest data with dlt
dlt (data load tool) is a Python library for loading data from APIs and other sources into a destination. It handles schema inference, type coercion, and write modes (merge, append, replace) automatically. In this step, you'll use dlt to load GitHub issues and pull requests into DuckDB and register them as Dagster assets.
Step 1: Scaffold the dlt component definition
Use dg scaffold to create the component folder:
dg scaffold defs dagster_dlt.DltLoadCollectionComponent dlt_ingest
This creates a dlt_ingest folder with two files:
src/project_elt_pipeline/defs/
└── dlt_ingest/
├── defs.yaml
└── loads.py
loads.py is where you define what to load and where to send it. defs.yaml references those definitions for the component.
Step 2: Define the dlt resources and pipeline
Replace the contents of defs/dlt_ingest/loads.py:
import os
import dlt
from dlt.sources.helpers import requests
GITHUB_REPO = os.getenv("GITHUB_REPO", "dagster-io/dagster")
@dlt.resource(write_disposition="merge", primary_key="id")
def issues():
token = os.getenv("GITHUB_TOKEN")
headers = {"Authorization": f"Bearer {token}"} if token else {}
owner, name = GITHUB_REPO.split("/")
response = requests.get(
f"https://api.github.com/repos/{owner}/{name}/issues",
headers=headers,
params={"state": "all", "per_page": 100},
)
response.raise_for_status()
yield [row for row in response.json() if "pull_request" not in row]
@dlt.resource(write_disposition="merge", primary_key="id")
def pull_requests():
token = os.getenv("GITHUB_TOKEN")
headers = {"Authorization": f"Bearer {token}"} if token else {}
owner, name = GITHUB_REPO.split("/")
response = requests.get(
f"https://api.github.com/repos/{owner}/{name}/pulls",
headers=headers,
params={"state": "all", "per_page": 100},
)
response.raise_for_status()
yield response.json()
@dlt.source
def github_data():
return issues, pull_requests
github_pipeline = dlt.pipeline(
pipeline_name="github",
destination="duckdb",
dataset_name="github_data",
)
github_load_source = github_data()
Here's what each part does:
@dlt.resourcemarks a function as a dlt resource — each resource maps to one table in the destination database.write_disposition="merge"upserts rows byprimary_keyso re-running doesn't create duplicates.@dlt.sourcegroups resources together. Returningissuesandpull_requests(without calling them) tells dlt to include both.github_pipelinedefines the destination (DuckDB in this case) with a dataset (schema) namedgithub_data.github_load_sourceis the instantiated source object that is referenced by the component.
Set your repo and token in .env:
GITHUB_TOKEN=your_token_here
GITHUB_REPO=dagster-io/dagster
A GitHub token isn't required for public repos, but without one you'll hit GitHub's low unauthenticated rate limit (60 requests/hour) quickly.
Step 3: Configure the component definition
Update defs/dlt_ingest/defs.yaml to reference your pipeline and source:
type: dagster_dlt.DltLoadCollectionComponent
attributes:
loads:
- source: .loads.github_load_source
pipeline: .loads.github_pipeline
translation:
group_name: github
The .loads. prefix is a relative module reference — loads refers to loads.py in the same directory, and github_load_source / github_pipeline are the module-level objects defined there.
DltLoadCollectionComponent inspects the source and creates one Dagster asset per dlt resource — in this case, issues and pull_requests, both in the github asset group.
Step 4: Verify the component
Run dg check defs to confirm the component loads without errors:
dg check defs
Step 5: View assets in Dagster
Reload definitions in the Dagster UI. You should now see assets from both components:
- sling_ingest group:
users,orders,products(replicated from Postgres) - github group:
issues,pull_requests(loaded from the GitHub API)
With GITHUB_TOKEN set in .env, click Materialize all on the github assets to load data into DuckDB.
How assets are generated
DltLoadCollectionComponent inspects the dlt source and creates one Dagster asset per resource — no @asset functions required. In this case, the issues and pull_requests resources each become an asset in the github group. Add a resource to your source and a new asset appears. Remove one and it disappears.
Summary
You now have two ingestion pipelines running as Dagster assets:
- Sling handles database-to-database replication — configure a stream in YAML and it appears as an asset
- dlt handles API ingestion with automatic schema inference — define a resource in Python and it becomes an asset
Raw data is in DuckDB, but the pipeline isn't complete yet. The next step adds the T — SQL transformation components that produce analytics-ready outputs using the same component-forward approach.
The transforms in the next step use the Postgres tables (users, orders, products) loaded by Sling. The issues and pull_requests assets are available in your asset graph as a starting point for additional transforms — for example, aggregating issue counts by label or tracking PR cycle times.
Next steps
- Continue this example by adding SQL transformations