Ask AI

Resources#

This guide covers using the new Pythonic resources system introduced in Dagster 1.3. If your code is still using the legacy resources system, see the Legacy resources guide. To migrate your code, refer to the Migrating to Pythonic resources and config guide.

In data engineering, resources are the external services, tools, and storage you use to do your job. For example, a simple ETL (Extract Transform Load) pipeline fetches data from an API, ingests it into a database, and updates a dashboard. External tools and services this pipeline uses could be:

  • The API the data is fetched from
  • The AWS S3 bucket where the API’s response is stored
  • The Snowflake/Databricks/BigQuery account the data is ingested into
  • The BI tool the dashboard was made in

Using Dagster resources, you can standardize connections and integrations to these tools across Dagster definitions like asset definitions, schedules, sensors, ops, and jobs.

So, why use resources?

  • Plug in different implementations in different environments - If you have a heavy external dependency that you want to use in production but avoid using in testing, you can accomplish this by providing different resources in each environment. Refer to the Separating business logic from environments section of the Testing documentation for more info about this capability.
  • Surface configuration in the Dagster UI - Resources and their configuration are surfaced in the UI, making it easy to see where resources are used and how they're configured.
  • Share configuration across multiple assets or ops - Resources are configurable and shared, so configuration can be supplied in one place instead of individually.
  • Share implementations across multiple assets or ops - When multiple assets access the same external services, resources provide a standard way to structure your code to share the implementations.

Relevant APIs#

NameDescription
ConfigurableResourceThe base class extended to define resources. Under the hood, implements ResourceDefinition.
ResourceParamAn annotation used to specify that a plain Python object parameter for an asset or op is a resource.
ResourceDefinitionClass for resource definitions. You almost never want to use initialize this class directly. Instead, you should extend the ConfigurableResource class which implements ResourceDefinition.
InitResourceContextThe context object provided to a resource during initialization. This object contains required resources, config, and other run information.
build_init_resource_contextFunction for building an InitResourceContext outside of execution, intended to be used when testing a resource.
build_resourcesFunction for initializing a set of resources outside of the context of a job's execution.
with_resourcesAdvanced API for providing resources to a specific set of asset definitions, overriding those provided to Definitions.

Defining a resource#

Typically, resources are defined by subclassing ConfigurableResource. Resources typically have a set of configuration values, which are used to specify things like account identifiers, API keys, or database names when interfacing with an external tool or service. This configuration schema is specified by attributes on the class.

The configuration system has a few advantages over plain Python parameter passing. Configured values can be:

  1. Replaced at run launch time in the Launchpad in Dagster UI, the GraphQL API, or the Python API
  2. Displayed in the Dagster UI
  3. Set dynamically using environment variables, resolved at runtime

With asset definitions#

The following example demonstrates defining a subclass of ConfigurableResource that represents a connection to an external service. The resource can be configured by constructing it in the Definitions call.

You can define methods on the resource class which depend on config values.

from dagster import asset, Definitions, ConfigurableResource
import requests
from requests import Response

class MyConnectionResource(ConfigurableResource):
    username: str

    def request(self, endpoint: str) -> Response:
        return requests.get(
            f"https://my-api.com/{endpoint}",
            headers={"user-agent": "dagster"},
        )

@asset
def data_from_service(my_conn: MyConnectionResource) -> Dict[str, Any]:
    return my_conn.request("/fetch_data").json()

defs = Definitions(
    assets=[data_from_service],
    resources={
        "my_conn": MyConnectionResource(username="my_user"),
    },
)

Assets specify resource dependencies by annotating the resource as a parameter to the asset function.

To provide resource values to assets, attach them to the Definitions objects. These resources are automatically passed to the function at runtime.

With sensors#

Sensors use resources in the same way as assets, which can be useful for querying external services for data.

To specify resource dependencies on a sensor, annotate the resource type as a parameter to the sensor's function. For more information and examples, refer to the Sensors documentation.

With schedules#

Schedules can use resources in case your schedule logic needs to interface with an external tool or to make your schedule logic more testable.

To specify resource dependencies on a schedule, annotate the resource type as a parameter to the schedule's function. Refer to the Schedule examples reference for more info.

With ops and jobs#

The following example defines a subclass of ConfigurableResource that represents a connection to an external service. The resource can be configured by constructing it in the Definitions call.

You can define methods on the resource class which depend on config values.

from dagster import Definitions, job, op, ConfigurableResource
import requests
from requests import Response

class MyConnectionResource(ConfigurableResource):
    username: str

    def request(self, endpoint: str) -> Response:
        return requests.get(
            f"https://my-api.com/{endpoint}",
            headers={"user-agent": "dagster"},
        )

@op
def update_service(my_conn: MyConnectionResource):
    my_conn.request("/update")

@job
def update_service_job():
    update_service()

defs = Definitions(
    jobs=[update_service_job],
    resources={
        "my_conn": MyConnectionResource(username="my_user"),
    },
)

Ops specify resource dependencies by annotating the resource as a parameter to the op function.

To provide resource values to ops, attach them to the Definitions objects. These resources are automatically passed to the function at runtime.

