Ask AI

Spark & Dagster#

Dagster assets and ops can perform computations using Spark.

Running computations on Spark presents unique challenges, because, unlike other computations, Spark jobs typically execute on infrastructure that's specialized for Spark - i.e. that can network sets of workers into clusters that Spark can run computations against. Spark applications are typically not containerized or executed on Kubernetes. Running Spark code often requires submitting code to a Databricks or EMR cluster.

There are two approaches to writing Dagster assets and ops that invoke Spark computations:

Asset body submits Spark job#

With this approach, the code inside the asset definition submits a Spark job to an external system like Databricks or EMR, usually pointing to a jar or zip of Python files that contain the actual Spark data transformations and actions.

If you want to run a Spark job against YARN or a Spark Standalone cluster, the code inside your asset can issue a shell command that invokes spark-submit.

This is the easiest approach for migrating existing Spark jobs, and it's the only approach that works for Spark jobs written in Java or Scala. The downside is that it loses out on some of the benefits of Dagster - the implementation of each asset is bound to the execution environment, so you can't run your Spark transformations without relying on external infrastructure. Writing unit tests is cumbersome.

Asset accepts and produces DataFrames or RDDs#

With this approach, the code inside the asset definition consists of pure logical data transformations on Spark DataFrames or RDDs. The @asset-decorated function accepts DataFrames as parameters and returns DataFrames when it completes. An IO manager handles writing and reading the DataFrames to and from persistent storage. The Running PySpark code in assets example below shows what this looks like.

If you want your Spark driver to run inside a Spark cluster, you use a "step launcher" resource that informs Dagster how to launch the step. The step launcher resource is responsible for invoking spark-submit or submitting the job to Databricks or EMR. Submitting PySpark jobs on EMR shows what this looks like for EMR.

The advantage of this approach is a very clean local testing story. You can run an entire pipeline of Spark assets in a single process. You can use IO managers to abstract away IO - storing outputs on the local filesystem during local development and in the cloud in production.

The downside is that this approach only works with PySpark, and setting up a step launcher can be difficult. We currently provide an emr_pyspark_step_launcher and a databricks_pyspark_step_launcher, but if you need to submit your Spark job to a different kind of cluster, writing your own can be time consuming (here are some tips). You also need to install the Dagster library itself on the cluster.

Running PySpark code in assets#

You can find the code for this example on Github

Passing PySpark DataFrames between asset functions requires a little bit of extra care, compared to other data types, for a couple reasons:

  • Spark has a lazy execution model, which means that PySpark won't process any data until an action like write or collect is called on a DataFrame.
  • PySpark DataFrames cannot be pickled, which means that IO Managers like the FilesystemIOManager won't work for them.

In this example, we've defined an IOManager that knows how to store and retrieve PySpark DataFrames that are produced and consumed by ops.

This example assumes that all the outputs within the Dagster pipeline will be PySpark DataFrames and stored in the same way. To learn how to use different IO managers for different outputs within the same Dagster pipeline, take a look at the IO Manager concept page.

This example writes out DataFrames to the local file system, but can be tweaked to write to cloud object stores like S3 by changing to the write and read invocations.

import os

from dagster import ConfigurableIOManager, Definitions, asset
from pyspark.sql import DataFrame, Row, SparkSession
from pyspark.sql.types import IntegerType, StringType, StructField, StructType


class LocalParquetIOManager(ConfigurableIOManager):
    def _get_path(self, context):
        return os.path.join(*context.asset_key.path)

    def handle_output(self, context, obj):
        obj.write.parquet(self._get_path(context))

    def load_input(self, context):
        spark = SparkSession.builder.getOrCreate()
        return spark.read.parquet(self._get_path(context.upstream_output))


@asset
def people() -> DataFrame:
    schema = StructType([StructField("name", StringType()), StructField("age", IntegerType())])
    rows = [Row(name="Thom", age=51), Row(name="Jonny", age=48), Row(name="Nigel", age=49)]
    spark = SparkSession.builder.getOrCreate()
    return spark.createDataFrame(rows, schema)


@asset
def people_over_50(people: DataFrame) -> DataFrame:
    return people.filter(people["age"] > 50)


defs = Definitions(
    assets=[people, people_over_50], resources={"io_manager": LocalParquetIOManager()}
)

Submitting PySpark jobs on EMR#

You can find the code for this example on Github

This example demonstrates how to use the emr_pyspark_step_launcher to have an asset run as a Spark step on an EMR cluster. In it, each of the two assets will be executed as a separate EMR step on the same EMR cluster.

from pathlib import Path
from typing import Any

from dagster import ConfigurableIOManager, Definitions, ResourceParam, asset
from dagster_aws.emr import emr_pyspark_step_launcher
from dagster_aws.s3 import S3Resource
from dagster_pyspark import PySparkResource
from pyspark.sql import DataFrame, Row
from pyspark.sql.types import IntegerType, StringType, StructField, StructType


class ParquetIOManager(ConfigurableIOManager):
    pyspark: PySparkResource
    path_prefix: str

    def _get_path(self, context) -> str:
        return "/".join([context.resource_config["path_prefix"], *context.asset_key.path])

    def handle_output(self, context, obj):
        obj.write.parquet(self._get_path(context))

    def load_input(self, context):
        spark = self.pyspark.spark_session
        return spark.read.parquet(self._get_path(context.upstream_output))


@asset
def people(pyspark: PySparkResource, pyspark_step_launcher: ResourceParam[Any]) -> DataFrame:
    schema = StructType([StructField("name", StringType()), StructField("age", IntegerType())])
    rows = [Row(name="Thom", age=51), Row(name="Jonny", age=48), Row(name="Nigel", age=49)]
    return pyspark.spark_session.createDataFrame(rows, schema)


emr_pyspark = PySparkResource(spark_config={"spark.executor.memory": "2g"})


@asset
def people_over_50(pyspark_step_launcher: ResourceParam[Any], people: DataFrame) -> DataFrame:
    return people.filter(people["age"] > 50)


defs = Definitions(
    assets=[people, people_over_50],
    resources={
        "pyspark_step_launcher": emr_pyspark_step_launcher.configured(
            {
                "cluster_id": {"env": "EMR_CLUSTER_ID"},
                "local_pipeline_package_path": str(Path(__file__).parent),
                "deploy_local_pipeline_package": True,
                "region_name": "us-west-1",
                "staging_bucket": "my_staging_bucket",
                "wait_for_logs": True,
            }
        ),
        "pyspark": emr_pyspark,
        "s3": S3Resource(),
        "io_manager": ParquetIOManager(pyspark=emr_pyspark, path_prefix="s3://my-s3-bucket"),
    },
)

The EMR PySpark step launcher relies on S3 to shuttle config and events to and from EMR.