Skip to main content

Dagster Pipes with Databricks (serverless compute)

Dagster Pipes lets you run code in Databricks while streaming logs and structured metadata back to Dagster in real time. Unlike Databricks Connect, the computation runs entirely on Databricks — your Dagster asset acts as the orchestrator, not the executor.

note

To use Dagster Pipes to deploy Python scripts to DBFS on classic (non-serverless) clusters, see Dagster Pipes with Databricks (DBFS, classic clusters).

When to use Dagster Pipes

Dagster Pipes is best for:

  • Existing Databricks notebooks or scripts you want to orchestrate without rewriting
  • Large batch jobs that should execute independently of the Dagster process
  • Scenarios where you need real-time log streaming and structured metadata back in Dagster
  • Teams where Databricks jobs are deployed and managed separately from Dagster code

Prerequisites

Install the dagster-databricks library:

uv add dagster-databricks

Configure your environment:

export DATABRICKS_HOST=https://dbc-xxxxxxx-yyyy.cloud.databricks.com/
export DATABRICKS_TOKEN=<your-personal-access-token>

For serverless compute, DBFS is unavailable. Use PipesDatabricksServerlessClient, which uses a Unity Catalog Volume as the message transport layer.

Step 1: Configure the Pipes resource

src/<project-name>/defs/databricks-assets.py
import os

from dagster_databricks.pipes import PipesDatabricksServerlessClient
from databricks.sdk import WorkspaceClient
from databricks.sdk.service import jobs

import dagster as dg


@dg.asset
def feature_engineering(
context: dg.AssetExecutionContext,
pipes_databricks: PipesDatabricksServerlessClient,
) -> dg.MaterializeResult:
task = jobs.SubmitTask.from_dict(
{
"task_key": "feature_engineering",
"notebook_task": {
"notebook_path": "/path/to/my/feature_engineering_notebook",
"source": jobs.Source.WORKSPACE,
},
}
)

return pipes_databricks.run(
task=task,
context=context,
).get_materialize_result()


@dg.definitions
def resources():
pipes_databricks_resource = PipesDatabricksServerlessClient(
client=WorkspaceClient(
host=os.environ["DATABRICKS_HOST"],
token=os.environ["DATABRICKS_TOKEN"],
),
# A Unity Catalog Volume used as the Pipes message transport
volume_path="/Volumes/catalog/schema/dagster_pipes",
)
return dg.Definitions(
assets=[feature_engineering],
resources={"pipes_databricks": pipes_databricks_resource},
)

The volume_path must point to an existing Unity Catalog Volume (for example, /Volumes/catalog/schema/dagster_pipes). The client handles context injection and message reading automatically.

Step 2: Add Pipes boilerplate to your notebook

On the Databricks side, wrap your notebook logic with open_dagster_pipes using the Unity Catalog loaders:

Databricks notebook
from dagster_pipes import (
PipesDatabricksNotebookWidgetsParamsLoader,
PipesUnityCatalogVolumesContextLoader,
PipesUnityCatalogVolumesMessageWriter,
open_dagster_pipes,
)

with open_dagster_pipes(
context_loader=PipesUnityCatalogVolumesContextLoader(),
message_writer=PipesUnityCatalogVolumesMessageWriter(),
params_loader=PipesDatabricksNotebookWidgetsParamsLoader(dbutils.widgets),
) as pipes:
# Access parameters passed from Dagster
some_parameter = pipes.get_extra("some_parameter")
pipes.log.info(f"Running with some_parameter={some_parameter}")

# ... your notebook computation ...

# Report structured metadata back to Dagster
pipes.report_asset_materialization(
metadata={
"row_count": {"raw_value": 99_000, "type": "int"},
"output_table": {"raw_value": "catalog.schema.output", "type": "text"},
}
)

The PipesDatabricksNotebookWidgetsParamsLoader reads Pipes parameters from notebook widget values, which PipesDatabricksServerlessClient injects automatically when it submits the run.

Triggering existing jobs without Pipes

If your existing Databricks notebook or job doesn't have Pipes boilerplate, you can still orchestrate it from Dagster using the Databricks SDK directly. This approach blocks downstream assets on job failure and surfaces run metadata, but doesn't support real-time log streaming.

src/<project-name>/defs/databricks-assets.py
import os

from databricks.sdk import WorkspaceClient

import dagster as dg


@dg.asset(
kinds={"databricks", "notebook"},
description="Triggers an existing Databricks job and waits for completion.",
)
def feature_engineering(context: dg.AssetExecutionContext) -> dg.MaterializeResult:
host = os.environ["DATABRICKS_HOST"]
token = os.environ["DATABRICKS_TOKEN"]
job_id = int(os.environ["DATABRICKS_JOB_ID"])

client = WorkspaceClient(host=host, token=token)

context.log.info(f"Triggering Databricks job {job_id}")
run = client.jobs.run_now(job_id=job_id).result()
run_url = f"{host}/#job/{job_id}/run/{run.run_id}"

context.log.info(
f"Job completed — run_id={run.run_id}, state={run.state.result_state}"
)
return dg.MaterializeResult(
metadata={
"job_id": dg.MetadataValue.text(str(job_id)),
"run_id": dg.MetadataValue.text(str(run.run_id)),
"run_url": dg.MetadataValue.url(run_url),
"status": dg.MetadataValue.text(str(run.state.result_state)),
}
)

client.jobs.run_now(job_id=...).result() blocks until the job completes and raises an exception on failure, which surfaces as an asset failure in the Dagster UI.

Advanced usage

For advanced scenarios such as streaming materializations mid-run, custom context loaders, or existing job-polling logic, see the Dagster Pipes details and customization guide.