The data load tool (dlt) open-source library defines a standardized approach for creating data pipelines that load often messy data sources into well-structured data sets. It offers many advanced features, such as:
Handling connection secrets
Converting data into the structure required for a destination
Incremental updates and merges
dlt also provides a large collection of pre-built, verified sources and destinations, allowing you to write less code (if any!) by leveraging the work of the dlt community.
In this guide, we'll explain how the dlt integration works, how to set up a Dagster project for dlt, and how to use a pre-defined dlt source.
Each resource queries an API endpoint and yields the data that we wish to load into our data warehouse. The two resources defined on the source will map to Dagster assets.
Next, we defined a dlt pipeline that specifies how we want the data to be loaded:
A dlt source and pipeline are the two components required to load data using dlt. These will be the parameters of our multi-asset, which will integrate dlt and Dagster.
Installing dagster-embedded-elt will also install the dlt package.
Step 1: Configure your Dagster project to support dlt#
The first step is to define a location for the dlt code used for ingesting data. We recommend creating a dlt_sources directory at the root of your Dagster project, but this code can reside anywhere within your Python project.
Run the following to create the dlt_sources directory:
In the dlt_sources directory, you can write ingestion code following the dlt tutorial or you can use a verified source.
In this example, we'll use the GitHub source provided by dlt.
Run the following to create a location for the dlt source code and initialize the GitHub source:
cd dlt_sources
dlt init github snowflake
At which point you'll see the following in the command line:
Looking up the init scripts in https://github.com/dlt-hub/verified-sources.git...
Cloning and configuring a verified source github (Source that load github issues, pull requests and reactions for a specific repository via customizable graphql query. Loads events incrementally.)
When prompted to proceed, enter y. You should see the following confirming that the GitHub source was added to the project:
Verified source github was added to your project!
* See the usage examples and code snippets to copy from github_pipeline.py
* Add credentials for snowflake and other secrets in ./.dlt/secrets.toml
* requirements.txt was created. Install it with:
pip3 install -r requirements.txt
* Read https://dlthub.com/docs/walkthroughs/create-a-pipeline formore information
This downloaded the code required to collect data from the GitHub API. It also created a requirements.txt and a .dlt/ configuration directory. These files can be removed, as we will configure our pipelines through Dagster, however, you may still find it informative to reference.
$ tree -a
.
├── .dlt # can be removed
│ ├── .sources
│ ├── config.toml
│ └── secrets.toml
├── .gitignore
├── github
│ ├── README.md
│ ├── __init__.py
│ ├── helpers.py
│ ├── queries.py
│ └── settings.py
├── github_pipeline.py
└── requirements.txt # can be removed
This integration manages connections and secrets using environment variables as dlt. The dlt library can infer required environment variables used by its sources and resources. Refer to dlt's Secrets and Configs documentation for more information.
In the example we've been using:
The github_reactions source requires a GitHub access token
The Snowflake destination requires database connection details
This results in the following required environment variables:
Next, we'll define a DagsterDltResource, which provides a wrapper of a dlt pipeline runner. Use the following to define the resource, which can be shared across all dlt pipelines:
from dagster_embedded_elt.dlt import DagsterDltResource
dlt_resource = DagsterDltResource()
We'll add the resource to our Definitions in a later step.
Step 5: Create a dlt_assets definition for GitHub#
The @dlt_assets decorator takes a dlt_source and dlt_pipeline parameter. In this example, we used the github_reactions source and created a dlt_pipeline to ingest data from Github to Snowflake.
In the same file containing your Dagster assets, you can create an instance of your @dlt_assets by doing something like the following:
If you are using the sql_database source, consider setting defer_table_reflect=True to reduce database reads. By default, the Dagster daemon will refresh definitions roughly every minute, which will query the database for resource definitions.
from dagster import AssetExecutionContext, Definitions
from dagster_embedded_elt.dlt import DagsterDltResource, dlt_assets
from dlt import pipeline
from dlt_sources.github import github_reactions
@dlt_assets(
dlt_source=github_reactions("dagster-io","dagster", max_items=250),
dlt_pipeline=pipeline(
pipeline_name="github_issues",
dataset_name="github",
destination="snowflake",
progress="log",),
name="github",
group_name="github",)defdagster_github_assets(context: AssetExecutionContext, dlt: DagsterDltResource):yieldfrom dlt.run(context=context)
from collections.abc import Iterable
import dlt
from dagster_embedded_elt.dlt import(
DagsterDltResource,
DagsterDltTranslator,
dlt_assets,)from dlt.extract.resource import DltResource
from dagster import AssetExecutionContext, AssetKey
@dlt.sourcedefexample_dlt_source():defexample_resource():...return example_resource
classCustomDagsterDltTranslator(DagsterDltTranslator):defget_asset_key(self, resource: DltResource)-> AssetKey:"""Overrides asset key to be the dlt resource name."""return AssetKey(f"{resource.name}")defget_deps_asset_keys(self, resource: DltResource)-> Iterable[AssetKey]:"""Overrides upstream asset key to be a single source asset."""return[AssetKey("common_upstream_dlt_dependency")]@dlt_assets(
name="example_dlt_assets",
dlt_source=example_dlt_source(),
dlt_pipeline=dlt.pipeline(
pipeline_name="example_pipeline_name",
dataset_name="example_dataset_name",
destination="snowflake",
progress="log",),
dagster_dlt_translator=CustomDagsterDltTranslator(),)defdlt_example_assets(context: AssetExecutionContext, dlt: DagsterDltResource):yieldfrom dlt.run(context=context)
In this example, we customized the translator to change how the dlt assets' names are defined. We also hard-coded the asset dependency upstream of our assets to provide a fan-out model from a single dependency to our dlt assets.
For example, let's say we have defined a set of dlt assets named thinkific_assets, we can iterate over those assets and derive a AssetSpec with attributes like group_name.
While still an experimental feature, it is possible to use partitions within your dlt assets. However, it should be noted that this may result in concurrency related issues as state is managed by dlt. For this reason, it is recommended to set concurrency limits for your partitioned dlt assets. See the Limiting concurrency in data pipelines guide for more details.
That said, here is an example of using static named partitions from a dlt source.