Ask AI

Source code for dagster_embedded_elt.sling.dagster_sling_translator

import re
from dataclasses import dataclass
from typing import Any, Iterable, Mapping, Optional

from dagster import (
    AssetKey,
    AutoMaterializePolicy,
    FreshnessPolicy,
    MetadataValue,
)
from dagster._annotations import public


[docs]@dataclass class DagsterSlingTranslator: target_prefix: str = "target"
[docs] @public def sanitize_stream_name(self, stream_name: str) -> str: """A function that takes a stream name from a Sling replication config and returns a sanitized name for the stream. By default, this removes any non-alphanumeric characters from the stream name and replaces them with underscores, while removing any double quotes. Args: stream_name (str): The name of the stream. Examples: Using a custom stream name sanitizer: .. code-block:: python class CustomSlingTranslator(DagsterSlingTranslator): def sanitize_stream_name(self, stream_name: str) -> str: return stream_name.replace(".", "") """ return re.sub(r"[^a-zA-Z0-9_.]", "_", stream_name.replace('"', "").lower())
[docs] @public def get_asset_key(self, stream_definition: Mapping[str, Any]) -> AssetKey: """A function that takes a stream definition from a Sling replication config and returns a Dagster AssetKey. The stream definition is a dictionary key/value pair where the key is the stream name and the value is a dictionary representing the Sling Replication Stream Config. For example: .. code-block:: python stream_definition = {"public.users": {'sql': 'select all_user_id, name from public."all_Users"', 'object': 'public.all_users'} } By default, this returns the class's target_prefix paramater concatenated with the stream name. A stream named "public.accounts" will create an AssetKey named "target_public_accounts". Override this function to customize how to map a Sling stream to a Dagster AssetKey. Alternatively, you can provide metadata in your Sling replication config to specify the Dagster AssetKey for a stream as follows: .. code-block:: yaml public.users: meta: dagster: asset_key: "mydb_users" Args: stream_definition (Mapping[str, Any]): A dictionary representing the stream definition Returns: AssetKey: The Dagster AssetKey for the replication stream. Examples: Using a custom mapping for streams: .. code-block:: python class CustomSlingTranslator(DagsterSlingTranslator): def get_asset_key_for_target(self, stream_definition) -> AssetKey: map = {"stream1": "asset1", "stream2": "asset2"} return AssetKey(map[stream_name]) """ config = stream_definition.get("config", {}) or {} object_key = config.get("object") meta = config.get("meta", {}) asset_key = meta.get("dagster", {}).get("asset_key") if asset_key: if self.sanitize_stream_name(asset_key) != asset_key: raise ValueError( f"Asset key {asset_key} for stream {stream_definition['name']} is not " "sanitized. Please use only alphanumeric characters and underscores." ) return AssetKey(asset_key.split(".")) # You can override the Sling Replication default object with an object key stream_name = object_key or stream_definition["name"] sanitized_components = self.sanitize_stream_name(stream_name).split(".") return AssetKey([self.target_prefix] + sanitized_components)
[docs] @public def get_deps_asset_key(self, stream_definition: Mapping[str, Any]) -> Iterable[AssetKey]: """A function that takes a stream name from a Sling replication config and returns a Dagster AssetKey for the dependencies of the replication stream. By default, this returns the stream name. For example, a stream named "public.accounts" will create an AssetKey named "target_public_accounts" and a dependency named "public_accounts". Override this function to customize how to map a Sling stream to a Dagster depenency. Alternatively, you can provide metadata in your Sling replication config to specify the Dagster AssetKey for a stream as follows: .. code-block:: yaml public.users: meta: dagster: deps: "sourcedb_users" Args: stream_name (str): The name of the stream. Returns: AssetKey: The Dagster AssetKey dependency for the replication stream. Examples: Using a custom mapping for streams: .. code-block:: python class CustomSlingTranslator(DagsterSlingTranslator): def get_deps_asset_key(self, stream_name: str) -> AssetKey: map = {"stream1": "asset1", "stream2": "asset2"} return AssetKey(map[stream_name]) """ config = stream_definition.get("config", {}) or {} meta = config.get("meta", {}) deps = meta.get("dagster", {}).get("deps") deps_out = [] if deps and isinstance(deps, str): deps = [deps] if deps: assert isinstance(deps, list) for asset_key in deps: if self.sanitize_stream_name(asset_key) != asset_key: raise ValueError( f"Deps Asset key {asset_key} for stream {stream_definition['name']} is not " "sanitized. Please use only alphanumeric characters and underscores." ) deps_out.append(AssetKey(asset_key.split("."))) return deps_out stream_name = stream_definition["name"] components = self.sanitize_stream_name(stream_name).split(".") return [AssetKey(components)]
[docs] @public def get_description(self, stream_definition: Mapping[str, Any]) -> Optional[str]: config = stream_definition.get("config", {}) or {} if "sql" in config: return config["sql"] meta = config.get("meta", {}) description = meta.get("dagster", {}).get("description") return description
[docs] @public def get_metadata(self, stream_definition: Mapping[str, Any]) -> Mapping[str, Any]: return {"stream_config": MetadataValue.json(stream_definition.get("config", {}))}
[docs] @public def get_group_name(self, stream_definition: Mapping[str, Any]) -> Optional[str]: config = stream_definition.get("config", {}) or {} meta = config.get("meta", {}) return meta.get("dagster", {}).get("group")
[docs] @public def get_freshness_policy( self, stream_definition: Mapping[str, Any] ) -> Optional[FreshnessPolicy]: config = stream_definition.get("config", {}) or {} meta = config.get("meta", {}) freshness_policy_config = meta.get("dagster", {}).get("freshness_policy") if freshness_policy_config: return FreshnessPolicy( maximum_lag_minutes=float(freshness_policy_config["maximum_lag_minutes"]), cron_schedule=freshness_policy_config.get("cron_schedule"), cron_schedule_timezone=freshness_policy_config.get("cron_schedule_timezone"), )
[docs] @public def get_auto_materialize_policy( self, stream_definition: Mapping[str, Any] ) -> Optional[AutoMaterializePolicy]: config = stream_definition.get("config", {}) or {} meta = config.get("meta", {}) auto_materialize_policy_config = "auto_materialize_policy" in meta.get("dagster", {}) if auto_materialize_policy_config: return AutoMaterializePolicy.eager()