Skip to main content

Lambda run launcher

In this example, we'll explore how to run Dagster jobs on AWS Lambda functions instead of spinning up new ECS tasks or containers. This approach is useful for lightweight, configuration-driven jobs that benefit from Lambda's instant startup time.

FactorLambdaECS/Kubernetes
Startup timeSub-second30-60 seconds
Max runtime15 minutesUnlimited
DependenciesPre-packaged in LambdaFull Python environment
ConcurrencyThousands of parallel invocationsLimited by cluster capacity
Best forAPI calls, webhooks, quick tasksHeavy compute, long-running jobs

Problem: Instant job execution for lightweight workloads

Imagine you have jobs that invoke external APIs, trigger webhooks, or perform quick data validations. These jobs spend more time waiting for containers to start than actually executing. You want sub-second startup times while maintaining full Dagster orchestration.

The key question is: How can you route certain jobs to Lambda for instant execution while keeping heavier workloads on traditional compute?

SolutionBest for
Basic Lambda jobNew Lambda functions, simple invocations, async fire-and-forget
Multi-agent architectureMixed workloads requiring both Lambda and ECS/Kubernetes compute
Wrapping existing LambdasExisting Lambda functions with specific payload formats, zero-code migration

Solution 1: Basic Lambda job configuration

Configure a job to run on Lambda by adding tags that specify the Lambda function name and invocation type. The Lambda run launcher reads these tags and invokes the appropriate function.

Async invocation (fire and forget)

Use invocation_type: "Event" for fire-and-forget execution. The job triggers the Lambda and returns immediately without waiting for a response. This is ideal for triggering webhooks, sending notifications, or kicking off background processes.

Async Lambda invocation
@dg.op(config_schema={"message": str})
def async_op(context: dg.OpExecutionContext):
message = context.op_config["message"]
context.log.info(f"Message: {message}")
return {"status": "processed", "message": message}


@dg.job(
tags={
"lambda/function_name": "dagster-example-handler",
"lambda/invocation_type": "Event",
}
)
def async_lambda_job():
async_op()

Sync invocation (wait for response)

Use invocation_type: "RequestResponse" when you need to wait for the Lambda to complete and return a result. This is useful when subsequent steps depend on the Lambda's output or when you need to capture errors.

Sync Lambda invocation
@dg.op(
config_schema={
"input_data": str,
"processing_mode": str,
}
)
def sync_op(context: dg.OpExecutionContext):
input_data = context.op_config["input_data"]
mode = context.op_config["processing_mode"]
context.log.info(f"Processing {input_data} in {mode} mode")
return {"result": "completed"}


@dg.job(
tags={
"lambda/function_name": "dagster-sync-handler",
"lambda/invocation_type": "RequestResponse",
}
)
def sync_lambda_job():
sync_op()

Using Lambda ARN

Instead of a function name, you can specify the full Lambda ARN using the lambda/function_arn tag. This is useful when invoking Lambda functions in different AWS accounts or regions.

Lambda ARN-based job
@dg.op
def arn_based_op(context: dg.OpExecutionContext):
context.log.info("Executing with Lambda ARN")
return {"status": "success"}


@dg.job(
tags={
"lambda/function_arn": "arn:aws:lambda:us-east-1:123456789012:function:my-dagster-handler",
"lambda/invocation_type": "Event",
}
)
def arn_lambda_job():
arn_based_op()

Multi-op ETL job

Lambda jobs can have multiple ops with dependencies. The entire job graph is passed to Lambda, which executes the ops in order. This works well for lightweight ETL pipelines where each step is configuration-driven.

Multi-op ETL job
@dg.op(config_schema={"input_path": str})
def extract(context: dg.OpExecutionContext):
input_path = context.op_config["input_path"]
context.log.info(f"Extracting from {input_path}")
return {"data": ["record1", "record2", "record3"]}


@dg.op
def transform(context: dg.OpExecutionContext, data: dict):
records = data["data"]
context.log.info(f"Transforming {len(records)} records")
return {"transformed": [r.upper() for r in records]}


