Ask AI

Source code for dagster_sigma.translator

import re
from typing import AbstractSet, Any, Dict, List, Optional, Union

from dagster import AssetKey, AssetSpec, MetadataValue, TableSchema
from dagster._core.definitions.metadata.metadata_set import TableMetadataSet
from dagster._core.definitions.metadata.table import TableColumn
from dagster._record import record
from dagster._serdes.serdes import whitelist_for_serdes
from dagster._utils.cached_method import cached_method
from dagster._vendored.dateutil.parser import isoparse


def _coerce_input_to_valid_name(name: str) -> str:
    """Cleans an input to be a valid Dagster asset name."""
    return re.sub(r"[^A-Za-z0-9_]+", "_", name)


def asset_key_from_table_name(table_name: str) -> AssetKey:
    """Converts a reference to a table in a Sigma query to a Dagster AssetKey."""
    return AssetKey([_coerce_input_to_valid_name(part) for part in table_name.split(".")])


def _inode_from_url(url: str) -> str:
    """Builds a Sigma internal inode value from a Sigma URL."""
    return f'inode-{url.split("/")[-1]}'


[docs] @whitelist_for_serdes @record class SigmaWorkbook: """Represents a Sigma workbook, a collection of visualizations and queries for data exploration and analysis. https://help.sigmacomputing.com/docs/workbooks """ properties: Dict[str, Any] datasets: AbstractSet[str] owner_email: Optional[str]
[docs] @whitelist_for_serdes @record class SigmaDataset: """Represents a Sigma dataset, a centralized data definition which can contain aggregations or other manipulations. https://help.sigmacomputing.com/docs/datasets """ properties: Dict[str, Any] columns: AbstractSet[str] inputs: AbstractSet[str]
@whitelist_for_serdes @record class SigmaOrganizationData: workbooks: List[SigmaWorkbook] datasets: List[SigmaDataset] @cached_method def get_datasets_by_inode(self) -> Dict[str, SigmaDataset]: return {_inode_from_url(dataset.properties["url"]): dataset for dataset in self.datasets}
[docs] class DagsterSigmaTranslator: """Translator class which converts raw response data from the Sigma API into AssetSpecs. Subclass this class to provide custom translation logic. """ def __init__(self, context: SigmaOrganizationData): self._context = context @property def organization_data(self) -> SigmaOrganizationData: return self._context def get_asset_key(self, data: Union[SigmaDataset, SigmaWorkbook]) -> AssetKey: """Get the AssetKey for a Sigma object, such as a workbook or dataset.""" return AssetKey(_coerce_input_to_valid_name(data.properties["name"])) def get_asset_spec(self, data: Union[SigmaDataset, SigmaWorkbook]) -> AssetSpec: """Get the AssetSpec for a Sigma object, such as a workbook or dataset.""" if isinstance(data, SigmaWorkbook): metadata = { "dagster_sigma/web_url": MetadataValue.url(data.properties["url"]), "dagster_sigma/version": data.properties["latestVersion"], "dagster_sigma/created_at": MetadataValue.timestamp( isoparse(data.properties["createdAt"]) ), } datasets = [self._context.get_datasets_by_inode()[inode] for inode in data.datasets] return AssetSpec( key=self.get_asset_key(data), metadata=metadata, kinds={"sigma"}, deps={self.get_asset_key(dataset) for dataset in datasets}, owners=[data.owner_email] if data.owner_email else None, ) elif isinstance(data, SigmaDataset): metadata = { "dagster_sigma/web_url": MetadataValue.url(data.properties["url"]), "dagster_sigma/created_at": MetadataValue.timestamp( isoparse(data.properties["createdAt"]) ), **TableMetadataSet( column_schema=TableSchema( columns=[ TableColumn(name=column_name) for column_name in sorted(data.columns) ] ) ), } return AssetSpec( key=self.get_asset_key(data), metadata=metadata, kinds={"sigma"}, deps={ asset_key_from_table_name(input_table_name.lower()) for input_table_name in data.inputs }, description=data.properties.get("description"), )