Ask AI

Migrating to Pythonic resources and config#

With Dagster 1.3 we added a new set of APIs to our resources and config systems based on Pydantic, rather than our own bespoke configuration schema API. This added more type safety and made the Python APIs more idiomatic. This guide provides information on how to migrate from our legacy resource and config system to this new system, even in large existing Dagster codebases.


Principles#

A critical part of any migration process is the ability to break the migration into a series of small, low-risk changes. This reduces coordination costs and increases the parallelization of work. Without an incremental process, engineers are stuck in a world where there is a one-time, high-risk migration that requires synchronized coordination with all the other stakeholders in the code base.

We designed this new API with this in mind, allowing for old code and new code to co-exist within a single Dagster code location. When needed for backwards compatibility, config schemas defined in Pydantic can easily be converted to the old configuration schema format. And your assets and ops can access Pythonic config and resources through the old APIs attached to the context, allowing you to update assets and ops separately


Using new config schema#

Let's take some existing code we have, using the old configuration system:

from dagster import AssetExecutionContext, Definitions, asset

@asset(config_schema={"conn_string": str, "port": int})
def an_asset(context: AssetExecutionContext, upstream_asset):
    assert context.op_execution_context.op_config["conn_string"]
    assert context.op_execution_context.op_config["port"]

defs = Definitions(assets=[an_asset, upstream_asset])

job_def = defs.get_implicit_global_asset_job_def()

result = job_def.execute_in_process(
    run_config={"ops": {"an_asset": {"config": {"conn_string": "foo", "port": 1}}}}
)

To port this code to use the new config schema system, let's start by writing a new class that inherits from Config. Instead of passing a dictionary of field names and types to config_schema, we can use Python typing to declare the equivalent schema. Now we can program against that object directly rather than through a dictionary attached to the context.

from dagster import Config, Definitions, asset

class AnAssetConfig(Config):
    conn_string: str
    port: int

@asset
def an_asset(upstream_asset, config: AnAssetConfig):
    assert config.conn_string
    assert config.port

defs = Definitions(assets=[an_asset, upstream_asset])

job_def = defs.get_implicit_global_asset_job_def()

# code to launch/execute jobs is unchanged
result = job_def.execute_in_process(
    run_config={"ops": {"an_asset": {"config": {"conn_string": "foo", "port": 1}}}}
)

Notice that we did not have to change the run config dictionary passed to execute_in_process. The same thing goes for RunRequest instances in sensors and schedules: you do not have to update them in lockstep order to migrate to the Pythonic config API.

However, we do enable the use of strong typing to launch runs which you can opt into if you so choose. To do this, you can use the RunConfig object.

result = job_def.execute_in_process(
    run_config=RunConfig(ops={"an_asset": AnAssetConfig(conn_string="foo", port=1)})
)

Migrating resources#

For the rest of this guide, we will be working from this example:

from dagster import (
    AssetExecutionContext,
    Definitions,
    InitResourceContext,
    asset,
    resource,
)

class FancyDbResource:
    def __init__(self, conn_string: str) -> None:
        self.conn_string = conn_string

    def execute(self, query: str) -> None: ...

@resource(config_schema={"conn_string": str})
def fancy_db_resource(context: InitResourceContext) -> FancyDbResource:
    return FancyDbResource(context.resource_config["conn_string"])

@asset(required_resource_keys={"fancy_db"})
def asset_one(context: AssetExecutionContext) -> None:
    assert context.resources.fancy_db

@asset(required_resource_keys={"fancy_db"})
def asset_two(context: AssetExecutionContext) -> None:
    assert context.resources.fancy_db

defs = Definitions(
    assets=[asset_one, asset_two],
    resources={
        "fancy_db": fancy_db_resource.configured({"conn_string": "some_value"})
    },
)

Step 1: Converting to a Pythonic resource#

The first step is to convert to a Pythonic resource. We will convert FancyDbResource to a Pythonic resource by making it inherit from ConfigurableResource . Instead of config_schema={"conn_string": str}, we can declare attributes on a class using vanilla Python typing.

from dagster import ConfigurableResource

class FancyDbResource(ConfigurableResource):
    conn_string: str

    def execute(self, query: str) -> None: ...

The attributes declared on a class inheriting from ConfigurableResource serve as the new way to declare a configuration schema. Now, however, there's a problem: You're migrating an existing codebase that contains numerous callsites to the old fancy_db_resource function annotated with @resource. You have declared the config schema twice, once on @resource and once on the class. This is fine for now as the config schema is simple, but for more complicated schemas this can be a problem.

To assist with this, you can use the old resource API and our recommended pattern for constructing the old-style resource from the new one:

from dagster import InitResourceContext, resource

@resource(config_schema=FancyDbResource.to_config_schema())
def fancy_db_resource(context: InitResourceContext) -> FancyDbResource:
    return FancyDbResource.from_resource_context(context)

# old-style resource API still works, but the Pythonic resource is the source of truth
# for schema information and implementation
defs = Definitions(
    # ...
    resources={
        "fancy_db": fancy_db_resource.configured({"conn_string": "some_value"})
    },
)

In this example, we've written a Pythonic resource while passing the old-style resource into our Definitions object. This allows us to have a single source of truth for the config schema.

Step 2: Providing the Pythonic resource to Definitions#

Next, we'll change Definitions to include the Pythonic resource. Note that we don't need to migrate our asset code at the same time, as Pythonic resources are available via the asset's context:

from dagster import AssetExecutionContext, ConfigurableResource, Definitions, asset