There are many supported config types that can be used when defining resources. Refer to the advanced config types documentation for a more comprehensive overview of the available config types.


Configuring resources#

Using environment variables with resources#

Resources can be configured using environment variables, which is useful for secrets or other environment-specific configuration. If you're using Dagster+, environment variables can be configured directly in the UI.

To use environment variables, pass an EnvVar when constructing the resource. EnvVar inherits from str and can be used to populate any string config field on a resource. The value of the environment variable will be evaluated when a run is launched.

from dagster import EnvVar, Definitions, ConfigurableResource

class CredentialsResource(ConfigurableResource):
    username: str
    password: str

defs = Definitions(
    assets=...,
    resources={
        "credentials": CredentialsResource(
            username=EnvVar("MY_USERNAME"),
            password=EnvVar("MY_PASSWORD"),
        )
    },
)

What about os.getenv()? When os.getenv() is used, the value of the variable is retrieved when Dagster loads the code location. Using EnvVar not only tells Dagster to retrieve the value at runtime, but also not to display the value in the UI.

Using the EnvVar approach has a few unique benefits:

  • Improved observability. The UI will display information about configuration values sourced from environment variables.
  • Secret values are hidden in the UI. Secret values are hidden in the Launchpad, Resources page, and other places where configuration is displayed.
  • Simplified testing. Because you can provide string values directly to configuration rather than environment variables, testing may be easier.

For more information on using environment variables with Dagster, refer to the Environment variables guide.

Configuring resources at launch time#

In some cases, you may want to specify configuration for a resource at launch time, in the Launchpad or in a RunRequest for a schedule or sensor. For example, you may want a sensor-triggered run to specify a different target table in a database resource for each run.

You can use the configure_at_launch() method to defer the construction of a configurable resource until launch time:

from dagster import ConfigurableResource, Definitions, asset

class DatabaseResource(ConfigurableResource):
    table: str

    def read(self): ...

@asset
def data_from_database(db_conn: DatabaseResource):
    return db_conn.read()

defs = Definitions(
    assets=[data_from_database],
    resources={"db_conn": DatabaseResource.configure_at_launch()},
)

Providing resource launch time configuration in Python code#

Then, configuration for the resource can be provided at launch time in the Launchpad or in Python code using the config parameter of the RunRequest:

from dagster import sensor, define_asset_job, RunRequest, RunConfig

update_data_job = define_asset_job(
    name="update_data_job", selection=[data_from_database]
)

@sensor(job=update_data_job)
def table_update_sensor():
    tables = ...
    for table_name in tables:
        yield RunRequest(
            run_config=RunConfig(
                resources={
                    "db_conn": DatabaseResource(table=table_name),
                },
            ),
        )

Resources that depend on other resources#

In some situations, you may want to define a resource that depends on other resources. This is useful for common configuration. For example, separate resources for a database and for a filestore may both depend on credentials for a particular cloud provider. Defining these credentials as a separate, nested resource allows you to specify configuration in a single place. It also makes it easier to test resources, since the nested resource can be mocked.

In this case, you can list that nested resource as an attribute of the resource class:

from dagster import Definitions, ConfigurableResource, ResourceDependency

class CredentialsResource(ConfigurableResource):
    username: str
    password: str

class FileStoreBucket(ConfigurableResource):
    credentials: ResourceDependency[CredentialsResource]
    region: str

    def write(self, data: str):
        # We can access the credentials resource via `self.credentials`,
        # which will be an initialized instance of `CredentialsResource`
        get_filestore_client(
            username=self.credentials.username,
            password=self.credentials.password,
            region=self.region,
        ).write(data)

defs = Definitions(
    assets=[my_asset],
    resources={
        "bucket": FileStoreBucket(
            credentials=CredentialsResource(
                username="my_user", password="my_password"
            ),
            region="us-east-1",
        ),
    },
)

If it's preferred to provide the configuration for credentials at launch time, use the configure_at_launch() method to defer the construction of the CredentialsResource until launch time.

Because credentials requires launch time configuration through the launchpad, it must also be passed to the Definitions object, so that configuration can be provided at launch time. Nested resources only need to be passed to the Definitions object if they require launch time configuration.

credentials = CredentialsResource.configure_at_launch()

defs = Definitions(
    assets=[my_asset],
    resources={
        "credentials": credentials,
        "bucket": FileStoreBucket(
            credentials=credentials,
            region="us-east-1",
        ),
    },
)

Resource lifecycle#

Once a resource reaches a certain complexity, you may want to manage the state of the resource over its lifetime. This is useful for resources that require special initialization or cleanup. ConfigurableResource is a data class meant to encapsulate config, but also provides lifecycle hooks to manage the state of the resource.

You can mark any private state attributes using Pydantic's PrivateAttr. These attributes, which must start with an underscore, won't be included in the resource's config.

Lifecycle hooks#

When a resource is initialized during a Dagster run, the setup_for_execution method is called. This method is passed an InitResourceContext object, which contains the resource's config and other run information. The resource can use this context to initialize any state it needs for the duration of the run.

