Dagster Embedded ELT#

This package provides a framework for building ELT pipelines with Dagster through helpful pre-built assets and resources. It is currently in experimental development, and we'd love to hear your feedback.

This package currently includes a single implementation using Sling, which provides a simple way to sync data between databases and file systems.

We plan on adding additional embedded ELT tool integrations in the future.


Relevant APIs#

NameDescription
build_sling_assetThe core Sling asset factory for building syncs
SlingResourceThe Sling Resource used for handing credentials to databases and object stores

Getting started#

To get started with dagster-embedded-elt and Sling, familiarize yourself with Sling by reading their docs which describe how sources and targets are configured.

The typical pattern for building an ELT pipeline with Sling has three steps:

  1. First, create a SlingResource which is a container for the source and the target.
  2. In the SlingResource define both a SlingSourceConnection and a SlingTargetConnection which holds the source and target credentials that Sling will use to sync data.
  3. Finally, create an asset that syncs between two connections. You can use the build_sling_asset factory for most use cases.

Step 1: Setting up a Sling resource#

A Sling resource is a Dagster resource that contains references to both a source connection and a target connection. Sling is versatile in what a source or destination can represent. You can provide arbitrary keywords to the SlingSourceConnection and SlingTargetConnection classes.

The types and parameters for each connection are defined by Sling's connections.

The simplest connection is a file connection, which can be defined as:

from dagster_embedded_elt.sling import SlingSourceConnection
source = SlingSourceConnection(type="file")
sling = SlingResource(source_connection=source, ...)

Note that no path is required in the source connection, as that is provided by the asset itself.

asset_def = build_sling_asset(
    asset_spec=AssetSpec("my_file"),
    source_stream=f"file://{path_to_file}",
    ...
)

For database connections, you can provide a connection string or a dictionary of keyword arguments. For example, to connect to a SQLite database, you can provide a path to the database using the instance keyword, which is specified in Sling's SQLite connection documentation.

source = SlingSourceConnection(type="sqlite", instance="path/to/sqlite.db")

Step 2: Creating a Sling sync#

To create a Sling sync, once you have defined your resource, you can use the build_sling_asset factory to create an asset.


sling_resource = SlingResource(
    source_connection=SlingSourceConnection(type="file"),
    target_connection=SlingTargetConnection(
        type="sqlite", connection_string="sqlite://path/to/sqlite.db"
    ),
)

asset_spec = AssetSpec(
    key=["main", "tbl"],
    group_name="etl",
    description="ETL Test",
    deps=["foo"],
)

asset_def = build_sling_asset(
    asset_spec=asset_spec,
    source_stream="file://path/to/file.csv",
    target_object="main.dest_tbl",
    mode=SlingMode.INCREMENTAL,
    primary_key="id",
)

sling_job = build_assets_job(
    "sling_job",
    [asset_def],
    resource_defs={"sling": sling_resource},
)


Examples#

This is an example of how to setup a Sling sync between Postgres and Snowflake:

# pyright: reportCallIssue=none
# pyright: reportOptionalMemberAccess=none

import os

from dagster_embedded_elt.sling import (
    SlingMode,
    SlingResource,
    SlingSourceConnection,
    SlingTargetConnection,
    build_sling_asset,
)

from dagster import AssetSpec

source = SlingSourceConnection(
    type="postgres",
    host="localhost",
    port=5432,
    database="my_database",
    user="my_user",
    password=os.getenv("PG_PASS"),
)

target = SlingTargetConnection(
    type="snowflake",
    host="hostname.snowflake",
    user="username",
    database="database",
    password=os.getenv("SF_PASSWORD"),
    role="role",
)

sling = SlingResource(source_connection=source, target_connection=target)

asset_def = build_sling_asset(
    asset_spec=AssetSpec("my_asset_name"),
    source_stream="public.my_table",
    target_object="marts.my_table",
    mode=SlingMode.INCREMENTAL,
    primary_key="id",
)

Similarily, you can define file/storage connections:

source = SlingSourceConnection(
    type="s3",
    bucket="sling-bucket",
    access_key_id=os.getenv("AWS_ACCESS_KEY_ID"),
    secret_access_key=os.getenv("AWS_SECRET_ACCESS_KEY"),
)

sling = SlingResource(source_connection=source, target_connection=target)

asset_def = build_sling_asset(
    asset_spec=AssetSpec("my_asset_name"),
    source_stream="s3://my-bucket/my_file.parquet",
    target_object="marts.my_table",
    primary_key="id",
)