Ask AI

Source code for dagster_fivetran.asset_defs

import hashlib
import inspect
import re
from concurrent.futures import ThreadPoolExecutor
from functools import partial
from typing import (
    Any,
    Callable,
    Dict,
    List,
    Mapping,
    NamedTuple,
    Optional,
    Sequence,
    Set,
    Type,
    Union,
    cast,
)

from dagster import (
    AssetKey,
    AssetsDefinition,
    OpExecutionContext,
    _check as check,
    multi_asset,
)
from dagster._core.definitions.asset_spec import AssetSpec
from dagster._core.definitions.cacheable_assets import (
    AssetsDefinitionCacheableData,
    CacheableAssetsDefinition,
)
from dagster._core.definitions.events import AssetMaterialization, CoercibleToAssetKeyPrefix, Output
from dagster._core.definitions.metadata import RawMetadataMapping
from dagster._core.definitions.metadata.metadata_set import TableMetadataSet
from dagster._core.definitions.metadata.table import TableColumn, TableSchema
from dagster._core.definitions.resource_definition import ResourceDefinition
from dagster._core.definitions.tags import build_kind_tag
from dagster._core.errors import DagsterStepOutputNotFoundError
from dagster._core.execution.context.init import build_init_resource_context
from dagster._core.utils import imap
from dagster._utils.log import get_dagster_logger

from dagster_fivetran.resources import DEFAULT_POLL_INTERVAL, FivetranResource
from dagster_fivetran.translator import DagsterFivetranTranslator, FivetranConnectorTableProps
from dagster_fivetran.utils import (
    generate_materializations,
    get_fivetran_connector_url,
    metadata_for_table,
)

DEFAULT_MAX_THREADPOOL_WORKERS = 10
logger = get_dagster_logger()


def _fetch_and_attach_col_metadata(
    fivetran_resource: FivetranResource, connector_id: str, materialization: AssetMaterialization
) -> AssetMaterialization:
    """Subroutine to fetch column metadata for a given table from the Fivetran API and attach it to the
    materialization.
    """
    try:
        schema_source_name = materialization.metadata["schema_source_name"].value
        table_source_name = materialization.metadata["table_source_name"].value

        table_conn_data = fivetran_resource.make_request(
            "GET",
            f"connectors/{connector_id}/schemas/{schema_source_name}/tables/{table_source_name}/columns",
        )
        columns = check.dict_elem(table_conn_data, "columns")
        table_columns = sorted(
            [
                TableColumn(name=col["name_in_destination"], type="")
                for col in columns.values()
                if "name_in_destination" in col and col.get("enabled")
            ],
            key=lambda col: col.name,
        )
        return materialization.with_metadata(
            {
                **materialization.metadata,
                **TableMetadataSet(column_schema=TableSchema(table_columns)),
            }
        )
    except Exception as e:
        logger.warning(
            "An error occurred while fetching column metadata for table %s",
            f"Exception: {e}",
            exc_info=True,
        )
        return materialization


