Submitting PySpark solids on EMR

You can find the code for this example on Github.

This example demonstrates how to have a solid run as a Spark step on an EMR cluster. In it, each of the three solids will be executed as a separate EMR step on the same EMR cluster.
from pathlib import Path

from dagster import (
from dagster.core.definitions.no_step_launcher import no_step_launcher
from dagster_aws.emr import emr_pyspark_step_launcher
from dagster_aws.s3 import s3_pickle_io_manager, s3_resource
from dagster_pyspark import DataFrame as DagsterPySparkDataFrame
from dagster_pyspark import pyspark_resource
from pyspark.sql import DataFrame, Row
from pyspark.sql.types import IntegerType, StringType, StructField, StructType

# Make pyspark.sql.DataFrame map to dagster_pyspark.DataFrame
make_python_type_usable_as_dagster_type(python_type=DataFrame, dagster_type=DagsterPySparkDataFrame)

@solid(required_resource_keys={"pyspark", "pyspark_step_launcher"})
def make_people(context) -> DataFrame:
    schema = StructType([StructField("name", StringType()), StructField("age", IntegerType())])
    rows = [Row(name="Thom", age=51), Row(name="Jonny", age=48), Row(name="Nigel", age=49)]
    return context.resources.pyspark.spark_session.createDataFrame(rows, schema)

def filter_over_50(_, people: DataFrame) -> DataFrame:
    return people.filter(people["age"] > 50)

def count_people(_, people: DataFrame) -> int:
    return people.count()

emr_mode = ModeDefinition(
        "pyspark_step_launcher": emr_pyspark_step_launcher.configured(
                "cluster_id": {"env": "EMR_CLUSTER_ID"},
                "local_pipeline_package_path": str(Path(__file__).parent),
                "deploy_local_pipeline_package": True,
                "region_name": "us-west-1",
                "staging_bucket": "my_staging_bucket",
                "wait_for_logs": True,
        "pyspark": pyspark_resource.configured({"spark_conf": {"spark.executor.memory": "2g"}}),
        "s3": s3_resource,
        "io_manager": s3_pickle_io_manager.configured(
            {"s3_bucket": "my_staging_bucket", "s3_prefix": "simple-pyspark"}

local_mode = ModeDefinition(
        "pyspark_step_launcher": no_step_launcher,
        "pyspark": pyspark_resource.configured({"spark_conf": {"spark.default.parallelism": 1}}),

@pipeline(mode_defs=[emr_mode, local_mode])
def my_pipeline():

It accomplishes this by using the emr_pyspark_step_launcher, which knows how to launch an EMR step that runs the contents of a solid. The example defines a mode that links the resource key "pyspark_step_launcher" to the emr_pyspark_step_launcher resource definition, and then requires that "pyspark_step_launcher" resource key for the solid which it wants to launch remotely.

The EMR PySpark step launcher relies on S3 to shuttle config and events to and from EMR.

More generally, a step launcher is any resource that extends the StepLauncher abstract class, whose methods can be invoked where a solid would otherwise be executed in-process to instead launch a remote process with the solid running inside it. To use a step launcher for a particular solid, set a required resource key for the solid that points to that resource.

Open in a playground

Open in Gitpod


curl | tar -xz --strip=2 dagster-master/examples/emr_pyspark
cd emr_pyspark