Once a resource is no longer needed, the teardown_after_execution method is called. This method is passed the same context object as setup_for_execution. This method can be useful for cleaning up any state that was initialized in setup_for_execution.

setup_for_execution and teardown_after_execution are each called once per run, per process. When using the in-process executor, this means that they will be called once per run. When using the multiprocess executor, each process's instance of the resource will be initialized and torn down.

In the following example, we set up an API token for a client resource based on the username and password provided in the config. The API token can then be used to query an API in the asset body.

from dagster import ConfigurableResource, InitResourceContext, asset
import requests

from pydantic import PrivateAttr

class MyClientResource(ConfigurableResource):
    username: str
    password: str

    _api_token: str = PrivateAttr()

    def setup_for_execution(self, context: InitResourceContext) -> None:
        # Fetch and set up an API token based on the username and password
        self._api_token = requests.get(
            "https://my-api.com/token", auth=(self.username, self.password)
        ).text

    def get_all_users(self):
        return requests.get(
            "https://my-api.com/users",
            headers={"Authorization": self._api_token},
        )

@asset
def my_asset(client: MyClientResource):
    return client.get_all_users()

For more complex use cases, you can override the yield_for_execution. By default, this context manager calls setup_for_execution, yields the resource, and then calls teardown_after_execution, but you can override it to provide any custom behavior. This is useful for resources that require a context to be open for the duration of a run, such as database connections or file handles.

from dagster import ConfigurableResource, asset, InitResourceContext
from contextlib import contextmanager
from pydantic import PrivateAttr

class DBConnection:
    ...

    def query(self, body: str): ...

@contextmanager
def get_database_connection(username: str, password: str): ...

class MyClientResource(ConfigurableResource):
    username: str
    password: str

    _db_connection: DBConnection = PrivateAttr()

    @contextmanager
    def yield_for_execution(self, context: InitResourceContext):
        # keep connection open for the duration of the execution
        with get_database_connection(self.username, self.password) as conn:
            # set up the connection attribute so it can be used in the execution
            self._db_connection = conn

            # yield, allowing execution to occur
            yield self

    def query(self, body: str):
        return self._db_connection.query(body)

@asset
def my_asset(client: MyClientResource):
    client.query("SELECT * FROM my_table")

Using bare Python objects as resources#

When starting to build a set of assets or jobs, you may want to use a bare Python object without configuration as a resource, such as a third-party API client.

Dagster supports passing plain Python objects as resources. This follows a similar pattern to using a ConfigurableResource subclass; however, assets and ops that use these resources must annotate them with ResourceParam. This annotation lets Dagster know that the parameter is a resource and not an upstream input.

from dagster import Definitions, asset, ResourceParam

# `ResourceParam[GitHub]` is treated exactly like `GitHub` for type checking purposes,
# and the runtime type of the github parameter is `GitHub`. The purpose of the
# `ResourceParam` wrapper is to let Dagster know that `github` is a resource and not an
# upstream asset.

@asset
def public_github_repos(github: ResourceParam[GitHub]):
    return github.organization("dagster-io").repositories()

defs = Definitions(
    assets=[public_github_repos],
    resources={"github": GitHub(...)},
)

Testing configurable resources#

You can test the initialization of a ConfigurableResource by constructing it manually. In most cases, the resource can be constructed directly:

from dagster import ConfigurableResource

class MyResource(ConfigurableResource):
    value: str

    def get_value(self) -> str:
        return self.value

def test_my_resource():
    assert MyResource(value="foo").get_value() == "foo"

If the resource requires other resources, you can pass them as constructor arguments:

from dagster import ConfigurableResource

class StringHolderResource(ConfigurableResource):
    value: str

class MyResourceRequiresAnother(ConfigurableResource):
    foo: StringHolderResource
    bar: str

def test_my_resource_with_nesting():
    string_holder = StringHolderResource(value="foo")
    resource = MyResourceRequiresAnother(foo=string_holder, bar="bar")
    assert resource.foo.value == "foo"
    assert resource.bar == "bar"

Testing with resource context#

In the case that a resource uses the resource initialization context, you can use the build_init_resource_context utility alongside the with_init_resource_context helper on the resource class:

from dagster import (
    ConfigurableResource,
    build_init_resource_context,
    DagsterInstance,
)
from typing import Optional

class MyContextResource(ConfigurableResource[GitHub]):
    base_path: Optional[str] = None

    def effective_base_path(self) -> str:
        if self.base_path:
            return self.base_path
        instance = self.get_resource_context().instance
        assert instance
        return instance.storage_directory()

def test_my_context_resource():
    with DagsterInstance.ephemeral() as instance:
        context = build_init_resource_context(instance=instance)
        assert (
            MyContextResource(base_path=None)
            .with_resource_context(context)
            .effective_base_path()
            == instance.storage_directory()
        )

Next steps#

Resources are a powerful way to encapsulate reusable logic in your assets and ops. For more information on the supported config types for resources, see the advanced config types documentation. For information on the Dagster config system, which you can use to parameterize assets and ops, refer to the run configuration documentation.