def _build_fivetran_assets(
    connector_id: str,
    destination_tables: Sequence[str],
    fetch_column_metadata: bool,
    poll_timeout: Optional[float],
    poll_interval: float,
    io_manager_key: Optional[str],
    asset_key_prefix: Optional[Sequence[str]],
    metadata_by_table_name: Optional[Mapping[str, RawMetadataMapping]],
    table_to_asset_key_map: Optional[Mapping[str, AssetKey]],
    resource_defs: Optional[Mapping[str, ResourceDefinition]],
    group_name: Optional[str],
    infer_missing_tables: bool,
    op_tags: Optional[Mapping[str, Any]],
    asset_tags: Optional[Mapping[str, Any]],
    max_threadpool_workers: int = DEFAULT_MAX_THREADPOOL_WORKERS,
    translator: Optional[Type[DagsterFivetranTranslator]] = None,
    connection_metadata: Optional["FivetranConnectionMetadata"] = None,
) -> Sequence[AssetsDefinition]:
    asset_key_prefix = check.opt_sequence_param(asset_key_prefix, "asset_key_prefix", of_type=str)
    check.invariant(
        (translator and connection_metadata) or not translator,
        "Translator and connection_metadata required.",
    )

    translator_instance = translator() if translator else None

    tracked_asset_keys = {
        table: AssetKey([*asset_key_prefix, *table.split(".")])
        if not translator_instance or not connection_metadata
        else translator_instance.get_asset_spec(
            FivetranConnectorTableProps(table=table, **connection_metadata._asdict())
        ).key
        for table in destination_tables
    }
    user_facing_asset_keys = table_to_asset_key_map or tracked_asset_keys
    tracked_asset_key_to_user_facing_asset_key = {
        tracked_key: user_facing_asset_keys[table_name]
        for table_name, tracked_key in tracked_asset_keys.items()
    }

    _metadata_by_table_name = check.opt_mapping_param(
        metadata_by_table_name, "metadata_by_table_name", key_type=str
    )

    @multi_asset(
        name=f"fivetran_sync_{connector_id}",
        resource_defs=resource_defs,
        group_name=group_name,
        op_tags=op_tags,
        specs=[
            AssetSpec(
                key=user_facing_asset_keys[table],
                metadata={
                    **_metadata_by_table_name.get(table, {}),
                    **({"dagster/io_manager_key": io_manager_key} if io_manager_key else {}),
                },
                tags={
                    **build_kind_tag("fivetran"),
                    **(asset_tags or {}),
                },
            )
            if not translator_instance or not connection_metadata
            else translator_instance.get_asset_spec(
                FivetranConnectorTableProps(table=table, **connection_metadata._asdict())
            )
            for table in tracked_asset_keys.keys()
        ],
    )
    def _assets(context: OpExecutionContext, fivetran: FivetranResource) -> Any:
        fivetran_output = fivetran.sync_and_poll(
            connector_id=connector_id,
            poll_interval=poll_interval,
            poll_timeout=poll_timeout,
        )

        materialized_asset_keys = set()

        _map_fn: Callable[[AssetMaterialization], AssetMaterialization] = (
            lambda materialization: _fetch_and_attach_col_metadata(
                fivetran, connector_id, materialization
            )
            if fetch_column_metadata
            else materialization
        )
        with ThreadPoolExecutor(
            max_workers=max_threadpool_workers,
            thread_name_prefix=f"fivetran_{connector_id}",
        ) as executor:
            for materialization in imap(
                executor=executor,
                iterable=generate_materializations(
                    fivetran_output,
                    asset_key_prefix=asset_key_prefix,
                ),
                func=_map_fn,
            ):
                # scan through all tables actually created, if it was expected then emit an Output.
                # otherwise, emit a runtime AssetMaterialization
                if materialization.asset_key in tracked_asset_keys.values():
                    key = tracked_asset_key_to_user_facing_asset_key[materialization.asset_key]
                    yield Output(
                        value=None,
                        output_name=key.to_python_identifier(),
                        metadata=materialization.metadata,
                    )
                    materialized_asset_keys.add(materialization.asset_key)

                else:
                    yield materialization

        unmaterialized_asset_keys = set(tracked_asset_keys.values()) - materialized_asset_keys
        if infer_missing_tables:
            for asset_key in unmaterialized_asset_keys:
                key = tracked_asset_key_to_user_facing_asset_key[asset_key]

                yield Output(value=None, output_name=key.to_python_identifier())

        else:
            if unmaterialized_asset_keys:
                asset_key = next(iter(unmaterialized_asset_keys))
                output_name = "_".join(asset_key.path)
                raise DagsterStepOutputNotFoundError(
                    f"Core compute for {context.op_def.name} did not return an output for"
                    f' non-optional output "{output_name}".',
                    step_key=context.get_step_execution_context().step.key,
                    output_name=output_name,
                )

    return [_assets]


