Dagster Pipes with Databricks (serverless compute)
Dagster Pipes lets you run code in Databricks while streaming logs and structured metadata back to Dagster in real time. Unlike Databricks Connect, the computation runs entirely on Databricks — your Dagster asset acts as the orchestrator, not the executor.
To use Dagster Pipes to deploy Python scripts to DBFS on classic (non-serverless) clusters, see Dagster Pipes with Databricks (DBFS, classic clusters).
When to use Dagster Pipes
Dagster Pipes is best for:
- Existing Databricks notebooks or scripts you want to orchestrate without rewriting
- Large batch jobs that should execute independently of the Dagster process
- Scenarios where you need real-time log streaming and structured metadata back in Dagster
- Teams where Databricks jobs are deployed and managed separately from Dagster code
Prerequisites
Install the dagster-databricks library:
- uv
- pip
uv add dagster-databricks
pip install dagster-databricks
Configure your environment:
export DATABRICKS_HOST=https://dbc-xxxxxxx-yyyy.cloud.databricks.com/
export DATABRICKS_TOKEN=<your-personal-access-token>
For serverless compute, DBFS is unavailable. Use PipesDatabricksServerlessClient, which uses a Unity Catalog Volume as the message transport layer.
Step 1: Configure the Pipes resource
import os
from dagster_databricks.pipes import PipesDatabricksServerlessClient
from databricks.sdk import WorkspaceClient
from databricks.sdk.service import jobs
import dagster as dg
@dg.asset
def feature_engineering(
context: dg.AssetExecutionContext,
pipes_databricks: PipesDatabricksServerlessClient,
) -> dg.MaterializeResult:
task = jobs.SubmitTask.from_dict(
{
"task_key": "feature_engineering",
"notebook_task": {
"notebook_path": "/path/to/my/feature_engineering_notebook",
"source": jobs.Source.WORKSPACE,
},
}
)
return pipes_databricks.run(
task=task,
context=context,
).get_materialize_result()
@dg.definitions
def resources():
pipes_databricks_resource = PipesDatabricksServerlessClient(
client=WorkspaceClient(
host=os.environ["DATABRICKS_HOST"],
token=os.environ["DATABRICKS_TOKEN"],
),
# A Unity Catalog Volume used as the Pipes message transport
volume_path="/Volumes/catalog/schema/dagster_pipes",
)
return dg.Definitions(
assets=[feature_engineering],
resources={"pipes_databricks": pipes_databricks_resource},
)
The volume_path must point to an existing Unity Catalog Volume (for example, /Volumes/catalog/schema/dagster_pipes). The client handles context injection and message reading automatically.
Step 2: Add Pipes boilerplate to your notebook
On the Databricks side, wrap your notebook logic with open_dagster_pipes using the Unity Catalog loaders:
from dagster_pipes import (
PipesDatabricksNotebookWidgetsParamsLoader,
PipesUnityCatalogVolumesContextLoader,
PipesUnityCatalogVolumesMessageWriter,
open_dagster_pipes,
)
with open_dagster_pipes(
context_loader=PipesUnityCatalogVolumesContextLoader(),
message_writer=PipesUnityCatalogVolumesMessageWriter(),
params_loader=PipesDatabricksNotebookWidgetsParamsLoader(dbutils.widgets),
) as pipes:
# Access parameters passed from Dagster
some_parameter = pipes.get_extra("some_parameter")
pipes.log.info(f"Running with some_parameter={some_parameter}")
# ... your notebook computation ...
# Report structured metadata back to Dagster
pipes.report_asset_materialization(
metadata={
"row_count": {"raw_value": 99_000, "type": "int"},
"output_table": {"raw_value": "catalog.schema.output", "type": "text"},
}
)
The PipesDatabricksNotebookWidgetsParamsLoader reads Pipes parameters from notebook widget values, which PipesDatabricksServerlessClient injects automatically when it submits the run.
Triggering existing jobs without Pipes
If your existing Databricks notebook or job doesn't have Pipes boilerplate, you can still orchestrate it from Dagster using the Databricks SDK directly. This approach blocks downstream assets on job failure and surfaces run metadata, but doesn't support real-time log streaming.
import os
from databricks.sdk import WorkspaceClient
import dagster as dg
@dg.asset(
kinds={"databricks", "notebook"},
description="Triggers an existing Databricks job and waits for completion.",
)
def feature_engineering(context: dg.AssetExecutionContext) -> dg.MaterializeResult:
host = os.environ["DATABRICKS_HOST"]
token = os.environ["DATABRICKS_TOKEN"]
job_id = int(os.environ["DATABRICKS_JOB_ID"])
client = WorkspaceClient(host=host, token=token)
context.log.info(f"Triggering Databricks job {job_id}")
run = client.jobs.run_now(job_id=job_id).result()
run_url = f"{host}/#job/{job_id}/run/{run.run_id}"
context.log.info(
f"Job completed — run_id={run.run_id}, state={run.state.result_state}"
)
return dg.MaterializeResult(
metadata={
"job_id": dg.MetadataValue.text(str(job_id)),
"run_id": dg.MetadataValue.text(str(run.run_id)),
"run_url": dg.MetadataValue.url(run_url),
"status": dg.MetadataValue.text(str(run.state.result_state)),
}
)
client.jobs.run_now(job_id=...).result() blocks until the job completes and raises an exception on failure, which surfaces as an asset failure in the Dagster UI.
Advanced usage
For advanced scenarios such as streaming materializations mid-run, custom context loaders, or existing job-polling logic, see the Dagster Pipes details and customization guide.