@dg.op(config_schema={"output_path": str})
def load(context: dg.OpExecutionContext, transformed: dict):
output_path = context.op_config["output_path"]
data = transformed["transformed"]
context.log.info(f"Loading {len(data)} records to {output_path}")
return {"loaded": len(data)}


@dg.job(
tags={
"lambda/function_name": "dagster-etl-handler",
"lambda/invocation_type": "Event",
}
)
def etl_lambda_job():
extracted = extract()
transformed = transform(extracted)
load(transformed)

Lambda handler

The Lambda function receives a payload containing run metadata, job configuration, and environment variables. The handler extracts the ops configuration and processes each op:

Lambda handler for Dagster jobs
import json
import logging
from typing import Any

logger = logging.getLogger()
logger.setLevel(logging.INFO)


def lambda_handler(event: dict[str, Any], context: Any) -> dict[str, Any]:
"""Handle Dagster run invocation."""
try:
dagster_run = event.get("dagster_run", {})
run_id = dagster_run.get("run_id", "unknown")
job_name = dagster_run.get("job_name", "unknown")

logger.info(f"Processing run {run_id} for job {job_name}")

run_config = event.get("run_config", {})
env_vars = event.get("environment_variables", {})

# Your business logic here
result = process_dagster_job(run_config, env_vars)

return {
"statusCode": 200,
"body": json.dumps(
{
"run_id": run_id,
"job_name": job_name,
"status": "success",
"result": result,
}
),
}

except Exception as e:
logger.error(f"Error processing Dagster run: {e!s}", exc_info=True)
return {
"statusCode": 500,
"body": json.dumps(
{
"run_id": event.get("dagster_run", {}).get("run_id", "unknown"),
"status": "error",
"error": str(e),
}
),
}


def process_dagster_job(config: dict[str, Any], env_vars: dict[str, str]) -> dict[str, Any]:
"""Process the Dagster job configuration."""
ops_config = config.get("ops", {})

results = {}
for op_name in ops_config:
logger.info(f"Processing op: {op_name}")

# Your business logic here
results[op_name] = {"status": "completed"}

return {"ops_processed": len(ops_config), "results": results}

The payload includes:

  • dagster_run: Run metadata (run_id, job_name, deployment_name, location_name)
  • run_config: Job configuration including ops config
  • environment_variables: Environment context from Dagster Cloud
  • dagster_cloud: Organization and deployment IDs
  • metadata: Launcher metadata (launched_at, launcher_version)

Solution 2: Multi-agent architecture

Since each Dagster agent supports only one run launcher, use multiple agents with queue routing to handle both Lambda and traditional compute workloads. Code locations specify which agent queue they target.

Lambda agent jobs

Lambda is ideal for lightweight, configuration-driven jobs that complete quickly:

  • API triggers: Call external APIs, webhooks, or notifications
  • Event dispatchers: Send messages to SQS, SNS, or EventBridge
  • Data checks: Quick validation or existence checks
  • Orchestration triggers: Kick off downstream processes

These jobs benefit from Lambda's sub-second startup time and pay-per-invocation pricing.

Lambda agent jobs
import dagster as dg


@dg.op(config_schema={"api_endpoint": str, "api_key": str})
def call_api(context: dg.OpExecutionContext):
endpoint = context.op_config["api_endpoint"]
context.log.info(f"Calling API: {endpoint}")
return {"status": "called", "endpoint": endpoint}


@dg.job(
tags={
"lambda/function_name": "api-caller",
"lambda/invocation_type": "Event",
}
)
def api_trigger_job():
call_api()


@dg.op(config_schema={"source_bucket": str, "file_pattern": str})
def trigger_etl(context: dg.OpExecutionContext):
bucket = context.op_config["source_bucket"]
context.log.info(f"Triggering ETL for {bucket}")
return {"triggered": True}


@dg.job(tags={"lambda/function_name": "etl-trigger"})
def etl_trigger_job():
trigger_etl()


@dg.op
def check_new_data(context: dg.OpExecutionContext):
context.log.info("Checking for new data...")
return {"has_new_data": True, "file_count": 42}


