Skip to main content

Resource caching

In this example, we'll explore different ways to handle caching in Dagster. Caching is especially useful for assets that rely on expensive operations ,such as API calls, database queries, or heavy computations, as it can dramatically improve performance, reduce costs, and make pipelines more efficient. In practice, it’s usually best to implement caching within resources rather than assets, since this makes the functionality easier to share and reuse.

Problem: Expensive resource

In this example, we define a simple resource ExpensiveResource with an addition method that includes a forced 5-second sleep. Without caching, the resource executes the method from scratch every time it’s called. Because no intermediate results are stored, repeated calls with the same inputs always re-run the computation and incur the full cost.

src/project_mini/defs/resource_caching/expensive_resource.py
import time

import dagster as dg


class ExpensiveResource(dg.ConfigurableResource):
def addition(self, num1: int, num2: int) -> int:
time.sleep(5)
return num1 + num2




@dg.asset
def expensive_asset(
expensive_resource: ExpensiveResource,
) -> dg.MaterializeResult:
value = expensive_resource.addition(1, 2)
value = expensive_resource.addition(1, 2)
value = expensive_resource.addition(1, 2)
return dg.MaterializeResult(metadata={"addition": value})


@dg.asset(
deps=[expensive_asset],
)
def another_expensive_asset(
expensive_resource: ExpensiveResource,
) -> dg.MaterializeResult:
value = expensive_resource.addition(1, 2)
value = expensive_resource.addition(1, 2)
value = expensive_resource.addition(1, 2)
return dg.MaterializeResult(metadata={"addition": value})
expensive_assetanother_expensive_asset
Execution Time15 seconds15 seconds

Solution 1: Caching within the resource

The in-memory caching implementation uses Python’s @[functools.lru_cache](https://docs.python.org/3/library/functools.html#functools.cache) decorator to store results of the addition method. Once a particular set of arguments has been computed, the result is stored in memory within the resource. Subsequent calls with the same arguments return immediately from the cache instead of redoing the expensive operation.

However, because each asset initializes its own resource, cached results are not shared across asset executions.

src/project_mini/defs/assets.py
import time

from functools import lru_cache

import dagster as dg


class ExpensiveResourceCache(dg.ConfigurableResource):
@lru_cache(maxsize=128)
def addition(self, num1: int, num2: int) -> int:
time.sleep(5)
return num1 + num2




@dg.asset
def expensive_asset_cache(
expensive_resource_cache: ExpensiveResourceCache,
) -> dg.MaterializeResult:
value = expensive_resource_cache.addition(1, 2)
value = expensive_resource_cache.addition(1, 2)
value = expensive_resource_cache.addition(1, 2)
return dg.MaterializeResult(metadata={"addition": value})


@dg.asset(
deps=[expensive_asset_cache],
)
def another_expensive_asset_cache(
expensive_resource_cache: ExpensiveResourceCache,
) -> dg.MaterializeResult:
value = expensive_resource_cache.addition(1, 2)
value = expensive_resource_cache.addition(1, 2)
value = expensive_resource_cache.addition(1, 2)
return dg.MaterializeResult(metadata={"addition": value})
expensive_asset_cacheanother_expensive_asset_cache
Execution Time5 seconds5 seconds

Solution 2: External caching

The external caching implementation persists results to disk using a pickle file. At runtime, the resource loads the cache from the file, checks whether a result already exists for the given arguments, and saves new results after computation. This approach ensures that the cache survives process restarts and can be reused across assets, even if each initializes its own resource.

src/project_mini/defs/assets.py
import pickle

import time

import dagster as dg


class ExpensiveResourcePickle(dg.ConfigurableResource):
@property
def cache_file(self):
return "dagster_resource_cache.pkl"

def _load_cache(self) -> dict:
try:
if self.cache_file.exists():
with open(self.cache_file, "rb") as f:
return pickle.load(f)
else:
return {}
except Exception:
return {}

def _save_cache(self, cache: dict):
try:
with open(self.cache_file, "wb") as f:
pickle.dump(cache, f)
except Exception:
pass

def addition(self, num1: int, num2: int) -> int:
cache_key = (num1, num2)
cache = self._load_cache()

if cache_key in cache:
return cache[cache_key]

time.sleep(5)
result = num1 + num2

cache[cache_key] = result
self._save_cache(cache)

return result




@dg.asset
def expensive_asset_pickle(
expensive_resource_pickle: ExpensiveResourcePickle,
) -> dg.MaterializeResult:
value = expensive_resource_pickle.addition(1, 2)
value = expensive_resource_pickle.addition(1, 2)
value = expensive_resource_pickle.addition(1, 2)
return dg.MaterializeResult(metadata={"addition": value})


@dg.asset(
deps=[expensive_asset_pickle],
)
def another_expensive_asset_pickle(
expensive_resource_pickle: ExpensiveResourcePickle,
) -> dg.MaterializeResult:
value = expensive_resource_pickle.addition(1, 2)
value = expensive_resource_pickle.addition(1, 2)
value = expensive_resource_pickle.addition(1, 2)
return dg.MaterializeResult(metadata={"addition": value})
expensive_asset_pickleanother_expensive_asset_pickle
Execution Time5 seconds (on first run)< 1 second
note

Using a pickle file assumes that all assets execute on the same node. Depending on how you execute Dagster you might want to use an external caching layer such as AWS Dynamo or Redis that your assets have access to.