Ask AI

Embedded ELT (dagster-embedded-elt)

This package provides a framework for building ELT pipelines with Dagster through helpful pre-built assets and resources. This package currently includes the following integrations:

  • Sling, which provides a simple way to sync data between databases and file systems

  • dlt, or data load tool, which provides a way to load data from systems and APIs

For more information on getting started, see the Embedded ELT documentation.


Sling (dagster-embedded-elt.sling)

Refer to the Sling guide to get started.

Assets (Sling)

@dagster_embedded_elt.sling.sling_assets(*, replication_config, dagster_sling_translator=None, name=None, partitions_def=None, backfill_policy=None, op_tags=None)[source]

Create a definition for how to materialize a set of Sling replication streams as Dagster assets, as described by a Sling replication config. This will create on Asset for every Sling target stream.

A Sling Replication config is a configuration that maps sources to destinations. For the full spec and descriptions, see Sling’s Documentation.

Parameters:
  • replication_config (Union[Mapping[str, Any], str, Path]) – A path to a Sling replication config, or a dictionary of a replication config.

  • dagster_sling_translator – (DagsterSlingTranslator): Allows customization of how to map a Sling stream to a Dagster AssetKey.

  • (Optional[str] (name) – The name of the op.

  • partitions_def (Optional[PartitionsDefinition]) – The partitions definition for this asset.

  • backfill_policy (Optional[BackfillPolicy]) – The backfill policy for this asset.

  • op_tags (Optional[Mapping[str, Any]]) – The tags for this asset.

Examples

Running a sync by providing a path to a Sling Replication config:

from dagster_embedded_elt.sling import sling_assets, SlingResource, SlingConnectionResource

sling_resource = SlingResource(
    connections=[
        SlingConnectionResource(
            name="MY_POSTGRES", type="postgres", connection_string=EnvVar("POSTGRES_URL")
        ),
        SlingConnectionResource(
            name="MY_DUCKDB",
            type="duckdb",
            connection_string="duckdb:///var/tmp/duckdb.db",
        ),
    ]
)

config_path = "/path/to/replication.yaml"
@sling_assets(replication_config=config_path)
def my_assets(context, sling: SlingResource):
    yield from sling.replicate(context=context)
class dagster_embedded_elt.sling.DagsterSlingTranslator(target_prefix: str = 'target')[source]
get_asset_key(stream_definition)[source]

A function that takes a stream definition from a Sling replication config and returns a Dagster AssetKey.

The stream definition is a dictionary key/value pair where the key is the stream name and the value is a dictionary representing the Sling Replication Stream Config.

For example:

stream_definition = {"public.users":
    {'sql': 'select all_user_id, name from public."all_Users"',
    'object': 'public.all_users'}
}

By default, this returns the class’s target_prefix paramater concatenated with the stream name. A stream named “public.accounts” will create an AssetKey named “target_public_accounts”.

Override this function to customize how to map a Sling stream to a Dagster AssetKey.

Alternatively, you can provide metadata in your Sling replication config to specify the Dagster AssetKey for a stream as follows:

public.users:
   meta:
     dagster:
       asset_key: "mydb_users"
Parameters:

stream_definition (Mapping[str, Any]) – A dictionary representing the stream definition

Returns:

The Dagster AssetKey for the replication stream.

Return type:

AssetKey

Examples

Using a custom mapping for streams:

class CustomSlingTranslator(DagsterSlingTranslator):
    def get_asset_key_for_target(self, stream_definition) -> AssetKey:
        map = {"stream1": "asset1", "stream2": "asset2"}
        return AssetKey(map[stream_name])
get_auto_materialize_policy(stream_definition)[source]

Defines the auto-materialize policy for a given stream definition.

This method checks the provided stream definition for a specific configuration indicating an auto-materialize policy. If the configuration is found, it returns an eager auto-materialize policy. Otherwise, it returns None.

Parameters:
  • stream_definition (Mapping[str, Any]) – A dictionary representing the stream definition,

  • details. (which includes configuration)

Returns:

An eager auto-materialize policy if the configuration is found, otherwise None.

Return type:

Optional[AutoMaterializePolicy]

get_deps_asset_key(stream_definition)[source]

A function that takes a stream name from a Sling replication config and returns a Dagster AssetKey for the dependencies of the replication stream.

By default, this returns the stream name. For example, a stream named “public.accounts” will create an AssetKey named “target_public_accounts” and a dependency named “public_accounts”.

Override this function to customize how to map a Sling stream to a Dagster depenency. Alternatively, you can provide metadata in your Sling replication config to specify the Dagster AssetKey for a stream as follows:

public.users:
    meta:
        dagster:
            deps: "sourcedb_users"
Parameters:

stream_name (str) – The name of the stream.

Returns:

The Dagster AssetKey dependency for the replication stream.

Return type:

AssetKey

Examples

Using a custom mapping for streams:

class CustomSlingTranslator(DagsterSlingTranslator):
    def get_deps_asset_key(self, stream_name: str) -> AssetKey:
        map = {"stream1": "asset1", "stream2": "asset2"}
        return AssetKey(map[stream_name])
get_description(stream_definition)[source]

Retrieves the description for a given stream definition.

This method checks the provided stream definition for a description. It first looks for an “sql” key in the configuration and returns its value if found. If not, it looks for a description in the metadata under the “dagster” key.

Parameters:
  • stream_definition (Mapping[str, Any]) – A dictionary representing the stream definition,

  • details. (which includes configuration)

Returns:

The description of the stream if found, otherwise None.

Return type:

Optional[str]

get_freshness_policy(stream_definition)[source]

Retrieves the freshness policy for a given stream definition.

This method checks the provided stream definition for a specific configuration indicating a freshness policy. If the configuration is found, it constructs and returns a FreshnessPolicy object based on the provided parameters. Otherwise, it returns None.

Parameters:
  • stream_definition (Mapping[str, Any]) – A dictionary representing the stream definition,

  • details. (which includes configuration)

Returns:

A FreshnessPolicy object if the configuration is found, otherwise None.

Return type:

Optional[FreshnessPolicy]

get_group_name(stream_definition)[source]

Retrieves the group name for a given stream definition.

This method checks the provided stream definition for a group name in the metadata under the “dagster” key.

Parameters:
  • stream_definition (Mapping[str, Any]) – A dictionary representing the stream definition,

  • details. (which includes configuration)

Returns:

The group name if found, otherwise None.

Return type:

Optional[str]

get_metadata(stream_definition)[source]

Retrieves the metadata for a given stream definition.

This method extracts the configuration from the provided stream definition and returns it as a JSON metadata value.

Parameters:
  • stream_definition (Mapping[str, Any]) – A dictionary representing the stream definition,

  • details. (which includes configuration)

Returns:

A dictionary containing the stream configuration as JSON metadata.

Return type:

Mapping[str, Any]

get_tags(stream_definition)[source]

Retrieves the tags for a given stream definition.

This method returns an empty dictionary, indicating that no tags are associated with the stream definition by default. This method can be overridden to provide custom tags.

Parameters:
  • stream_definition (Mapping[str, Any]) – A dictionary representing the stream definition,

  • details. (which includes configuration)

Returns:

An empty dictionary.

Return type:

Mapping[str, Any]

sanitize_stream_name(stream_name)[source]

A function that takes a stream name from a Sling replication config and returns a sanitized name for the stream.

By default, this removes any non-alphanumeric characters from the stream name and replaces them with underscores, while removing any double quotes.

Parameters:

stream_name (str) – The name of the stream.

Examples

Using a custom stream name sanitizer:

class CustomSlingTranslator(DagsterSlingTranslator):
    def sanitize_stream_name(self, stream_name: str) -> str:
        return stream_name.replace(".", "")

Resources (Sling)

class dagster_embedded_elt.sling.SlingResource(*, connections=[])[source]

Resource for interacting with the Sling package. This resource can be used to run Sling replications.

Parameters:

connections (List[SlingConnectionResource]) – A list of connections to use for the replication.

Examples

from dagster_etl.sling import SlingResource, SlingConnectionResource

sling_resource = SlingResource(
    connections=[
        SlingConnectionResource(
            name="MY_POSTGRES",
            type="postgres",
            connection_string=EnvVar("POSTGRES_CONNECTION_STRING"),
        ),
        SlingConnectionResource(
            name="MY_SNOWFLAKE",
            type="snowflake",
            host=EnvVar("SNOWFLAKE_HOST"),
            user=EnvVar("SNOWFLAKE_USER"),
            database=EnvVar("SNOWFLAKE_DATABASE"),
            password=EnvVar("SNOWFLAKE_PASSWORD"),
            role=EnvVar("SNOWFLAKE_ROLE"),
        ),
    ]
)
class dagster_embedded_elt.sling.SlingConnectionResource(*, name, type, connection_string=None, **config_dict)[source]

A representation of a connection to a database or file to be used by Sling. This resource can be used as a source or a target for a Sling syncs.

Reference the Sling docs for more information on possible connection types and parameters: https://docs.slingdata.io/connections

The name of the connection is passed to Sling and must match the name of the connection provided in the replication configuration: https://docs.slingdata.io/sling-cli/run/configuration/replication You may provide either a connection string or keyword arguments for the connection.

Examples

Creating a Sling Connection for a file, such as CSV or JSON:

source = SlingConnectionResource(name="MY_FILE", type="file")

Create a Sling Connection for a Postgres database, using a connection string:

postgres_conn = SlingConnectionResource(name="MY_POSTGRES", type="postgres", connection_string=EnvVar("POSTGRES_CONNECTION_STRING"))
mysql_conn = SlingConnectionResource(name="MY_MYSQL", type="mysql", connection_string="mysql://user:password@host:port/schema")

Create a Sling Connection for a Postgres or Snowflake database, using keyword arguments:


dlt (dagster-embedded-elt.dlt)

Refer to the dlt guide to get started.

Assets (dlt)

@dagster_embedded_elt.dlt.dlt_assets(*, dlt_source, dlt_pipeline, name=None, group_name=None, dlt_dagster_translator=None, dagster_dlt_translator=None, partitions_def=None)[source]

Asset Factory for using data load tool (dlt).

Parameters:
  • dlt_source (DltSource) – The DltSource to be ingested.

  • dlt_pipeline (Pipeline) – The dlt Pipeline defining the destination parameters.

  • name (Optional[str], optional) – The name of the op.

  • group_name (Optional[str], optional) – The name of the asset group.

  • dlt_dagster_translator (DltDagsterTranslator, optional) – deprecated (This parameter will be removed in version 1.9. Use dagster_dlt_translator instead.) Customization object for defining asset parameters from dlt resources.

Examples

Loading Hubspot data to Snowflake with an auto materialize policy using the dlt verified source:

class HubspotDltDagsterTranslator(DltDagsterTranslator):
    @public
    def get_auto_materialize_policy(self, resource: DltResource) -> Optional[AutoMaterializePolicy]:
        return AutoMaterializePolicy.eager().with_rules(
            AutoMaterializeRule.materialize_on_cron("0 0 * * *")
        )


@dlt_assets(
    dlt_source=hubspot(include_history=True),
    dlt_pipeline=pipeline(
        pipeline_name="hubspot",
        dataset_name="hubspot",
        destination="snowflake",
        progress="log",
    ),
    name="hubspot",
    group_name="hubspot",
    dlt_dagster_translator=HubspotDltDagsterTranslator(),
)
def hubspot_assets(context: AssetExecutionContext, dlt: DltDagsterResource):
    yield from dlt.run(context=context)

Loading Github issues to snowflake:

@dlt_assets(
    dlt_source=github_reactions(
        "dagster-io", "dagster", items_per_page=100, max_items=250
    ),
    dlt_pipeline=pipeline(
        pipeline_name="github_issues",
        dataset_name="github",
        destination="snowflake",
        progress="log",
    ),
    name="github",
    group_name="github",
)
def github_reactions_dagster_assets(context: AssetExecutionContext, dlt: DltDagsterResource):
    yield from dlt.run(context=context)
dagster_embedded_elt.dlt.build_dlt_asset_specs(dlt_source, dlt_pipeline, dagster_dlt_translator=None)[source]

Build a list of asset specs from a dlt source and pipeline.

Parameters:
  • dlt_source (DltSource) – dlt source object

  • dlt_pipeline (Pipeline) – dlt pipeline object

  • dagster_dlt_translator (Optional[DagsterDltTranslator]) – Allows customizing how to map dlt project to asset keys and asset metadata.

Returns:

List[AssetSpec] list of asset specs from dlt source and pipeline

class dagster_embedded_elt.dlt.DagsterDltTranslator[source]
get_asset_key(resource)[source]

Defines asset key for a given dlt resource key and dataset name.

This method can be overriden to provide custom asset key for a dlt resource.

Parameters:

resource (DltResource) – dlt resource

Returns:

AssetKey of Dagster asset derived from dlt resource

get_auto_materialize_policy(resource)[source]

Defines resource specific auto materialize policy.

This method can be overriden to provide custom auto materialize policy for a dlt resource.

Parameters:

resource (DltResource) – dlt resource

Returns:

The automaterialize policy for a resource

Return type:

Optional[AutoMaterializePolicy]

get_deps_asset_keys(resource)[source]

Defines upstream asset dependencies given a dlt resource.

Defaults to a concatenation of resource.source_name and resource.name.

Parameters:

resource (DltResource) – dlt resource

Returns:

The Dagster asset keys upstream of dlt_resource_key.

Return type:

Iterable[AssetKey]

get_description(resource)[source]

A method that takes in a dlt resource returns the Dagster description of the resource.

This method can be overriden to provide a custom description for a dlt resource.

Parameters:

resource (DltResource) – dlt resource

Returns:

The Dagster description for the dlt resource.

Return type:

Optional[str]

get_group_name(resource)[source]

A method that takes in a dlt resource and returns the Dagster group name of the resource.

This method can be overriden to provide a custom group name for a dlt resource.

Parameters:

resource (DltResource) – dlt resource

Returns:

A Dagster group name for the dlt resource.

Return type:

Optional[str]

get_metadata(resource)[source]

Defines resource specific metadata.

Parameters:

resource (DltResource) – dlt resource

Returns:

The custom metadata entries for this resource.

Return type:

Mapping[str, Any]

get_owners(resource)[source]

A method that takes in a dlt resource and returns the Dagster owners of the resource.

This method can be overriden to provide custom owners for a dlt resource.

Parameters:

resource (DltResource) – dlt resource

Returns:

A sequence of Dagster owners for the dlt resource.

Return type:

Optional[Sequence[str]]

get_tags(resource)[source]

A method that takes in a dlt resource and returns the Dagster tags of the structure.

This method can be overriden to provide custom tags for a dlt resource.

Parameters:

resource (DltResource) – dlt resource

Returns:

A dictionary representing the Dagster tags for the

dlt resource.

Return type:

Optional[Mapping[str, str]]

Resources (dlt)

class dagster_embedded_elt.dlt.DagsterDltResource[source]

experimental This API may break in future versions, even between dot releases.

run(context, dlt_source=None, dlt_pipeline=None, dagster_dlt_translator=None, **kwargs)[source]

Runs the dlt pipeline with subset support.

Parameters:
  • context (Union[OpExecutionContext, AssetExecutionContext]) – Asset or op execution context

  • dlt_source (Optional[DltSource]) – optional dlt source if resource is used from an @op

  • dlt_pipeline (Optional[Pipeline]) – optional dlt pipeline if resource is used from an @op

  • dagster_dlt_translator (Optional[DagsterDltTranslator]) – optional dlt translator if resource is used from an @op

  • **kwargs (dict[str, Any]) – Keyword args passed to pipeline run method

Returns:

An iterator of MaterializeResult or AssetMaterialization

Return type:

Iterator[Union[MaterializeResult, AssetMaterialization]]