from concurrent.futures import ThreadPoolExecutor
from dataclasses import dataclass
from typing import TYPE_CHECKING, Any, Dict, List, Mapping, Optional, Sequence, Tuple, Type, cast
from dagster import (
AssetSpec,
ConfigurableResource,
Definitions,
_check as check,
)
from dagster._annotations import deprecated, experimental, public
from dagster._core.definitions.definitions_load_context import StateBackedDefinitionsLoader
from dagster._record import record
from dagster._utils.cached_method import cached_method
from dagster._utils.log import get_dagster_logger
from looker_sdk import init40
from looker_sdk.rtl.api_settings import ApiSettings, SettingsConfig
from looker_sdk.sdk.api40.methods import Looker40SDK
from pydantic import Field
from dagster_looker.api.dagster_looker_api_translator import (
DagsterLookerApiTranslator,
LookerInstanceData,
LookerStructureData,
LookerStructureType,
RequestStartPdtBuild,
)
if TYPE_CHECKING:
from looker_sdk.sdk.api40.models import Folder, LookmlModelExplore
logger = get_dagster_logger("dagster_looker")
LOOKER_RECONSTRUCTION_METADATA_KEY_PREFIX = "dagster-looker/reconstruction_metadata"
[docs]
@record
class LookerFilter:
"""Filters the set of Looker objects to fetch.
Args:
dashboard_folders (Optional[List[List[str]]]): A list of folder paths to fetch dashboards from.
Each folder path is a list of folder names, starting from the root folder. All dashboards
contained in the specified folders will be fetched. If not provided, all dashboards will be fetched.
only_fetch_explores_used_in_dashboards (bool): If True, only explores used in the fetched dashboards
will be fetched. If False, all explores will be fetched. Defaults to False.
"""
dashboard_folders: Optional[List[List[str]]] = None
only_fetch_explores_used_in_dashboards: bool = False
[docs]
@experimental
class LookerResource(ConfigurableResource):
"""Represents a connection to a Looker instance and provides methods
to interact with the Looker API.
"""
base_url: str = Field(
...,
description="Base URL for the Looker API. For example, https://your.cloud.looker.com.",
)
client_id: str = Field(..., description="Client ID for the Looker API.")
client_secret: str = Field(..., description="Client secret for the Looker API.")
@cached_method
def get_sdk(self) -> Looker40SDK:
class DagsterLookerApiSettings(ApiSettings):
def read_config(_self) -> SettingsConfig:
return {
**super().read_config(),
"base_url": self.base_url,
"client_id": self.client_id,
"client_secret": self.client_secret,
}
return init40(config_settings=DagsterLookerApiSettings())
[docs]
@public
@deprecated(
breaking_version="1.9.0",
additional_warn_text="Use dagster_looker.load_looker_asset_specs instead",
)
def build_defs(
self,
*,
request_start_pdt_builds: Optional[Sequence[RequestStartPdtBuild]] = None,
dagster_looker_translator: Optional[DagsterLookerApiTranslator] = None,
looker_filter: Optional[LookerFilter] = None,
) -> Definitions:
"""Returns a Definitions object which will load structures from the Looker instance
and translate it into assets, using the provided translator.
Args:
request_start_pdt_builds (Optional[Sequence[RequestStartPdtBuild]]): A list of
requests to start PDT builds. See https://developers.looker.com/api/explorer/4.0/types/DerivedTable/RequestStartPdtBuild?sdk=py
for documentation on all available fields.
dagster_looker_translator (Optional[DagsterLookerApiTranslator]): The translator to
use to convert Looker structures into assets. Defaults to DagsterLookerApiTranslator.
Returns:
Definitions: A Definitions object which will contain return the Looker structures as assets.
"""
from dagster_looker.api.assets import build_looker_pdt_assets_definitions
resource_key = "looker"
translator_cls = (
dagster_looker_translator.__class__
if dagster_looker_translator
else DagsterLookerApiTranslator
)
pdts = build_looker_pdt_assets_definitions(
resource_key=resource_key,
request_start_pdt_builds=request_start_pdt_builds or [],
dagster_looker_translator=translator_cls,
)
return Definitions(
assets=[*pdts, *load_looker_asset_specs(self, translator_cls, looker_filter)],
resources={resource_key: self},
)
[docs]
@experimental
def load_looker_asset_specs(
looker_resource: LookerResource,
dagster_looker_translator: Type[DagsterLookerApiTranslator] = DagsterLookerApiTranslator,
looker_filter: Optional[LookerFilter] = None,
) -> Sequence[AssetSpec]:
"""Returns a list of AssetSpecs representing the Looker structures.
Args:
looker_resource (LookerResource): The Looker resource to fetch assets from.
dagster_looker_translator (Type[DagsterLookerApiTranslator]): The translator to use
to convert Looker structures into AssetSpecs. Defaults to DagsterLookerApiTranslator.
Returns:
List[AssetSpec]: The set of AssetSpecs representing the Looker structures.
"""
return check.is_list(
LookerApiDefsLoader(
looker_resource=looker_resource,
translator_cls=dagster_looker_translator,
looker_filter=looker_filter or LookerFilter(),
)
.build_defs()
.assets,
AssetSpec,
)
def build_folder_path(folder_id_to_folder: Dict[str, "Folder"], folder_id: str) -> List[str]:
curr = folder_id
result = []
while curr in folder_id_to_folder:
result = [folder_id_to_folder[curr].name] + result
curr = folder_id_to_folder[curr].parent_id
return result
@dataclass(frozen=True)
class LookerApiDefsLoader(StateBackedDefinitionsLoader[Mapping[str, Any]]):
looker_resource: LookerResource
translator_cls: Type[DagsterLookerApiTranslator]
looker_filter: LookerFilter
@property
def defs_key(self) -> str:
return f"{LOOKER_RECONSTRUCTION_METADATA_KEY_PREFIX}/{self.looker_resource.client_id}"
def fetch_state(self) -> Mapping[str, Any]:
looker_instance_data = self.fetch_looker_instance_data()
return looker_instance_data.to_state(self.looker_resource.get_sdk())
def defs_from_state(self, state: Mapping[str, Any]) -> Definitions:
looker_instance_data = LookerInstanceData.from_state(self.looker_resource.get_sdk(), state)
translator = self.translator_cls(looker_instance_data)
return self._build_defs_from_looker_instance_data(looker_instance_data, translator)
def _build_defs_from_looker_instance_data(
self,
looker_instance_data: LookerInstanceData,
dagster_looker_translator: DagsterLookerApiTranslator,
) -> Definitions:
explores = [
dagster_looker_translator.get_asset_spec(
LookerStructureData(
structure_type=LookerStructureType.EXPLORE,
data=lookml_explore,
base_url=self.looker_resource.base_url,
),
)
for lookml_explore in looker_instance_data.explores_by_id.values()
]
views = [
dagster_looker_translator.get_asset_spec(
LookerStructureData(
structure_type=LookerStructureType.DASHBOARD,
data=looker_dashboard,
base_url=self.looker_resource.base_url,
)
)
for looker_dashboard in looker_instance_data.dashboards_by_id.values()
]
return Definitions(assets=[*explores, *views])
def fetch_looker_instance_data(self) -> LookerInstanceData:
"""Fetches all explores and dashboards from the Looker instance.
TODO: Fetch explores in parallel using asyncio
TODO: Get all the LookML views upstream of the explores
"""
sdk = self.looker_resource.get_sdk()
folders = sdk.all_folders()
folder_by_id = {folder.id: folder for folder in folders if folder.id is not None}
# Get dashboards
dashboards = sdk.all_dashboards(
fields=",".join(
[
"id",
"hidden",
"folder",
]
)
)
folder_filter_strings = (
[
"/".join(folder_filter).lower()
for folder_filter in self.looker_filter.dashboard_folders
]
if self.looker_filter.dashboard_folders
else []
)
dashboard_ids_to_fetch = []
if len(folder_filter_strings) == 0:
dashboard_ids_to_fetch = [
dashboard.id for dashboard in dashboards if not dashboard.hidden
]
else:
for dashboard in dashboards:
if (
not dashboard.hidden
and dashboard.folder is not None
and dashboard.folder.id is not None
):
folder_string = "/".join(
build_folder_path(folder_by_id, dashboard.folder.id)
).lower()
if any(
folder_string.startswith(folder_filter_string)
for folder_filter_string in folder_filter_strings
):
dashboard_ids_to_fetch.append(dashboard.id)
with ThreadPoolExecutor(max_workers=None) as executor:
dashboards_by_id = dict(
list(
executor.map(
lambda dashboard_id: (
dashboard_id,
sdk.dashboard(dashboard_id=dashboard_id),
),
(dashboard_id for dashboard_id in dashboard_ids_to_fetch),
)
)
)
# Get explore names from models
explores_for_model = {
model.name: [explore.name for explore in (model.explores or []) if explore.name]
for model in sdk.all_lookml_models(
fields=",".join(
[
"name",
"explores",
]
)
)
if model.name
}
if self.looker_filter.only_fetch_explores_used_in_dashboards:
used_explores = set()
for dashboard in dashboards_by_id.values():
for dash_filter in dashboard.dashboard_filters or []:
used_explores.add((dash_filter.model, dash_filter.explore))
explores_for_model = {
model_name: [
explore_name
for explore_name in explore_names
if (model_name, explore_name) in used_explores
]
for model_name, explore_names in explores_for_model.items()
}
def fetch_explore(model_name, explore_name) -> Optional[Tuple[str, "LookmlModelExplore"]]:
try:
lookml_explore = sdk.lookml_model_explore(
lookml_model_name=model_name,
explore_name=explore_name,
fields=",".join(
[
"id",
"view_name",
"sql_table_name",
"joins",
]
),
)
return (check.not_none(lookml_explore.id), lookml_explore)
except:
logger.warning(
f"Failed to fetch LookML explore '{explore_name}' for model '{model_name}'."
)
with ThreadPoolExecutor(max_workers=None) as executor:
explores_to_fetch = [
(model_name, explore_name)
for model_name, explore_names in explores_for_model.items()
for explore_name in explore_names
]
explores_by_id = dict(
cast(
List[Tuple[str, "LookmlModelExplore"]],
(
entry
for entry in executor.map(
lambda explore: fetch_explore(*explore), explores_to_fetch
)
if entry is not None
),
)
)
user_ids_to_fetch = set()
for dashboard in dashboards_by_id.values():
if dashboard.user_id:
user_ids_to_fetch.update(dashboard.user_id)
users = sdk.search_users(id=",".join(user_ids_to_fetch))
return LookerInstanceData(
explores_by_id=explores_by_id,
dashboards_by_id=dashboards_by_id,
users_by_id={check.not_none(user.id): user for user in users},
)