This package provides a framework for building ELT pipelines with Dagster through helpful pre-built assets and resources. This package currently includes the following integrations:
Sling, which provides a simple way to sync data between databases and file systems
dlt, or data load tool, which provides a way to load data from systems and APIs
For more information on getting started, see the Embedded ELT documentation.
Refer to the Sling guide to get started.
Create a definition for how to materialize a set of Sling replication streams as Dagster assets, as described by a Sling replication config. This will create on Asset for every Sling target stream.
A Sling Replication config is a configuration that maps sources to destinations. For the full spec and descriptions, see Sling’s Documentation.
replication_config (Union[Mapping[str, Any], str, Path]) – A path to a Sling replication config, or a dictionary of a replication config.
dagster_sling_translator – (DagsterSlingTranslator): Allows customization of how to map a Sling stream to a Dagster AssetKey.
(Optional[str] (name) – The name of the op.
partitions_def (Optional[PartitionsDefinition]) – The partitions definition for this asset.
backfill_policy (Optional[BackfillPolicy]) – The backfill policy for this asset.
op_tags (Optional[Mapping[str, Any]]) – The tags for this asset.
Examples
Running a sync by providing a path to a Sling Replication config:
from dagster_embedded_elt.sling import sling_assets, SlingResource, SlingConnectionResource
sling_resource = SlingResource(
connections=[
SlingConnectionResource(
name="MY_POSTGRES", type="postgres", connection_string=EnvVar("POSTGRES_URL")
),
SlingConnectionResource(
name="MY_DUCKDB",
type="duckdb",
connection_string="duckdb:///var/tmp/duckdb.db",
),
]
)
config_path = "/path/to/replication.yaml"
@sling_assets(replication_config=config_path)
def my_assets(context, sling: SlingResource):
yield from sling.replicate(context=context)
A function that takes a stream definition from a Sling replication config and returns a Dagster AssetKey.
The stream definition is a dictionary key/value pair where the key is the stream name and the value is a dictionary representing the Sling Replication Stream Config.
For example:
stream_definition = {"public.users":
{'sql': 'select all_user_id, name from public."all_Users"',
'object': 'public.all_users'}
}
By default, this returns the class’s target_prefix paramater concatenated with the stream name. A stream named “public.accounts” will create an AssetKey named “target_public_accounts”.
Override this function to customize how to map a Sling stream to a Dagster AssetKey.
Alternatively, you can provide metadata in your Sling replication config to specify the Dagster AssetKey for a stream as follows:
public.users:
meta:
dagster:
asset_key: "mydb_users"
stream_definition (Mapping[str, Any]) – A dictionary representing the stream definition
The Dagster AssetKey for the replication stream.
Examples
Using a custom mapping for streams:
class CustomSlingTranslator(DagsterSlingTranslator):
def get_asset_key_for_target(self, stream_definition) -> AssetKey:
map = {"stream1": "asset1", "stream2": "asset2"}
return AssetKey(map[stream_name])
Defines the auto-materialize policy for a given stream definition.
This method checks the provided stream definition for a specific configuration indicating an auto-materialize policy. If the configuration is found, it returns an eager auto-materialize policy. Otherwise, it returns None.
stream_definition (Mapping[str, Any]) – A dictionary representing the stream definition,
details. (which includes configuration)
An eager auto-materialize policy if the configuration is found, otherwise None.
Optional[AutoMaterializePolicy]
A function that takes a stream name from a Sling replication config and returns a Dagster AssetKey for the dependencies of the replication stream.
By default, this returns the stream name. For example, a stream named “public.accounts” will create an AssetKey named “target_public_accounts” and a dependency named “public_accounts”.
Override this function to customize how to map a Sling stream to a Dagster depenency. Alternatively, you can provide metadata in your Sling replication config to specify the Dagster AssetKey for a stream as follows:
public.users:
meta:
dagster:
deps: "sourcedb_users"
stream_name (str) – The name of the stream.
The Dagster AssetKey dependency for the replication stream.
Examples
Using a custom mapping for streams:
class CustomSlingTranslator(DagsterSlingTranslator):
def get_deps_asset_key(self, stream_name: str) -> AssetKey:
map = {"stream1": "asset1", "stream2": "asset2"}
return AssetKey(map[stream_name])
Retrieves the description for a given stream definition.
This method checks the provided stream definition for a description. It first looks for an “sql” key in the configuration and returns its value if found. If not, it looks for a description in the metadata under the “dagster” key.
stream_definition (Mapping[str, Any]) – A dictionary representing the stream definition,
details. (which includes configuration)
The description of the stream if found, otherwise None.
Optional[str]
Retrieves the freshness policy for a given stream definition.
This method checks the provided stream definition for a specific configuration indicating a freshness policy. If the configuration is found, it constructs and returns a FreshnessPolicy object based on the provided parameters. Otherwise, it returns None.
stream_definition (Mapping[str, Any]) – A dictionary representing the stream definition,
details. (which includes configuration)
A FreshnessPolicy object if the configuration is found, otherwise None.
Optional[FreshnessPolicy]
Retrieves the group name for a given stream definition.
This method checks the provided stream definition for a group name in the metadata under the “dagster” key.
stream_definition (Mapping[str, Any]) – A dictionary representing the stream definition,
details. (which includes configuration)
The group name if found, otherwise None.
Optional[str]
Retrieves the metadata for a given stream definition.
This method extracts the configuration from the provided stream definition and returns it as a JSON metadata value.
stream_definition (Mapping[str, Any]) – A dictionary representing the stream definition,
details. (which includes configuration)
A dictionary containing the stream configuration as JSON metadata.
Mapping[str, Any]
Retrieves the tags for a given stream definition.
This method returns an empty dictionary, indicating that no tags are associated with the stream definition by default. This method can be overridden to provide custom tags.
stream_definition (Mapping[str, Any]) – A dictionary representing the stream definition,
details. (which includes configuration)
An empty dictionary.
Mapping[str, Any]
A function that takes a stream name from a Sling replication config and returns a sanitized name for the stream.
By default, this removes any non-alphanumeric characters from the stream name and replaces them with underscores, while removing any double quotes.
stream_name (str) – The name of the stream.
Examples
Using a custom stream name sanitizer:
class CustomSlingTranslator(DagsterSlingTranslator):
def sanitize_stream_name(self, stream_name: str) -> str:
return stream_name.replace(".", "")
Resource for interacting with the Sling package. This resource can be used to run Sling replications.
connections (List[SlingConnectionResource]) – A list of connections to use for the replication.
Examples
from dagster_etl.sling import SlingResource, SlingConnectionResource
sling_resource = SlingResource(
connections=[
SlingConnectionResource(
name="MY_POSTGRES",
type="postgres",
connection_string=EnvVar("POSTGRES_CONNECTION_STRING"),
),
SlingConnectionResource(
name="MY_SNOWFLAKE",
type="snowflake",
host=EnvVar("SNOWFLAKE_HOST"),
user=EnvVar("SNOWFLAKE_USER"),
database=EnvVar("SNOWFLAKE_DATABASE"),
password=EnvVar("SNOWFLAKE_PASSWORD"),
role=EnvVar("SNOWFLAKE_ROLE"),
),
]
)
A representation of a connection to a database or file to be used by Sling. This resource can be used as a source or a target for a Sling syncs.
Reference the Sling docs for more information on possible connection types and parameters: https://docs.slingdata.io/connections
The name of the connection is passed to Sling and must match the name of the connection provided in the replication configuration: https://docs.slingdata.io/sling-cli/run/configuration/replication You may provide either a connection string or keyword arguments for the connection.
Examples
Creating a Sling Connection for a file, such as CSV or JSON:
source = SlingConnectionResource(name="MY_FILE", type="file")
Create a Sling Connection for a Postgres database, using a connection string:
postgres_conn = SlingConnectionResource(name="MY_POSTGRES", type="postgres", connection_string=EnvVar("POSTGRES_CONNECTION_STRING"))
mysql_conn = SlingConnectionResource(name="MY_MYSQL", type="mysql", connection_string="mysql://user:password@host:port/schema")
Create a Sling Connection for a Postgres or Snowflake database, using keyword arguments:
Refer to the dlt guide to get started.
Asset Factory for using data load tool (dlt).
dlt_source (DltSource) – The DltSource to be ingested.
dlt_pipeline (Pipeline) – The dlt Pipeline defining the destination parameters.
name (Optional[str], optional) – The name of the op.
group_name (Optional[str], optional) – The name of the asset group.
dlt_dagster_translator (DltDagsterTranslator, optional) – ( deprecated ) (This parameter will be removed in version 1.9. Use dagster_dlt_translator instead.) Customization object for defining asset parameters from dlt resources.
Examples
Loading Hubspot data to Snowflake with an auto materialize policy using the dlt verified source:
class HubspotDltDagsterTranslator(DltDagsterTranslator):
@public
def get_auto_materialize_policy(self, resource: DltResource) -> Optional[AutoMaterializePolicy]:
return AutoMaterializePolicy.eager().with_rules(
AutoMaterializeRule.materialize_on_cron("0 0 * * *")
)
@dlt_assets(
dlt_source=hubspot(include_history=True),
dlt_pipeline=pipeline(
pipeline_name="hubspot",
dataset_name="hubspot",
destination="snowflake",
progress="log",
),
name="hubspot",
group_name="hubspot",
dlt_dagster_translator=HubspotDltDagsterTranslator(),
)
def hubspot_assets(context: AssetExecutionContext, dlt: DltDagsterResource):
yield from dlt.run(context=context)
Loading Github issues to snowflake:
@dlt_assets(
dlt_source=github_reactions(
"dagster-io", "dagster", items_per_page=100, max_items=250
),
dlt_pipeline=pipeline(
pipeline_name="github_issues",
dataset_name="github",
destination="snowflake",
progress="log",
),
name="github",
group_name="github",
)
def github_reactions_dagster_assets(context: AssetExecutionContext, dlt: DltDagsterResource):
yield from dlt.run(context=context)
Build a list of asset specs from a dlt source and pipeline.
dlt_source (DltSource) – dlt source object
dlt_pipeline (Pipeline) – dlt pipeline object
dagster_dlt_translator (Optional[DagsterDltTranslator]) – Allows customizing how to map dlt project to asset keys and asset metadata.
List[AssetSpec] list of asset specs from dlt source and pipeline
Defines asset key for a given dlt resource key and dataset name.
This method can be overriden to provide custom asset key for a dlt resource.
resource (DltResource) – dlt resource
AssetKey of Dagster asset derived from dlt resource
Defines resource specific auto materialize policy.
This method can be overriden to provide custom auto materialize policy for a dlt resource.
resource (DltResource) – dlt resource
The automaterialize policy for a resource
Optional[AutoMaterializePolicy]
Defines upstream asset dependencies given a dlt resource.
Defaults to a concatenation of resource.source_name and resource.name.
resource (DltResource) – dlt resource
The Dagster asset keys upstream of dlt_resource_key.
Iterable[AssetKey]
A method that takes in a dlt resource returns the Dagster description of the resource.
This method can be overriden to provide a custom description for a dlt resource.
resource (DltResource) – dlt resource
The Dagster description for the dlt resource.
Optional[str]
A method that takes in a dlt resource and returns the Dagster group name of the resource.
This method can be overriden to provide a custom group name for a dlt resource.
resource (DltResource) – dlt resource
A Dagster group name for the dlt resource.
Optional[str]
Defines resource specific metadata.
resource (DltResource) – dlt resource
The custom metadata entries for this resource.
Mapping[str, Any]
A method that takes in a dlt resource and returns the Dagster owners of the resource.
This method can be overriden to provide custom owners for a dlt resource.
resource (DltResource) – dlt resource
A sequence of Dagster owners for the dlt resource.
Optional[Sequence[str]]
A method that takes in a dlt resource and returns the Dagster tags of the structure.
This method can be overriden to provide custom tags for a dlt resource.
resource (DltResource) – dlt resource
dlt resource.
Optional[Mapping[str, str]]
( experimental ) > This API may break in future versions, even between dot releases.
Runs the dlt pipeline with subset support.
context (Union[OpExecutionContext, AssetExecutionContext]) – Asset or op execution context
dlt_source (Optional[DltSource]) – optional dlt source if resource is used from an @op
dlt_pipeline (Optional[Pipeline]) – optional dlt pipeline if resource is used from an @op
dagster_dlt_translator (Optional[DagsterDltTranslator]) – optional dlt translator if resource is used from an @op
**kwargs (dict[str, Any]) – Keyword args passed to pipeline run method
An iterator of MaterializeResult or AssetMaterialization
Iterator[Union[MaterializeResult, AssetMaterialization]]