import re
from typing import TYPE_CHECKING, Mapping, NamedTuple, Optional, Sequence, Union
import dagster._check as check
import dagster._seven as seven
from dagster._annotations import PublicAttr
from dagster._serdes import whitelist_for_serdes
ASSET_KEY_SPLIT_REGEX = re.compile("[^a-zA-Z0-9_]")
ASSET_KEY_DELIMITER = "/"
if TYPE_CHECKING:
from dagster._core.definitions.assets import AssetsDefinition
from dagster._core.definitions.source_asset import SourceAsset
def parse_asset_key_string(s: str) -> Sequence[str]:
return list(filter(lambda x: x, re.split(ASSET_KEY_SPLIT_REGEX, s)))
[docs]@whitelist_for_serdes
class AssetKey(NamedTuple("_AssetKey", [("path", PublicAttr[Sequence[str]])])):
"""Object representing the structure of an asset key. Takes in a sanitized string, list of
strings, or tuple of strings.
Example usage:
.. code-block:: python
from dagster import AssetKey
AssetKey("asset1")
AssetKey(["asset1"]) # same as the above
AssetKey(["prefix", "asset1"])
AssetKey(["prefix", "subprefix", "asset1"])
Args:
path (Union[str, Sequence[str]]): String, list of strings, or tuple of strings. A list of
strings represent the hierarchical structure of the asset_key.
"""
def __new__(cls, path: Union[str, Sequence[str]]):
if isinstance(path, str):
path = [path]
else:
path = list(check.sequence_param(path, "path", of_type=str))
return super(AssetKey, cls).__new__(cls, path=path)
def __str__(self):
return f"AssetKey({self.path})"
def __repr__(self):
return f"AssetKey({self.path})"
def __hash__(self):
return hash(tuple(self.path))
def __eq__(self, other):
if not isinstance(other, AssetKey):
return False
if len(self.path) != len(other.path):
return False
for i in range(0, len(self.path)):
if self.path[i] != other.path[i]:
return False
return True
def to_string(self) -> str:
"""E.g. '["first_component", "second_component"]'."""
return seven.json.dumps(self.path)
def to_user_string(self) -> str:
"""E.g. "first_component/second_component"."""
return ASSET_KEY_DELIMITER.join(self.path)
def to_python_identifier(self, suffix: Optional[str] = None) -> str:
"""Build a valid Python identifier based on the asset key that can be used for
operation names or I/O manager keys.
"""
path = list(self.path)
if suffix is not None:
path.append(suffix)
return "__".join(path).replace("-", "_")
@staticmethod
def from_user_string(asset_key_string: str) -> "AssetKey":
return AssetKey(asset_key_string.split(ASSET_KEY_DELIMITER))
@staticmethod
def from_db_string(asset_key_string: Optional[str]) -> Optional["AssetKey"]:
if not asset_key_string:
return None
if asset_key_string[0] == "[":
# is a json string
try:
path = seven.json.loads(asset_key_string)
except seven.JSONDecodeError:
path = parse_asset_key_string(asset_key_string)
else:
path = parse_asset_key_string(asset_key_string)
return AssetKey(path)
@staticmethod
def get_db_prefix(path: Sequence[str]):
check.sequence_param(path, "path", of_type=str)
return seven.json.dumps(path)[:-2] # strip trailing '"]' from json string
@staticmethod
def from_graphql_input(graphql_input_asset_key: Mapping[str, Sequence[str]]) -> "AssetKey":
return AssetKey(graphql_input_asset_key["path"])
def to_graphql_input(self) -> Mapping[str, Sequence[str]]:
return {"path": self.path}
@staticmethod
def from_coercible(arg: "CoercibleToAssetKey") -> "AssetKey":
if isinstance(arg, AssetKey):
return check.inst_param(arg, "arg", AssetKey)
elif isinstance(arg, str):
return AssetKey([arg])
elif isinstance(arg, list):
check.list_param(arg, "arg", of_type=str)
return AssetKey(arg)
elif isinstance(arg, tuple):
check.tuple_param(arg, "arg", of_type=str)
return AssetKey(arg)
else:
check.failed(f"Unexpected type for AssetKey: {type(arg)}")
@staticmethod
def from_coercible_or_definition(
arg: Union["CoercibleToAssetKey", "AssetsDefinition", "SourceAsset"],
) -> "AssetKey":
from dagster._core.definitions.assets import AssetsDefinition
from dagster._core.definitions.source_asset import SourceAsset
if isinstance(arg, AssetsDefinition):
return arg.key
elif isinstance(arg, SourceAsset):
return arg.key
else:
return AssetKey.from_coercible(arg)
def has_prefix(self, prefix: Sequence[str]) -> bool:
return len(self.path) >= len(prefix) and self.path[: len(prefix)] == prefix
def with_prefix(self, prefix: "CoercibleToAssetKeyPrefix") -> "AssetKey":
prefix = key_prefix_from_coercible(prefix)
return AssetKey(list(prefix) + list(self.path))
CoercibleToAssetKey = Union[AssetKey, str, Sequence[str]]
CoercibleToAssetKeyPrefix = Union[str, Sequence[str]]
def check_opt_coercible_to_asset_key_prefix_param(
prefix: Optional[CoercibleToAssetKeyPrefix], param_name: str
) -> Optional[Sequence[str]]:
try:
return key_prefix_from_coercible(prefix) if prefix is not None else None
except check.CheckError:
raise check.ParameterCheckError(
f'Param "{param_name}" is not a string or a sequence of strings'
)
def key_prefix_from_coercible(key_prefix: CoercibleToAssetKeyPrefix) -> Sequence[str]:
if isinstance(key_prefix, str):
return [key_prefix]
elif isinstance(key_prefix, list):
return key_prefix
else:
check.failed(f"Unexpected type for key_prefix: {type(key_prefix)}")