In data engineering, resources are the external services, tools, and storage you use to do your job. For example, a simple ETL (Extract Transform Load) pipeline fetches data from an API, ingests it into a database, and updates a dashboard. External tools and services this pipeline uses could be:
The API the data is fetched from
The AWS S3 bucket where the API’s response is stored
The Snowflake/Databricks/BigQuery account the data is ingested into
Plug in different implementations in different environments - If you have a heavy external dependency that you want to use in production but avoid using in testing, you can accomplish this by providing different resources in each environment. Refer to the Separating business logic from environments section of the Testing documentation for more info about this capability.
Surface configuration in the Dagster UI - Resources and their configuration are surfaced in the UI, making it easy to see where resources are used and how they're configured.
Share configuration across multiple assets or ops - Resources are configurable and shared, so configuration can be supplied in one place instead of individually.
Share implementations across multiple assets or ops - When multiple assets access the same external services, resources provide a standard way to structure your code to share the implementations.
Class for resource definitions. You almost never want to use initialize this class directly. Instead, you should extend the ConfigurableResource class which implements ResourceDefinition.
Typically, resources are defined by subclassing ConfigurableResource. Resources typically have a set of configuration values, which are used to specify things like account identifiers, API keys, or database names when interfacing with an external tool or service. This configuration schema is specified by attributes on the class.
The configuration system has a few advantages over plain Python parameter passing. Configured values can be:
Replaced at run launch time in the Launchpad in Dagster UI, the GraphQL API, or the Python API
Displayed in the Dagster UI
Set dynamically using environment variables, resolved at runtime
The following example demonstrates defining a subclass of ConfigurableResource that represents a connection to an external service. The resource can be configured by constructing it in the Definitions call.
You can define methods on the resource class which depend on config values.
Sensors use resources in the same way as assets, which can be useful for querying external services for data.
To specify resource dependencies on a sensor, annotate the resource type as a parameter to the sensor's function. For more information and examples, refer to the Sensors documentation.
Schedules can use resources in case your schedule logic needs to interface with an external tool or to make your schedule logic more testable.
To specify resource dependencies on a schedule, annotate the resource type as a parameter to the schedule's function. Refer to the Schedule examples reference for more info.
The following example defines a subclass of ConfigurableResource that represents a connection to an external service. The resource can be configured by constructing it in the Definitions call.
You can define methods on the resource class which depend on config values.
Ops specify resource dependencies by annotating the resource as a parameter to the op function.
To provide resource values to ops, attach them to the Definitions objects. These resources are automatically passed to the function at runtime.
There are many supported config types that can be used when defining resources. Refer to the advanced config types documentation for a more comprehensive overview of the available config types.
Resources can be configured using environment variables, which is useful for secrets or other environment-specific configuration. If you're using Dagster+, environment variables can be configured directly in the UI.
To use environment variables, pass an EnvVar when constructing the resource. EnvVar inherits from str and can be used to populate any string config field on a resource. The value of the environment variable will be evaluated when a run is launched.
What about os.getenv()? When os.getenv() is used, the value of the variable is retrieved when Dagster loads the code location. Using EnvVar not only tells Dagster to retrieve the value at runtime, but also not to display the value in the UI.
Using the EnvVar approach has a few unique benefits:
Improved observability. The UI will display information about configuration values sourced from environment variables.
Secret values are hidden in the UI. Secret values are hidden in the Launchpad, Resources page, and other places where configuration is displayed.
Simplified testing. Because you can provide string values directly to configuration rather than environment variables, testing may be easier.
In some cases, you may want to specify configuration for a resource at launch time, in the Launchpad or in a RunRequest for a schedule or sensor. For example, you may want a sensor-triggered run to specify a different target table in a database resource for each run.
You can use the configure_at_launch() method to defer the construction of a configurable resource until launch time:
In some situations, you may want to define a resource that depends on other resources. This is useful for common configuration. For example, separate resources for a database and for a filestore may both depend on credentials for a particular cloud provider. Defining these credentials as a separate, nested resource allows you to specify configuration in a single place. It also makes it easier to test resources, since the nested resource can be mocked.
In this case, you can list that nested resource as an attribute of the resource class:
from dagster import Definitions, ConfigurableResource, ResourceDependency
classCredentialsResource(ConfigurableResource):
username:str
password:strclassFileStoreBucket(ConfigurableResource):
credentials: ResourceDependency[CredentialsResource]
region:strdefwrite(self, data:str):# We can access the credentials resource via `self.credentials`,# which will be an initialized instance of `CredentialsResource`
get_filestore_client(
username=self.credentials.username,
password=self.credentials.password,
region=self.region,).write(data)
defs = Definitions(
assets=[my_asset],
resources={"bucket": FileStoreBucket(
credentials=CredentialsResource(
username="my_user", password="my_password"),
region="us-east-1",),},)
If it's preferred to provide the configuration for credentials at launch time, use the configure_at_launch() method to defer the construction of the CredentialsResource until launch time.
Because credentials requires launch time configuration through the launchpad, it must also be passed to the Definitions object, so that configuration can be provided at launch time. Nested resources only need to be passed to the Definitions object if they require launch time configuration.
Once a resource reaches a certain complexity, you may want to manage the state of the resource over its lifetime. This is useful for resources that require special initialization or cleanup. ConfigurableResource is a data class meant to encapsulate config, but also provides lifecycle hooks to manage the state of the resource.
You can mark any private state attributes using Pydantic's PrivateAttr. These attributes, which must start with an underscore, won't be included in the resource's config.
When a resource is initialized during a Dagster run, the setup_for_execution method is called. This method is passed an InitResourceContext object, which contains the resource's config and other run information. The resource can use this context to initialize any state it needs for the duration of the run.
Once a resource is no longer needed, the teardown_after_execution method is called. This method is passed the same context object as setup_for_execution. This method can be useful for cleaning up any state that was initialized in setup_for_execution.
setup_for_execution and teardown_after_execution are each called once per run, per process. When using the in-process executor, this means that they will be called once per run. When using the multiprocess executor, each process's instance of the resource will be initialized and torn down.
In the following example, we set up an API token for a client resource based on the username and password provided in the config. The API token can then be used to query an API in the asset body.
from dagster import ConfigurableResource, InitResourceContext, asset
import requests
from pydantic import PrivateAttr
classMyClientResource(ConfigurableResource):
username:str
password:str
_api_token:str= PrivateAttr()defsetup_for_execution(self, context: InitResourceContext)->None:# Fetch and set up an API token based on the username and password
self._api_token = requests.get("https://my-api.com/token", auth=(self.username, self.password)).text
defget_all_users(self):return requests.get("https://my-api.com/users",
headers={"Authorization": self._api_token},)@assetdefmy_asset(client: MyClientResource):return client.get_all_users()
For more complex use cases, you can override the yield_for_execution. By default, this context manager calls setup_for_execution, yields the resource, and then calls teardown_after_execution, but you can override it to provide any custom behavior. This is useful for resources that require a context to be open for the duration of a run, such as database connections or file handles.
from dagster import ConfigurableResource, asset, InitResourceContext
from contextlib import contextmanager
from pydantic import PrivateAttr
classDBConnection:...defquery(self, body:str):...@contextmanagerdefget_database_connection(username:str, password:str):...classMyClientResource(ConfigurableResource):
username:str
password:str
_db_connection: DBConnection = PrivateAttr()@contextmanagerdefyield_for_execution(self, context: InitResourceContext):# keep connection open for the duration of the executionwith get_database_connection(self.username, self.password)as conn:# set up the connection attribute so it can be used in the execution
self._db_connection = conn
# yield, allowing execution to occuryield self
defquery(self, body:str):return self._db_connection.query(body)@assetdefmy_asset(client: MyClientResource):
client.query("SELECT * FROM my_table")
When starting to build a set of assets or jobs, you may want to use a bare Python object without configuration as a resource, such as a third-party API client.
Dagster supports passing plain Python objects as resources. This follows a similar pattern to using a ConfigurableResource subclass; however, assets and ops that use these resources must annotate them with ResourceParam. This annotation lets Dagster know that the parameter is a resource and not an upstream input.
from dagster import Definitions, asset, ResourceParam
# `ResourceParam[GitHub]` is treated exactly like `GitHub` for type checking purposes,# and the runtime type of the github parameter is `GitHub`. The purpose of the# `ResourceParam` wrapper is to let Dagster know that `github` is a resource and not an# upstream asset.@assetdefpublic_github_repos(github: ResourceParam[GitHub]):return github.organization("dagster-io").repositories()
defs = Definitions(
assets=[public_github_repos],
resources={"github": GitHub(...)},)
In the case that a resource uses the resource initialization context, you can use the build_init_resource_context utility alongside the with_init_resource_context helper on the resource class:
Resources are a powerful way to encapsulate reusable logic in your assets and ops. For more information on the supported config types for resources, see the advanced config types documentation. For information on the Dagster config system, which you can use to parameterize assets and ops, refer to the run configuration documentation.