Ask AI

Source code for dagster_fivetran.asset_decorator

from typing import Any, Callable, Optional

from dagster import AssetsDefinition, multi_asset
from dagster._annotations import experimental

from dagster_fivetran.resources import FivetranWorkspace
from dagster_fivetran.translator import DagsterFivetranTranslator, FivetranMetadataSet
from dagster_fivetran.utils import DAGSTER_FIVETRAN_TRANSLATOR_METADATA_KEY


[docs] @experimental def fivetran_assets( *, connector_id: str, workspace: FivetranWorkspace, name: Optional[str] = None, group_name: Optional[str] = None, dagster_fivetran_translator: Optional[DagsterFivetranTranslator] = None, ) -> Callable[[Callable[..., Any]], AssetsDefinition]: """Create a definition for how to sync the tables of a given Fivetran connector. Args: connector_id (str): The Fivetran Connector ID. You can retrieve this value from the "Setup" tab of a given connector in the Fivetran UI. workspace (FivetranWorkspace): The Fivetran workspace to fetch assets from. name (Optional[str], optional): The name of the op. group_name (Optional[str], optional): The name of the asset group. dagster_fivetran_translator (Optional[DagsterFivetranTranslator], optional): The translator to use to convert Fivetran content into :py:class:`dagster.AssetSpec`. Defaults to :py:class:`DagsterFivetranTranslator`. Examples: Sync the tables of a Fivetran connector: .. code-block:: python from dagster_fivetran import FivetranWorkspace, fivetran_assets import dagster as dg fivetran_workspace = FivetranWorkspace( account_id=dg.EnvVar("FIVETRAN_ACCOUNT_ID"), api_key=dg.EnvVar("FIVETRAN_API_KEY"), api_secret=dg.EnvVar("FIVETRAN_API_SECRET"), ) @fivetran_assets( connector_id="fivetran_connector_id", name="fivetran_connector_id", group_name="fivetran_connector_id", workspace=fivetran_workspace, ) def fivetran_connector_assets(context: dg.AssetExecutionContext, fivetran: FivetranWorkspace): yield from fivetran.sync_and_poll(context=context) defs = dg.Definitions( assets=[fivetran_connector_assets], resources={"fivetran": fivetran_workspace}, ) Sync the tables of a Fivetran connector with a custom translator: .. code-block:: python from dagster_fivetran import ( DagsterFivetranTranslator, FivetranConnectorTableProps, FivetranWorkspace, fivetran_assets ) import dagster as dg from dagster._core.definitions.asset_spec import replace_attributes class CustomDagsterFivetranTranslator(DagsterFivetranTranslator): def get_asset_spec(self, props: FivetranConnectorTableProps) -> dg.AssetSpec: asset_spec = super().get_asset_spec(props) return replace_attributes( asset_spec, key=asset_spec.key.with_prefix("my_prefix"), ) fivetran_workspace = FivetranWorkspace( account_id=dg.EnvVar("FIVETRAN_ACCOUNT_ID"), api_key=dg.EnvVar("FIVETRAN_API_KEY"), api_secret=dg.EnvVar("FIVETRAN_API_SECRET"), ) @fivetran_assets( connector_id="fivetran_connector_id", name="fivetran_connector_id", group_name="fivetran_connector_id", workspace=fivetran_workspace, dagster_fivetran_translator=CustomDagsterFivetranTranslator(), ) def fivetran_connector_assets(context: dg.AssetExecutionContext, fivetran: FivetranWorkspace): yield from fivetran.sync_and_poll(context=context) defs = dg.Definitions( assets=[fivetran_connector_assets], resources={"fivetran": fivetran_workspace}, ) """ dagster_fivetran_translator = dagster_fivetran_translator or DagsterFivetranTranslator() return multi_asset( name=name, group_name=group_name, can_subset=True, specs=[ spec.merge_attributes( metadata={DAGSTER_FIVETRAN_TRANSLATOR_METADATA_KEY: dagster_fivetran_translator} ) for spec in workspace.load_asset_specs( dagster_fivetran_translator=dagster_fivetran_translator ) if FivetranMetadataSet.extract(spec.metadata).connector_id == connector_id ], )