Ask AI

Source code for dagster_gcp.bigquery.resources

from contextlib import contextmanager
from datetime import datetime
from typing import Any, Iterator, Mapping, Optional, Sequence

from dagster import (
    ConfigurableResource,
    IAttachDifferentObjectToOpContext,
    _check as check,
    resource,
)
from dagster._core.definitions.resource_definition import dagster_maintained_resource
from google.cloud import bigquery
from pydantic import Field

from dagster_gcp.bigquery.utils import setup_gcp_creds


[docs] class BigQueryResource(ConfigurableResource, IAttachDifferentObjectToOpContext): """Resource for interacting with Google BigQuery. Examples: .. code-block:: python from dagster import Definitions, asset from dagster_gcp import BigQueryResource @asset def my_table(bigquery: BigQueryResource): with bigquery.get_client() as client: client.query("SELECT * FROM my_dataset.my_table") defs = Definitions( assets=[my_table], resources={ "bigquery": BigQueryResource(project="my-project") } ) """ project: Optional[str] = Field( default=None, description=( "Project ID for the project which the client acts on behalf of. Will be passed when" " creating a dataset / job. If not passed, falls back to the default inferred from the" " environment." ), ) location: Optional[str] = Field( default=None, description="Default location for jobs / datasets / tables.", ) gcp_credentials: Optional[str] = Field( default=None, description=( "GCP authentication credentials. If provided, a temporary file will be created" " with the credentials and ``GOOGLE_APPLICATION_CREDENTIALS`` will be set to the" " temporary file. To avoid issues with newlines in the keys, you must base64" " encode the key. You can retrieve the base64 encoded key with this shell" " command: ``cat $GOOGLE_AUTH_CREDENTIALS | base64``" ), ) @classmethod def _is_dagster_maintained(cls) -> bool: return True @contextmanager def get_client(self) -> Iterator[bigquery.Client]: """Context manager to create a BigQuery Client. Examples: .. code-block:: python from dagster import asset from dagster_gcp import BigQueryResource @asset def my_table(bigquery: BigQueryResource): with bigquery.get_client() as client: client.query("SELECT * FROM my_dataset.my_table") """ if self.gcp_credentials: with setup_gcp_creds(self.gcp_credentials): yield bigquery.Client(project=self.project, location=self.location) else: yield bigquery.Client(project=self.project, location=self.location) def get_object_to_set_on_execution_context(self) -> Any: with self.get_client() as client: yield client
[docs] @dagster_maintained_resource @resource( config_schema=BigQueryResource.to_config_schema(), description="Dagster resource for connecting to BigQuery", ) def bigquery_resource(context): bq_resource = BigQueryResource.from_resource_context(context) with bq_resource.get_client() as client: yield client
[docs] def fetch_last_updated_timestamps( *, client: bigquery.Client, dataset_id: str, table_ids: Sequence[str], ) -> Mapping[str, datetime]: """Get the last updated timestamps of a list BigQuery table. Note that this only works on BigQuery tables, and not views. Args: client (bigquery.Client): The BigQuery client. dataset_id (str): The BigQuery dataset ID. table_ids (Sequence[str]): The table IDs to get the last updated timestamp for. Returns: Mapping[str, datetime]: A mapping of table IDs to their last updated timestamps (UTC). """ check.invariant( len(table_ids) > 0, "At least one table ID must be provided.", ) query = f""" SELECT table_id, TIMESTAMP_MILLIS(last_modified_time) as last_modified_time FROM {dataset_id}.__TABLES__ WHERE table_id IN ({", ".join([f'"{table_id}"' for table_id in table_ids])}) """ query_job = client.query(query) results = query_job.result() modified_times = {row.table_id: row.last_modified_time for row in results} for table_id in table_ids: if table_id not in modified_times: raise ValueError(f"Table {table_id} not found in dataset {dataset_id}") return modified_times