Ask AI

Sling & Dagster#

Sling provides an easy-to-use YAML configuration layer for loading data from files, replicating data between databases, exporting custom SQL queries to cloud storage, and much more.


How it works#

The Dagster integration allows you to derive Dagster assets from a replication configuration file. The typical pattern for building an ELT pipeline with Sling has three steps:

  1. Define a Sling replication.yaml file that specifies the source and target connections, as well as which streams to sync from.

  2. Create a SlingResource and pass a list of SlingConnectionResource for each connection to the connection parameter, ensuring the resource uses the same name given to the connection in the Sling configuration.

  3. Use the @sling_assets decorator to define an asset that runs the Sling replication job and yields from the SlingResource.replicate method to run the sync.

We'll walk you through each of these steps in this guide.


Prerequisites#

To follow the steps in this guide:

  • Familiarize yourself with Sling's replication configuration, if you've never worked with Sling before. The replication configuration is a YAML file that specifies the source and target connections, as well as which streams to sync from. The dagtser-embedded-elt integration uses this configuration to build assets for both sources and destinations.

  • To install the following libraries:

    pip install dagster dagster-embedded-elt
    

    Refer to the Dagster installation guide for more info.


Step 1: Set up a Sling replication configuration#

Dagster's Sling integration is built around Sling's replication configuration. You may provide either a path to an existing replication.yaml file or construct a dictionary that represents the configuration in Python. This configuration is passed to the Sling CLI to run the replication job.

replication.yaml#

This example creates a replication configuration in a replication.yaml file:

# replication.yaml

SOURCE: MY_POSTGRES
TARGET: MY_SNOWFLAKE

defaults:
  mode: full-refresh
  object: "{stream_schema}_{stream_table}"

streams:
  public.accounts:
  public.users:
  public.finance_departments:
    object: "departments"

Step 2: Create a Sling resource#

Next, you'll create a SlingResource object that contains references to the connections specified in the replication configuration:

from dagster_embedded_elt.sling.resources import (
    SlingConnectionResource,
    SlingResource,
)

from dagster import EnvVar

sling_resource = SlingResource(
    connections=[
        # Using a connection string from an environment variable
        SlingConnectionResource(
            name="MY_POSTGRES",
            type="postgres",
            connection_string=EnvVar("POSTGRES_CONNECTION_STRING"),
        ),
        # Using a hard-coded connection string
        SlingConnectionResource(
            name="MY_DUCKDB",
            type="duckdb",
            connection_string="duckdb:///var/tmp/duckdb.db",
        ),
        # Using a keyword-argument constructor
        SlingConnectionResource(
            name="MY_SNOWFLAKE",
            type="snowflake",
            host=EnvVar("SNOWFLAKE_HOST"),
            user=EnvVar("SNOWFLAKE_USER"),
            role="REPORTING",
        ),
    ]
)

A SlingResource takes a connections parameter, where each SlingConnectionResource represents a connection to a source or target database. You may provide as many connections to the SlingResource as needed.

The name parameter in the SlingConnectionResource should match the SOURCE and TARGET keys in the replication configuration.

You can pass a connection string or arbitrary keyword arguments to the SlingConnectionResource to specify the connection details. Refer to Sling's connections reference for the specific connection types and parameters.


Step 3: Define the Sling assets#

Next, define a Sling asset using the @sling_assets decorator. Dagster will read the replication configuration to produce assets.

Each stream will render two assets, one for the source stream and one for the target destination. You can override how assets are named by passing in a custom DagsterSlingTranslator object.

from dagster_embedded_elt.sling import (
    SlingResource,
    sling_assets,
)

from dagster import Definitions, file_relative_path

replication_config = file_relative_path(__file__, "../sling_replication.yaml")
sling_resource = SlingResource(connections=[...])  # Add connections here


@sling_assets(replication_config=replication_config)
def my_assets(context, sling: SlingResource):
    yield from sling.replicate(context=context)
    for row in sling.stream_raw_logs():
        context.log.info(row)

Step 4: Create the Definitions object#

The last step is to include the Sling assets and resource in a Definitions object. This enables Dagster tools to load everything we've defined:

defs = Definitions(
    assets=[
        my_assets,
    ],
    resources={
        "sling": sling_resource,
    },
)

That's it! You should now be able to view your assets in the Dagster UI and run the replication job.


Examples#

Example 1: Database to database#

To set up a Sling sync between two databases, such as Postgres and Snowflake, you could do something like the following:

from dagster_embedded_elt.sling import (
    SlingConnectionResource,
    SlingResource,
    sling_assets,
)

from dagster import EnvVar

source = SlingConnectionResource(
    name="MY_PG",
    type="postgres",
    host="localhost",
    port=5432,
    database="my_database",
    user="my_user",
    password=EnvVar("PG_PASS"),
)

target = SlingConnectionResource(
    name="MY_SF",
    type="snowflake",
    host="hostname.snowflake",
    user="username",
    database="database",
    password=EnvVar("SF_PASSWORD"),
    role="role",
)

sling = SlingResource(
    connections=[
        source,
        target,
    ]
)
replication_config = {
    "SOURCE": "MY_PG",
    "TARGET": "MY_SF",
    "defaults": {"mode": "full-refresh", "object": "{stream_schema}_{stream_table}"},
    "streams": {
        "public.accounts": None,
        "public.users": None,
        "public.finance_departments": {"object": "departments"},
    },
}


@sling_assets(replication_config=replication_config)
def my_assets(context, sling: SlingResource):
    yield from sling.replicate(context=context)

Example 2: File to database#

To set up a Sling sync between a file in an object store and a database, such as from Amazon S3 to Snowflake, you could do something like the following:

from dagster_embedded_elt.sling import (
    SlingConnectionResource,
    SlingResource,
    sling_assets,
)

from dagster import EnvVar

target = SlingConnectionResource(
    name="MY_SF",
    type="snowflake",
    host="hostname.snowflake",
    user="username",
    database="database",
    password=EnvVar("SF_PASSWORD"),
    role="role",
)

source = SlingConnectionResource(
    name="MY_S3",
    type="s3",
    bucket="sling-bucket",
    access_key_id=EnvVar("AWS_ACCESS_KEY_ID"),
    secret_access_key=EnvVar("AWS_SECRET_ACCESS_KEY"),
)

sling = SlingResource(connections=[source, target])

replication_config = {
    "SOURCE": "MY_S3",
    "TARGET": "MY_SF",
    "defaults": {"mode": "full-refresh", "object": "{stream_schema}_{stream_table}"},
    "streams": {
        "s3://my-bucket/my_file.parquet": {
            "object": "marts.my_table",
            "primary_key": "id",
        },
    },
}


@sling_assets(replication_config=replication_config)
def my_assets(context, sling: SlingResource):
    yield from sling.replicate(context=context)

APIs in this guide#

NameDescription
@sling_assetsThe core Sling asset factory for building syncs
SlingResourceThe Sling resource used for handing credentials to databases and object stores
DagsterSlingTranslatorA translator for specifying how to map between Sling and Dagster types
SlingConnectionResourceA Sling connection resource for specifying database and storage connection credentials