Multi-workspace Databricks
In this example, you'll explore a Dagster project that demonstrates an enterprise data mesh architecture. The project shows how a single Dagster deployment can orchestrate data pipelines across multiple Databricks workspaces while integrating Kafka for real-time event ingestion and Fivetran for SaaS data syncs.
The core challenge this project addresses is common in large organizations: data is distributed across many systems — event streams, SaaS tools, legacy databases, and business-unit-specific Databricks workspaces — but the organization needs unified lineage tracking, consistent scheduling, and cross-domain analytics. Dagster acts as the control plane, maintaining a single asset graph that spans all of these systems.
The pipeline:
- Ingests real-time ERP and CRM events from Kafka topics
- Syncs SaaS data from Salesforce and HubSpot via Fivetran
- Accesses legacy SQL Server and PostgreSQL databases through Databricks Unity Catalog Lakehouse Federation
- Runs workspace-specific Databricks jobs across four business domains (customer analytics, sales analytics, finance reporting, and marketing attribution)
- Tracks full asset lineage from raw sources through workspace-specific transformations to cross-domain analytics
Prerequisites
To follow the steps in this guide, you'll need:
- Python 3.10+ and
uvinstalled. For more information, see the Installation guide. - Familiarity with Python and asset-based pipelines.
- Active Databricks workspaces with job IDs and access tokens.
- A Kafka broker reachable from your Dagster deployment.
Step 1: Set up your Dagster environment
-
Clone the Dagster repo and navigate to the project:
cd examples/docs_projects/project_multi_workspace_databricks -
Install the required dependencies with
uv:uv sync -
Activate the virtual environment:
- MacOS/Unix
- Windows
source .venv/bin/activate.venv\Scripts\activate -
Configure credentials for your Databricks workspaces and Kafka broker (see Configuration below).
Step 2: Launch the Dagster webserver
Start the Dagster webserver from the project root:
dg dev
Open http://localhost:3000 in your browser. You'll see 21 assets across seven groups — the full data mesh from raw ingestion through analytics.
Understanding the asset model
A key design choice in this project is that many assets have pass bodies rather than executing computation directly. This is intentional and reflects a fundamental Dagster pattern: an asset represents data that exists somewhere, not necessarily computation that runs inside Dagster.
The project has two categories of assets:
External-system assets are @dg.asset functions with empty bodies. They appear in the lineage graph so other assets can declare dependencies on them, but the actual data production happens outside Dagster:
raw_erp_eventsandraw_crm_eventsare materialized by Kafka sensors, not by running the asset function directly. The sensor detects new messages, batches them, and triggers a run that marks the asset as materialized.salesforce_dataandhubspot_datarepresent Fivetran connector syncs. Fivetran manages the sync schedule; Dagster tracks that these assets exist and what depends on them.legacy_customer_dataandlegacy_transaction_datarepresent tables accessed via Databricks Unity Catalog Lakehouse Federation. The data lives in on-premise SQL Server and PostgreSQL — Dagster provides lineage tracking for downstream consumers.
Orchestrated assets — the four workspace groups — are fully implemented by the DatabricksJobOrchestrator component. When materialized, they run a real Databricks job in the configured workspace and emit MaterializeResult for each asset produced.
This hybrid approach gives you complete lineage — from raw Kafka events through workspace transformations to cross-domain analytics — while letting each system own its own execution.
Architecture
The project is organized into two layers:
Core assets (defs/core_assets/)
Defined as standard @asset functions, these represent the shared data foundation:
| Group | Assets |
|---|---|
ingestion | raw_erp_events, raw_crm_events, salesforce_data, hubspot_data, legacy_customer_data, legacy_transaction_data |
transformation | enriched_customer_profiles, order_fulfillment_status |
analytics | customer_360_view, marketing_attribution |
Workspace assets (defs/*/defs.yaml)
Each of the four business domains is configured via a defs.yaml file using the reusable DatabricksJobOrchestrator component:
| Workspace | Databricks job | Assets produced |
|---|---|---|
customer_analytics | Job 12345 | enriched_profiles, customer_segments |
sales_analytics | Job 12346 | order_fulfillment, revenue_forecasting |
finance_reporting | Job 12347 | invoice_reconciliation, payment_tracking, financial_statements, customer_segments_aggregation |
marketing_attribution | Job 12348 | campaign_performance, attribution_model, customer_lifetime_value |
Data flows from the core ingestion assets (Kafka, Fivetran, legacy databases) into workspace-specific Databricks jobs, which produce domain-specific assets. Those workspace assets are then consumed by the transformation and analytics core assets to produce the final cross-domain outputs (customer_360_view, marketing_attribution).
Kafka ingestion
Real-time ERP and CRM events flow through Kafka. Rather than polling Kafka on a fixed schedule from inside an asset, the project uses a sensor-driven pattern that maps naturally to event-driven ingestion.
The KafkaResource
A ConfigurableResource wraps the Kafka connection and is injected into sensors at runtime:
class KafkaResource(dg.ConfigurableResource):
"""Resource for connecting to Kafka."""
bootstrap_servers: str = Field(
default="localhost:9092",
description="Comma-separated list of Kafka bootstrap servers",
)
group_id: str = Field(
default="dagster-consumer-group",
description="Consumer group ID",
)
topics: list[str] = Field(
default_factory=lambda: ["events"],
description="List of Kafka topics to consume",
)
def create_consumer(self) -> KafkaConsumer:
"""Create a Kafka consumer instance."""
return KafkaConsumer(
*self.topics,
bootstrap_servers=self.bootstrap_servers.split(","),
group_id=self.group_id,
auto_offset_reset="earliest",
enable_auto_commit=False,
)
The resource is registered in defs/core_assets/__init__.py with a default configuration pointing at localhost:9092. Update bootstrap_servers and topics for your deployment.
The Kafka sensor
kafka_sensor_factory is a factory function that creates a SensorDefinition for a given replica ID and target asset key. Running two sensor instances (replica 0 for ERP, replica 1 for CRM) allows the project to consume from both topics in parallel without coordination:
@dg.sensor(
name=f"watch_kafka_replica_{replica_id}",
minimum_interval_seconds=TIME_BETWEEN_SENSOR_TICKS,
default_status=dg.DefaultSensorStatus.RUNNING,
)
def watch_kafka(
context: dg.SensorEvaluationContext,
kafka_resource: KafkaResource,
) -> Iterator[dg.RunRequest]:
"""Sensor that polls Kafka for new messages and triggers runs."""
start_time = time.time()
consumer = kafka_resource.create_consumer()
try:
batch_data: list[dict] = []
max_offset = -1
while time.time() - start_time < MAX_SENSOR_TICK_RUNTIME:
message_batch = consumer.poll(
timeout_ms=2000, max_records=MAX_BATCH_SIZE
)
if not message_batch:
break
for _topic_partition, messages in message_batch.items():
for message in messages:
batch_data.append(
{
"topic": message.topic,
"partition": message.partition,
"offset": message.offset,
"timestamp": message.timestamp,
"key": (
message.key.decode("utf-8") if message.key else None
),
"value": message.value.decode("utf-8"),
}
)
max_offset = max(max_offset, message.offset)
if len(batch_data) >= MAX_BATCH_SIZE:
break
if batch_data:
context.log.info(
f"Consumed {len(batch_data)} messages, max_offset={max_offset}"
)
yield dg.RunRequest(
run_key=f"max_offset_{max_offset}",
run_config={
"ops": {
"process_kafka_events": {
"config": {
"batch_data": batch_data,
"max_offset": max_offset,
}
}
}
},
asset_selection=[target_asset_key] if target_asset_key else None,
)
consumer.commit()
finally:
consumer.close()
Each sensor tick:
- Opens a Kafka consumer using the
KafkaResource - Polls for up to 30 seconds, collecting a batch of up to 50 messages
- If messages were received, yields a
RunRequestwith the batch serialized into run config - Commits Kafka offsets only after the run request is yielded successfully
The run_key is set to max_offset_{n}, which means Dagster deduplicates runs — if the sensor ticks before the previous run completes, a new RunRequest with the same offset won't trigger a duplicate run.
Kafka asset definitions
The raw_erp_events and raw_crm_events assets themselves have pass bodies because the Kafka sensor — not the asset function — is what processes and materializes the data:
@dg.asset(
kinds={"kafka", "raw_data"},
group_name="ingestion",
)
def raw_erp_events(context: dg.AssetExecutionContext) -> None:
"""Raw ERP events ingested from Kafka (invoices, orders, payments).
Consumes event-based data from Kafka topics populated by ERP systems.
Events include invoice creation, payment processing, and order updates.
"""
# This is a placeholder - actual execution happens via the Kafka sensor
# The sensor triggers runs that populate this asset
pass
@dg.asset(
kinds={"kafka", "raw_data"},
group_name="ingestion",
)
def raw_crm_events(context: dg.AssetExecutionContext) -> None:
"""Raw CRM events ingested from Kafka (customer interactions, balances).
Consumes event-based data from Kafka topics populated by CRM systems.
Events include customer balance updates, support interactions, and account changes.
"""
pass
Other assets that depend on raw_erp_events or raw_crm_events will see them as materialized once the sensor triggers a run. Their presence in the asset graph makes the dependency explicit and enables downstream scheduling.
The DatabricksJobOrchestrator component
The core reusable abstraction in this project is DatabricksJobOrchestrator — a custom Component that maps a Databricks job in a specific workspace to one or more Dagster assets.
Rather than writing a new @asset function for each workspace and job, you configure one YAML file per domain. Dagster's component system loads the YAML, instantiates the component, and calls build_defs() to produce the asset graph:
type: project_multi_workspace_databricks.components.databricks_job_orchestrator.DatabricksJobOrchestrator
attributes:
job_id: 12345
workspace_config:
host: "https://workspace-customer-analytics.cloud.databricks.com"
token: "demo-token-not-real"
assets:
- key: "customer_analytics/enriched_profiles"
description: "Customer profiles enriched with transaction history and behavior patterns"
group_name: "customer_analytics"
deps:
- "raw_erp_events"
- "raw_crm_events"
- "legacy_customer_data"
- key: "customer_analytics/customer_segments"
description: "Customer segmentation for targeted marketing campaigns"
group_name: "customer_analytics"
deps:
- "customer_analytics/enriched_profiles"
How build_defs works
The build_defs method converts the component's configuration into a Definitions object containing a single @dg.multi_asset. Each entry in assets: becomes an AssetSpec, carrying the asset key, group, dependencies, and Databricks metadata:
def build_defs(self, context: dg.ComponentLoadContext) -> dg.Definitions:
"""Build Dagster definitions for this component."""
enriched_specs = []
for asset_def in self.assets:
key_parts = asset_def.key.split("/")
asset_key = dg.AssetKey(key_parts)
deps = [dg.AssetKey(dep.split("/")) for dep in asset_def.deps]
description = asset_def.description or (
f"Asset {asset_def.key} produced by Databricks job {self.job_id}"
)
metadata: dict[str, Any] = {
"databricks_job_id": self.job_id,
"databricks_host": self.workspace_config.host,
"job_parameters": self.job_parameters or {},
}
spec = dg.AssetSpec(
key=asset_key,
description=description,
metadata=metadata,
kinds={"databricks"},
group_name=asset_def.group_name,
deps=deps,
)
enriched_specs.append(spec)
component = self
@dg.multi_asset(
name=f"databricks_job_{self.job_id}",
specs=enriched_specs,
)
def databricks_job_asset(context: dg.AssetExecutionContext):
"""Execute the Databricks job and materialize all assets."""
context.log.info(f"Executing Databricks job {component.job_id}")
result = component.execute()
context.log.info(f"Job completed: {result}")
for spec in enriched_specs:
yield dg.MaterializeResult(
asset_key=spec.key,
metadata={
"execution_start_time": result["start_time"],
"execution_end_time": result["end_time"],
"databricks_run_id": result["run_id"],
},
)
return dg.Definitions(assets=[databricks_job_asset])
When the multi-asset executes, it calls execute(), which uses DatabricksClient to trigger jobs.run_now() in the configured workspace and waits up to one hour for completion. On success, it yields a MaterializeResult for each asset spec, attaching execution metadata (run ID, start/end times).
Adding a new workspace
To add a new business domain, scaffold a new component instance:
dg scaffold defs \
project_multi_workspace_databricks.components.databricks_job_orchestrator.DatabricksJobOrchestrator \
new_workspace_name
This creates defs/new_workspace_name/defs.yaml pre-populated with the component schema. Fill in the job_id, workspace_config, and assets list — no Python code required.
Configuration
Databricks workspaces
Update workspace_config in each workspace's defs.yaml with real credentials:
workspace_config:
host: 'https://your-workspace.cloud.databricks.com'
token: '${DATABRICKS_TOKEN}'
Kafka
Update KafkaResource in defs/core_assets/__init__.py:
KafkaResource(
bootstrap_servers="your-kafka-broker:9092",
topics=["erp-events", "crm-events"],
)
Fivetran
The salesforce_data and hubspot_data assets are placeholder nodes — their materialization is driven externally by Fivetran connectors. To wire them to a real Fivetran instance, replace the pass-body assets with assets loaded from the Fivetran integration:
from dagster_fivetran import FivetranResource, load_assets_from_fivetran_instance
fivetran_resource = FivetranResource(
api_key=os.getenv("FIVETRAN_API_KEY"),
api_secret=os.getenv("FIVETRAN_API_SECRET"),
)