Ask AI

Source code for dagster_gcp.dataproc.ops

from typing import Any, Dict

from dagster import (
    Bool,
    Config,
    Field as DagsterField,
    Int,
    op,
)
from dagster._seven import json
from pydantic import Field

from .configs import define_dataproc_submit_job_config
from .resources import TWENTY_MINUTES, DataprocResource

# maintain the old config schema because of the nested job_config schema
DATAPROC_CONFIG_SCHEMA = {
    "job_timeout_in_seconds": DagsterField(
        Int,
        description="""Optional. Maximum time in seconds to wait for the job being
                    completed. Default is set to 1200 seconds (20 minutes).
                    """,
        is_required=False,
        default_value=TWENTY_MINUTES,
    ),
    "job_config": define_dataproc_submit_job_config(),
    "job_scoped_cluster": DagsterField(
        Bool,
        description="whether to create a cluster or use an existing cluster",
        is_required=False,
        default_value=True,
    ),
}


class DataprocOpConfig(Config):
    job_timeout_in_seconds: int = Field(
        default=TWENTY_MINUTES,
        description=(
            "Maximum time in seconds to wait for the job being completed. Default is set to 1200"
            " seconds (20 minutes)."
        ),
    )
    job_scoped_cluster: bool = Field(
        default=True,
        description="Whether to create a cluster or use an existing cluster. Defaults to True.",
    )
    project_id: str = Field(
        description=(
            "Required. Project ID for the project which the client acts on behalf of. Will be"
            " passed when creating a dataset/job."
        )
    )
    region: str = Field(description="The GCP region.")
    job_config: Dict[str, Any] = Field(
        description="Python dictionary containing configuration for the Dataproc Job."
    )


def _dataproc_compute(context):
    job_config = context.op_config["job_config"]
    job_timeout = context.op_config["job_timeout_in_seconds"]

    context.log.info(
        "submitting job with config: %s and timeout of: %d seconds"
        % (str(json.dumps(job_config)), job_timeout)
    )

    if context.op_config["job_scoped_cluster"]:
        # Cluster context manager, creates and then deletes cluster
        with context.resources.dataproc.cluster_context_manager() as cluster:
            # Submit the job specified by this solid to the cluster defined by the associated resource
            result = cluster.submit_job(job_config)

            job_id = result["reference"]["jobId"]
            context.log.info(f"Submitted job ID {job_id}")
            cluster.wait_for_job(job_id, wait_timeout=job_timeout)

    else:
        # Submit to an existing cluster
        # Submit the job specified by this solid to the cluster defined by the associated resource
        result = context.resources.dataproc.submit_job(job_config)

        job_id = result["reference"]["jobId"]
        context.log.info(f"Submitted job ID {job_id}")
        context.resources.dataproc.wait_for_job(job_id, wait_timeout=job_timeout)


@op(required_resource_keys={"dataproc"}, config_schema=DATAPROC_CONFIG_SCHEMA)
def dataproc_solid(context):
    return _dataproc_compute(context)


[docs]@op(required_resource_keys={"dataproc"}, config_schema=DATAPROC_CONFIG_SCHEMA) def dataproc_op(context): return _dataproc_compute(context)
@op def configurable_dataproc_op(context, dataproc: DataprocResource, config: DataprocOpConfig): job_config = {"projectId": config.project_id, "region": config.region, "job": config.job_config} job_timeout = config.job_timeout_in_seconds context.log.info( "submitting job with config: %s and timeout of: %d seconds" % (str(json.dumps(job_config)), job_timeout) ) dataproc_client = dataproc.get_client() if config.job_scoped_cluster: # Cluster context manager, creates and then deletes cluster with dataproc_client.cluster_context_manager() as cluster: # Submit the job specified by this solid to the cluster defined by the associated resource result = cluster.submit_job(job_config) job_id = result["reference"]["jobId"] context.log.info(f"Submitted job ID {job_id}") cluster.wait_for_job(job_id, wait_timeout=job_timeout) else: # Submit to an existing cluster # Submit the job specified by this solid to the cluster defined by the associated resource result = dataproc_client.submit_job(job_config) job_id = result["reference"]["jobId"] context.log.info(f"Submitted job ID {job_id}") dataproc_client.wait_for_job(job_id, wait_timeout=job_timeout)