Source code for dagster_spark.utils
import itertools
import os
import dagster._check as check
from dagster_spark.types import SparkOpError
def flatten_dict(d):
def _flatten_dict(d, result, key_path=None):
"""Iterates an arbitrarily nested dictionary and yield dot-notation key:value tuples.
{'foo': {'bar': 3, 'baz': 1}, {'other': {'key': 1}} =>
[('foo.bar', 3), ('foo.baz', 1), ('other.key', 1)]
"""
for k, v in d.items():
new_key_path = (key_path or []) + [k]
if isinstance(v, dict):
_flatten_dict(v, result, new_key_path)
else:
result.append((".".join(new_key_path), v))
result = []
if d is not None:
_flatten_dict(d, result)
return result
def parse_spark_config(spark_conf):
"""Convert spark conf dict to list of CLI arguments.
For each key-value pair in spark conf, we need to pass to CLI in format:
--conf "key=value"
"""
spark_conf_list = flatten_dict(spark_conf)
return format_for_cli(spark_conf_list)
def format_for_cli(spark_conf_list):
return list(
itertools.chain.from_iterable([("--conf", "{}={}".format(*c)) for c in spark_conf_list])
)
[docs]
def construct_spark_shell_command(
application_jar,
main_class,
master_url=None,
spark_conf=None,
deploy_mode=None,
application_arguments=None,
spark_home=None,
):
"""Constructs the spark-submit command for a Spark job."""
check.opt_str_param(master_url, "master_url")
check.str_param(application_jar, "application_jar")
spark_conf = check.opt_dict_param(spark_conf, "spark_conf")
check.opt_str_param(deploy_mode, "deploy_mode")
check.opt_str_param(application_arguments, "application_arguments")
check.opt_str_param(spark_home, "spark_home")
spark_home = spark_home if spark_home else os.environ.get("SPARK_HOME")
if spark_home is None:
raise SparkOpError(
"No spark home set. You must either pass spark_home in config or "
"set $SPARK_HOME in your environment (got None)."
)
master_url = ["--master", master_url] if master_url else []
deploy_mode = ["--deploy-mode", deploy_mode] if deploy_mode else []
spark_shell_cmd = (
[f"{spark_home}/bin/spark-submit", "--class", main_class]
+ master_url
+ deploy_mode
+ parse_spark_config(spark_conf)
+ [application_jar]
+ [application_arguments]
)
return spark_shell_cmd