Ask AI

Databricks & Dagster#

Dagster can orchestrate your Databricks jobs and other Databricks API calls, making it easy to chain together multiple Databricks jobs and orchestrate Databricks alongside your other technologies.


Prerequisites#

To get started, you will need to install the dagster and dagster-databricks Python packages:

pip install dagster dagster-databricks

You'll also want to have a Databricks workspace with an existing project that is deployed with a Databricks job. If you don't have this, follow the Databricks quickstart to set one up.

To manage your Databricks job from Dagster, you'll need three values, which can be set as environment variables in Dagster:

  1. A host for connecting with your Databricks workspace, starting with https://, stored in an environment variable DATABRICKS_HOST,
  2. A token corresponding to a personal access token for your Databricks workspace, stored in an environment variable DATABRICKS_TOKEN, and
  3. A DATABRICKS_JOB_ID for the Databricks job you want to run.

You can follow the Databricks API authentication instructions to retrieve these values.


Step 1: Connecting to Databricks#

The first step in using Databricks with Dagster is to tell Dagster how to connect to your Databricks workspace using a Databricks resource. This resource contains information on the location of your Databricks workspace and any credentials sourced from environment variables that are required to access it. You can access the underlying Databricks API client to communicate to your Databricks workspace by configuring the resource.

For more information about the Databricks resource, see the API reference.

from dagster_databricks import databricks_client

databricks_client_instance = databricks_client.configured(
    {
        "host": {"env": "DATABRICKS_HOST"},
        "token": {"env": "DATABRICKS_TOKEN"},
    }
)

Step 2: Create an op/asset that connects to Databricks#

In this step, we'll demonstrate several ways to model a Databricks API call as either a Dagster op or the computation backing a Software-defined asset. You can either:

Afterward, we create a Dagster job that invokes the op or selects the asset to run the Databricks API call.

For guidance on deciding whether to use an op or asset, refer to the Understanding how assets relate to ops guide.

from dagster_databricks import create_databricks_run_now_op

my_databricks_run_now_op = create_databricks_run_now_op(
    databricks_job_id=DATABRICKS_JOB_ID,
)

@job(resource_defs={"databricks": databricks_client_instance})
def my_databricks_job():
    my_databricks_run_now_op()

Step 3: Schedule your Databricks computation#

Now that your Databricks API calls are modeled within Dagster, you can schedule them to run regularly.

In the example below, we schedule the materialize_databricks_table and my_databricks_job jobs to run daily:

from dagster import (
    AssetSelection,
    Definitions,
    ScheduleDefinition,
)

defs = Definitions(
    assets=[my_databricks_table],
    schedules=[
        ScheduleDefinition(
            job=materialize_databricks_table,
            cron_schedule="@daily",
        ),
        ScheduleDefinition(
            job=my_databricks_job,
            cron_schedule="@daily",
        ),
    ],
    jobs=[my_databricks_job],
    resources={"databricks": databricks_client_instance},
)

What's next?#

By now, you should have a working Databricks and Dagster integration!

What's next? From here, you can: