Build pipelines with AWS EMR on EKS
This tutorial gives a short overview on how to use Dagster Pipes with AWS EMR on EKS (the corresponding AWS API is called emr-containers
).
The dagster-aws integration library provides the pipes.PipesEMRContainersClient
resource, which can be used to launch EMR jobs from Dagster assets and ops. Dagster can receive regular events such as logs, asset checks, or asset materializations from jobs launched with this client. Using it requires minimal code changes to your EMR jobs.
Prerequisites
-
In the Dagster environment, you'll need to:
-
Install the following packages:
pip install dagster dagster-webserver dagster-aws
Refer to the Dagster installation guide for more info.
-
Configure AWS authentication credentials: If you don't have these set up already, refer to the boto3 quickstart.
-
-
In AWS:
- An existing AWS account
- An EMR Virtual Cluster set up
Step 1: Install the dagster-pipes module in your EMR environment
There are a few options for deploying Python code & dependencies for PySpark jobs. In this tutorial, we are going to build a custom Docker image for this purpose.
Install dagster-pipes
, dagster-aws
and boto3
Python packages in your image:
FROM 895885662937.dkr.ecr.us-west-2.amazonaws.com/spark/emr-7.5.0:latest
COPY --from=ghcr.io/astral-sh/uv:latest /uv /bin/uv
USER root
RUN mkdir /python && chown hadoop:hadoop /python
USER hadoop
ENV UV_PYTHON_INSTALL_DIR=/python \
UV_BREAK_SYSTEM_PACKAGES=1
RUN uv python install --python-preference only-managed 3.9.16
ENV PATH="${UV_PYTHON_INSTALL_DIR}/cpython-3.9.16-linux-x86_64-gnu/bin:${PATH}" \
PYTHONPATH="${UV_PYTHON_INSTALL_DIR}/cpython-3.9.16-linux-x86_64-gnu/lib/python3.9/site-packages" \
UV_PYTHON="${UV_PYTHON_INSTALL_DIR}/cpython-3.9.16-linux-x86_64-gnu/bin/python" \
PYSPARK_PYTHON="${UV_PYTHON_INSTALL_DIR}/cpython-3.9.16-linux-x86_64-gnu/bin/python" \
PYSPARK_DRIVER_PYTHON="${UV_PYTHON_INSTALL_DIR}/cpython-3.9.16-linux-x86_64-gnu/bin/python"
RUN uv pip install --system dagster-pipes boto3 pyspark
WORKDIR /app
COPY script.py .
It's also recommended to upgrade the default Python version included in the base EMR image (as it has been done in the Dockerfile
above)
We copy the EMR job script (script.py
) to the image in the last step.
Step 2: Invoke dagster-pipes in the EMR job script
Call open_dagster_pipes
in the EMR script to create a context that can be used to send messages to Dagster:
import sys
import boto3
from dagster_pipes import PipesS3MessageWriter, open_dagster_pipes
from pyspark.sql import SparkSession
def main():
s3_client = boto3.client("s3")
with open_dagster_pipes(
message_writer=PipesS3MessageWriter(client=s3_client),
) as pipes:
pipes.log.info("Hello from AWS EMR Containers!")
spark = SparkSession.builder.appName("HelloWorld").getOrCreate()
df = spark.createDataFrame(
[(1, "Alice", 34), (2, "Bob", 45), (3, "Charlie", 56)],
["id", "name", "age"],
)
# calculate a really important statistic
avg_age = float(df.agg({"age": "avg"}).collect()[0][0])
# attach it to the asset materialization in Dagster
pipes.report_asset_materialization(
metadata={"average_age": {"raw_value": avg_age, "type": "float"}},
data_version="alpha",
)
print("Hello from stdout!")
print("Hello from stderr!", file=sys.stderr)
It's best to use the PipesS3MessageWriter
with EMR on EKS, because this message writer has the ability to capture the Spark driver logs and send them to Dagster.
Step 3: Create an asset using the PipesEMRcontainersClient to launch the job
In the Dagster asset/op code, use the PipesEMRcontainersClient
resource to launch the job:
from dagster_aws.pipes import PipesEMRContainersClient
import dagster as dg
@dg.asset
def emr_containers_asset(
context: dg.AssetExecutionContext,
pipes_emr_containers_client: PipesEMRContainersClient,
):
image = (
...
) # it's likely the image can be taken from context.run_tags["dagster/image"]
return pipes_emr_containers_client.run(
context=context,
start_job_run_params={
"releaseLabel": "emr-7.5.0-latest",
"virtualClusterId": ...,
"clientToken": context.run_id, # idempotency identifier for the job run
"executionRoleArn": ...,
"jobDriver": {
"sparkSubmitJobDriver": {
"entryPoint": "local:///app/script.py",
"sparkSubmitParameters": f"--conf spark.kubernetes.container.image={image}",
}
},
},
).get_materialize_result()
Setting include_stdio_in_messages
to True
in the PipesS3MessageReader
will allow the driver logs to be forwarded to the Dagster process.
Materializing this asset will launch the AWS on EKS job and wait for it to complete. If the job fails, the Dagster process will raise an exception. If the Dagster process is interrupted while the job is still running, the job will be terminated.
Step 4: Create Dagster definitions
Next, add the PipesEMRContainersClient
resource to your project's Definitions
object:
import boto3
from dagster_aws.pipes import PipesS3ContextInjector, PipesS3MessageReader
from dagster import Definitions
defs = Definitions(
assets=[emr_containers_asset],
resources={
"pipes_emr_containers_client": PipesEMRContainersClient(
message_reader=PipesS3MessageReader(
client=boto3.client("s3"),
bucket=...,
include_stdio_in_messages=True,
),
)
},
)
Dagster will now be able to launch the AWS EMR Containers job from the emr_containers_asset
asset, and receive logs and events from the job. If include_stdio_in_messages
is set to True
, the logs will be forwarded to the Dagster process.