Ask AI

Resources
Legacy
#

This guide covers using the legacy Dagster resource system. For docs on the new Pythonic resource system introduced in Dagster 1.3, see the updated resources guide. To migrate your code, refer to the migrating to Pythonic resources and config guide.

Resources are objects that are shared across the implementations of multiple software-defined assets and ops and that can be plugged in after defining those ops and assets.

Resources typically model external components that assets and ops interact with. For example, a resource might be a connection to a data warehouse like Snowflake or a service like Slack.

So, why use resources?

  • Plug in different implementations in different environments - If you have a heavy external dependency that you want to use in production, but avoid using in testing, you can accomplish this by providing different resources in each environment. Check out Separating Business Logic from Environments for more info about this capability.
  • Share configuration across multiple ops or assets - Resources are configurable and shared, so you can supply configuration in one place instead of configuring the ops and assets individually.
  • Share implementations across multiple ops or assets - When multiple ops access the same external services, resources provide a standard way to structure your code to share the implementations.

Relevant APIs#

NameDescription
@resourceThe decorator used to define resources. The decorated function is called a resource_fn. The decorator returns a ResourceDefinition.
ResourceDefinitionClass for resource definitions. You almost never want to use initialize this class directly. Instead, you should use the @resource which returns a ResourceDefinition.
InitResourceContextThe context object provided to a resource during initialization. This object contains required resource, config, and other run information.
build_init_resource_contextFunction for building an InitResourceContext outside of execution, intended to be used when testing a resource.
build_resourcesFunction for initializing a set of resources outside of the context of a job's execution.
with_resourcesFunction for providing resources to software-defined assets and source assets.

Defining a resource#

To define a resource, use the @resource decorator. Wrap a function that takes an init_context as the first parameter, which is an instance of InitResourceContext. From this function, return or yield the object that you would like to be available as a resource.

from dagster import resource


class ExternalCerealFetcher:
    def fetch_new_cereals(self, start_ts, end_ts):
        pass


@resource
def cereal_fetcher(init_context):
    return ExternalCerealFetcher()

Using resources#

Using resources with software-defined assets#

Accessing resources in software-defined assets#

Software-defined assets use resource keys to access resources:

from dagster import asset, AssetExecutionContext


@asset(required_resource_keys={"foo"})
def asset_requires_resource(context: AssetExecutionContext):
    do_something_with_resource(context.resources.foo)

Providing resources to software-defined assets#

How resources are provided to assets depends on how you're organizing your code definitions in Dagster.

Resources can be provided to software-defined assets by passing them to a Definitions object. The resources provided to Definitions are automatically bound to the assets.

from dagster import Definitions


defs = Definitions(
    assets=[asset_requires_resource],
    resources={"foo": foo_resource},
)

When defining asset jobs (using define_asset_job), you don't need to provide resources to the job directly. The job will make use of the resources provided to the assets.


Using resources with ops#

Accessing resources in ops#

Like software-defined assets, ops use resource keys to access resources:

from dagster import op

CREATE_TABLE_1_QUERY = "create table_1 as select * from table_0"


@op(required_resource_keys={"database"})
def op_requires_resources(context: OpExecutionContext):
    context.resources.database.execute_query(CREATE_TABLE_1_QUERY)

Providing resources to ops#

Jobs provide resources to the ops inside them. A job has a dictionary that maps resource keys to resource definitions. You can supply this dictionary to the resource_defs argument when using either of the ways to construct a job: GraphDefinition.to_job or @job.

Supplying resources when using GraphDefinition.to_job is especially common, because you can build multiple jobs from the same graph that are distinguished by their different resources.

from dagster import graph


@graph
def do_database_stuff():
    op_requires_resources()


do_database_stuff_prod = do_database_stuff.to_job(
    resource_defs={"database": database_resource_a}
)
do_database_stuff_dev = do_database_stuff.to_job(
    resource_defs={"database": database_resource_b}
)

Supplying resources to the @job, i.e. when there aren't multiple jobs for the same graph, is also useful. For example, if you want to use an off-the-shelf resource or supply configuration in one place instead of in every op.

from dagster import job


@job(resource_defs={"database": database_resource})
def do_database_stuff_job():
    op_requires_resources()

Resource configuration#

