Ask AI

Embedded ELT (dagster-embedded-elt)

This package provides a framework for building ELT pipelines with Dagster through helpful pre-built assets and resources. This package currently includes the following integrations:

  • Sling, which provides a simple way to sync data between databases and file systems

  • dlt, or data load tool, which provides a way to load data from systems and APIs

For more information on getting started, see the Embedded ELT documentation.


Sling (dagster-embedded-elt.sling)

Refer to the Sling guide to get started.

Assets (Sling)

@dagster_embedded_elt.sling.sling_assets(*, replication_config, dagster_sling_translator=DagsterSlingTranslator(target_prefix='target'), name=None, partitions_def=None, backfill_policy=None, op_tags=None)[source]

Create a definition for how to materialize a set of Sling replication streams as Dagster assets, as described by a Sling replication config. This will create on Asset for every Sling target stream.

A Sling Replication config is a configuration that maps sources to destinations. For the full spec and descriptions, see Sling’s Documentation.

Parameters:
  • replication_config (Union[Mapping[str, Any], str, Path]) – A path to a Sling replication config, or a dictionary of a replication config.

  • dagster_sling_translator – (DagsterSlingTranslator): Allows customization of how to map a Sling stream to a Dagster AssetKey.

  • (Optional[str] (name) – The name of the op.

  • partitions_def (Optional[PartitionsDefinition]) – The partitions definition for this asset.

  • backfill_policy (Optional[BackfillPolicy]) – The backfill policy for this asset.

  • op_tags (Optional[Mapping[str, Any]]) – The tags for this asset.

Examples

Running a sync by providing a path to a Sling Replication config:

from dagster_embedded_elt.sling import sling_assets, SlingResource, SlingConnectionResource

sling_resource = SlingResource(
    connections=[
        SlingConnectionResource(
            name="MY_POSTGRES", type="postgres", connection_string=EnvVar("POSTGRES_URL")
        ),
        SlingConnectionResource(
            name="MY_DUCKDB",
            type="duckdb",
            connection_string="duckdb:///var/tmp/duckdb.db",
        ),
    ]
)

config_path = "/path/to/replication.yaml"
@sling_assets(replication_config=config_path)
def my_assets(context, sling: SlingResource):
    yield from sling.replicate(context=context)
class dagster_embedded_elt.sling.DagsterSlingTranslator(target_prefix: str = 'target')[source]
get_asset_key(stream_definition)[source]

A function that takes a stream definition from a Sling replication config and returns a Dagster AssetKey.

The stream definition is a dictionary key/value pair where the key is the stream name and the value is a dictionary representing the Sling Replication Stream Config.

For example:

stream_definition = {"public.users":
    {'sql': 'select all_user_id, name from public."all_Users"',
    'object': 'public.all_users'}
}

By default, this returns the class’s target_prefix paramater concatenated with the stream name. A stream named “public.accounts” will create an AssetKey named “target_public_accounts”.

Override this function to customize how to map a Sling stream to a Dagster AssetKey.

Alternatively, you can provide metadata in your Sling replication config to specify the Dagster AssetKey for a stream as follows:

public.users:
   meta:
     dagster:
       asset_key: "mydb_users"
Parameters:

stream_definition (Mapping[str, Any]) – A dictionary representing the stream definition

Returns:

The Dagster AssetKey for the replication stream.

Return type:

AssetKey

Examples

Using a custom mapping for streams:

class CustomSlingTranslator(DagsterSlingTranslator):
    def get_asset_key_for_target(self, stream_definition) -> AssetKey:
        map = {"stream1": "asset1", "stream2": "asset2"}
        return AssetKey(map[stream_name])
get_auto_materialize_policy(stream_definition)[source]
get_deps_asset_key(stream_definition)[source]

A function that takes a stream name from a Sling replication config and returns a Dagster AssetKey for the dependencies of the replication stream.

By default, this returns the stream name. For example, a stream named “public.accounts” will create an AssetKey named “target_public_accounts” and a dependency named “public_accounts”.

Override this function to customize how to map a Sling stream to a Dagster depenency. Alternatively, you can provide metadata in your Sling replication config to specify the Dagster AssetKey for a stream as follows:

public.users:
    meta:
        dagster:
            deps: "sourcedb_users"
Parameters:

stream_name (str) – The name of the stream.

Returns:

The Dagster AssetKey dependency for the replication stream.

Return type:

AssetKey

Examples

Using a custom mapping for streams:

class CustomSlingTranslator(DagsterSlingTranslator):
    def get_deps_asset_key(self, stream_name: str) -> AssetKey:
        map = {"stream1": "asset1", "stream2": "asset2"}
        return AssetKey(map[stream_name])
get_description(stream_definition)[source]
get_freshness_policy(stream_definition)[source]
get_group_name(stream_definition)[source]
get_metadata(stream_definition)[source]
sanitize_stream_name(stream_name)[source]

A function that takes a stream name from a Sling replication config and returns a sanitized name for the stream.

By default, this removes any non-alphanumeric characters from the stream name and replaces them with underscores, while removing any double quotes.

Parameters:

stream_name (str) – The name of the stream.

Examples

Using a custom stream name sanitizer:

class CustomSlingTranslator(DagsterSlingTranslator):
    def sanitize_stream_name(self, stream_name: str) -> str:
        return stream_name.replace(".", "")
dagster_embedded_elt.sling.build_sling_asset(asset_spec, source_stream, target_object, mode=SlingMode.FULL_REFRESH, primary_key=None, update_key=None, source_options=None, target_options=None, sling_resource_key='sling')[source]

deprecated This API will be removed in version 0.23.0.

Use @sling_assets instead..

Asset Factory for using Sling to sync data from a source stream to a target object.

Parameters:
  • asset_spec (AssetSpec) – The AssetSpec to use to materialize this asset.

  • source_stream (str) – The source stream to sync from. This can be a table, a query, or a path.

  • target_object (str) – The target object to sync to. This can be a table, or a path.

  • mode (SlingMode, optional) – The sync mode to use when syncing. Defaults to SlingMode.FULL_REFRESH.

  • primary_key (Optional[Union[str, List[str]]], optional) – The optional primary key to use when syncing.

  • update_key (Optional[str], optional) – The optional update key to use when syncing.

  • source_options (Optional[Dict[str, Any]], optional) – Any optional Sling source options to use when syncing.

  • target_options (Optional[Dict[str, Any]], optional) – Any optional target options to use when syncing.

  • sling_resource_key (str, optional) – The resource key for the SlingResource. Defaults to “sling”.

Examples

Creating a Sling asset that syncs from a file to a table:

asset_spec = AssetSpec(key=["main", "dest_tbl"])
asset_def = build_sling_asset(
        asset_spec=asset_spec,
        source_stream="file:///tmp/test.csv",
        target_object="main.dest_table",
        mode=SlingMode.INCREMENTAL,
        primary_key="id"
)

Creating a Sling asset that syncs from a table to a file with a full refresh:

asset_spec = AssetSpec(key="test.csv")
asset_def = build_sling_asset(
        asset_spec=asset_spec,
        source_stream="main.dest_table",
        target_object="file:///tmp/test.csv",
        mode=SlingMode.FULL_REFRESH
)

Resources (Sling)

class dagster_embedded_elt.sling.SlingResource(*, source_connection=None, target_connection=None, connections=[])[source]

experimental This API may break in future versions, even between dot releases.

Resource for interacting with the Sling package. This resource can be used to run Sling replications.

Parameters:

Examples

from dagster_etl.sling import SlingResource, SlingConnectionResource

sling_resource = SlingResource(
    connections=[
        SlingConnectionResource(
            name="MY_POSTGRES",
            type="postgres",
            connection_string=EnvVar("POSTGRES_CONNECTION_STRING"),
        ),
        SlingConnectionResource(
            name="MY_SNOWFLAKE",
            type="snowflake",
            host=EnvVar("SNOWFLAKE_HOST"),
            user=EnvVar("SNOWFLAKE_USER"),
            database=EnvVar("SNOWFLAKE_DATABASE"),
            password=EnvVar("SNOWFLAKE_PASSWORD"),
            role=EnvVar("SNOWFLAKE_ROLE"),
        ),
    ]
)
class dagster_embedded_elt.sling.SlingConnectionResource(*, name, type, connection_string=None, **config_dict)[source]

A representation of a connection to a database or file to be used by Sling. This resource can be used as a source or a target for a Sling syncs.

Reference the Sling docs for more information on possible connection types and parameters: https://docs.slingdata.io/connections

The name of the connection is passed to Sling and must match the name of the connection provided in the replication configuration: https://docs.slingdata.io/sling-cli/run/configuration/replication You may provide either a connection string or keyword arguments for the connection.

Examples

Creating a Sling Connection for a file, such as CSV or JSON:

source = SlingConnectionResource(name="MY_FILE", type="file")

Create a Sling Connection for a Postgres database, using a connection string:

postgres_conn = SlingConnectionResource(name="MY_POSTGRES", type="postgres", connection_string=EnvVar("POSTGRES_CONNECTION_STRING"))
mysql_conn = SlingConnectionResource(name="MY_MYSQL", type="mysql", connection_string="mysql://user:password@host:port/schema")

Create a Sling Connection for a Postgres or Snowflake database, using keyword arguments:

class dagster_embedded_elt.sling.resources.SlingSourceConnection(*, type, connection_string=None, **config_dict)[source]

deprecated This API will be removed in version 0.23.0.

SlingSourceConnection has been deprecated, use SlingConnectionResource for both source and target connections..

A Sling Source Connection defines the source connection used by SlingResource.

Examples

Creating a Sling Source for a file, such as CSV or JSON:

source = SlingSourceConnection(type="file")

Create a Sling Source for a Postgres database, using a connection string:

source = SlingTargetConnection(type="postgres", connection_string=EnvVar("POSTGRES_CONNECTION_STRING"))
source = SlingSourceConnection(type="postgres", connection_string="postgresql://user:password@host:port/schema")

Create a Sling Source for a Postgres database, using keyword arguments, as described here: https://docs.slingdata.io/connections/database-connections/postgres

source = SlingTargetConnection(type="postgres", host="host", user="hunter42", password=EnvVar("POSTGRES_PASSWORD"))
class dagster_embedded_elt.sling.resources.SlingTargetConnection(*, type, connection_string=None, **config_dict)[source]

deprecated This API will be removed in version 0.23.0.

SlingTargetConnection has been deprecated, use SlingConnectionResource for both source and target connections..

A Sling Target Connection defines the target connection used by SlingResource.

Examples

Creating a Sling Target for a file, such as CSV or JSON:

source = SlingTargetConnection(type="file")

Create a Sling Source for a Postgres database, using a connection string:

source = SlingTargetConnection(type="postgres", connection_string="postgresql://user:password@host:port/schema"
source = SlingTargetConnection(type="postgres", connection_string=EnvVar("POSTGRES_CONNECTION_STRING"))

Create a Sling Source for a Postgres database, using keyword arguments, as described here: https://docs.slingdata.io/connections/database-connections/postgres


dlt (dagster-embedded-elt.dlt)

Refer to the dlt guide to get started.

Assets (dlt)

@dagster_embedded_elt.dlt.dlt_assets(*, dlt_source, dlt_pipeline, name=None, group_name=None, dlt_dagster_translator=DagsterDltTranslator())[source]

Asset Factory for using data load tool (dlt).

Parameters:
  • dlt_source (DltSource) – The DltSource to be ingested.

  • dlt_pipeline (Pipeline) – The dlt Pipeline defining the destination parameters.

  • name (Optional[str], optional) – The name of the op.

  • group_name (Optional[str], optional) – The name of the asset group.

  • dlt_dagster_translator (DltDagsterTranslator, optional) – Customization object for defining asset parameters from dlt resources.

Examples

Loading Hubspot data to Snowflake with an auto materialize policy using the dlt verified source:

class HubspotDltDagsterTranslator(DltDagsterTranslator):
    @public
    def get_auto_materialize_policy(self, resource: DltResource) -> Optional[AutoMaterializePolicy]:
        return AutoMaterializePolicy.eager().with_rules(
            AutoMaterializeRule.materialize_on_cron("0 0 * * *")
        )


@dlt_assets(
    dlt_source=hubspot(include_history=True),
    dlt_pipeline=pipeline(
        pipeline_name="hubspot",
        dataset_name="hubspot",
        destination="snowflake",
    ),
    name="hubspot",
    group_name="hubspot",
    dlt_dagster_translator=HubspotDltDagsterTranslator(),
)
def hubspot_assets(context: AssetExecutionContext, dlt: DltDagsterResource):
    yield from dlt.run(context=context)

Loading Github issues to snowflake:

@dlt_assets(
    dlt_source=github_reactions(
        "dagster-io", "dagster", items_per_page=100, max_items=250
    ),
    dlt_pipeline=pipeline(
        pipeline_name="github_issues",
        dataset_name="github",
        destination="snowflake",
    ),
    name="github",
    group_name="github",
)
def github_reactions_dagster_assets(context: AssetExecutionContext, dlt: DltDagsterResource):
    yield from dlt.run(context=context)
class dagster_embedded_elt.dlt.DagsterDltTranslator[source]
get_asset_key(resource)[source]

Defines asset key for a given dlt resource key and dataset name.

Parameters:

resource (DltResource) – dlt resource / transformer

Returns:

AssetKey of Dagster asset derived from dlt resource

get_auto_materialize_policy(resource)[source]

Defines resource specific auto materialize policy.

Parameters:

resource (DltResource) – dlt resource / transformer

Returns:

The automaterialize policy for a resource

Return type:

Optional[AutoMaterializePolicy]

get_deps_asset_keys(resource)[source]

Defines upstream asset dependencies given a dlt resource.

Defaults to a concatenation of resource.source_name and resource.name.

Parameters:

resource (DltResource) – dlt resource / transformer

Returns:

The Dagster asset keys upstream of dlt_resource_key.

Return type:

Iterable[AssetKey]

Resources (dlt)

class dagster_embedded_elt.dlt.DagsterDltResource[source]

experimental This API may break in future versions, even between dot releases.

run(context, dlt_source=None, dlt_pipeline=None, dagster_dlt_translator=None, **kwargs)[source]

Runs the dlt pipeline with subset support.

Parameters:
  • context (Union[OpExecutionContext, AssetExecutionContext]) – Asset or op execution context

  • dlt_source (Optional[DltSource]) – optional dlt source if resource is used from an @op

  • dlt_pipeline (Optional[Pipeline]) – optional dlt pipeline if resource is used from an @op

  • dagster_dlt_translator (Optional[DagsterDltTranslator]) – optional dlt translator if resource is used from an @op

  • **kwargs (dict[str, Any]) – Keyword args passed to pipeline run method

Returns:

An iterator of MaterializeResult or AssetMaterialization

Return type:

Iterator[Union[MaterializeResult, AssetMaterialization]]