Ask AI

dlt & Dagster#

The data load tool (dlt) open-source library defines a standardized approach for creating data pipelines that load often messy data sources into well-structured data sets. It offers many advanced features, such as:

  • Handling connection secrets
  • Converting data into the structure required for a destination
  • Incremental updates and merges

dlt also provides a large collection of pre-built, verified sources and destinations, allowing you to write less code (if any!) by leveraging the work of the dlt community.

In this guide, we'll explain how the dlt integration works, how to set up a Dagster project for dlt, and how to use a pre-defined dlt source.


How it works#

The Dagster dlt integration uses multi-assets, a single definition that results in multiple assets. These assets are derived from the DltSource.

The following is an example of a dlt source definition where a source is made up of two resources:

@dlt.source
def example(api_key: str = dlt.secrets.value):
    @dlt.resource(primary_key="id", write_disposition="merge")
    def courses():
        response = requests.get(url=BASE_URL + "courses")
        response.raise_for_status()
        yield response.json().get("items")

    @dlt.resource(primary_key="id", write_disposition="merge")
    def users():
        for page in _paginate(BASE_URL + "users"):
            yield page

    return courses, users

Each resource queries an API endpoint and yields the data that we wish to load into our data warehouse. The two resources defined on the source will map to Dagster assets.

Next, we defined a dlt pipeline that specifies how we want the data to be loaded:

pipeline = dlt.pipeline(
    pipeline_name="example_pipeline",
    destination="snowflake",
    dataset_name="example_data",
    progress="log",
)

A dlt source and pipeline are the two components required to load data using dlt. These will be the parameters of our multi-asset, which will integrate dlt and Dagster.


Prerequisites#

To follow the steps in this guide, you'll need:

  • To read the dlt introduction, if you've never worked with dlt before.

  • To install the following libraries:

    pip install dagster dagster-embedded-elt
    

    Installing dagster-embedded-elt will also install the dlt package.


Step 1: Configure your Dagster project to support dlt#

The first step is to define a location for the dlt code used for ingesting data. We recommend creating a dlt_sources directory at the root of your Dagster project, but this code can reside anywhere within your Python project.

Run the following to create the dlt_sources directory:

cd $DAGSTER_HOME && mkdir dlt_sources

Step 2: Initialize dlt ingestion code#

In the dlt_sources directory, you can write ingestion code following the dlt tutorial or you can use a verified source.

In this example, we'll use the GitHub source provided by dlt.

  1. Run the following to create a location for the dlt source code and initialize the GitHub source:

    cd dlt_sources
    
    dlt init github snowflake
    

    At which point you'll see the following in the command line:

    Looking up the init scripts in https://github.com/dlt-hub/verified-sources.git...
    Cloning and configuring a verified source github (Source that load github issues, pull requests and reactions for a specific repository via customizable graphql query. Loads events incrementally.)
    
  2. When prompted to proceed, enter y. You should see the following confirming that the GitHub source was added to the project:

    Verified source github was added to your project!
    * See the usage examples and code snippets to copy from github_pipeline.py
    * Add credentials for snowflake and other secrets in ./.dlt/secrets.toml
    * requirements.txt was created. Install it with:
    pip3 install -r requirements.txt
    * Read https://dlthub.com/docs/walkthroughs/create-a-pipeline for more information
    

This downloaded the code required to collect data from the GitHub API. It also created a requirements.txt and a .dlt/ configuration directory. These files can be removed, as we will configure our pipelines through Dagster, however, you may still find it informative to reference.

$ tree -a
.
├── .dlt               # can be removed
│   ├── .sources
│   ├── config.toml
│   └── secrets.toml
├── .gitignore
├── github
│   ├── README.md
│   ├── __init__.py
│   ├── helpers.py
│   ├── queries.py
│   └── settings.py
├── github_pipeline.py
└── requirements.txt   # can be removed

Step 3: Define dlt environment variables#

This integration manages connections and secrets using environment variables as dlt. The dlt library can infer required environment variables used by its sources and resources. Refer to dlt's Secrets and Configs documentation for more information.

In the example we've been using:

  • The github_reactions source requires a GitHub access token
  • The Snowflake destination requires database connection details

This results in the following required environment variables:

SOURCES__GITHUB__ACCESS_TOKEN=""
DESTINATION__SNOWFLAKE__CREDENTIALS__DATABASE=""
DESTINATION__SNOWFLAKE__CREDENTIALS__PASSWORD=""
DESTINATION__SNOWFLAKE__CREDENTIALS__USERNAME=""
DESTINATION__SNOWFLAKE__CREDENTIALS__HOST=""
DESTINATION__SNOWFLAKE__CREDENTIALS__WAREHOUSE=""
DESTINATION__SNOWFLAKE__CREDENTIALS__ROLE=""

Ensure that these variables are defined in your environment, either in your .env file when running locally or in the Dagster deployment's environment variables.


Step 4: Define a DagsterDltResource#

Next, we'll define a DagsterDltResource, which provides a wrapper of a dlt pipeline runner. Use the following to define the resource, which can be shared across all dlt pipelines:

from dagster_embedded_elt.dlt import DagsterDltResource

dlt_resource = DagsterDltResource()

We'll add the resource to our Definitions in a later step.


Step 5: Create a dlt_assets definition for GitHub#

The @dlt_assets decorator takes a dlt_source and dlt_pipeline parameter. In this example, we used the github_reactions source and created a dlt_pipeline to ingest data from Github to Snowflake.

In the same file containing your Dagster assets, you can create an instance of your @dlt_assets by doing something like the following:

