from enum import Enum
from typing import Any, Dict, Mapping, Optional, Union
from dagster import (
AssetKey,
AssetSpec,
_check as check,
)
from dagster._annotations import deprecated, public
from dagster._core.definitions.metadata.metadata_value import MetadataValue
from dagster._record import record
from dagster._utils.log import get_dagster_logger
from looker_sdk.sdk.api40.methods import Looker40SDK
from looker_sdk.sdk.api40.models import Dashboard, DashboardFilter, LookmlModelExplore, User
logger = get_dagster_logger("dagster_looker")
@record
class LookerInstanceData:
"""A record representing all content in a Looker instance."""
explores_by_id: Dict[str, LookmlModelExplore]
dashboards_by_id: Dict[str, Dashboard]
users_by_id: Dict[str, User]
def to_state(self, sdk: Looker40SDK) -> Mapping[str, Any]:
return {
"dashboards_by_id": {
dashboard_id: (sdk.serialize(api_model=dashboard_data).decode()) # type: ignore
for dashboard_id, dashboard_data in self.dashboards_by_id.items()
},
"explores_by_id": {
explore_id: (sdk.serialize(api_model=explore_data).decode()) # type: ignore
for explore_id, explore_data in self.explores_by_id.items()
},
"users_by_id": {
user_id: (sdk.serialize(api_model=user_data).decode()) # type: ignore
for user_id, user_data in self.users_by_id.items()
},
}
@staticmethod
def from_state(sdk: Looker40SDK, state: Mapping[str, Any]) -> "LookerInstanceData":
explores_by_id = {
explore_id: (
sdk.deserialize(data=serialized_lookml_explore, structure=LookmlModelExplore) # type: ignore
)
for explore_id, serialized_lookml_explore in state["explores_by_id"].items()
}
dashboards_by_id = {
dashboard_id: (sdk.deserialize(data=serialized_looker_dashboard, structure=Dashboard)) # type: ignore
for dashboard_id, serialized_looker_dashboard in state["dashboards_by_id"].items()
}
users_by_id = {
user_id: (sdk.deserialize(data=serialized_user, structure=User)) # type: ignore
for user_id, serialized_user in state["users_by_id"].items()
}
return LookerInstanceData(
explores_by_id=explores_by_id,
dashboards_by_id=dashboards_by_id,
users_by_id=users_by_id,
)
[docs]
@record
class RequestStartPdtBuild:
"""A request to start a PDT build. See https://developers.looker.com/api/explorer/4.0/types/DerivedTable/RequestStartPdtBuild?sdk=py
for documentation on all available fields.
Args:
model_name: The model of the PDT to start building.
view_name: The view name of the PDT to start building.
force_rebuild: Force rebuild of required dependent PDTs, even if they are already materialized.
force_full_incremental: Force involved incremental PDTs to fully re-materialize.
workspace: Workspace in which to materialize selected PDT ('dev' or default 'production').
source: The source of this request.
"""
model_name: str
view_name: str
force_rebuild: Optional[str] = None
force_full_incremental: Optional[str] = None
workspace: Optional[str] = None
source: Optional[str] = None
[docs]
class LookerStructureType(Enum):
VIEW = "view"
EXPLORE = "explore"
DASHBOARD = "dashboard"
@record
class LookmlView:
view_name: str
sql_table_name: Optional[str]
[docs]
@record
class LookerStructureData:
structure_type: LookerStructureType
data: Union[LookmlView, LookmlModelExplore, DashboardFilter, Dashboard]
base_url: Optional[str] = None
[docs]
class DagsterLookerApiTranslator:
def __init__(self, looker_instance_data: Optional[LookerInstanceData]):
self._looker_instance_data = looker_instance_data
@property
def instance_data(self) -> Optional[LookerInstanceData]:
return self._looker_instance_data
@deprecated(
breaking_version="1.10",
additional_warn_text="Use `DagsterLookerApiTranslator.get_asset_spec().key` instead",
)
def get_view_asset_key(self, looker_structure: LookerStructureData) -> AssetKey:
return self.get_asset_spec(looker_structure).key
def get_view_asset_spec(self, looker_structure: LookerStructureData) -> AssetSpec:
lookml_view = check.inst(looker_structure.data, LookmlView)
return AssetSpec(
key=AssetKey(["view", lookml_view.view_name]),
)
@deprecated(
breaking_version="1.10",
additional_warn_text="Use `DagsterLookerApiTranslator.get_asset_spec().key` instead",
)
def get_explore_asset_key(self, looker_structure: LookerStructureData) -> AssetKey:
return self.get_explore_asset_spec(looker_structure).key
def get_explore_asset_spec(self, looker_structure: LookerStructureData) -> AssetSpec:
lookml_explore = check.inst(looker_structure.data, (LookmlModelExplore, DashboardFilter))
if isinstance(lookml_explore, LookmlModelExplore):
explore_base_view = LookmlView(
view_name=check.not_none(lookml_explore.view_name),
sql_table_name=check.not_none(lookml_explore.sql_table_name),
)
explore_join_views = [
LookmlView(
view_name=check.not_none(lookml_explore_join.from_ or lookml_explore_join.name),
sql_table_name=lookml_explore_join.sql_table_name,
)
for lookml_explore_join in (lookml_explore.joins or [])
]
return AssetSpec(
key=AssetKey(check.not_none(lookml_explore.id)),
deps=list(
{
self.get_asset_spec(
LookerStructureData(
structure_type=LookerStructureType.VIEW, data=lookml_view
)
).key
for lookml_view in [explore_base_view, *explore_join_views]
}
),
tags={
"dagster/kind/looker": "",
"dagster/kind/explore": "",
},
metadata={
"dagster-looker/web_url": MetadataValue.url(
f"{looker_structure.base_url}/explore/{check.not_none(lookml_explore.id).replace('::', '/')}"
),
},
)
elif isinstance(lookml_explore, DashboardFilter):
lookml_model_name = check.not_none(lookml_explore.model)
lookml_explore_name = check.not_none(lookml_explore.explore)
return AssetSpec(key=AssetKey(f"{lookml_model_name}::{lookml_explore_name}"))
else:
check.assert_never(lookml_explore)
@deprecated(
breaking_version="1.10",
additional_warn_text="Use `DagsterLookerApiTranslator.get_asset_spec().key` instead",
)
def get_dashboard_asset_key(self, looker_structure: LookerStructureData) -> AssetKey:
return self.get_asset_spec(looker_structure).key
def get_dashboard_asset_spec(self, looker_structure: LookerStructureData) -> AssetSpec:
looker_dashboard = check.inst(looker_structure.data, Dashboard)
user = None
if self.instance_data and looker_dashboard.user_id:
user = self.instance_data.users_by_id.get(looker_dashboard.user_id)
return AssetSpec(
key=AssetKey(f"{check.not_none(looker_dashboard.title)}_{looker_dashboard.id}"),
deps=list(
{
self.get_asset_spec(
LookerStructureData(
structure_type=LookerStructureType.EXPLORE, data=dashboard_filter
)
).key
for dashboard_filter in looker_dashboard.dashboard_filters or []
}
),
tags={
"dagster/kind/looker": "",
"dagster/kind/dashboard": "",
},
metadata={
"dagster-looker/web_url": MetadataValue.url(
f"{looker_structure.base_url}{looker_dashboard.url}"
),
},
owners=[user.email] if user and user.email else None,
)
[docs]
@public
def get_asset_spec(self, looker_structure: LookerStructureData) -> AssetSpec:
if looker_structure.structure_type == LookerStructureType.VIEW:
return self.get_view_asset_spec(looker_structure)
if looker_structure.structure_type == LookerStructureType.EXPLORE:
return self.get_explore_asset_spec(looker_structure)
elif looker_structure.structure_type == LookerStructureType.DASHBOARD:
return self.get_dashboard_asset_spec(looker_structure)
else:
check.assert_never(looker_structure.structure_type)
[docs]
@deprecated(
breaking_version="1.10",
additional_warn_text="Use `DagsterLookerApiTranslator.get_asset_spec().key` instead",
)
@public
def get_asset_key(self, looker_structure: LookerStructureData) -> AssetKey:
return self.get_asset_spec(looker_structure).key