[docs] def build_fivetran_assets( connector_id: str, destination_tables: Sequence[str], poll_interval: float = DEFAULT_POLL_INTERVAL, poll_timeout: Optional[float] = None, io_manager_key: Optional[str] = None, asset_key_prefix: Optional[Sequence[str]] = None, metadata_by_table_name: Optional[Mapping[str, RawMetadataMapping]] = None, group_name: Optional[str] = None, infer_missing_tables: bool = False, op_tags: Optional[Mapping[str, Any]] = None, fetch_column_metadata: bool = True, ) -> Sequence[AssetsDefinition]: """Build a set of assets for a given Fivetran connector. Returns an AssetsDefinition which connects the specified ``asset_keys`` to the computation that will update them. Internally, executes a Fivetran sync for a given ``connector_id``, and polls until that sync completes, raising an error if it is unsuccessful. Requires the use of the :py:class:`~dagster_fivetran.fivetran_resource`, which allows it to communicate with the Fivetran API. Args: connector_id (str): The Fivetran Connector ID that this op will sync. You can retrieve this value from the "Setup" tab of a given connector in the Fivetran UI. destination_tables (List[str]): `schema_name.table_name` for each table that you want to be represented in the Dagster asset graph for this connection. poll_interval (float): The time (in seconds) that will be waited between successive polls. poll_timeout (Optional[float]): The maximum time that will waited before this operation is timed out. By default, this will never time out. io_manager_key (Optional[str]): The io_manager to be used to handle each of these assets. asset_key_prefix (Optional[List[str]]): A prefix for the asset keys inside this asset. If left blank, assets will have a key of `AssetKey([schema_name, table_name])`. metadata_by_table_name (Optional[Mapping[str, RawMetadataMapping]]): A mapping from destination table name to user-supplied metadata that should be associated with the asset for that table. group_name (Optional[str]): A string name used to organize multiple assets into groups. This group name will be applied to all assets produced by this multi_asset. infer_missing_tables (bool): If True, will create asset materializations for tables specified in destination_tables even if they are not present in the Fivetran sync output. This is useful in cases where Fivetran does not sync any data for a table and therefore does not include it in the sync output API response. op_tags (Optional[Dict[str, Any]]): A dictionary of tags for the op that computes the asset. Frameworks may expect and require certain metadata to be attached to a op. Values that are not strings will be json encoded and must meet the criteria that json.loads(json.dumps(value)) == value. fetch_column_metadata (bool): If True, will fetch column schema information for each table in the connector. This will induce additional API calls. **Examples:** Basic example: .. code-block:: python from dagster import AssetKey, repository, with_resources from dagster_fivetran import fivetran_resource from dagster_fivetran.assets import build_fivetran_assets my_fivetran_resource = fivetran_resource.configured( { "api_key": {"env": "FIVETRAN_API_KEY"}, "api_secret": {"env": "FIVETRAN_API_SECRET"}, } ) Attaching metadata: .. code-block:: python fivetran_assets = build_fivetran_assets( connector_id="foobar", table_names=["schema1.table1", "schema2.table2"], metadata_by_table_name={ "schema1.table1": { "description": "This is a table that contains foo and bar", }, "schema2.table2": { "description": "This is a table that contains baz and quux", }, }, ) """ return _build_fivetran_assets( connector_id=connector_id, destination_tables=destination_tables, poll_interval=poll_interval, poll_timeout=poll_timeout, io_manager_key=io_manager_key, asset_key_prefix=asset_key_prefix, metadata_by_table_name=metadata_by_table_name, group_name=group_name, infer_missing_tables=infer_missing_tables, op_tags=op_tags, asset_tags=None, fetch_column_metadata=fetch_column_metadata, table_to_asset_key_map=None, resource_defs=None, )
class FivetranConnectionMetadata( NamedTuple( "_FivetranConnectionMetadata", [ ("name", str), ("connector_id", str), ("connector_url", str), ("schemas", Mapping[str, Any]), ("database", Optional[str]), ("service", Optional[str]), ], ) ): def build_asset_defn_metadata( self, key_prefix: Sequence[str], group_name: Optional[str], table_to_asset_key_fn: Callable[[str], AssetKey], io_manager_key: Optional[str] = None, ) -> AssetsDefinitionCacheableData: schema_table_meta: Dict[str, RawMetadataMapping] = {} if "schemas" in self.schemas: schemas_inner = cast(Dict[str, Any], self.schemas["schemas"]) for schema in schemas_inner.values(): if schema["enabled"]: schema_name = schema["name_in_destination"] schema_tables: Dict[str, Dict[str, Any]] = cast( Dict[str, Dict[str, Any]], schema["tables"] ) for table in schema_tables.values(): if table["enabled"]: table_name = table["name_in_destination"] schema_table_meta[f"{schema_name}.{table_name}"] = metadata_for_table( table, self.connector_url, database=self.database, schema=schema_name, table=table_name, ) else: schema_table_meta[self.name] = {} outputs = { table: AssetKey([*key_prefix, *list(table_to_asset_key_fn(table).path)]) for table in schema_table_meta.keys() } internal_deps: Dict[str, Set[AssetKey]] = {} return AssetsDefinitionCacheableData( keys_by_input_name={}, keys_by_output_name=outputs, internal_asset_deps=internal_deps, group_name=group_name, key_prefix=key_prefix, can_subset=False, metadata_by_output_name=schema_table_meta, extra_metadata={ "connector_id": self.connector_id, "io_manager_key": io_manager_key, "storage_kind": self.service, "connection_metadata": self.to_serializable_repr(), }, ) def to_serializable_repr(self) -> Any: return self._asdict() @staticmethod def from_serializable_repr(rep: Any) -> "FivetranConnectionMetadata": return FivetranConnectionMetadata(**rep) def _build_fivetran_assets_from_metadata( assets_defn_meta: AssetsDefinitionCacheableData, resource_defs: Mapping[str, ResourceDefinition], poll_interval: float, poll_timeout: Optional[float], fetch_column_metadata: bool, translator: Optional[Type[DagsterFivetranTranslator]] = None, ) -> AssetsDefinition: metadata = cast(Mapping[str, Any], assets_defn_meta.extra_metadata) connector_id = cast(str, metadata["connector_id"]) io_manager_key = cast(Optional[str], metadata["io_manager_key"]) storage_kind = cast(Optional[str], metadata.get("storage_kind")) connection_metadata = FivetranConnectionMetadata.from_serializable_repr( metadata["connection_metadata"] ) return _build_fivetran_assets( connector_id=connector_id, destination_tables=list( assets_defn_meta.keys_by_output_name.keys() if assets_defn_meta.keys_by_output_name else [] ), asset_key_prefix=list(assets_defn_meta.key_prefix or []), metadata_by_table_name=cast( Dict[str, RawMetadataMapping], assets_defn_meta.metadata_by_output_name ), io_manager_key=io_manager_key, table_to_asset_key_map=assets_defn_meta.keys_by_output_name, resource_defs=resource_defs, group_name=assets_defn_meta.group_name, poll_interval=poll_interval, poll_timeout=poll_timeout, asset_tags=build_kind_tag(storage_kind) if storage_kind else None, fetch_column_metadata=fetch_column_metadata, infer_missing_tables=False, op_tags=None, translator=translator, connection_metadata=connection_metadata, )[0] class FivetranInstanceCacheableAssetsDefinition(CacheableAssetsDefinition): def __init__( self, fivetran_resource_def: Union[FivetranResource, ResourceDefinition], key_prefix: Sequence[str], connector_to_group_fn: Optional[Callable[[str], Optional[str]]], connector_filter: Optional[Callable[[FivetranConnectionMetadata], bool]], connector_to_io_manager_key_fn: Optional[Callable[[str], Optional[str]]], connector_to_asset_key_fn: Optional[Callable[[FivetranConnectionMetadata, str], AssetKey]], destination_ids: Optional[List[str]], poll_interval: float, poll_timeout: Optional[float], fetch_column_metadata: bool, translator: Optional[Type[DagsterFivetranTranslator]] = None, ): self._fivetran_resource_def = fivetran_resource_def if isinstance(fivetran_resource_def, FivetranResource): # We hold a copy which is not fully processed, this retains e.g. EnvVars for # display in the UI self._partially_initialized_fivetran_instance = fivetran_resource_def # The processed copy is used to query the Fivetran instance self._fivetran_instance: FivetranResource = ( self._partially_initialized_fivetran_instance.process_config_and_initialize() ) else: self._partially_initialized_fivetran_instance = fivetran_resource_def( build_init_resource_context() ) self._fivetran_instance: FivetranResource = ( self._partially_initialized_fivetran_instance ) self._key_prefix = key_prefix self._connector_to_group_fn = connector_to_group_fn self._connection_filter = connector_filter self._connector_to_io_manager_key_fn = connector_to_io_manager_key_fn self._connector_to_asset_key_fn: Callable[[FivetranConnectionMetadata, str], AssetKey] = ( connector_to_asset_key_fn or (lambda _, table: AssetKey(path=table.split("."))) ) self._destination_ids = destination_ids self._poll_interval = poll_interval self._poll_timeout = poll_timeout self._fetch_column_metadata = fetch_column_metadata self._translator = translator contents = hashlib.sha1() contents.update(",".join(key_prefix).encode("utf-8")) if connector_filter: contents.update(inspect.getsource(connector_filter).encode("utf-8")) super().__init__(unique_id=f"fivetran-{contents.hexdigest()}") def _get_connectors(self) -> Sequence[FivetranConnectionMetadata]: output_connectors: List[FivetranConnectionMetadata] = [] if not self._destination_ids: groups = self._fivetran_instance.make_request("GET", "groups")["items"] else: groups = [{"id": destination_id} for destination_id in self._destination_ids] for group in groups: group_id = group["id"] group_details = self._fivetran_instance.get_destination_details(group_id) database = group_details.get("config", {}).get("database") service = group_details.get("service") connectors = self._fivetran_instance.make_request( "GET", f"groups/{group_id}/connectors" )["items"] for connector in connectors: connector_id = connector["id"] connector_name = connector["schema"] setup_state = connector.get("status", {}).get("setup_state") if setup_state and setup_state in ("incomplete", "broken"): continue connector_url = get_fivetran_connector_url(connector) schemas = self._fivetran_instance.make_request( "GET", f"connectors/{connector_id}/schemas" ) output_connectors.append( FivetranConnectionMetadata( name=connector_name, connector_id=connector_id, connector_url=connector_url, schemas=schemas, database=database, service=service, ) ) return output_connectors def compute_cacheable_data(self) -> Sequence[AssetsDefinitionCacheableData]: asset_defn_data: List[AssetsDefinitionCacheableData] = [] for connector in self._get_connectors(): if not self._connection_filter or self._connection_filter(connector): table_to_asset_key = partial(self._connector_to_asset_key_fn, connector) asset_defn_data.append( connector.build_asset_defn_metadata( key_prefix=self._key_prefix, group_name=( self._connector_to_group_fn(connector.name) if self._connector_to_group_fn else None ), io_manager_key=( self._connector_to_io_manager_key_fn(connector.name) if self._connector_to_io_manager_key_fn else None ), table_to_asset_key_fn=table_to_asset_key, ) ) return asset_defn_data def build_definitions( self, data: Sequence[AssetsDefinitionCacheableData] ) -> Sequence[AssetsDefinition]: return [ _build_fivetran_assets_from_metadata( meta, { "fivetran": self._partially_initialized_fivetran_instance.get_resource_definition() }, poll_interval=self._poll_interval, poll_timeout=self._poll_timeout, fetch_column_metadata=self._fetch_column_metadata, translator=self._translator, ) for meta in data ] def _clean_name(name: str) -> str: """Cleans an input to be a valid Dagster asset name.""" return re.sub(r"[^a-z0-9]+", "_", name.lower())
[docs] def load_assets_from_fivetran_instance( fivetran: Union[FivetranResource, ResourceDefinition], key_prefix: Optional[CoercibleToAssetKeyPrefix] = None, connector_to_group_fn: Optional[Callable[[str], Optional[str]]] = None, io_manager_key: Optional[str] = None, connector_to_io_manager_key_fn: Optional[Callable[[str], Optional[str]]] = None, connector_filter: Optional[Callable[[FivetranConnectionMetadata], bool]] = None, connector_to_asset_key_fn: Optional[ Callable[[FivetranConnectionMetadata, str], AssetKey] ] = None, destination_ids: Optional[List[str]] = None, poll_interval: float = DEFAULT_POLL_INTERVAL, poll_timeout: Optional[float] = None, fetch_column_metadata: bool = True, translator: Optional[Type[DagsterFivetranTranslator]] = None, ) -> CacheableAssetsDefinition: """Loads Fivetran connector assets from a configured FivetranResource instance. This fetches information about defined connectors at initialization time, and will error on workspace load if the Fivetran instance is not reachable. Args: fivetran (ResourceDefinition): A FivetranResource configured with the appropriate connection details. key_prefix (Optional[CoercibleToAssetKeyPrefix]): A prefix for the asset keys created. connector_to_group_fn (Optional[Callable[[str], Optional[str]]]): Function which returns an asset group name for a given Fivetran connector name. If None, no groups will be created. Defaults to a basic sanitization function. io_manager_key (Optional[str]): The IO manager key to use for all assets. Defaults to "io_manager". Use this if all assets should be loaded from the same source, otherwise use connector_to_io_manager_key_fn. connector_to_io_manager_key_fn (Optional[Callable[[str], Optional[str]]]): Function which returns an IO manager key for a given Fivetran connector name. When other ops are downstream of the loaded assets, the IOManager specified determines how the inputs to those ops are loaded. Defaults to "io_manager". connector_filter (Optional[Callable[[FivetranConnectorMetadata], bool]]): Optional function which takes in connector metadata and returns False if the connector should be excluded from the output assets. connector_to_asset_key_fn (Optional[Callable[[FivetranConnectorMetadata, str], AssetKey]]): Optional function which takes in connector metadata and a table name and returns an AssetKey for that table. Defaults to a function that generates an AssetKey matching the table name, split by ".". destination_ids (Optional[List[str]]): A list of destination IDs to fetch connectors from. If None, all destinations will be polled for connectors. poll_interval (float): The time (in seconds) that will be waited between successive polls. poll_timeout (Optional[float]): The maximum time that will waited before this operation is timed out. By default, this will never time out. fetch_column_metadata (bool): If True, will fetch column schema information for each table in the connector. This will induce additional API calls. **Examples:** Loading all Fivetran connectors as assets: .. code-block:: python from dagster_fivetran import fivetran_resource, load_assets_from_fivetran_instance fivetran_instance = fivetran_resource.configured( { "api_key": "some_key", "api_secret": "some_secret", } ) fivetran_assets = load_assets_from_fivetran_instance(fivetran_instance) Filtering the set of loaded connectors: .. code-block:: python from dagster_fivetran import fivetran_resource, load_assets_from_fivetran_instance fivetran_instance = fivetran_resource.configured( { "api_key": "some_key", "api_secret": "some_secret", } ) fivetran_assets = load_assets_from_fivetran_instance( fivetran_instance, connector_filter=lambda meta: "snowflake" in meta.name, ) """ if isinstance(key_prefix, str): key_prefix = [key_prefix] key_prefix = check.list_param(key_prefix or [], "key_prefix", of_type=str) check.invariant( not ( ( key_prefix or connector_to_group_fn or io_manager_key or connector_to_io_manager_key_fn or connector_to_asset_key_fn ) and translator ), "Cannot specify key_prefix, connector_to_group_fn, io_manager_key, connector_to_io_manager_key_fn, or connector_to_asset_key_fn when translator is specified", ) connector_to_group_fn = connector_to_group_fn or _clean_name check.invariant( not io_manager_key or not connector_to_io_manager_key_fn, "Cannot specify both io_manager_key and connector_to_io_manager_key_fn", ) if not connector_to_io_manager_key_fn: connector_to_io_manager_key_fn = lambda _: io_manager_key return FivetranInstanceCacheableAssetsDefinition( fivetran_resource_def=fivetran, key_prefix=key_prefix, connector_to_group_fn=connector_to_group_fn, connector_to_io_manager_key_fn=connector_to_io_manager_key_fn, connector_filter=connector_filter, connector_to_asset_key_fn=connector_to_asset_key_fn, destination_ids=destination_ids, poll_interval=poll_interval, poll_timeout=poll_timeout, fetch_column_metadata=fetch_column_metadata, translator=translator, )