@dg.job(tags={"lambda/function_name": "data-checker"})
def check_data_job():
check_new_data()

ECS agent jobs

ECS (or Kubernetes) is better suited for jobs that need:

  • Python dependencies: pandas, scikit-learn, pytorch, spark
  • Long runtimes: Jobs exceeding Lambda's 15-minute limit
  • Large memory: Processing datasets larger than Lambda's 10GB limit
  • Custom environments: Specific Python versions or system packages

These jobs trade slower startup time for full Python environment flexibility.

ECS agent jobs
import dagster as dg


@dg.op
def load_large_dataset(context: dg.OpExecutionContext):
context.log.info("Loading 10GB dataset...")
# Your business logic here (pandas, spark, etc.)
return {"records": 1000000}


@dg.op
def train_model(context: dg.OpExecutionContext, dataset: dict):
context.log.info("Training model...")
# Your business logic here (sklearn, pytorch, etc.)
return {"model_id": "model-123", "accuracy": 0.95}


@dg.job
def ml_training_job():
dataset = load_large_dataset()
train_model(dataset)


@dg.op(config_schema={"query": str})
def run_complex_query(context: dg.OpExecutionContext):
query = context.op_config["query"]
context.log.info(f"Running long query: {query}")
# Your business logic here (runs > 15 minutes)
return {"rows": 1000000}


@dg.job
def long_running_job():
run_complex_query()


@dg.op
def process_data(context: dg.OpExecutionContext):
context.log.info("Processing large dataset...")
# Your business logic here
return {"processed": 1000000}


@dg.job
def process_data_job():
process_data()

Queue routing configuration

Configure each code location's dagster_cloud.yaml to route to the appropriate agent:

# For Lambda jobs
locations:
- location_name: lambda_jobs
code_source:
package_name: lambda_jobs
agent_queue: lambda # Routes to Lambda agent
# For ECS/compute-heavy jobs
locations:
- location_name: python_jobs
code_source:
package_name: python_jobs
agent_queue: ecs # Routes to ECS agent

Solution 3: Wrapping existing Lambda functions

If you have existing Lambda functions that expect a specific payload format, use payload modes to adapt the Dagster run configuration to match what your Lambda expects—no Lambda code changes required.

Payload modeWhat Lambda receives
full (default)Complete payload with run metadata, config, and env vars
config_onlyJust the run_config dictionary
ops_onlyJust the ops config from run_config.ops
customExtract a specific path using payload_config_path

Ops-only payload mode

Use payload_mode='ops_only' when your Lambda expects the ops configuration structure but not the outer run_config wrapper.

Suppose you have an existing Lambda function that expects a payload like:

{
"copy_files_op": {
"config": {
"source_bucket": "my-bucket",
"destination_bucket": "other-bucket",
"file_pattern": "*.csv"
}
}
}

Configure your Dagster job to send exactly this structure:

Ops-only payload mode
@dg.op(
config_schema={
"source_bucket": str,
"destination_bucket": str,
"file_pattern": str,
}
)
def copy_files_op(context: dg.OpExecutionContext):
config = context.op_config
context.log.info(f"Copying files from {config['source_bucket']}")
return {"status": "initiated"}


@dg.job(
tags={
"lambda/function_name": "existing-s3-copy-function",
"lambda/payload_mode": "ops_only",
}
)
def copy_files_job():
copy_files_op()

Here's how the existing Lambda handler receives and processes this payload:

Lambda handler for ops-only payload
def ops_aware_lambda(event: dict[str, Any], context: Any) -> dict[str, Any]:
"""Existing Lambda that understands Dagster ops config structure."""
logger.info("Ops-aware Lambda handler")

ops_config = event
logger.info(f"Received ops config: {json.dumps(ops_config, indent=2)}")

results = {}
for op_name, op_data in ops_config.items():
logger.info(f"Processing op: {op_name}")
config = op_data.get("config", {})
result = process_op_config(op_name, config)
results[op_name] = result

return {"statusCode": 200, "body": json.dumps(results)}

Custom payload mode

Use payload_mode='custom' with payload_config_path to extract a specific nested value from your run config. This lets you invoke existing Lambdas that expect a flat configuration object without any Dagster-specific structure.

