import re
from typing import Any, Dict, List, Optional, Union
from dagster import (
AssetExecutionContext,
AssetsDefinition,
AssetSpec,
MaterializeResult,
multi_asset,
)
from dagster._annotations import deprecated
from dagster._utils.warnings import deprecation_warning
from dagster_embedded_elt.sling.resources import SlingMode, SlingResource
[docs]@deprecated(
breaking_version="0.23.0",
additional_warn_text="Use `@sling_assets` instead.",
)
def build_sling_asset(
asset_spec: AssetSpec,
source_stream: str,
target_object: str,
mode: SlingMode = SlingMode.FULL_REFRESH,
primary_key: Optional[Union[str, List[str]]] = None,
update_key: Optional[str] = None,
source_options: Optional[Dict[str, Any]] = None,
target_options: Optional[Dict[str, Any]] = None,
sling_resource_key: str = "sling",
) -> AssetsDefinition:
"""Asset Factory for using Sling to sync data from a source stream to a target object.
Args:
asset_spec (AssetSpec): The AssetSpec to use to materialize this asset.
source_stream (str): The source stream to sync from. This can be a table, a query, or a path.
target_object (str): The target object to sync to. This can be a table, or a path.
mode (SlingMode, optional): The sync mode to use when syncing. Defaults to SlingMode.FULL_REFRESH.
primary_key (Optional[Union[str, List[str]]], optional): The optional primary key to use when syncing.
update_key (Optional[str], optional): The optional update key to use when syncing.
source_options (Optional[Dict[str, Any]], optional): Any optional Sling source options to use when syncing.
target_options (Optional[Dict[str, Any]], optional): Any optional target options to use when syncing.
sling_resource_key (str, optional): The resource key for the SlingResource. Defaults to "sling".
Examples:
Creating a Sling asset that syncs from a file to a table:
.. code-block:: python
asset_spec = AssetSpec(key=["main", "dest_tbl"])
asset_def = build_sling_asset(
asset_spec=asset_spec,
source_stream="file:///tmp/test.csv",
target_object="main.dest_table",
mode=SlingMode.INCREMENTAL,
primary_key="id"
)
Creating a Sling asset that syncs from a table to a file with a full refresh:
.. code-block:: python
asset_spec = AssetSpec(key="test.csv")
asset_def = build_sling_asset(
asset_spec=asset_spec,
source_stream="main.dest_table",
target_object="file:///tmp/test.csv",
mode=SlingMode.FULL_REFRESH
)
"""
if primary_key is not None and not isinstance(primary_key, list):
primary_key = [primary_key]
@multi_asset(
name=asset_spec.key.to_python_identifier(),
compute_kind="sling",
specs=[asset_spec],
required_resource_keys={sling_resource_key},
)
def sync(context: AssetExecutionContext) -> MaterializeResult:
deprecation_warning(
"build_sling_asset",
breaking_version="0.23.0",
additional_warn_text="Use `@sling_assets` property instead.",
)
sling: SlingResource = getattr(context.resources, sling_resource_key)
last_row_count_observed = None
for stdout_line in sling.sync(
source_stream=source_stream,
target_object=target_object,
mode=mode,
primary_key=primary_key,
update_key=update_key,
source_options=source_options,
target_options=target_options,
):
match = re.search(r"(\d+) rows", stdout_line)
if match:
last_row_count_observed = int(match.group(1))
context.log.info(stdout_line)
return MaterializeResult(
metadata=(
{} if last_row_count_observed is None else {"row_count": last_row_count_observed}
)
)
return sync