Ask AI

dlt & Dagster#

The data load tool (dlt) open-source library defines a standardized approach for creating data pipelines that load often messy data sources into well-structured data sets. It offers many advanced features, such as:

  • Handling connection secrets
  • Converting data into the structure required for a destination
  • Incremental updates and merges

dlt also provides a large collection of pre-built, verified sources and destinations, allowing you to write less code (if any!) by leveraging the work of the dlt community.

In this guide, we'll explain how the dlt integration works, how to set up a Dagster project for dlt, and how to use a pre-defined dlt source.


How it works#

The Dagster dlt integration uses multi-assets, a single definition that results in multiple assets. These assets are derived from the DltSource.

The following is an example of a dlt source definition where a source is made up of two resources:

@dlt.source
def example(api_key: str = dlt.secrets.value):
    @dlt.resource(primary_key="id", write_disposition="merge")
    def courses():
        response = requests.get(url=BASE_URL + "courses")
        response.raise_for_status()
        yield response.json().get("items")

    @dlt.resource(primary_key="id", write_disposition="merge")
    def users():
        for page in _paginate(BASE_URL + "users"):
            yield page

    return courses, users

Each resource queries an API endpoint and yields the data that we wish to load into our data warehouse. The two resources defined on the source will map to Dagster assets.

Next, we defined a dlt pipeline that specifies how we want the data to be loaded:

pipeline = dlt.pipeline(
    pipeline_name="example_pipeline", destination="snowflake", dataset_name="example_data"
)

A dlt source and pipelinea are the two components required to load data using dlt. These will be the parameters of our multi-asset, which will integrate dlt and Dagster.


Prerequisites#

To follow the steps in this guide, you'll need:

  • To read the dlt introduction, if you've never worked with dlt before.

  • To install the following libraries:

    pip install dagster dagster-embedded-elt
    

    Installing dagster-embedded-elt will also install the dlt package.


Step 1: Configure your Dagster project to support dlt#

The first step is to define a location for the dlt code used for ingesting data. We recommend creating a dlt_sources directory at the root of your Dagster project, but this code can reside anywhere within your Python project.

Run the following to create the dlt_sources directory:

cd $DAGSTER_HOME && mkdir dlt_sources

Step 2: Initialize dlt ingestion code#

In the dlt_sources directory, you can write ingestion code following the dlt tutorial or you can use a verified source.

In this example, we'll use the GitHub source provided by dlt.

  1. Run the following to create a location for the dlt source code and initialize the GitHub source:

    cd dlt_sources
    
    dlt init github snowflake
    

    At which point you'll see the following in the command line:

    Looking up the init scripts in https://github.com/dlt-hub/verified-sources.git...
    Cloning and configuring a verified source github (Source that load github issues, pull requests and reactions for a specific repository via customizable graphql query. Loads events incrementally.)
    
  2. When prompted to proceed, enter y. You should see the following confirming that the GitHub source was added to the project:

    Verified source github was added to your project!
    * See the usage examples and code snippets to copy from github_pipeline.py
    * Add credentials for snowflake and other secrets in ./.dlt/secrets.toml
    * requirements.txt was created. Install it with:
    pip3 install -r requirements.txt
    * Read https://dlthub.com/docs/walkthroughs/create-a-pipeline for more information
    

This downloaded the code required to collect data from the GitHub API. It also created a requirements.txt and a .dlt/ configuration directory. These files can be removed, as we will configure our pipelines through Dagster, however, you may still find it informative to reference.

$ tree -a
.
├── .dlt               # can be removed
│   ├── .sources
│   ├── config.toml
│   └── secrets.toml
├── .gitignore
├── github
│   ├── README.md
│   ├── __init__.py
│   ├── helpers.py
│   ├── queries.py
│   └── settings.py
├── github_pipeline.py
└── requirements.txt   # can be removed

Step 3: Define dlt environment variables#

This integration manages connections and secrets using environment variables as dlt. The dlt library can infer required environment variables used by its sources and resources. Refer to dlt's Secrets and Configs documentation for more information.

In the example we've been using:

  • The github_reactions source requires a GitHub access token
  • The Snowflake destination requires database connection details

This results in the following required environment variables:

SOURCES__GITHUB__ACCESS_TOKEN=""
DESTINATION__SNOWFLAKE__CREDENTIALS__DATABASE=""
DESTINATION__SNOWFLAKE__CREDENTIALS__PASSWORD=""
DESTINATION__SNOWFLAKE__CREDENTIALS__USERNAME=""
DESTINATION__SNOWFLAKE__CREDENTIALS__HOST=""
DESTINATION__SNOWFLAKE__CREDENTIALS__WAREHOUSE=""
DESTINATION__SNOWFLAKE__CREDENTIALS__ROLE=""

Ensure that these variables are defined in your environment, either in your .env file when running locally or in the Dagster deployment's environment variables.


Step 4: Define a DagsterDltResource#

Next, we'll define a DagsterDltResource, which provides a wrapper of a dlt pipeline runner. Use the following to define the resource, which can be shared across all dlt pipelines:

from dagster_embedded_elt.dlt import DagsterDltResource

dlt_resource = DagsterDltResource()

We'll add the resource to our Definitions in a later step.


Step 5: Create a dlt_assets definition for GitHub#

The @dlt_assets decorator takes a dlt_source and dlt_pipeline parameter. In this example, we used the github_reactions source and created a dlt_pipeline to ingest data from Github to Snowflake.

In the same file containing your Dagster assets, you can create an instance of your @dlt_assets by doing something like the following:

from dagster import AssetExecutionContext, Definitions
from dagster_embedded_elt.dlt import DagsterDltResource, dlt_assets
from dlt import pipeline
from dlt_sources.github import github_reactions


@dlt_assets(
    dlt_source=github_reactions(
        "dagster-io", "dagster", max_items=250
    ),
    dlt_pipeline=pipeline(
        pipeline_name="github_issues",
        dataset_name="github",
        destination="snowflake",
    ),
    name="github",
    group_name="github",
)
def dagster_github_assets(context: AssetExecutionContext, dlt: DagsterDltResource):
    yield from dlt.run(context=context)

Step 6: Create the Definitions object#

The last step is to include the assets and resource in a Definitions object. This enables Dagster tools to load everything we've defined:

defs = Definitions(
    assets=[
        dagster_github_assets,
    ],
    resources={
        "dlt": dlt_resource,
    },
)

And that's it! You should now have two assets that load data to corresponding Snowflake tables: one for issues and the other for pull requests.


What's next?#

Want to see more real-world examples of dlt in production? Check out how we use it internally at Dagster in Dagster Open Platform.


APIs in this guide#

NameDescription
@dlt_assetsThe core dlt asset factory for building ingestion jobs
DagsterDltResourceThe dlt resource for running ingestions.
DagsterDltTranslatorA translator for specifying how to map between dlt and Dagster