Dagster & Sling
This integration allows you to use Sling to extract and load data from popular data sources to destinations with high performance and ease.
This integration is currently experimental.
Installation
pip install dagster-sling
Example
from dagster_sling import SlingConnectionResource, SlingResource, sling_assets
import dagster as dg
source = SlingConnectionResource(
name="MY_PG",
type="postgres",
host="localhost", # type: ignore
port=5432, # type: ignore
database="my_database", # type: ignore
user="my_user", # type: ignore
password=dg.EnvVar("PG_PASS"), # type: ignore
)
target = SlingConnectionResource(
name="MY_SF",
type="snowflake",
host="hostname.snowflake", # type: ignore
user="username", # type: ignore
database="database", # type: ignore
password=dg.EnvVar("SF_PASSWORD"), # type: ignore
role="role", # type: ignore
)
@sling_assets(
replication_config={
"SOURCE": "MY_PG",
"TARGET": "MY_SF",
"defaults": {
"mode": "full-refresh",
"object": "{stream_schema}_{stream_table}",
},
"streams": {
"public.accounts": None,
"public.users": None,
"public.finance_departments": {"object": "departments"},
},
}
)
def my_sling_assets(context, sling: SlingResource):
yield from sling.replicate(context=context)
defs = dg.Definitions(
assets=[my_sling_assets],
resources={
"sling": SlingResource(
connections=[
source,
target,
]
)
},
)
About Sling
Sling provides an easy-to-use YAML configuration layer for loading data from files, replicating data between databases, exporting custom SQL queries to cloud storage, and much more.
Key Features
-
Data Movement: Transfer data between different storage systems and databases efficiently
-
Flexible Connectivity: Support for numerous databases, data warehouses, and file storage systems
-
Transformation Capabilities: Built-in data transformation features during transfer
-
Multiple Operation Modes: Support for various replication modes including full-refresh, incremental, and snapshot
-
Production-Ready: Deployable with monitoring, scheduling, and error handling