from dagster import AssetExecutionContext, Definitions
from dagster_embedded_elt.dlt import DagsterDltResource, dlt_assets
from dlt import pipeline
from dlt_sources.github import github_reactions


@dlt_assets(
    dlt_source=github_reactions(
        "dagster-io", "dagster", max_items=250
    ),
    dlt_pipeline=pipeline(
        pipeline_name="github_issues",
        dataset_name="github",
        destination="snowflake",
        progress="log",
    ),
    name="github",
    group_name="github",
)
def dagster_github_assets(context: AssetExecutionContext, dlt: DagsterDltResource):
    yield from dlt.run(context=context)

Step 6: Create the Definitions object#

The last step is to include the assets and resource in a Definitions object. This enables Dagster tools to load everything we've defined:

defs = Definitions(
    assets=[
        dagster_github_assets,
    ],
    resources={
        "dlt": dlt_resource,
    },
)

And that's it! You should now have two assets that load data to corresponding Snowflake tables: one for issues and the other for pull requests.


Advanced usage#

Overriding the translator to customize dlt assets#

The DagsterDltTranslator object can be used to customize how dlt properties map to Dagster concepts.

For example, to change how the name of the asset is derived, you can override the DagsterDltTranslator.get_asset_key method, or if you would like to change the key of the upstream source asset, you can override the DagsterDltTranslator.get_deps_assets_keys method.

from collections.abc import Iterable

import dlt
from dagster_embedded_elt.dlt import (
    DagsterDltResource,
    DagsterDltTranslator,
    dlt_assets,
)

from dagster import AssetExecutionContext, AssetKey


@dlt.source
def example_dlt_source():
    def example_resource(): ...

    return example_resource


class CustomDagsterDltTranslator(DagsterDltTranslator):
    def get_asset_key(self, resource: DagsterDltResource) -> AssetKey:
        """Overrides asset key to be the dlt resource name."""
        return AssetKey(f"{resource.name}")

    def get_deps_asset_keys(self, resource: DagsterDltResource) -> Iterable[AssetKey]:
        """Overrides upstream asset key to be a single source asset."""
        return [AssetKey("common_upstream_dlt_dependency")]


@dlt_assets(
    name="example_dlt_assets",
    dlt_source=example_dlt_source(),
    dlt_pipeline=dlt.pipeline(
        pipeline_name="example_pipeline_name",
        dataset_name="example_dataset_name",
        destination="snowflake",
        progress="log",
    ),
    dagster_dlt_translator=CustomDagsterDltTranslator(),
)
def dlt_example_assets(context: AssetExecutionContext, dlt: DagsterDltResource):
    yield from dlt.run(context=context)

In this example, we customized the translator to change how the dlt assets' names are defined. We also hard-coded the asset dependency upstream of our assets to provide a fan-out model from a single dependency to our dlt assets.

Assigning metadata to upstream external assets#

A common question is how to define metadata on the external assets upstream of the dlt assets.

This can be accomplished by defining a AssetSpec with a key that matches the one defined in the DagsterDltTranslator.get_deps_assets_keys method.

For example, let's say we have defined a set of dlt assets named thinkific_assets, we can iterate over those assets and derive a AssetSpec with attributes like group_name.

import dlt
from dagster_embedded_elt.dlt import DagsterDltResource, dlt_assets

from dagster import AssetExecutionContext, AssetSpec


@dlt.source
def example_dlt_source():
    def example_resource(): ...

    return example_resource


@dlt_assets(
    dlt_source=example_dlt_source(),
    dlt_pipeline=dlt.pipeline(
        pipeline_name="example_pipeline_name",
        dataset_name="example_dataset_name",
        destination="snowflake",
        progress="log",
    ),
)
def example_dlt_assets(context: AssetExecutionContext, dlt: DagsterDltResource):
    yield from dlt.run(context=context)


thinkific_source_assets = [
    AssetSpec(key, group_name="thinkific") for key in example_dlt_assets.dependency_keys
]

Using partitions in your dlt assets#

While still an experimental feature, it is possible to use partitions within your dlt assets. However, it should be noted that this may result in concurrency related issues as state is managed by dlt. For this reason, it is recommended to set concurrency limits for your partitioned dlt assets. See the Limiting concurrency in data pipelines guide for more details.

That said, here is an example of using static named partitions from a dlt source.

from typing import Optional

import dlt
from dagster_embedded_elt.dlt import DagsterDltResource, dlt_assets

from dagster import AssetExecutionContext, StaticPartitionsDefinition

color_partitions = StaticPartitionsDefinition(["red", "green", "blue"])


@dlt.source
def example_dlt_source(color: Optional[str] = None):
    def load_colors():
        if color:
            # partition-specific processing
            ...
        else:
            # non-partitioned processing
            ...


@dlt_assets(
    dlt_source=example_dlt_source(),
    name="example_dlt_assets",
    dlt_pipeline=dlt.pipeline(
        pipeline_name="example_pipeline_name",
        dataset_name="example_dataset_name",
        destination="snowflake",
    ),
    partitions_def=color_partitions,
)
def compute(context: AssetExecutionContext, dlt: DagsterDltResource):
    color = context.partition_key
    yield from dlt.run(context=context, dlt_source=example_dlt_source(color=color))

What's next?#

Want to see real-world examples of dlt in production? Check out how we use it internally at Dagster in the Dagster Open Platform project.


APIs in this guide#

NameDescription
@dlt_assetsThe core dlt asset factory for building ingestion jobs
DagsterDltResourceThe dlt resource for running ingestions.
DagsterDltTranslatorA translator for specifying how to map between dlt and Dagster