import hashlib
from dagster import (
In,
List,
Nothing,
Out,
_check as check,
op,
)
from dagster._core.storage.tags import COMPUTE_KIND_TAG
from dagster_pandas import DataFrame
from google.cloud.bigquery.encryption_configuration import EncryptionConfiguration
from google.cloud.bigquery.job import LoadJobConfig, QueryJobConfig
from google.cloud.bigquery.table import TimePartitioning
from dagster_gcp.bigquery.configs import (
define_bigquery_create_dataset_config,
define_bigquery_delete_dataset_config,
define_bigquery_load_config,
define_bigquery_query_config,
)
from dagster_gcp.bigquery.types import BigQueryLoadSource
_START = "start"
def _preprocess_config(cfg):
destination_encryption_configuration = cfg.get("destination_encryption_configuration")
time_partitioning = cfg.get("time_partitioning")
if destination_encryption_configuration is not None:
cfg["destination_encryption_configuration"] = EncryptionConfiguration(
kms_key_name=destination_encryption_configuration
)
if time_partitioning is not None:
cfg["time_partitioning"] = TimePartitioning(**time_partitioning)
return cfg
[docs]
def bq_op_for_queries(sql_queries):
"""Executes BigQuery SQL queries.
Expects a BQ client to be provisioned in resources as context.resources.bigquery.
"""
sql_queries = check.list_param(sql_queries, "sql queries", of_type=str)
m = hashlib.sha1()
for query in sql_queries:
m.update(query.encode("utf-8"))
hash_str = m.hexdigest()[:10]
name = f"bq_op_{hash_str}"
@op(
name=name,
ins={_START: In(Nothing)},
out=Out(List[DataFrame]),
config_schema=define_bigquery_query_config(),
required_resource_keys={"bigquery"},
tags={COMPUTE_KIND_TAG: "sql", "sql": "\n".join(sql_queries)},
)
def _bq_fn(context):
query_job_config = _preprocess_config(context.op_config.get("query_job_config", {}))
# Retrieve results as pandas DataFrames
results = []
for sql_query in sql_queries:
# We need to construct a new QueryJobConfig for each query.
# See: https://bit.ly/2VjD6sl
cfg = QueryJobConfig(**query_job_config) if query_job_config else None
context.log.info(
"executing query %s with config: %s"
% (sql_query, cfg.to_api_repr() if cfg else "(no config provided)")
)
results.append(
context.resources.bigquery.query(sql_query, job_config=cfg).to_dataframe()
)
return results
return _bq_fn
BIGQUERY_LOAD_CONFIG = define_bigquery_load_config()
[docs]
@op(
ins={"paths": In(List[str])},
out=Out(Nothing),
config_schema=BIGQUERY_LOAD_CONFIG,
required_resource_keys={"bigquery"},
)
def import_gcs_paths_to_bq(context, paths):
return _execute_load_in_source(context, paths, BigQueryLoadSource.GCS)
[docs]
@op(
ins={"df": In(DataFrame)},
out=Out(Nothing),
config_schema=BIGQUERY_LOAD_CONFIG,
required_resource_keys={"bigquery"},
)
def import_df_to_bq(context, df):
return _execute_load_in_source(context, df, BigQueryLoadSource.DataFrame)
[docs]
@op(
ins={"path": In(str)},
out=Out(Nothing),
config_schema=BIGQUERY_LOAD_CONFIG,
required_resource_keys={"bigquery"},
)
def import_file_to_bq(context, path):
return _execute_load_in_source(context, path, BigQueryLoadSource.File)
def _execute_load_in_source(context, source, source_name):
destination = context.op_config.get("destination")
load_job_config = _preprocess_config(context.op_config.get("load_job_config", {}))
cfg = LoadJobConfig(**load_job_config) if load_job_config else None
context.log.info(
"executing BQ load with config: %s for source %s"
% (cfg.to_api_repr() if cfg else "(no config provided)", source)
)
if source_name == BigQueryLoadSource.DataFrame:
context.resources.bigquery.load_table_from_dataframe(
source, destination, job_config=cfg
).result()
# Load from file. See: https://cloud.google.com/bigquery/docs/loading-data-local
elif source_name == BigQueryLoadSource.File:
with open(source, "rb") as file_obj:
context.resources.bigquery.load_table_from_file(
file_obj, destination, job_config=cfg
).result()
# Load from GCS. See: https://cloud.google.com/bigquery/docs/loading-data-cloud-storage
elif source_name == BigQueryLoadSource.GCS:
context.resources.bigquery.load_table_from_uri(source, destination, job_config=cfg).result()
[docs]
@op(
ins={_START: In(Nothing)},
config_schema=define_bigquery_create_dataset_config(),
required_resource_keys={"bigquery"},
)
def bq_create_dataset(context):
"""BigQuery Create Dataset.
This op encapsulates creating a BigQuery dataset.
Expects a BQ client to be provisioned in resources as context.resources.bigquery.
"""
(dataset, exists_ok) = [context.op_config.get(k) for k in ("dataset", "exists_ok")]
context.log.info("executing BQ create_dataset for dataset %s" % (dataset))
context.resources.bigquery.create_dataset(dataset, exists_ok)
[docs]
@op(
ins={_START: In(Nothing)},
config_schema=define_bigquery_delete_dataset_config(),
required_resource_keys={"bigquery"},
)
def bq_delete_dataset(context):
"""BigQuery Delete Dataset.
This op encapsulates deleting a BigQuery dataset.
Expects a BQ client to be provisioned in resources as context.resources.bigquery.
"""
(dataset, delete_contents, not_found_ok) = [
context.op_config.get(k) for k in ("dataset", "delete_contents", "not_found_ok")
]
context.log.info("executing BQ delete_dataset for dataset %s" % dataset)
context.resources.bigquery.delete_dataset(
dataset, delete_contents=delete_contents, not_found_ok=not_found_ok
)