Skip to main content

Build pipelines with AWS EMR on EKS

This tutorial gives a short overview on how to use Dagster Pipes with AWS EMR on EKS (the corresponding AWS API is called emr-containers).

The dagster-aws integration library provides the pipes.PipesEMRContainersClient resource, which can be used to launch EMR jobs from Dagster assets and ops. Dagster can receive regular events such as logs, asset checks, or asset materializations from jobs launched with this client. Using it requires minimal code changes to your EMR jobs.

Prerequisites

  • In the Dagster environment, you'll need to:

    • Install the following packages:

      pip install dagster dagster-webserver dagster-aws

      Refer to the Dagster installation guide for more info.

    • Configure AWS authentication credentials: If you don't have these set up already, refer to the boto3 quickstart.

  • In AWS:

Step 1: Install the dagster-pipes module in your EMR environment

There are a few options for deploying Python code & dependencies for PySpark jobs. In this tutorial, we are going to build a custom Docker image for this purpose.

Install dagster-pipes, dagster-aws and boto3 Python packages in your image:

FROM 895885662937.dkr.ecr.us-west-2.amazonaws.com/spark/emr-7.5.0:latest

COPY --from=ghcr.io/astral-sh/uv:latest /uv /bin/uv

USER root
RUN mkdir /python && chown hadoop:hadoop /python

USER hadoop
ENV UV_PYTHON_INSTALL_DIR=/python \
UV_BREAK_SYSTEM_PACKAGES=1

RUN uv python install --python-preference only-managed 3.9.16

ENV PATH="${UV_PYTHON_INSTALL_DIR}/cpython-3.9.16-linux-x86_64-gnu/bin:${PATH}" \
PYTHONPATH="${UV_PYTHON_INSTALL_DIR}/cpython-3.9.16-linux-x86_64-gnu/lib/python3.9/site-packages" \
UV_PYTHON="${UV_PYTHON_INSTALL_DIR}/cpython-3.9.16-linux-x86_64-gnu/bin/python" \
PYSPARK_PYTHON="${UV_PYTHON_INSTALL_DIR}/cpython-3.9.16-linux-x86_64-gnu/bin/python" \
PYSPARK_DRIVER_PYTHON="${UV_PYTHON_INSTALL_DIR}/cpython-3.9.16-linux-x86_64-gnu/bin/python"

RUN uv pip install --system dagster-pipes boto3 pyspark

WORKDIR /app
COPY script.py .
note

It's also recommended to upgrade the default Python version included in the base EMR image (as it has been done in the Dockerfile above)

We copy the EMR job script (script.py) to the image in the last step.

Step 2: Invoke dagster-pipes in the EMR job script

Call open_dagster_pipes in the EMR script to create a context that can be used to send messages to Dagster:

import sys

import boto3
from dagster_pipes import PipesS3MessageWriter, open_dagster_pipes
from pyspark.sql import SparkSession


def main():
s3_client = boto3.client("s3")
with open_dagster_pipes(
message_writer=PipesS3MessageWriter(client=s3_client),
) as pipes:
pipes.log.info("Hello from AWS EMR Containers!")

spark = SparkSession.builder.appName("HelloWorld").getOrCreate()

df = spark.createDataFrame(
[(1, "Alice", 34), (2, "Bob", 45), (3, "Charlie", 56)],
["id", "name", "age"],
)

# calculate a really important statistic
avg_age = float(df.agg({"age": "avg"}).collect()[0][0])

# attach it to the asset materialization in Dagster
pipes.report_asset_materialization(
metadata={"average_age": {"raw_value": avg_age, "type": "float"}},
data_version="alpha",
)

print("Hello from stdout!")
print("Hello from stderr!", file=sys.stderr)

note

It's best to use the PipesS3MessageWriter with EMR on EKS, because this message writer has the ability to capture the Spark driver logs and send them to Dagster.

Step 3: Create an asset using the PipesEMRcontainersClient to launch the job

In the Dagster asset/op code, use the PipesEMRcontainersClient resource to launch the job:


from dagster_aws.pipes import PipesEMRContainersClient

import dagster as dg


@dg.asset
def emr_containers_asset(
context: dg.AssetExecutionContext,
pipes_emr_containers_client: PipesEMRContainersClient,
):
image = (
...
) # it's likely the image can be taken from context.run_tags["dagster/image"]

return pipes_emr_containers_client.run(
context=context,
start_job_run_params={
"releaseLabel": "emr-7.5.0-latest",
"virtualClusterId": ...,
"clientToken": context.run_id, # idempotency identifier for the job run
"executionRoleArn": ...,
"jobDriver": {
"sparkSubmitJobDriver": {
"entryPoint": "local:///app/script.py",
"sparkSubmitParameters": f"--conf spark.kubernetes.container.image={image}",
}
},
},
).get_materialize_result()

note

Setting include_stdio_in_messages to True in the PipesS3MessageReader will allow the driver logs to be forwarded to the Dagster process.

Materializing this asset will launch the AWS on EKS job and wait for it to complete. If the job fails, the Dagster process will raise an exception. If the Dagster process is interrupted while the job is still running, the job will be terminated.

Step 4: Create Dagster definitions

Next, add the PipesEMRContainersClient resource to your project's Definitions object:

import boto3
from dagster_aws.pipes import PipesS3ContextInjector, PipesS3MessageReader

from dagster import Definitions

defs = Definitions(
assets=[emr_containers_asset],
resources={
"pipes_emr_containers_client": PipesEMRContainersClient(
message_reader=PipesS3MessageReader(
client=boto3.client("s3"),
bucket=...,
include_stdio_in_messages=True,
),
)
},
)

Dagster will now be able to launch the AWS EMR Containers job from the emr_containers_asset asset, and receive logs and events from the job. If include_stdio_in_messages is set to True, the logs will be forwarded to the Dagster process.