Lambda run launcher
In this example, we'll explore how to run Dagster jobs on AWS Lambda functions instead of spinning up new ECS tasks or containers. This approach is useful for lightweight, configuration-driven jobs that benefit from Lambda's instant startup time.
| Factor | Lambda | ECS/Kubernetes |
|---|---|---|
| Startup time | Sub-second | 30-60 seconds |
| Max runtime | 15 minutes | Unlimited |
| Dependencies | Pre-packaged in Lambda | Full Python environment |
| Concurrency | Thousands of parallel invocations | Limited by cluster capacity |
| Best for | API calls, webhooks, quick tasks | Heavy compute, long-running jobs |
Problem: Instant job execution for lightweight workloads
Imagine you have jobs that invoke external APIs, trigger webhooks, or perform quick data validations. These jobs spend more time waiting for containers to start than actually executing. You want sub-second startup times while maintaining full Dagster orchestration.
The key question is: How can you route certain jobs to Lambda for instant execution while keeping heavier workloads on traditional compute?
| Solution | Best for |
|---|---|
| Basic Lambda job | New Lambda functions, simple invocations, async fire-and-forget |
| Multi-agent architecture | Mixed workloads requiring both Lambda and ECS/Kubernetes compute |
| Wrapping existing Lambdas | Existing Lambda functions with specific payload formats, zero-code migration |
Solution 1: Basic Lambda job configuration
Configure a job to run on Lambda by adding tags that specify the Lambda function name and invocation type. The Lambda run launcher reads these tags and invokes the appropriate function.
Async invocation (fire and forget)
Use invocation_type: "Event" for fire-and-forget execution. The job triggers the Lambda and returns immediately without waiting for a response. This is ideal for triggering webhooks, sending notifications, or kicking off background processes.
@dg.op(config_schema={"message": str})
def async_op(context: dg.OpExecutionContext):
message = context.op_config["message"]
context.log.info(f"Message: {message}")
return {"status": "processed", "message": message}
@dg.job(
tags={
"lambda/function_name": "dagster-example-handler",
"lambda/invocation_type": "Event",
}
)
def async_lambda_job():
async_op()
Sync invocation (wait for response)
Use invocation_type: "RequestResponse" when you need to wait for the Lambda to complete and return a result. This is useful when subsequent steps depend on the Lambda's output or when you need to capture errors.
@dg.op(
config_schema={
"input_data": str,
"processing_mode": str,
}
)
def sync_op(context: dg.OpExecutionContext):
input_data = context.op_config["input_data"]
mode = context.op_config["processing_mode"]
context.log.info(f"Processing {input_data} in {mode} mode")
return {"result": "completed"}
@dg.job(
tags={
"lambda/function_name": "dagster-sync-handler",
"lambda/invocation_type": "RequestResponse",
}
)
def sync_lambda_job():
sync_op()
Using Lambda ARN
Instead of a function name, you can specify the full Lambda ARN using the lambda/function_arn tag. This is useful when invoking Lambda functions in different AWS accounts or regions.
@dg.op
def arn_based_op(context: dg.OpExecutionContext):
context.log.info("Executing with Lambda ARN")
return {"status": "success"}
@dg.job(
tags={
"lambda/function_arn": "arn:aws:lambda:us-east-1:123456789012:function:my-dagster-handler",
"lambda/invocation_type": "Event",
}
)
def arn_lambda_job():
arn_based_op()
Multi-op ETL job
Lambda jobs can have multiple ops with dependencies. The entire job graph is passed to Lambda, which executes the ops in order. This works well for lightweight ETL pipelines where each step is configuration-driven.
@dg.op(config_schema={"input_path": str})
def extract(context: dg.OpExecutionContext):
input_path = context.op_config["input_path"]
context.log.info(f"Extracting from {input_path}")
return {"data": ["record1", "record2", "record3"]}
@dg.op
def transform(context: dg.OpExecutionContext, data: dict):
records = data["data"]
context.log.info(f"Transforming {len(records)} records")
return {"transformed": [r.upper() for r in records]}
@dg.op(config_schema={"output_path": str})
def load(context: dg.OpExecutionContext, transformed: dict):
output_path = context.op_config["output_path"]
data = transformed["transformed"]
context.log.info(f"Loading {len(data)} records to {output_path}")
return {"loaded": len(data)}
@dg.job(
tags={
"lambda/function_name": "dagster-etl-handler",
"lambda/invocation_type": "Event",
}
)
def etl_lambda_job():
extracted = extract()
transformed = transform(extracted)
load(transformed)
Lambda handler
The Lambda function receives a payload containing run metadata, job configuration, and environment variables. The handler extracts the ops configuration and processes each op:
import json
import logging
from typing import Any
logger = logging.getLogger()
logger.setLevel(logging.INFO)
def lambda_handler(event: dict[str, Any], context: Any) -> dict[str, Any]:
"""Handle Dagster run invocation."""
try:
dagster_run = event.get("dagster_run", {})
run_id = dagster_run.get("run_id", "unknown")
job_name = dagster_run.get("job_name", "unknown")
logger.info(f"Processing run {run_id} for job {job_name}")
run_config = event.get("run_config", {})
env_vars = event.get("environment_variables", {})
# Your business logic here
result = process_dagster_job(run_config, env_vars)
return {
"statusCode": 200,
"body": json.dumps(
{
"run_id": run_id,
"job_name": job_name,
"status": "success",
"result": result,
}
),
}
except Exception as e:
logger.error(f"Error processing Dagster run: {e!s}", exc_info=True)
return {
"statusCode": 500,
"body": json.dumps(
{
"run_id": event.get("dagster_run", {}).get("run_id", "unknown"),
"status": "error",
"error": str(e),
}
),
}
def process_dagster_job(config: dict[str, Any], env_vars: dict[str, str]) -> dict[str, Any]:
"""Process the Dagster job configuration."""
ops_config = config.get("ops", {})
results = {}
for op_name in ops_config:
logger.info(f"Processing op: {op_name}")
# Your business logic here
results[op_name] = {"status": "completed"}
return {"ops_processed": len(ops_config), "results": results}
The payload includes:
dagster_run: Run metadata (run_id, job_name, deployment_name, location_name)run_config: Job configuration including ops configenvironment_variables: Environment context from Dagster Clouddagster_cloud: Organization and deployment IDsmetadata: Launcher metadata (launched_at, launcher_version)
Solution 2: Multi-agent architecture
Since each Dagster agent supports only one run launcher, use multiple agents with queue routing to handle both Lambda and traditional compute workloads. Code locations specify which agent queue they target.
Lambda agent jobs
Lambda is ideal for lightweight, configuration-driven jobs that complete quickly:
- API triggers: Call external APIs, webhooks, or notifications
- Event dispatchers: Send messages to SQS, SNS, or EventBridge
- Data checks: Quick validation or existence checks
- Orchestration triggers: Kick off downstream processes
These jobs benefit from Lambda's sub-second startup time and pay-per-invocation pricing.
import dagster as dg
@dg.op(config_schema={"api_endpoint": str, "api_key": str})
def call_api(context: dg.OpExecutionContext):
endpoint = context.op_config["api_endpoint"]
context.log.info(f"Calling API: {endpoint}")
return {"status": "called", "endpoint": endpoint}
@dg.job(
tags={
"lambda/function_name": "api-caller",
"lambda/invocation_type": "Event",
}
)
def api_trigger_job():
call_api()
@dg.op(config_schema={"source_bucket": str, "file_pattern": str})
def trigger_etl(context: dg.OpExecutionContext):
bucket = context.op_config["source_bucket"]
context.log.info(f"Triggering ETL for {bucket}")
return {"triggered": True}
@dg.job(tags={"lambda/function_name": "etl-trigger"})
def etl_trigger_job():
trigger_etl()
@dg.op
def check_new_data(context: dg.OpExecutionContext):
context.log.info("Checking for new data...")
return {"has_new_data": True, "file_count": 42}
@dg.job(tags={"lambda/function_name": "data-checker"})
def check_data_job():
check_new_data()
ECS agent jobs
ECS (or Kubernetes) is better suited for jobs that need:
- Python dependencies: pandas, scikit-learn, pytorch, spark
- Long runtimes: Jobs exceeding Lambda's 15-minute limit
- Large memory: Processing datasets larger than Lambda's 10GB limit
- Custom environments: Specific Python versions or system packages
These jobs trade slower startup time for full Python environment flexibility.
import dagster as dg
@dg.op
def load_large_dataset(context: dg.OpExecutionContext):
context.log.info("Loading 10GB dataset...")
# Your business logic here (pandas, spark, etc.)
return {"records": 1000000}
@dg.op
def train_model(context: dg.OpExecutionContext, dataset: dict):
context.log.info("Training model...")
# Your business logic here (sklearn, pytorch, etc.)
return {"model_id": "model-123", "accuracy": 0.95}
@dg.job
def ml_training_job():
dataset = load_large_dataset()
train_model(dataset)
@dg.op(config_schema={"query": str})
def run_complex_query(context: dg.OpExecutionContext):
query = context.op_config["query"]
context.log.info(f"Running long query: {query}")
# Your business logic here (runs > 15 minutes)
return {"rows": 1000000}
@dg.job
def long_running_job():
run_complex_query()
@dg.op
def process_data(context: dg.OpExecutionContext):
context.log.info("Processing large dataset...")
# Your business logic here
return {"processed": 1000000}
@dg.job
def process_data_job():
process_data()
Queue routing configuration
Configure each code location's dagster_cloud.yaml to route to the appropriate agent:
# For Lambda jobs
locations:
- location_name: lambda_jobs
code_source:
package_name: lambda_jobs
agent_queue: lambda # Routes to Lambda agent
# For ECS/compute-heavy jobs
locations:
- location_name: python_jobs
code_source:
package_name: python_jobs
agent_queue: ecs # Routes to ECS agent
Solution 3: Wrapping existing Lambda functions
If you have existing Lambda functions that expect a specific payload format, use payload modes to adapt the Dagster run configuration to match what your Lambda expects—no Lambda code changes required.
| Payload mode | What Lambda receives |
|---|---|
| full (default) | Complete payload with run metadata, config, and env vars |
| config_only | Just the run_config dictionary |
| ops_only | Just the ops config from run_config.ops |
| custom | Extract a specific path using payload_config_path |
Ops-only payload mode
Use payload_mode='ops_only' when your Lambda expects the ops configuration structure but not the outer run_config wrapper.
Suppose you have an existing Lambda function that expects a payload like:
{
"copy_files_op": {
"config": {
"source_bucket": "my-bucket",
"destination_bucket": "other-bucket",
"file_pattern": "*.csv"
}
}
}
Configure your Dagster job to send exactly this structure:
@dg.op(
config_schema={
"source_bucket": str,
"destination_bucket": str,
"file_pattern": str,
}
)
def copy_files_op(context: dg.OpExecutionContext):
config = context.op_config
context.log.info(f"Copying files from {config['source_bucket']}")
return {"status": "initiated"}
@dg.job(
tags={
"lambda/function_name": "existing-s3-copy-function",
"lambda/payload_mode": "ops_only",
}
)
def copy_files_job():
copy_files_op()
Here's how the existing Lambda handler receives and processes this payload:
def ops_aware_lambda(event: dict[str, Any], context: Any) -> dict[str, Any]:
"""Existing Lambda that understands Dagster ops config structure."""
logger.info("Ops-aware Lambda handler")
ops_config = event
logger.info(f"Received ops config: {json.dumps(ops_config, indent=2)}")
results = {}
for op_name, op_data in ops_config.items():
logger.info(f"Processing op: {op_name}")
config = op_data.get("config", {})
result = process_op_config(op_name, config)
results[op_name] = result
return {"statusCode": 200, "body": json.dumps(results)}
Custom payload mode
Use payload_mode='custom' with payload_config_path to extract a specific nested value from your run config. This lets you invoke existing Lambdas that expect a flat configuration object without any Dagster-specific structure.
If your Lambda expects a flat config like {"api_key": "...", "endpoint": "...", "data": [...]}, use the payload_config_path tag to extract just the op config:
@dg.op(
config_schema={
"api_key": str,
"endpoint": str,
"data": list,
}
)
def api_call_op(context: dg.OpExecutionContext):
config = context.op_config
context.log.info(f"Calling API at {config['endpoint']}")
return {"status": "called"}
@dg.job(
tags={
"lambda/function_name": "existing-api-caller",
"lambda/payload_mode": "custom",
"lambda/payload_config_path": "ops.api_call_op.config",
}
)
def api_call_job():
api_call_op()
The Lambda receives exactly what it expects—no Dagster-specific structure:
{
"api_key": "...",
"endpoint": "...",
"data": [...]
}
Here's how an existing Lambda handler receives this flat configuration:
def simple_existing_lambda(event: dict[str, Any], context: Any) -> dict[str, Any]:
"""Existing Lambda that expects a simple config dictionary."""
logger.info("Simple existing Lambda handler")
source_bucket = event.get("source_bucket")
destination_bucket = event.get("destination_bucket")
file_pattern = event.get("file_pattern")
logger.info(f"Source: {source_bucket}")
logger.info(f"Destination: {destination_bucket}")
logger.info(f"Pattern: {file_pattern}")
result = process_files(source_bucket, destination_bucket, file_pattern)
return {"statusCode": 200, "body": json.dumps(result)}
Config-only payload mode
Use payload_mode='config_only' when your Lambda expects the full run_config dictionary but not the Dagster run metadata. This is useful when your Lambda function is already designed to work with Dagster's run_config structure.
@dg.op(config_schema={"input_file": str, "output_file": str})
def transform_op(context: dg.OpExecutionContext):
context.log.info("Transforming data")
return {"transformed": True}
@dg.op(config_schema={"destination": str})
def load_op(context: dg.OpExecutionContext):
context.log.info("Loading data")
return {"loaded": True}
@dg.job(
tags={
"lambda/function_name": "existing-etl-function",
"lambda/payload_mode": "config_only",
}
)
def etl_job():
transform_op()
load_op()
The Lambda receives the full run_config structure:
{
"ops": {
"transform_op": {
"config": {"input_file": "...", "output_file": "..."}
},
"load_op": {
"config": {"destination": "..."}
}
}
}
Here's how a Lambda handler processes this structure:
def run_config_lambda(event: dict[str, Any], context: Any) -> dict[str, Any]:
"""Existing Lambda that works with Dagster's full run_config."""
logger.info("Run config Lambda handler")
ops_config = event.get("ops", {})
resources_config = event.get("resources", {})
logger.info(f"Processing {len(ops_config)} ops")
results = {}
for op_name, op_data in ops_config.items():
config = op_data.get("config", {})
result = process_with_resources(op_name, config, resources_config)
results[op_name] = result
return {"statusCode": 200, "body": json.dumps({"status": "completed", "results": results})}
Real-world example
Here's a complete example of an existing ETL Lambda that can be orchestrated by Dagster without any code changes. The Lambda expects a specific payload format, and Dagster's custom payload mode extracts exactly what it needs:
def real_world_etl_lambda(event: dict[str, Any], context: Any) -> dict[str, Any]:
"""Existing ETL Lambda - receives flat config, no changes needed."""
logger.info("ETL Lambda handler (existing, unchanged)")
input_table = event.get("input_table")
output_table = event.get("output_table")
date_range = event.get("date_range", {})
filters = event.get("filters", [])
logger.info(f"ETL: {input_table} -> {output_table}")
logger.info(f"Date range: {date_range}")
logger.info(f"Filters: {filters}")
records_processed = run_etl_pipeline(input_table, output_table, date_range, filters)
return {
"statusCode": 200,
"body": json.dumps({"status": "success", "records_processed": records_processed}),
}