Ask AI

Source code for dagster_embedded_elt.sling.asset_decorator

from copy import deepcopy
from typing import Any, Callable, Iterable, Mapping, Optional

from dagster import (
    AssetsDefinition,
    AssetSpec,
    BackfillPolicy,
    PartitionsDefinition,
    _check as check,
    multi_asset,
)
from dagster._utils.merger import deep_merge_dicts
from dagster._utils.security import non_secure_md5_hash_str

from dagster_embedded_elt.sling.dagster_sling_translator import DagsterSlingTranslator
from dagster_embedded_elt.sling.sling_replication import SlingReplicationParam, validate_replication

METADATA_KEY_TRANSLATOR = "dagster_embedded_elt/dagster_sling_translator"
METADATA_KEY_REPLICATION_CONFIG = "dagster_embedded_elt/sling_replication_config"


def get_streams_from_replication(
    replication_config: Mapping[str, Any],
) -> Iterable[Mapping[str, Any]]:
    """Returns a list of streams and their configs from a Sling replication config."""
    for stream, config in replication_config.get("streams", {}).items():
        if config and config.get("disabled", False):
            continue
        yield {"name": stream, "config": config}


def streams_with_default_dagster_meta(
    streams: Iterable[Mapping[str, Any]], replication_config: Mapping[str, Any]
) -> Iterable[Mapping[str, Any]]:
    """Ensures dagster meta configs in the `defaults` block of the replication_config are passed to
    the assets definition object.
    """
    default_dagster_meta = replication_config.get("defaults", {}).get("meta", {}).get("dagster", {})
    if not default_dagster_meta:
        yield from streams
    else:
        for stream in streams:
            name = stream["name"]
            config = deepcopy(stream["config"])
            if not config:
                config = {"meta": {"dagster": default_dagster_meta}}
            else:
                config["meta"] = deep_merge_dicts(
                    {"dagster": default_dagster_meta}, config.get("meta", {})
                )
            yield {"name": name, "config": config}


[docs] def sling_assets( *, replication_config: SlingReplicationParam, dagster_sling_translator: Optional[DagsterSlingTranslator] = None, name: Optional[str] = None, partitions_def: Optional[PartitionsDefinition] = None, backfill_policy: Optional[BackfillPolicy] = None, op_tags: Optional[Mapping[str, Any]] = None, ) -> Callable[[Callable[..., Any]], AssetsDefinition]: """Create a definition for how to materialize a set of Sling replication streams as Dagster assets, as described by a Sling replication config. This will create on Asset for every Sling target stream. A Sling Replication config is a configuration that maps sources to destinations. For the full spec and descriptions, see `Sling's Documentation <https://docs.slingdata.io/sling-cli/run/configuration>`_. Args: replication_config (Union[Mapping[str, Any], str, Path]): A path to a Sling replication config, or a dictionary of a replication config. dagster_sling_translator: (DagsterSlingTranslator): Allows customization of how to map a Sling stream to a Dagster AssetKey. name (Optional[str]: The name of the op. partitions_def (Optional[PartitionsDefinition]): The partitions definition for this asset. backfill_policy (Optional[BackfillPolicy]): The backfill policy for this asset. op_tags (Optional[Mapping[str, Any]]): The tags for the underlying op. Examples: Running a sync by providing a path to a Sling Replication config: .. code-block:: python from dagster_embedded_elt.sling import sling_assets, SlingResource, SlingConnectionResource sling_resource = SlingResource( connections=[ SlingConnectionResource( name="MY_POSTGRES", type="postgres", connection_string=EnvVar("POSTGRES_URL") ), SlingConnectionResource( name="MY_DUCKDB", type="duckdb", connection_string="duckdb:///var/tmp/duckdb.db", ), ] ) config_path = "/path/to/replication.yaml" @sling_assets(replication_config=config_path) def my_assets(context, sling: SlingResource): yield from sling.replicate(context=context) """ replication_config = validate_replication(replication_config) raw_streams = get_streams_from_replication(replication_config) streams = streams_with_default_dagster_meta(raw_streams, replication_config) code_version = non_secure_md5_hash_str(str(replication_config).encode()) dagster_sling_translator = ( check.opt_inst_param( dagster_sling_translator, "dagster_sling_translator", DagsterSlingTranslator ) or DagsterSlingTranslator() ) return multi_asset( name=name, partitions_def=partitions_def, can_subset=True, op_tags=op_tags, backfill_policy=backfill_policy, specs=[ AssetSpec( key=dagster_sling_translator.get_asset_key(stream), deps=dagster_sling_translator.get_deps_asset_key(stream), description=dagster_sling_translator.get_description(stream), metadata={ **dagster_sling_translator.get_metadata(stream), METADATA_KEY_TRANSLATOR: dagster_sling_translator, METADATA_KEY_REPLICATION_CONFIG: replication_config, }, tags=dagster_sling_translator.get_tags(stream), kinds=dagster_sling_translator.get_kinds(stream), group_name=dagster_sling_translator.get_group_name(stream), freshness_policy=dagster_sling_translator.get_freshness_policy(stream), auto_materialize_policy=dagster_sling_translator.get_auto_materialize_policy( stream ), code_version=code_version, ) for stream in streams ], )