Databricks (dagster-databricks)
The dagster_databricks
package provides these main pieces of functionality:
- A resource,
databricks_pyspark_step_launcher
, which will execute a op within a Databricks context on a cluster, such that thepyspark
resource uses the cluster’s Spark instance. - An op factory,
create_databricks_run_now_op
, which creates an op that launches an existing Databricks job using the Run Now API. - A op factory,
create_databricks_submit_run_op
, which creates an op that submits a one-time run of a set of tasks on Databricks using the Submit Run API.
Note that, for the databricks_pyspark_step_launcher
, either S3 or Azure Data Lake Storage config
must be specified for ops to succeed, and the credentials for this storage must also be
stored as a Databricks Secret and stored in the resource config so that the Databricks cluster can
access storage.
APIs
Resources
- dagster_databricks.DatabricksClientResource ResourceDefinition
Resource which provides a Python client for interacting with Databricks within an op or asset.
class
dagster_databricks.DatabricksClientA thin wrapper over the Databricks REST API.
property
workspace_clientRetrieve a reference to the underlying Databricks Workspace client. For more information, see the Databricks SDK for Python.
Examples:from dagster import op
from databricks.sdk import WorkspaceClient
@op(required_resource_keys={"databricks_client"})
def op1(context):
# Initialize the Databricks Jobs API
client = context.resources.databricks_client.api_client
# Example 1: Run a Databricks job with some parameters.
client.jobs.run_now(...)
# Example 2: Trigger a one-time run of a Databricks workload.
client.jobs.submit(...)
# Example 3: Get an existing run.
client.jobs.get_run(...)
# Example 4: Cancel a run.
client.jobs.cancel_run(...)Returns: The authenticated Databricks SDK Workspace Client.Return type: WorkspaceClient
Ops
- dagster_databricks.create_databricks_run_now_op
Creates an op that launches an existing databricks job.
As config, the op accepts a blob of the form described in Databricks’ Job API: https://docs.databricks.com/api/workspace/jobs/runnow. The only required field is
job_id
, which is the ID of the job to be executed. Additional fields can be used to specify override parameters for the Databricks Job.Parameters:
- databricks_job_id (int) – The ID of the Databricks Job to be executed.
- databricks_job_configuration (dict) – Configuration for triggering a new job run of a Databricks Job. See https://docs.databricks.com/api/workspace/jobs/runnow for the full configuration.
- poll_interval_seconds (float) – How often to poll the Databricks API to check whether the Databricks job has finished running.
- max_wait_time_seconds (float) – How long to wait for the Databricks job to finish running before raising an error.
- name (Optional[str]) – The name of the op. If not provided, the name will be _databricks_run_now_op.
- databricks_resource_key (str) – The name of the resource key used by this op. If not provided, the resource key will be “databricks”.
Returns: An op definition to run the Databricks Job.Return type: OpDefinition Example:
from dagster import job
from dagster_databricks import create_databricks_run_now_op, DatabricksClientResource
DATABRICKS_JOB_ID = 1234
run_now_op = create_databricks_run_now_op(
databricks_job_id=DATABRICKS_JOB_ID,
databricks_job_configuration={
"python_params": [
"--input",
"schema.db.input_table",
"--output",
"schema.db.output_table",
],
},
)
@job(
resource_defs={
"databricks": DatabricksClientResource(
host=EnvVar("DATABRICKS_HOST"),
token=EnvVar("DATABRICKS_TOKEN")
)
}
)
def do_stuff():
run_now_op()
- dagster_databricks.create_databricks_submit_run_op
Creates an op that submits a one-time run of a set of tasks on Databricks.
As config, the op accepts a blob of the form described in Databricks’ Job API: https://docs.databricks.com/api/workspace/jobs/submit.
Parameters:
- databricks_job_configuration (dict) – Configuration for submitting a one-time run of a set of tasks on Databricks. See https://docs.databricks.com/api/workspace/jobs/submit for the full configuration.
- poll_interval_seconds (float) – How often to poll the Databricks API to check whether the Databricks job has finished running.
- max_wait_time_seconds (float) – How long to wait for the Databricks job to finish running before raising an error.
- name (Optional[str]) – The name of the op. If not provided, the name will be _databricks_submit_run_op.
- databricks_resource_key (str) – The name of the resource key used by this op. If not provided, the resource key will be “databricks”.
Returns: An op definition to submit a one-time run of a set of tasks on Databricks.Return type: OpDefinition Example:
from dagster import job
from dagster_databricks import create_databricks_submit_run_op, DatabricksClientResource
submit_run_op = create_databricks_submit_run_op(
databricks_job_configuration={
"new_cluster": {
"spark_version": '2.1.0-db3-scala2.11',
"num_workers": 2
},
"notebook_task": {
"notebook_path": "/Users/dagster@example.com/PrepareData",
},
}
)
@job(
resource_defs={
"databricks": DatabricksClientResource(
host=EnvVar("DATABRICKS_HOST"),
token=EnvVar("DATABRICKS_TOKEN")
)
}
)
def do_stuff():
submit_run_op()
Step Launcher
- dagster_databricks.databricks_pyspark_step_launcher ResourceDefinition
- superseded
This API has been superseded. While there is no plan to remove this functionality, for new projects, we recommend using Dagster Pipes. For more information, see https://docs.dagster.io/guides/build/external-pipelines/.
Resource for running ops as a Databricks Job.
When this resource is used, the op will be executed in Databricks using the ‘Run Submit’ API. Pipeline code will be zipped up and copied to a directory in DBFS along with the op’s execution context.
Use the ‘run_config’ configuration to specify the details of the Databricks cluster used, and the ‘storage’ key to configure persistent storage on that cluster. Storage is accessed by setting the credentials in the Spark context, as documented here for S3 and here for ADLS.
Pipes
class
dagster_databricks.PipesDatabricksClientPipes client for databricks.
Parameters:
- client (WorkspaceClient) – A databricks WorkspaceClient object.
- (Optional[Mapping[str (env) – An optional dict of environment variables to pass to the databricks job.
- str]] – An optional dict of environment variables to pass to the databricks job.
- context_injector (Optional[PipesContextInjector]) – A context injector to use to inject context into the k8s container process. Defaults to
PipesDbfsContextInjector
. - message_reader (Optional[PipesMessageReader]) – A message reader to use to read messages from the databricks job. Defaults to
PipesDbfsMessageReader
. - poll_interval_seconds (float) – How long to sleep between checking the status of the job run. Defaults to 5.
- forward_termination (bool) – Whether to cancel the Databricks job if the orchestration process is interrupted or canceled. Defaults to True.
class
dagster_databricks.PipesDbfsContextInjectorA context injector that injects context into a Databricks job by writing a JSON file to DBFS.
Parameters: client (WorkspaceClient) – A databricks WorkspaceClient object.
class
dagster_databricks.PipesDbfsMessageReaderMessage reader that reads messages by periodically reading message chunks from an automatically-generated temporary directory on DBFS.
If log_readers is passed, this reader will also start the passed readers when the first message is received from the external process.
Parameters:
- interval (float) – interval in seconds between attempts to download a chunk
- client (WorkspaceClient) – A databricks WorkspaceClient object.
- cluster_log_root (Optional[str]) – The root path on DBFS where the cluster logs are written. If set, this will be used to read stderr/stdout logs.
- include_stdio_in_messages (bool) – Whether to send stdout/stderr to Dagster via Pipes messages. Defaults to False.
- log_readers (Optional[Sequence[PipesLogReader]]) – A set of log readers for logs on DBFS.
class
dagster_databricks.PipesDbfsLogReaderReader that reads a log file from DBFS.
Parameters:
- interval (float) – interval in seconds between attempts to download a log chunk
- remote_log_name (Literal["stdout", "stderr"]) – The name of the log file to read.
- target_stream (TextIO) – The stream to which to forward log chunks that have been read.
- client (WorkspaceClient) – A databricks WorkspaceClient object.
- debug_info (Optional[str]) – An optional message containing debug information about the log reader.