Ask AI

Source code for dagster_embedded_elt.sling.resources

import contextlib
import json
import os
import re
import sys
import tempfile
import time
import uuid
from enum import Enum
from subprocess import PIPE, STDOUT, Popen
from typing import IO, Any, AnyStr, Dict, Generator, Iterator, List, Optional, Union

import sling
from dagster import (
    AssetExecutionContext,
    AssetMaterialization,
    ConfigurableResource,
    EnvVar,
    MaterializeResult,
    OpExecutionContext,
    PermissiveConfig,
    get_dagster_logger,
)
from dagster._annotations import deprecated, experimental, public
from dagster._utils.env import environ
from dagster._utils.warnings import deprecation_warning
from pydantic import Field

from dagster_embedded_elt.sling.asset_decorator import (
    METADATA_KEY_REPLICATION_CONFIG,
    METADATA_KEY_TRANSLATOR,
    get_streams_from_replication,
)
from dagster_embedded_elt.sling.dagster_sling_translator import DagsterSlingTranslator
from dagster_embedded_elt.sling.sling_replication import SlingReplicationParam, validate_replication

logger = get_dagster_logger()

ANSI_ESCAPE = re.compile(r"\x1B(?:[@-Z\\-_]|\[[0-?]*[ -/]*[@-~])")
DEPRECATION_WARNING_TEXT = "{name} has been deprecated, use `SlingConnectionResource` for both source and target connections."


@public
class SlingMode(str, Enum):
    """The mode to use when syncing.

    See the Sling docs for more information: https://docs.slingdata.io/sling-cli/run/configuration#modes
    """

    INCREMENTAL = "incremental"
    TRUNCATE = "truncate"
    FULL_REFRESH = "full-refresh"
    SNAPSHOT = "snapshot"
    BACKFILL = "backfill"