ResourceDefinitions can have a config schema, which allows you to customize behavior at runtime through run configuration.

For example, let's say we wanted to pass a connection string to our DatabaseConnection resource.

class DatabaseConnection:
    def __init__(self, connection: str):
        self.connection = connection


@resource(config_schema={"connection": str})
def db_resource(init_context):
    connection = init_context.resource_config["connection"]
    return DatabaseConnection(connection)

Context manager resources#

Dagster resources can serve as context managers, for scenarios where it is necessary to perform some sort of cleanup of the resource after execution. Let’s take the example of a database connection. We might want to clean up the connection once we are done using it. We can incorporate this into our resource like so:

from contextlib import contextmanager


@resource
@contextmanager
def db_connection():
    try:
        db_conn = get_db_connection()
        yield db_conn
    finally:
        cleanup_db_connection(db_conn)

At spinup time, Dagster will run the code within the try block, and be expecting a single yield. Having more than one yield will cause an error. The yielded object will be available to code that requires the resource:

@op(required_resource_keys={"db_connection"})
def use_db_connection(context: OpExecutionContext):
    db_conn = context.resources.db_connection
    ...

Once execution finishes, the finally block of the resource init function will run. In the case of our db_connection resource, this will run the cleanup function.

An important nuance is that resources are initialized (and torn down) once per process. This means that if using the in-process executor, which runs all steps in a single process, resources will be initialized at the beginning of execution, and torn down after every single step is finished executing. In contrast, when using the multiprocess executor (or other out-of-process executors), where there is a single process for each step, at the beginning of each step execution, the resource will be initialized, and at the end of that step’s execution, the finally block will be run.


Testing resource initialization#

You can test the initialization of a resource by invoking the resource definition. This will run the underlying decorated function.

from dagster import resource


@resource
def my_resource(_):
    return "foo"


def test_my_resource():
    assert my_resource(None) == "foo"

If your resource requires other resources or config, then you can provide a InitResourceContext object by using the build_init_resource_context function.

from dagster import build_init_resource_context, resource


@resource(required_resource_keys={"foo"}, config_schema={"bar": str})
def my_resource_requires_context(init_context):
    return init_context.resources.foo, init_context.resource_config["bar"]


def test_my_resource_with_context():
    init_context = build_init_resource_context(
        resources={"foo": "foo_str"}, config={"bar": "bar_str"}
    )
    assert my_resource_requires_context(init_context) == ("foo_str", "bar_str")

If your resource is a context manager, then you can open it as one using python's with syntax.

from contextlib import contextmanager
from dagster import resource


@resource
@contextmanager
def my_cm_resource(_):
    yield "foo"


def test_cm_resource():
    with my_cm_resource(None) as initialized_resource:
        assert initialized_resource == "foo"

Initializing resources outside of execution#

There are scenarios where you might want to reuse the code written within your resources outside of the context of execution. Consider a case where you have a resource db_connection, and you want to use that resource outside of the context of an execution. You can use the build_resources API to initialize this resource outside of execution.

from dagster import resource, build_resources


@resource
def the_credentials(): ...


@resource(required_resource_keys={"credentials"})
def the_db_connection(init_context):
    get_the_db_connection(init_context.resources.credentials)


def uses_db_connection():
    with build_resources(
        {"db_connection": the_db_connection, "credentials": the_credentials}
    ) as resources:
        conn = resources.db_connection
        ...

Resource-to-resource dependencies#

Resources can depend upon other resources. Use the required_resource_keys parameter of the @resource decorator to specify which resources to depend upon. Access the required resources through the context object provided to the wrapped function.

from dagster import resource


@resource
def credentials():
    return ("bad_username", "easy_password")


@resource(required_resource_keys={"credentials"})
def client(init_context):
    username, password = init_context.resources.credentials
    return Client(username, password)

Now, consider an op that will use the client resource:

from dagster import graph, op


@op(required_resource_keys={"client"})
def get_client(context: OpExecutionContext):
    return context.resources.client

When constructing a job that includes that op, we provide the resource client, but also credentials, because client requires it.

@job(resource_defs={"credentials": credentials, "client": client})
def connect():
    get_client()

See it in action#

For more examples of resources, check out the following in our Hacker News example: