Submitting PySpark solids on EMR

You can find the code for this example on Github.

This example demonstrates how to have a solid run as a Spark step on an EMR cluster. In it, each of the three solids will be executed as a separate EMR step on the same EMR cluster.

repo.py
from pathlib import Path

from dagster_aws.emr import emr_pyspark_step_launcher
from dagster_aws.s3 import s3_intermediate_storage, s3_resource
from dagster_pyspark import DataFrame as DagsterPySparkDataFrame
from dagster_pyspark import pyspark_resource
from pyspark.sql import DataFrame, Row
from pyspark.sql.types import IntegerType, StringType, StructField, StructType

from dagster import (
    ModeDefinition,
    make_python_type_usable_as_dagster_type,
    pipeline,
    repository,
    solid,
)
from dagster.core.definitions.no_step_launcher import no_step_launcher

# Make pyspark.sql.DataFrame map to dagster_pyspark.DataFrame
make_python_type_usable_as_dagster_type(python_type=DataFrame, dagster_type=DagsterPySparkDataFrame)


@solid(required_resource_keys={"pyspark", "pyspark_step_launcher"})
def make_people(context) -> DataFrame:
    schema = StructType([StructField("name", StringType()), StructField("age", IntegerType())])
    rows = [Row(name="Thom", age=51), Row(name="Jonny", age=48), Row(name="Nigel", age=49)]
    return context.resources.pyspark.spark_session.createDataFrame(rows, schema)


@solid(required_resource_keys={"pyspark_step_launcher"})
def filter_over_50(_, people: DataFrame) -> DataFrame:
    return people.filter(people["age"] > 50)


@solid(required_resource_keys={"pyspark_step_launcher"})
def count_people(_, people: DataFrame) -> int:
    return people.count()


emr_mode = ModeDefinition(
    name="emr",
    resource_defs={
        "pyspark_step_launcher": emr_pyspark_step_launcher.configured(
            {
                "cluster_id": {"env": "EMR_CLUSTER_ID"},
                "local_pipeline_package_path": str(Path(__file__).parent),
                "deploy_local_pipeline_package": True,
                "region_name": "us-west-1",
                "staging_bucket": "my_staging_bucket",
                "wait_for_logs": True,
            }
        ),
        "pyspark": pyspark_resource.configured({"spark_conf": {"spark.executor.memory": "2g"}}),
        "s3": s3_resource,
    },
    intermediate_storage_defs=[
        s3_intermediate_storage.configured(
            {"s3_bucket": "my_staging_bucket", "s3_prefix": "simple-pyspark"}
        )
    ],
)

local_mode = ModeDefinition(
    name="local",
    resource_defs={
        "pyspark_step_launcher": no_step_launcher,
        "pyspark": pyspark_resource.configured({"spark_conf": {"spark.default.parallelism": 1}}),
    },
)


@pipeline(mode_defs=[emr_mode, local_mode])
def my_pipeline():
    count_people(filter_over_50(make_people()))

It accomplishes this by using the emr_pyspark_step_launcher, which knows how to launch an EMR step that runs the contents of a solid. The example defines a mode that links the resource key "pyspark_step_launcher" to the emr_pyspark_step_launcher resource definition, and then requires that "pyspark_step_launcher" resource key for the solid which it wants to launch remotely.

The EMR PySpark step launcher relies on the presence of an s3_resource and s3 intermediate store to shuttle config and events to and from EMR.

More generally, a step launcher is any resource that extends the StepLauncher abstract class, whose methods can be invoked where a solid would otherwise be executed in-process to instead launch a remote process with the solid running inside it. To use a step launcher for a particular solid, set a required resource key for the solid that points to that resource.

Open in a playground

Open in Gitpod

Download

curl https://codeload.github.com/dagster-io/dagster/tar.gz/master | tar -xz --strip=2 dagster-master/examples/emr_pyspark
cd emr_pyspark