Resources#

Resources provide a way to manage dependencies to external components and share implementations across multiple ops in a job.

Relevant APIs#

NameDescription
@resourceThe decorator used to define resources. The decorated function is called a resource_fn. The decorator returns a ResourceDefinition.
ResourceDefinitionClass for resource definitions. You almost never want to use initialize this class directly. Instead, you should use the @resource which returns a ResourceDefinition.
InitResourceContextThe context object provided to a resource during initialization. This object contains required resource, config, and other run information.
build_init_resource_contextFunction for building an InitResourceContext outside of execution, intended to be used when testing a resource.

Overview#

You can use resources to provide access to features of the execution environment to ops. You can bind a set of resources (and other environment information) to a job so that those resources can be available to the ops within that job. You can construct different jobs for the same graph, each with different resources, to represent the execution environments that your graph will be run within.

Why Use Resources#

Representing external dependencies as resources, in conjunction with jobs, has very convenient properties:

  • Pluggable: You can map a resource to a key in one job, and then map a different resource to that same key in a different job. This is useful if there is a heavy external dependency that you want to use in production, but avoid using it in testing. You can simply provide different resource sets for each execution case: one for production with the heavy dependency (e.g., AWS) as a resource, and one for testing with something lighter (i.e., in-memory store) mapped to the same key. For more information about this capability, check out Separating Business Logic from Environments.
  • Job Scoped: Since resources are job scoped, if you provide a resource to a job, then it becomes available for use with every op in that job.
  • Configurable: Resources can be configured, using a strongly typed configuration system.
  • Dependencies: Resources can depend on other resources. This makes it possible to cleanly represent external environment objects that rely on other external environment information for initialization.

Defining a Resource#

To define a resource, use the @resource decorator. Wrap a function that takes an init_context as the first parameter, which is an instance of InitResourceContext. From this function, return or yield the object that you would like to be available as a resource.

from dagster import resource


class ExternalCerealFetcher:
    def fetch_new_cereals(self, start_ts, end_ts):
        pass


@resource
def cereal_fetcher(init_context):
    return ExternalCerealFetcher()

Accessing Resources in Ops#

Ops use resource keys to access resources, like so:

from dagster import op

CREATE_TABLE_1_QUERY = "create table_1 as select * from table_0"


@op(required_resource_keys={"database"})
def op_requires_resources(context):
    context.resources.database.execute_query(CREATE_TABLE_1_QUERY)

Providing Resources to a Job#

Jobs provide resources to the ops inside them. A job has a dictionary that maps resource keys to resource definitions. You can supply this dictionary to the resource_defs argument when using either of the ways to construct a job: GraphDefinition.to_job or @job.

Supplying resources when using GraphDefinition.to_job is especially common, because you can build multiple jobs from the same graph that are distinguished by their different resources.

from dagster import graph


@graph
def do_database_stuff():
    op_requires_resources()


do_database_stuff_prod = do_database_stuff.to_job(resource_defs={"database": database_resource_a})
do_database_stuff_dev = do_database_stuff.to_job(resource_defs={"database": database_resource_b})

Supplying resources to the @job, i.e. when there aren't multiple jobs for the same graph, is also useful. For example, if you want to use an off-the-shelf resource or supply configuration in one place instead of in every solid.

from dagster import job


@job(resource_defs={"database": database_resource})
def do_database_stuff_job():
    op_requires_resources()

Resource Configuration#

ResourceDefinitions can have a config schema, which allows you to customize behavior at runtime through run configuration.

For example, let's say we wanted to pass a connection string to our DatabaseConnection resource.

class DatabaseConnection:
    def __init__(self, connection: str):
        self.connection = connection


@resource(config_schema={"connection": str})
def db_resource(init_context):
    connection = init_context.resource_config["connection"]
    return DatabaseConnection(connection)

Context Manager Resources#

Dagster resources can serve as context managers, for scenarios where it is necessary to perform some sort of cleanup of the resource after execution. Let’s take the example of a database connection. We might want to clean up the connection once we are done using it. We can incorporate this into our resource like so:

@resource
@contextmanager
def db_connection():
    try:
        db_conn = get_db_connection()
        yield db_conn
    finally:
        cleanup_db_connection(db_conn)

At spinup time, Dagster will run the code within the try block, and be expecting a single yield. Having more than one yield will cause an error. The yielded object will be available to code that requires the resource:

@op(required_resource_keys={"db_connection"})
def use_db_connection(context):
    db_conn = context.resources.db_connection
    ...

Once execution finishes, the finally block of the resource init function will run. In the case of our db_connection resource, this will run the cleanup function.

An important nuance is that resources are initialized (and torn down) once per process. This means that if using the in-process executor, which runs all steps in a single process, resources will be initialized at the beginning of execution, and torn down after every single step is finished executing. In contrast, when using the multiprocess executor (or other out-of-process executors), where there is a single process for each step, at the beginning of each step execution, the resource will be initialized, and at the end of that step’s execution, the finally block will be run.

Testing Resource Initialization#

You can test the initialization of a resource by invoking the resource definition. This will run the underlying decorated function.

from dagster import resource


@resource
def my_resource(_):
    return "foo"


def test_my_resource():
    assert my_resource(None) == "foo"

If your resource requires other resources or config, then you can provide a InitResourceContext object by using the build_init_resource_context function.

from dagster import build_init_resource_context, resource


@resource(required_resource_keys={"foo"}, config_schema={"bar": str})
def my_resource_requires_context(init_context):
    return init_context.resources.foo, init_context.resource_config["bar"]


def test_my_resource_with_context():
    init_context = build_init_resource_context(
        resources={"foo": "foo_str"}, config={"bar": "bar_str"}
    )
    assert my_resource_requires_context(init_context) == ("foo_str", "bar_str")

If your resource is a context manager, then you can open it as one using python's with syntax.

from contextlib import contextmanager
from dagster import resource


@resource
@contextmanager
def my_cm_resource(_):
    yield "foo"


def test_cm_resource():
    with my_cm_resource(None) as initialized_resource:
        assert initialized_resource == "foo"

Resource to Resource Dependencies#

Resources can depend upon other resources. Use the required_resource_keys parameter of the @resource decorator to specify which resources to depend upon. Access the required resources through the context object provided to the wrapped function.

from dagster import resource


@resource
def credentials():
    return ("bad_username", "easy_password")


@resource(required_resource_keys={"credentials"})
def client(init_context):
    username, password = init_context.resources.credentials
    return Client(username, password)

Now, consider an op that will use the client resource.

from dagster import graph, op


@op(required_resource_keys={"client"})
def get_client(context):
    return context.resources.client

When constructing a job that includes that op, we provide the resource client, but also credentials, because client requires it.

@job(resource_defs={"credentials": credentials, "client": client})
def connect():
    get_client()