import re
import urllib.parse
from enum import Enum
from typing import Any, Dict, List, Literal, Optional, Sequence
from dagster import (
UrlMetadataValue,
_check as check,
)
from dagster._core.definitions.asset_key import AssetKey
from dagster._core.definitions.asset_spec import AssetSpec
from dagster._core.definitions.metadata.metadata_set import NamespacedMetadataSet, TableMetadataSet
from dagster._core.definitions.metadata.metadata_value import MetadataValue
from dagster._core.definitions.metadata.table import TableColumn, TableSchema
from dagster._core.definitions.tags.tag_set import NamespacedTagSet
from dagster._record import record
from dagster._serdes.serdes import whitelist_for_serdes
def _get_last_filepath_component(path: str) -> str:
"""Returns the last component of a file path."""
return path.split("/")[-1].split("\\")[-1]
def _remove_file_ext(name: str) -> str:
"""Removes the file extension from a given name."""
return name.rsplit(".", 1)[0]
def _clean_asset_name(name: str) -> str:
"""Cleans an input to be a valid Dagster asset name."""
return re.sub(r"[^A-Za-z0-9_]+", "_", name)
# regex to find objects of form
# [Name="ANALYTICS",Kind="Schema"]
PARSE_M_QUERY_OBJECT = re.compile(r'\[Name="(?P<name>[^"]+)",Kind="(?P<kind>[^"]+)"\]')
def _attempt_parse_m_query_source(sources: List[Dict[str, Any]]) -> Optional[AssetKey]:
for source in sources:
if "expression" in source:
objects = PARSE_M_QUERY_OBJECT.findall(source["expression"])
objects_by_kind = {obj[1]: obj[0].lower() for obj in objects}
if "Schema" in objects_by_kind and "Table" in objects_by_kind:
if "Database" in objects_by_kind:
return AssetKey(
[
objects_by_kind["Database"],
objects_by_kind["Schema"],
objects_by_kind["Table"],
]
)
else:
return AssetKey([objects_by_kind["Schema"], objects_by_kind["Table"]])
@whitelist_for_serdes
class PowerBIContentType(Enum):
"""Enum representing each object in PowerBI's ontology, generically referred to as "content" by the API."""
DASHBOARD = "dashboard"
REPORT = "report"
SEMANTIC_MODEL = "semantic_model"
DATA_SOURCE = "data_source"
@whitelist_for_serdes
@record
class PowerBIContentData:
"""A record representing a piece of content in PowerBI.
Includes the content's type and data as returned from the API.
"""
content_type: PowerBIContentType
properties: Dict[str, Any]
@whitelist_for_serdes
@record
class PowerBIWorkspaceData:
"""A record representing all content in a PowerBI workspace.
Provided as context for the translator so that it can resolve dependencies between content.
"""
workspace_id: str
dashboards_by_id: Dict[str, PowerBIContentData]
reports_by_id: Dict[str, PowerBIContentData]
semantic_models_by_id: Dict[str, PowerBIContentData]
data_sources_by_id: Dict[str, PowerBIContentData]
@classmethod
def from_content_data(
cls, workspace_id: str, content_data: Sequence[PowerBIContentData]
) -> "PowerBIWorkspaceData":
return cls(
workspace_id=workspace_id,
dashboards_by_id={
dashboard.properties["id"]: dashboard
for dashboard in content_data
if dashboard.content_type == PowerBIContentType.DASHBOARD
},
reports_by_id={
report.properties["id"]: report
for report in content_data
if report.content_type == PowerBIContentType.REPORT
},
semantic_models_by_id={
dataset.properties["id"]: dataset
for dataset in content_data
if dataset.content_type == PowerBIContentType.SEMANTIC_MODEL
},
data_sources_by_id={
data_source.properties["datasourceId"]: data_source
for data_source in content_data
if data_source.content_type == PowerBIContentType.DATA_SOURCE
},
)
class PowerBITagSet(NamespacedTagSet):
asset_type: Optional[Literal["dashboard", "report", "semantic_model", "data_source"]] = None
@classmethod
def namespace(cls) -> str:
return "dagster-powerbi"
class PowerBIMetadataSet(NamespacedMetadataSet):
web_url: Optional[UrlMetadataValue] = None
id: Optional[str] = None
@classmethod
def namespace(cls) -> str:
return "dagster-powerbi"
def _build_table_metadata(table: Dict[str, Any]) -> TableMetadataSet:
return TableMetadataSet(
table_name=table["name"],
column_schema=TableSchema(
columns=[
TableColumn(name=column["name"].lower(), type=column.get("dataType"))
for column in table["columns"]
]
),
)
[docs]
class DagsterPowerBITranslator:
"""Translator class which converts raw response data from the PowerBI API into AssetSpecs.
Subclass this class to implement custom logic for each type of PowerBI content.
"""
def __init__(self, context: PowerBIWorkspaceData):
self._context = context
@property
def workspace_data(self) -> PowerBIWorkspaceData:
return self._context
def get_asset_spec(self, data: PowerBIContentData) -> AssetSpec:
if data.content_type == PowerBIContentType.DASHBOARD:
return self.get_dashboard_spec(data)
elif data.content_type == PowerBIContentType.REPORT:
return self.get_report_spec(data)
elif data.content_type == PowerBIContentType.SEMANTIC_MODEL:
return self.get_semantic_model_spec(data)
elif data.content_type == PowerBIContentType.DATA_SOURCE:
return self.get_data_source_spec(data)
else:
check.assert_never(data.content_type)
def get_dashboard_asset_key(self, data: PowerBIContentData) -> AssetKey:
return AssetKey(
[
"dashboard",
_clean_asset_name(_remove_file_ext(data.properties["displayName"])),
]
)
def get_dashboard_spec(self, data: PowerBIContentData) -> AssetSpec:
dashboard_id = data.properties["id"]
tile_report_ids = [
tile["reportId"] for tile in data.properties["tiles"] if "reportId" in tile
]
report_keys = [
self.get_report_asset_key(self.workspace_data.reports_by_id[report_id])
for report_id in tile_report_ids
]
url = (
data.properties.get("webUrl")
or f"https://app.powerbi.com/groups/{self.workspace_data.workspace_id}/dashboards/{dashboard_id}"
)
return AssetSpec(
key=self.get_dashboard_asset_key(data),
deps=report_keys,
metadata={**PowerBIMetadataSet(web_url=MetadataValue.url(url) if url else None)},
tags={**PowerBITagSet(asset_type="dashboard")},
kinds={"powerbi", "dashboard"},
)
def get_report_asset_key(self, data: PowerBIContentData) -> AssetKey:
return AssetKey(["report", _clean_asset_name(data.properties["name"])])
def get_report_spec(self, data: PowerBIContentData) -> AssetSpec:
report_id = data.properties["id"]
dataset_id = data.properties["datasetId"]
dataset_data = self.workspace_data.semantic_models_by_id.get(dataset_id)
dataset_key = self.get_semantic_model_asset_key(dataset_data) if dataset_data else None
url = (
data.properties.get("webUrl")
or f"https://app.powerbi.com/groups/{self.workspace_data.workspace_id}/reports/{report_id}"
)
owner = data.properties.get("createdBy")
return AssetSpec(
key=self.get_report_asset_key(data),
deps=[dataset_key] if dataset_key else None,
metadata={**PowerBIMetadataSet(web_url=MetadataValue.url(url) if url else None)},
tags={**PowerBITagSet(asset_type="report")},
kinds={"powerbi", "report"},
owners=[owner] if owner else None,
)
def get_semantic_model_asset_key(self, data: PowerBIContentData) -> AssetKey:
return AssetKey(["semantic_model", _clean_asset_name(data.properties["name"])])
def get_semantic_model_spec(self, data: PowerBIContentData) -> AssetSpec:
dataset_id = data.properties["id"]
source_ids = data.properties.get("sources", [])
source_keys = [
self.get_data_source_asset_key(self.workspace_data.data_sources_by_id[source_id])
for source_id in source_ids
]
url = (
data.properties.get("webUrl")
or f"https://app.powerbi.com/groups/{self.workspace_data.workspace_id}/datasets/{dataset_id}"
)
for table in data.properties.get("tables", []):
source = table.get("source", [])
source_key = _attempt_parse_m_query_source(source)
if source_key:
source_keys.append(source_key)
owner = data.properties.get("configuredBy")
tables = data.properties.get("tables")
table_meta = {}
if tables:
if len(tables) == 1:
table_meta = _build_table_metadata(tables[0])
else:
table_meta = {
f"{table['name'].lower()}_column_schema": _build_table_metadata(
table
).column_schema
for table in tables
}
return AssetSpec(
key=self.get_semantic_model_asset_key(data),
deps=source_keys,
metadata={
**PowerBIMetadataSet(
web_url=MetadataValue.url(url) if url else None, id=data.properties["id"]
),
**table_meta,
},
tags={**PowerBITagSet(asset_type="semantic_model")},
kinds={"powerbi", "semantic model"},
owners=[owner] if owner else None,
)
def get_data_source_asset_key(self, data: PowerBIContentData) -> AssetKey:
connection_name = (
data.properties["connectionDetails"].get("path")
or data.properties["connectionDetails"].get("url")
or data.properties["connectionDetails"].get("database")
)
if not connection_name:
return AssetKey([_clean_asset_name(data.properties["datasourceId"])])
obj_name = _get_last_filepath_component(urllib.parse.unquote(connection_name))
return AssetKey(path=[_clean_asset_name(obj_name)])
def get_data_source_spec(self, data: PowerBIContentData) -> AssetSpec:
return AssetSpec(
key=self.get_data_source_asset_key(data),
tags={**PowerBITagSet(asset_type="data_source")},
kinds={"powerbi"},
)