import re
from typing import AbstractSet, Any, Dict, List, Optional, Union
from dagster import AssetKey, AssetSpec, MetadataValue, TableSchema
from dagster._annotations import deprecated
from dagster._core.definitions.metadata.metadata_set import TableMetadataSet
from dagster._core.definitions.metadata.table import TableColumn
from dagster._record import record
from dagster._serdes.serdes import whitelist_for_serdes
from dagster._utils.cached_method import cached_method
from dagster._vendored.dateutil.parser import isoparse
def _coerce_input_to_valid_name(name: str) -> str:
"""Cleans an input to be a valid Dagster asset name."""
return re.sub(r"[^A-Za-z0-9_]+", "_", name)
def asset_key_from_table_name(table_name: str) -> AssetKey:
"""Converts a reference to a table in a Sigma query to a Dagster AssetKey."""
return AssetKey([_coerce_input_to_valid_name(part) for part in table_name.split(".")])
def _inode_from_url(url: str) -> str:
"""Builds a Sigma internal inode value from a Sigma URL."""
return f'inode-{url.split("/")[-1]}'
[docs]
@whitelist_for_serdes
@record
class SigmaWorkbook:
"""Represents a Sigma workbook, a collection of visualizations and queries
for data exploration and analysis.
https://help.sigmacomputing.com/docs/workbooks
"""
properties: Dict[str, Any]
lineage: List[Dict[str, Any]]
datasets: AbstractSet[str]
direct_table_deps: AbstractSet[str]
owner_email: Optional[str]
[docs]
@whitelist_for_serdes
@record
class SigmaDataset:
"""Represents a Sigma dataset, a centralized data definition which can
contain aggregations or other manipulations.
https://help.sigmacomputing.com/docs/datasets
"""
properties: Dict[str, Any]
columns: AbstractSet[str]
inputs: AbstractSet[str]
@whitelist_for_serdes
@record
class SigmaTable:
"""Represents a table loaded into Sigma."""
properties: Dict[str, Any]
def get_table_path(self) -> List[str]:
"""Extracts the qualified table path from the name and path properties,
e.g. ["MY_DB", "MY_SCHEMA", "MY_TABLE"].
"""
return self.properties["path"].split("/")[1:] + [self.properties["name"]]
@whitelist_for_serdes
@record
class SigmaOrganizationData:
workbooks: List[SigmaWorkbook]
datasets: List[SigmaDataset]
tables: List[SigmaTable]
@cached_method
def get_datasets_by_inode(self) -> Dict[str, SigmaDataset]:
return {_inode_from_url(dataset.properties["url"]): dataset for dataset in self.datasets}
@cached_method
def get_tables_by_inode(self) -> Dict[str, SigmaTable]:
return {_inode_from_url(table.properties["urlId"]): table for table in self.tables}
[docs]
class DagsterSigmaTranslator:
"""Translator class which converts raw response data from the Sigma API into AssetSpecs.
Subclass this class to provide custom translation logic.
"""
def __init__(self, context: SigmaOrganizationData):
self._context = context
@property
def organization_data(self) -> SigmaOrganizationData:
return self._context
@deprecated(
breaking_version="1.10",
additional_warn_text="Use `DagsterSigmaTranslator.get_asset_spec(...).key` instead",
)
def get_asset_key(self, data: Union[SigmaDataset, SigmaWorkbook]) -> AssetKey:
"""Get the AssetKey for a Sigma object, such as a workbook or dataset."""
return self.get_asset_spec(data).key
def get_asset_spec(self, data: Union[SigmaDataset, SigmaWorkbook]) -> AssetSpec:
"""Get the AssetSpec for a Sigma object, such as a workbook or dataset."""
if isinstance(data, SigmaWorkbook):
metadata = {
"dagster_sigma/web_url": MetadataValue.url(data.properties["url"]),
"dagster_sigma/version": data.properties["latestVersion"],
"dagster_sigma/created_at": MetadataValue.timestamp(
isoparse(data.properties["createdAt"])
),
"dagster_sigma/properties": MetadataValue.json(data.properties),
"dagster_sigma/lineage": MetadataValue.json(data.lineage),
}
datasets = [self._context.get_datasets_by_inode()[inode] for inode in data.datasets]
tables = [
self._context.get_tables_by_inode()[inode] for inode in data.direct_table_deps
]
return AssetSpec(
key=AssetKey(_coerce_input_to_valid_name(data.properties["name"])),
metadata=metadata,
kinds={"sigma", "workbook"},
deps={
*[self.get_asset_key(dataset) for dataset in datasets],
*[
asset_key_from_table_name(".".join(table.get_table_path()).lower())
for table in tables
],
},
owners=[data.owner_email] if data.owner_email else None,
)
elif isinstance(data, SigmaDataset):
metadata = {
"dagster_sigma/web_url": MetadataValue.url(data.properties["url"]),
"dagster_sigma/created_at": MetadataValue.timestamp(
isoparse(data.properties["createdAt"])
),
"dagster_sigma/properties": MetadataValue.json(data.properties),
**TableMetadataSet(
column_schema=TableSchema(
columns=[
TableColumn(name=column_name) for column_name in sorted(data.columns)
]
)
),
}
return AssetSpec(
key=AssetKey(_coerce_input_to_valid_name(data.properties["name"])),
metadata=metadata,
kinds={"sigma", "dataset"},
deps={
asset_key_from_table_name(input_table_name.lower())
for input_table_name in data.inputs
},
description=data.properties.get("description"),
)