If your Lambda expects a flat config like {"api_key": "...", "endpoint": "...", "data": [...]}, use the payload_config_path tag to extract just the op config:

Custom payload mode
@dg.op(
config_schema={
"api_key": str,
"endpoint": str,
"data": list,
}
)
def api_call_op(context: dg.OpExecutionContext):
config = context.op_config
context.log.info(f"Calling API at {config['endpoint']}")
return {"status": "called"}


@dg.job(
tags={
"lambda/function_name": "existing-api-caller",
"lambda/payload_mode": "custom",
"lambda/payload_config_path": "ops.api_call_op.config",
}
)
def api_call_job():
api_call_op()

The Lambda receives exactly what it expects—no Dagster-specific structure:

{
"api_key": "...",
"endpoint": "...",
"data": [...]
}

Here's how an existing Lambda handler receives this flat configuration:

Lambda handler for custom payload
def simple_existing_lambda(event: dict[str, Any], context: Any) -> dict[str, Any]:
"""Existing Lambda that expects a simple config dictionary."""
logger.info("Simple existing Lambda handler")

source_bucket = event.get("source_bucket")
destination_bucket = event.get("destination_bucket")
file_pattern = event.get("file_pattern")

logger.info(f"Source: {source_bucket}")
logger.info(f"Destination: {destination_bucket}")
logger.info(f"Pattern: {file_pattern}")

result = process_files(source_bucket, destination_bucket, file_pattern)

return {"statusCode": 200, "body": json.dumps(result)}

Config-only payload mode

Use payload_mode='config_only' when your Lambda expects the full run_config dictionary but not the Dagster run metadata. This is useful when your Lambda function is already designed to work with Dagster's run_config structure.

Config-only payload mode
@dg.op(config_schema={"input_file": str, "output_file": str})
def transform_op(context: dg.OpExecutionContext):
context.log.info("Transforming data")
return {"transformed": True}


@dg.op(config_schema={"destination": str})
def load_op(context: dg.OpExecutionContext):
context.log.info("Loading data")
return {"loaded": True}


@dg.job(
tags={
"lambda/function_name": "existing-etl-function",
"lambda/payload_mode": "config_only",
}
)
def etl_job():
transform_op()
load_op()

The Lambda receives the full run_config structure:

{
"ops": {
"transform_op": {
"config": {"input_file": "...", "output_file": "..."}
},
"load_op": {
"config": {"destination": "..."}
}
}
}

Here's how a Lambda handler processes this structure:

Lambda handler for config-only payload
def run_config_lambda(event: dict[str, Any], context: Any) -> dict[str, Any]:
"""Existing Lambda that works with Dagster's full run_config."""
logger.info("Run config Lambda handler")

ops_config = event.get("ops", {})
resources_config = event.get("resources", {})

logger.info(f"Processing {len(ops_config)} ops")

results = {}
for op_name, op_data in ops_config.items():
config = op_data.get("config", {})
result = process_with_resources(op_name, config, resources_config)
results[op_name] = result

return {"statusCode": 200, "body": json.dumps({"status": "completed", "results": results})}

Real-world example

Here's a complete example of an existing ETL Lambda that can be orchestrated by Dagster without any code changes. The Lambda expects a specific payload format, and Dagster's custom payload mode extracts exactly what it needs:

Real-world ETL Lambda
def real_world_etl_lambda(event: dict[str, Any], context: Any) -> dict[str, Any]:
"""Existing ETL Lambda - receives flat config, no changes needed."""
logger.info("ETL Lambda handler (existing, unchanged)")

input_table = event.get("input_table")
output_table = event.get("output_table")
date_range = event.get("date_range", {})
filters = event.get("filters", [])

logger.info(f"ETL: {input_table} -> {output_table}")
logger.info(f"Date range: {date_range}")
logger.info(f"Filters: {filters}")

records_processed = run_etl_pipeline(input_table, output_table, date_range, filters)

return {
"statusCode": 200,
"body": json.dumps({"status": "success", "records_processed": records_processed}),
}