Source code for dagster_spark.resources
import os
import subprocess
import dagster._check as check
from dagster import resource
from dagster._core.definitions.resource_definition import dagster_maintained_resource
from dagster._core.log_manager import DagsterLogManager
from dagster_spark.types import SparkOpError
from dagster_spark.utils import construct_spark_shell_command
class SparkResource:
def __init__(self, logger):
self.logger = check.inst_param(logger, "logger", DagsterLogManager)
def run_spark_job(self, config, main_class):
check.dict_param(config, "config")
check.str_param(main_class, "main_class")
# Extract parameters from config
(
master_url,
deploy_mode,
application_jar,
spark_conf,
application_arguments,
spark_home,
) = [
config.get(k)
for k in (
"master_url",
"deploy_mode",
"application_jar",
"spark_conf",
"application_arguments",
"spark_home",
)
]
if not os.path.exists(application_jar):
raise SparkOpError(
f"Application jar {application_jar} does not exist. A valid jar must be "
"built before running this op."
)
spark_shell_cmd = construct_spark_shell_command(
application_jar=application_jar,
main_class=main_class,
master_url=master_url,
spark_conf=spark_conf,
deploy_mode=deploy_mode,
application_arguments=application_arguments,
spark_home=spark_home,
)
self.logger.info("Running spark-submit: " + " ".join(spark_shell_cmd))
retcode = subprocess.call(" ".join(spark_shell_cmd), shell=True)
if retcode != 0:
raise SparkOpError("Spark job failed. Please consult your logs.")
[docs]
@dagster_maintained_resource
@resource
def spark_resource(context):
return SparkResource(context.log)