Resource caching
In this example, we'll explore different ways to handle caching in Dagster. Caching is especially useful for assets that rely on expensive operations ,such as API calls, database queries, or heavy computations, as it can dramatically improve performance, reduce costs, and make pipelines more efficient. In practice, it’s usually best to implement caching within resources rather than assets, since this makes the functionality easier to share and reuse.
Problem: Expensive resource
In this example, we define a simple resource ExpensiveResource
with an addition
method that includes a forced 5-second sleep. Without caching, the resource executes the method from scratch every time it’s called. Because no intermediate results are stored, repeated calls with the same inputs always re-run the computation and incur the full cost.
import time
import dagster as dg
class ExpensiveResource(dg.ConfigurableResource):
def addition(self, num1: int, num2: int) -> int:
time.sleep(5)
return num1 + num2
@dg.asset
def expensive_asset(
expensive_resource: ExpensiveResource,
) -> dg.MaterializeResult:
value = expensive_resource.addition(1, 2)
value = expensive_resource.addition(1, 2)
value = expensive_resource.addition(1, 2)
return dg.MaterializeResult(metadata={"addition": value})
@dg.asset(
deps=[expensive_asset],
)
def another_expensive_asset(
expensive_resource: ExpensiveResource,
) -> dg.MaterializeResult:
value = expensive_resource.addition(1, 2)
value = expensive_resource.addition(1, 2)
value = expensive_resource.addition(1, 2)
return dg.MaterializeResult(metadata={"addition": value})
expensive_asset | another_expensive_asset | |
---|---|---|
Execution Time | 15 seconds | 15 seconds |
Solution 1: Caching within the resource
The in-memory caching implementation uses Python’s @[functools.lru_cache](https://docs.python.org/3/library/functools.html#functools.cache)
decorator to store results of the addition
method. Once a particular set of arguments has been computed, the result is stored in memory within the resource. Subsequent calls with the same arguments return immediately from the cache instead of redoing the expensive operation.
However, because each asset initializes its own resource, cached results are not shared across asset executions.
import time
from functools import lru_cache
import dagster as dg
class ExpensiveResourceCache(dg.ConfigurableResource):
@lru_cache(maxsize=128)
def addition(self, num1: int, num2: int) -> int:
time.sleep(5)
return num1 + num2
@dg.asset
def expensive_asset_cache(
expensive_resource_cache: ExpensiveResourceCache,
) -> dg.MaterializeResult:
value = expensive_resource_cache.addition(1, 2)
value = expensive_resource_cache.addition(1, 2)
value = expensive_resource_cache.addition(1, 2)
return dg.MaterializeResult(metadata={"addition": value})
@dg.asset(
deps=[expensive_asset_cache],
)
def another_expensive_asset_cache(
expensive_resource_cache: ExpensiveResourceCache,
) -> dg.MaterializeResult:
value = expensive_resource_cache.addition(1, 2)
value = expensive_resource_cache.addition(1, 2)
value = expensive_resource_cache.addition(1, 2)
return dg.MaterializeResult(metadata={"addition": value})
expensive_asset_cache | another_expensive_asset_cache | |
---|---|---|
Execution Time | 5 seconds | 5 seconds |
Solution 2: External caching
The external caching implementation persists results to disk using a pickle file. At runtime, the resource loads the cache from the file, checks whether a result already exists for the given arguments, and saves new results after computation. This approach ensures that the cache survives process restarts and can be reused across assets, even if each initializes its own resource.
import pickle
import time
import dagster as dg
class ExpensiveResourcePickle(dg.ConfigurableResource):
@property
def cache_file(self):
return "dagster_resource_cache.pkl"
def _load_cache(self) -> dict:
try:
if self.cache_file.exists():
with open(self.cache_file, "rb") as f:
return pickle.load(f)
else:
return {}
except Exception:
return {}
def _save_cache(self, cache: dict):
try:
with open(self.cache_file, "wb") as f:
pickle.dump(cache, f)
except Exception:
pass
def addition(self, num1: int, num2: int) -> int:
cache_key = (num1, num2)
cache = self._load_cache()
if cache_key in cache:
return cache[cache_key]
time.sleep(5)
result = num1 + num2
cache[cache_key] = result
self._save_cache(cache)
return result
@dg.asset
def expensive_asset_pickle(
expensive_resource_pickle: ExpensiveResourcePickle,
) -> dg.MaterializeResult:
value = expensive_resource_pickle.addition(1, 2)
value = expensive_resource_pickle.addition(1, 2)
value = expensive_resource_pickle.addition(1, 2)
return dg.MaterializeResult(metadata={"addition": value})
@dg.asset(
deps=[expensive_asset_pickle],
)
def another_expensive_asset_pickle(
expensive_resource_pickle: ExpensiveResourcePickle,
) -> dg.MaterializeResult:
value = expensive_resource_pickle.addition(1, 2)
value = expensive_resource_pickle.addition(1, 2)
value = expensive_resource_pickle.addition(1, 2)
return dg.MaterializeResult(metadata={"addition": value})
expensive_asset_pickle | another_expensive_asset_pickle | |
---|---|---|
Execution Time | 5 seconds (on first run) | < 1 second |
Using a pickle file assumes that all assets execute on the same node. Depending on how you execute Dagster you might want to use an external caching layer such as AWS Dynamo or Redis that your assets have access to.