Ask AI

Source code for dagster._core.definitions.version_strategy

import hashlib
import inspect
from abc import ABC, abstractmethod
from typing import TYPE_CHECKING, Any, NamedTuple, Optional

from dagster._annotations import public

if TYPE_CHECKING:
    from .op_definition import OpDefinition
    from .resource_definition import ResourceDefinition


[docs]class OpVersionContext(NamedTuple): """Provides execution-time information for computing the version for an op. Attributes: op_def (OpDefinition): The definition of the op to compute a version for. op_config (Any): The parsed config to be passed to the op during execution. """ op_def: "OpDefinition" op_config: Any
[docs]class ResourceVersionContext(NamedTuple): """Provides execution-time information for computing the version for a resource. Attributes: resource_def (ResourceDefinition): The definition of the resource whose version will be computed. resource_config (Any): The parsed config to be passed to the resource during execution. """ resource_def: "ResourceDefinition" resource_config: Any
[docs]class VersionStrategy(ABC): """Abstract class for defining a strategy to version ops and resources. When subclassing, `get_op_version` must be implemented, and `get_resource_version` can be optionally implemented. `get_op_version` should ingest an OpVersionContext, and `get_resource_version` should ingest a ResourceVersionContext. From that, each synthesize a unique string called a `version`, which will be tagged to outputs of that op in the job. Providing a `VersionStrategy` instance to a job will enable memoization on that job, such that only steps whose outputs do not have an up-to-date version will run. """
[docs] @public @abstractmethod def get_op_version(self, context: OpVersionContext) -> str: """Computes a version for an op. Args: context (OpVersionContext): The context for computing the version. Returns: str: The version for the op. """ raise NotImplementedError()
[docs] @public def get_resource_version(self, context: ResourceVersionContext) -> Optional[str]: """Computes a version for a resource. Args: context (ResourceVersionContext): The context for computing the version. Returns: Optional[str]: The version for the resource. If None, the resource will not be memoized. """ return None
[docs]class SourceHashVersionStrategy(VersionStrategy): """VersionStrategy that checks for changes to the source code of ops and resources. Only checks for changes within the immediate body of the op/resource's decorated function (or compute function, if the op/resource was constructed directly from a definition). """ def _get_source_hash(self, fn): code_as_str = inspect.getsource(fn) return hashlib.sha1(code_as_str.encode("utf-8")).hexdigest()
[docs] @public def get_op_version(self, context: OpVersionContext) -> str: """Computes a version for an op by hashing its source code. Args: context (OpVersionContext): The context for computing the version. Returns: str: The version for the op. """ compute_fn = context.op_def.compute_fn if callable(compute_fn): return self._get_source_hash(compute_fn) else: return self._get_source_hash(compute_fn.decorated_fn)
[docs] @public def get_resource_version(self, context: ResourceVersionContext) -> Optional[str]: """Computes a version for a resource by hashing its source code. Args: context (ResourceVersionContext): The context for computing the version. Returns: Optional[str]: The version for the resource. If None, the resource will not be memoized. """ return self._get_source_hash(context.resource_def.resource_fn)