Running PySpark code in solids

You can find the code for this example on Github.
repo.py
from dagster_pyspark import DataFrame as DagsterPySparkDataFrame
from dagster_pyspark import pyspark_resource
from pyspark.sql import DataFrame, Row
from pyspark.sql.types import IntegerType, StringType, StructField, StructType

from dagster import (
    ModeDefinition,
    make_python_type_usable_as_dagster_type,
    pipeline,
    repository,
    solid,
)

# Make pyspark.sql.DataFrame map to dagster_pyspark.DataFrame
make_python_type_usable_as_dagster_type(python_type=DataFrame, dagster_type=DagsterPySparkDataFrame)


@solid(required_resource_keys={"pyspark"})
def make_people(context) -> DataFrame:
    schema = StructType([StructField("name", StringType()), StructField("age", IntegerType())])
    rows = [Row(name="Thom", age=51), Row(name="Jonny", age=48), Row(name="Nigel", age=49)]
    return context.resources.pyspark.spark_session.createDataFrame(rows, schema)


@solid
def filter_over_50(_, people: DataFrame) -> DataFrame:
    return people.filter(people["age"] > 50)


@solid
def count_people(_, people: DataFrame) -> int:
    return people.count()


@pipeline(mode_defs=[ModeDefinition(resource_defs={"pyspark": pyspark_resource})])
def my_pipeline():
    count_people(filter_over_50(make_people()))

The pyspark resource enables passing Spark configuration. Solid bodies can use it to access a SparkSession through its spark_session property.

Because PySpark has a lazy execution model, when the PySpark jobs execute depends on what intermediate store is configured. With the default in-memory intermediate store, the solids that yield a DataFrame don't trigger execution, and the job won't run until an action, e.g. count in the example is called on one of the DataFrames.

Alternatively, with a local filesystem intermediate store or S3 intermediate store, any solid outputs that are DataFrames will be persisted to Parquet files. This means that, with these intermediate stores, the PySpark jobs execute immediately upon solid completion.

dagster_pyspark.DataFrame, which is a DagsterType knows how to save and load PySpark DataFrames. make_python_type_usable_as_dagster_type tells Dagster that when a solid has a return type annotation for a, pyspark.sql.DataFrame, then Dagster should use the dagster_pyspark.DataFrame to save and load it.

Open in a playground

Open in Gitpod

Download

curl https://codeload.github.com/dagster-io/dagster/tar.gz/master | tar -xz --strip=2 dagster-master/examples/basic_pyspark
cd basic_pyspark