[docs]@deprecated( breaking_version="0.23.0", additional_warn_text=DEPRECATION_WARNING_TEXT.format(name="SlingSourceConnection"), ) class SlingSourceConnection(PermissiveConfig): """A Sling Source Connection defines the source connection used by :py:class:`~dagster_elt.sling.SlingResource`. Examples: Creating a Sling Source for a file, such as CSV or JSON: .. code-block:: python source = SlingSourceConnection(type="file") Create a Sling Source for a Postgres database, using a connection string: .. code-block:: python source = SlingTargetConnection(type="postgres", connection_string=EnvVar("POSTGRES_CONNECTION_STRING")) source = SlingSourceConnection(type="postgres", connection_string="postgresql://user:password@host:port/schema") Create a Sling Source for a Postgres database, using keyword arguments, as described here: https://docs.slingdata.io/connections/database-connections/postgres .. code-block:: python source = SlingTargetConnection(type="postgres", host="host", user="hunter42", password=EnvVar("POSTGRES_PASSWORD")) """ type: str = Field(description="Type of the source connection. Use 'file' for local storage.") connection_string: Optional[str] = Field( description="The connection string for the source database.", default=None, )
[docs]@deprecated( breaking_version="0.23.0", additional_warn_text=DEPRECATION_WARNING_TEXT.format(name="SlingTargetConnection"), ) class SlingTargetConnection(PermissiveConfig): """A Sling Target Connection defines the target connection used by :py:class:`~dagster_elt.sling.SlingResource`. Examples: Creating a Sling Target for a file, such as CSV or JSON: .. code-block:: python source = SlingTargetConnection(type="file") Create a Sling Source for a Postgres database, using a connection string: .. code-block:: python source = SlingTargetConnection(type="postgres", connection_string="postgresql://user:password@host:port/schema" source = SlingTargetConnection(type="postgres", connection_string=EnvVar("POSTGRES_CONNECTION_STRING")) Create a Sling Source for a Postgres database, using keyword arguments, as described here: https://docs.slingdata.io/connections/database-connections/postgres .. code-block::python source = SlingTargetConnection(type="postgres", host="host", user="hunter42", password=EnvVar("POSTGRES_PASSWORD")) """ type: str = Field( description="Type of the destination connection. Use 'file' for local storage." ) connection_string: Optional[str] = Field( description="The connection string for the target database.", default=None, )
[docs]@public class SlingConnectionResource(PermissiveConfig): """A representation of a connection to a database or file to be used by Sling. This resource can be used as a source or a target for a Sling syncs. Reference the Sling docs for more information on possible connection types and parameters: https://docs.slingdata.io/connections The name of the connection is passed to Sling and must match the name of the connection provided in the replication configuration: https://docs.slingdata.io/sling-cli/run/configuration/replication You may provide either a connection string or keyword arguments for the connection. Examples: Creating a Sling Connection for a file, such as CSV or JSON: .. code-block:: python source = SlingConnectionResource(name="MY_FILE", type="file") Create a Sling Connection for a Postgres database, using a connection string: .. code-block:: python postgres_conn = SlingConnectionResource(name="MY_POSTGRES", type="postgres", connection_string=EnvVar("POSTGRES_CONNECTION_STRING")) mysql_conn = SlingConnectionResource(name="MY_MYSQL", type="mysql", connection_string="mysql://user:password@host:port/schema") Create a Sling Connection for a Postgres or Snowflake database, using keyword arguments: .. code-block::python postgres_conn = SlingConnectionResource( name="MY_OTHER_POSRGRES", type="postgres", host="host", user="hunter42", password=EnvVar("POSTGRES_PASSWORD") ) snowflake_conn = SlingConnectionResource( name="MY_SNOWFLAKE", type="snowflake", host=EnvVar("SNOWFLAKE_HOST"), user=EnvVar("SNOWFLAKE_USER"), database=EnvVar("SNOWFLAKE_DATABASE"), password=EnvVar("SNOWFLAKE_PASSWORD"), role=EnvVar("SNOWFLAKE_ROLE") ) """ name: str = Field( description="The name of the connection, must match the name in your Sling replication configuration." ) type: str = Field( description="Type of the source connection, must match the Sling connection types. Use 'file' for local storage." ) connection_string: Optional[str] = Field( description="The optional connection string for the source database, if not using keyword arguments.", default=None, )
[docs]@experimental class SlingResource(ConfigurableResource): """Resource for interacting with the Sling package. This resource can be used to run Sling replications. Args: connections (List[SlingConnectionResource]): A list of connections to use for the replication. source_connection (Optional[SlingSourceConnection]): Deprecated, use `connections` instead. target_connection (Optional[SlingTargetConnection]): Deprecated, use `connections` instead. Examples: .. code-block:: python from dagster_etl.sling import SlingResource, SlingConnectionResource sling_resource = SlingResource( connections=[ SlingConnectionResource( name="MY_POSTGRES", type="postgres", connection_string=EnvVar("POSTGRES_CONNECTION_STRING"), ), SlingConnectionResource( name="MY_SNOWFLAKE", type="snowflake", host=EnvVar("SNOWFLAKE_HOST"), user=EnvVar("SNOWFLAKE_USER"), database=EnvVar("SNOWFLAKE_DATABASE"), password=EnvVar("SNOWFLAKE_PASSWORD"), role=EnvVar("SNOWFLAKE_ROLE"), ), ] ) """ source_connection: Optional[SlingSourceConnection] = None target_connection: Optional[SlingTargetConnection] = None connections: List[SlingConnectionResource] = [] _stdout: List[str] = [] @classmethod def _is_dagster_maintained(cls) -> bool: return True def _clean_connection_dict(self, d: Dict[str, Any]) -> Dict[str, Any]: d = _process_env_vars(d) if d["connection_string"]: d["url"] = d["connection_string"] if "connection_string" in d: del d["connection_string"] return d def prepare_environment(self) -> Dict[str, Any]: sling_source = None sling_target = None if self.source_connection: sling_source = self._clean_connection_dict(dict(self.source_connection)) if self.target_connection: sling_target = self._clean_connection_dict(dict(self.target_connection)) env = {} if sling_source: env["SLING_SOURCE"] = json.dumps(sling_source) if sling_target: env["SLING_TARGET"] = json.dumps(sling_target) for conn in self.connections: d = self._clean_connection_dict(dict(conn)) env[conn.name] = json.dumps(d) return env @contextlib.contextmanager def _setup_config(self) -> Generator[None, None, None]: """Uses environment variables to set the Sling source and target connections.""" if self.source_connection: deprecation_warning( "source_connection", "0.23", "source_connection has been deprecated, provide a list of SlingConnectionResource to the `connections` parameter instead.", stacklevel=4, ) if self.target_connection: deprecation_warning( "target_connection", "0.23", "target_connection has been deprecated, provide a list of SlingConnectionResource to the `connections` parameter instead.", stacklevel=4, ) prepared_environment = self.prepare_environment() with environ(prepared_environment): yield def _clean_line(self, line: str) -> str: """Removes ANSI escape sequences from a line of output.""" return ANSI_ESCAPE.sub("", line).replace("INF", "") def _process_stdout(self, stdout: IO[AnyStr], encoding="utf8") -> Iterator[str]: """Process stdout from the Sling CLI.""" for line in stdout: assert isinstance(line, bytes) fmt_line = bytes.decode(line, encoding=encoding, errors="replace") yield self._clean_line(fmt_line) def _exec_sling_cmd( self, cmd, stdin=None, stdout=PIPE, stderr=STDOUT, encoding="utf8" ) -> Generator[str, None, None]: with Popen(cmd, shell=True, stdin=stdin, stdout=stdout, stderr=stderr) as proc: if proc.stdout: for line in self._process_stdout(proc.stdout, encoding=encoding): yield line proc.wait() if proc.returncode != 0: raise Exception("Sling command failed with error code %s", proc.returncode) @deprecated( breaking_version="0.23.0", additional_warn_text="sync has been deprecated, use `replicate` instead.", ) def sync( self, source_stream: str, target_object: str, mode: SlingMode = SlingMode.FULL_REFRESH, primary_key: Optional[List[str]] = None, update_key: Optional[str] = None, source_options: Optional[Dict[str, Any]] = None, target_options: Optional[Dict[str, Any]] = None, encoding: str = "utf8", ) -> Generator[str, None, None]: """Runs a Sling sync from the given source table to the given destination table. Generates output lines from the Sling CLI. Deprecated, use `replicate` instead. """ if ( self.source_connection and self.source_connection.type == "file" and not source_stream.startswith("file://") ): source_stream = "file://" + source_stream if ( self.target_connection and self.target_connection.type == "file" and not target_object.startswith("file://") ): target_object = "file://" + target_object with self._setup_config(): config = { "mode": mode, "source": { "conn": "SLING_SOURCE", "stream": source_stream, "primary_key": primary_key, "update_key": update_key, "options": source_options, }, "target": { "conn": "SLING_TARGET", "object": target_object, "options": target_options, }, } config["source"] = {k: v for k, v in config["source"].items() if v is not None} config["target"] = {k: v for k, v in config["target"].items() if v is not None} sling_cli = sling.Sling(**config) logger.info("Starting Sling sync with mode: %s", mode) cmd = sling_cli._prep_cmd() # noqa: SLF001 yield from self._exec_sling_cmd(cmd, encoding=encoding) def replicate( self, *, context: Union[OpExecutionContext, AssetExecutionContext], replication_config: Optional[SlingReplicationParam] = None, dagster_sling_translator: Optional[DagsterSlingTranslator] = None, debug: bool = False, ) -> Generator[Union[MaterializeResult, AssetMaterialization], None, None]: """Runs a Sling replication from the given replication config. Args: context: Asset or Op execution context. replication_config: The Sling replication config to use for the replication. dagster_sling_translator: The translator to use for the replication. debug: Whether to run the replication in debug mode. Returns: Generator[Union[MaterializeResult, AssetMaterialization], None, None]: A generator of MaterializeResult or AssetMaterialization """ # attempt to retrieve params from asset context if not passed as a parameter if not (replication_config or dagster_sling_translator): metadata_by_key = context.assets_def.metadata_by_key first_asset_metadata = next(iter(metadata_by_key.values())) dagster_sling_translator = first_asset_metadata.get(METADATA_KEY_TRANSLATOR) replication_config = first_asset_metadata.get(METADATA_KEY_REPLICATION_CONFIG) # if translator has not been defined on metadata _or_ through param, then use the default constructor dagster_sling_translator = dagster_sling_translator or DagsterSlingTranslator() replication_config = validate_replication(replication_config) stream_definition = get_streams_from_replication(replication_config) with self._setup_config(): uid = uuid.uuid4() temp_dir = tempfile.gettempdir() temp_file = os.path.join(temp_dir, f"sling-replication-{uid}.json") env = os.environ.copy() with open(temp_file, "w") as file: json.dump(replication_config, file, cls=sling.JsonEncoder) logger.debug(f"Replication config: {replication_config}") debug_str = "-d" if debug else "" cmd = f"{sling.SLING_BIN} run {debug_str} -r {temp_file}" logger.debug(f"Running Sling replication with command: {cmd}") # Get start time from wall clock start_time = time.time() results = sling._run( # noqa cmd=cmd, temp_file=temp_file, return_output=True, env=env, ) for row in results.split("\n"): clean_line = self._clean_line(row) sys.stdout.write(clean_line + "\n") self._stdout.append(clean_line) end_time = time.time() has_asset_def: bool = bool(context and context.has_assets_def) for stream in stream_definition: output_name = dagster_sling_translator.get_asset_key(stream) if has_asset_def: yield MaterializeResult( asset_key=output_name, metadata={"elapsed_time": end_time - start_time} ) else: yield AssetMaterialization( asset_key=output_name, metadata={"elapsed_time": end_time - start_time} ) def stream_raw_logs(self) -> Generator[str, None, None]: """Returns a generator of raw logs from the Sling CLI.""" yield from self._stdout
def _process_env_vars(config: Dict[str, Any]) -> Dict[str, Any]: out = {} for key, value in config.items(): if isinstance(value, dict) and len(value) == 1 and next(iter(value.keys())) == "env": out[key] = EnvVar(next(iter(value.values()))).get_value() else: out[key] = value return out