Source code for dagster_looker.api.assets
from typing import Sequence, Type, cast
from dagster import AssetExecutionContext, AssetsDefinition, Failure, multi_asset
from dagster._annotations import experimental
from dagster_looker.api.dagster_looker_api_translator import (
DagsterLookerApiTranslator,
LookerStructureData,
LookerStructureType,
LookmlView,
RequestStartPdtBuild,
)
from dagster_looker.api.resource import LookerResource
[docs]
@experimental
def build_looker_pdt_assets_definitions(
resource_key: str,
request_start_pdt_builds: Sequence[RequestStartPdtBuild],
dagster_looker_translator: Type[DagsterLookerApiTranslator] = DagsterLookerApiTranslator,
) -> Sequence[AssetsDefinition]:
"""Returns the AssetsDefinitions of the executable assets for the given the list of refreshable PDTs.
Args:
resource_key (str): The resource key to use for the Looker resource.
request_start_pdt_builds (Optional[Sequence[RequestStartPdtBuild]]): A list of requests to start PDT builds.
See https://developers.looker.com/api/explorer/4.0/types/DerivedTable/RequestStartPdtBuild?sdk=py
for documentation on all available fields.
dagster_looker_translator (Optional[DagsterLookerApiTranslator]): The translator to
use to convert Looker structures into assets. Defaults to DagsterLookerApiTranslator.
Returns:
AssetsDefinition: The AssetsDefinitions of the executable assets for the given the list of refreshable PDTs.
"""
translator = dagster_looker_translator(None)
result = []
for request_start_pdt_build in request_start_pdt_builds:
@multi_asset(
specs=[
translator.get_asset_spec(
LookerStructureData(
structure_type=LookerStructureType.VIEW,
data=LookmlView(
view_name=request_start_pdt_build.view_name,
sql_table_name=None,
),
)
)
],
name=f"{request_start_pdt_build.model_name}_{request_start_pdt_build.view_name}",
required_resource_keys={resource_key},
)
def pdts(context: AssetExecutionContext):
looker = cast(LookerResource, getattr(context.resources, resource_key))
context.log.info(
f"Starting pdt build for Looker view `{request_start_pdt_build.view_name}` "
f"in Looker model `{request_start_pdt_build.model_name}`."
)
materialize_pdt = looker.get_sdk().start_pdt_build(
model_name=request_start_pdt_build.model_name,
view_name=request_start_pdt_build.view_name,
force_rebuild=request_start_pdt_build.force_rebuild,
force_full_incremental=request_start_pdt_build.force_full_incremental,
workspace=request_start_pdt_build.workspace,
source=f"Dagster run {context.run_id}"
if context.run_id
else request_start_pdt_build.source,
)
if not materialize_pdt.materialization_id:
raise Failure("No materialization id was returned from Looker API.")
check_pdt = looker.get_sdk().check_pdt_build(
materialization_id=materialize_pdt.materialization_id
)
context.log.info(
f"Materialization id: {check_pdt.materialization_id}, "
f"response text: {check_pdt.resp_text}"
)
result.append(pdts)
return result