class FancyDbResource(ConfigurableResource):
    conn_string: str

    def execute(self, query: str) -> None: ...

@asset(required_resource_keys={"fancy_db"})
def asset_one(context: AssetExecutionContext) -> None:
    # this still works because the resource is still available on the context
    assert context.resources.fancy_db

defs = Definitions(
    assets=[asset_one],
    resources={"fancy_db": FancyDbResource(conn_string="some_value")},
)

Step 3: Using the resource in assets#

Lastly, we'll update the asset to take the resource as a parameter:

from dagster import AssetExecutionContext, asset

@asset
def asset_one(context: AssetExecutionContext, fancy_db: FancyDbResource) -> None:
    assert fancy_db

Migrating resources that use separate objects for business logic#

A common pattern is to separate the interaction with the configuration system from the object that contains the actual business logic. For example:

  • Pre-existing or third-party code that doesn't know about the configuration system - and shouldn't
  • A complex client that requires mutable state and bookkeeping

In these cases, using the old resource API, you would have written a @resource decorator that returns that object directly.

# Pre-existing code that you don't want to alter
class FancyDbClient:
    def __init__(self, conn_string: str) -> None:
        self.conn_string = conn_string

    def execute_query(self, query: str) -> None: ...

# Alternatively could have been imported from third-party library
# from fancy_db import FancyDbClient

from dagster import AssetExecutionContext, InitResourceContext, asset, resource

@resource(config_schema={"conn_string": str})
def fancy_db_resource(context: InitResourceContext) -> FancyDbClient:
    return FancyDbClient(context.resource_config["conn_string"])

@asset(required_resource_keys={"fancy_db"})
def existing_asset(context: AssetExecutionContext) -> None:
    context.resources.fancy_db.execute_query("SELECT * FROM foo")

With Pythonic-style resources you would write a class that can return that client from a method. In code that consumes that resource you would call that method to access the underlying client.

from dagster import AssetExecutionContext, ConfigurableResource, asset

class FancyDbResource(ConfigurableResource):
    conn_string: str

    def get_client(self) -> FancyDbClient:
        return FancyDbClient(self.conn_string)

@asset
def new_asset(fancy_db: FancyDbResource) -> None:
    client = fancy_db.get_client()
    client.execute_query("SELECT * FROM foo")

Resources with context managers#

In the old API, @resource functions could also be context managers to handle initialization and cleanup tasks. This context manager was called by framework code rather than user code:

from dagster import AssetExecutionContext, InitResourceContext, asset, resource

@resource(config_schema={"conn_string": str})
def fancy_db_resource(context: InitResourceContext) -> Iterator[FancyDbClient]:
    some_expensive_setup()
    try:
        # the client is yielded to the assets that require it
        yield FancyDbClient(context.resource_config["conn_string"])
    finally:
        # this is only called once the asset has finished executing
        some_expensive_teardown()

@asset(required_resource_keys={"fancy_db"})
def asset_one(context: AssetExecutionContext) -> None:
    # some_expensive_setup() has been called, but some_expensive_teardown() has not
    context.resources.fancy_db.execute_query("SELECT * FROM foo")

This could cause confusion and difficult-to-understand stack traces. With Pythonic resources, you can manage this directly in the body of the asset or op:

from contextlib import contextmanager

from dagster import ConfigurableResource, asset

class FancyDbResource(ConfigurableResource):
    conn_string: str

    @contextmanager
    def get_client(self) -> Iterator[FancyDbClient]:
        try:
            some_expensive_setup()
            yield FancyDbClient(self.conn_string)
        finally:
            some_expensive_teardown()

@asset
def asset_one(fancy_db: FancyDbResource) -> None:
    with fancy_db.get_client() as client:
        client.execute_query("SELECT * FROM foo")

Migrating code with resources with separate business objects#

While there are benefits to managing object access in a resource rather than having the @resource factory function return the object, this does present a problem with performing an at-scale migration. The old code will expect the business object on the context object while the new code will expect the enclosing resource object when it's accessed as a parameter.

@asset(required_resource_keys={"fancy_db"})
def existing_asset(context: AssetExecutionContext) -> None:
    # This code is now broken because the resource is no longer a FancyDbClient
    # but it is a FancyDbResource.
    context.resources.fancy_db.execute_query("SELECT * FROM foo")

Ultimately, we want the underlying client to reside in the context of the old code, but in the new code, have the new resource passed to the asset.

You can accomplish this by using Dagster's framework support, IAttachDifferentObjectToOpContext. Implementing this interface allows you to instruct the framework to place a different object on the context object.

This framework can be implemented while you migrate your code, so that both new and old code can co-exist:

from dagster import (
    AssetExecutionContext,
    ConfigurableResource,
    IAttachDifferentObjectToOpContext,
    asset,
)

class FancyDbResource(ConfigurableResource, IAttachDifferentObjectToOpContext):
    conn_string: str

    def get_object_to_set_on_execution_context(self) -> FancyDbClient:
        return self.get_client()

    def get_client(self) -> FancyDbClient:
        return FancyDbClient(self.conn_string)

@asset
def new_asset(fancy_db: FancyDbResource) -> None:
    client = fancy_db.get_client()
    client.execute_query("SELECT * FROM foo")

@asset(required_resource_keys={"fancy_db"})
def existing_asset(context: AssetExecutionContext) -> None:
    # This code now works because context.resources.fancy_db is now a FancyDbClient
    context.resources.fancy_db.execute_query("SELECT * FROM foo")