import os
from abc import ABC, abstractmethod
from datetime import datetime
from typing import Any, Callable, Generic, Mapping, NamedTuple, Optional, Sequence, Union
from typing_extensions import Self, TypeVar
import dagster._check as check
import dagster._seven as seven
from dagster._annotations import PublicAttr, experimental, public
from dagster._core.definitions.asset_key import AssetKey
from dagster._core.definitions.metadata.table import (
TableColumn as TableColumn,
TableColumnConstraints as TableColumnConstraints,
TableColumnDep as TableColumnDep,
TableColumnLineage as TableColumnLineage,
TableConstraints as TableConstraints,
TableRecord as TableRecord,
TableSchema as TableSchema,
)
from dagster._core.errors import DagsterInvalidMetadata
from dagster._serdes import whitelist_for_serdes
from dagster._serdes.serdes import PackableValue
T_Packable = TypeVar("T_Packable", bound=PackableValue, default=PackableValue, covariant=True)
from dagster._serdes import pack_value
from dagster._serdes.serdes import (
FieldSerializer,
JsonSerializableValue,
PackableValue,
UnpackContext,
WhitelistMap,
)
# ########################
# ##### METADATA VALUE
# ########################
[docs]
class MetadataValue(ABC, Generic[T_Packable]):
"""Utility class to wrap metadata values passed into Dagster events so that they can be
displayed in the Dagster UI and other tooling.
.. code-block:: python
@op
def emit_metadata(context, df):
yield AssetMaterialization(
asset_key="my_dataset",
metadata={
"my_text_label": "hello",
"dashboard_url": MetadataValue.url("http://mycoolsite.com/my_dashboard"),
"num_rows": 0,
},
)
"""
@public
@property
@abstractmethod
def value(self) -> T_Packable:
"""The wrapped value."""
raise NotImplementedError()
[docs]
@public
@staticmethod
def text(text: str) -> "TextMetadataValue":
"""Static constructor for a metadata value wrapping text as
:py:class:`TextMetadataValue`. Can be used as the value type for the `metadata`
parameter for supported events.
Example:
.. code-block:: python
@op
def emit_metadata(context, df):
yield AssetMaterialization(
asset_key="my_dataset",
metadata={
"my_text_label": MetadataValue.text("hello")
},
)
Args:
text (str): The text string for a metadata entry.
"""
return TextMetadataValue(text)
[docs]
@public
@staticmethod
def url(url: str) -> "UrlMetadataValue":
"""Static constructor for a metadata value wrapping a URL as
:py:class:`UrlMetadataValue`. Can be used as the value type for the `metadata`
parameter for supported events.
Example:
.. code-block:: python
@op
def emit_metadata(context):
yield AssetMaterialization(
asset_key="my_dashboard",
metadata={
"dashboard_url": MetadataValue.url("http://mycoolsite.com/my_dashboard"),
}
)
Args:
url (str): The URL for a metadata entry.
"""
return UrlMetadataValue(url)
[docs]
@public
@staticmethod
def path(path: Union[str, os.PathLike]) -> "PathMetadataValue":
"""Static constructor for a metadata value wrapping a path as
:py:class:`PathMetadataValue`.
Example:
.. code-block:: python
@op
def emit_metadata(context):
yield AssetMaterialization(
asset_key="my_dataset",
metadata={
"filepath": MetadataValue.path("path/to/file"),
}
)
Args:
path (str): The path for a metadata entry.
"""
return PathMetadataValue(path)
[docs]
@public
@staticmethod
def notebook(path: Union[str, os.PathLike]) -> "NotebookMetadataValue":
"""Static constructor for a metadata value wrapping a notebook path as
:py:class:`NotebookMetadataValue`.
Example:
.. code-block:: python
@op
def emit_metadata(context):
yield AssetMaterialization(
asset_key="my_dataset",
metadata={
"notebook_path": MetadataValue.notebook("path/to/notebook.ipynb"),
}
)
Args:
path (str): The path to a notebook for a metadata entry.
"""
return NotebookMetadataValue(path)
[docs]
@public
@staticmethod
def json(data: Union[Sequence[Any], Mapping[str, Any]]) -> "JsonMetadataValue":
"""Static constructor for a metadata value wrapping a json-serializable list or dict
as :py:class:`JsonMetadataValue`. Can be used as the value type for the `metadata`
parameter for supported events.
Example:
.. code-block:: python
@op
def emit_metadata(context):
yield ExpectationResult(
success=not missing_things,
label="is_present",
metadata={
"about my dataset": MetadataValue.json({"missing_columns": missing_things})
},
)
Args:
data (Union[Sequence[Any], Mapping[str, Any]]): The JSON data for a metadata entry.
"""
return JsonMetadataValue(data)
[docs]
@public
@staticmethod
def md(data: str) -> "MarkdownMetadataValue":
"""Static constructor for a metadata value wrapping markdown data as
:py:class:`MarkdownMetadataValue`. Can be used as the value type for the `metadata`
parameter for supported events.
Example:
.. code-block:: python
@op
def emit_metadata(context, md_str):
yield AssetMaterialization(
asset_key="info",
metadata={
'Details': MetadataValue.md(md_str)
},
)
Args:
md_str (str): The markdown for a metadata entry.
"""
return MarkdownMetadataValue(data)
[docs]
@public
@staticmethod
def python_artifact(python_artifact: Callable[..., Any]) -> "PythonArtifactMetadataValue":
"""Static constructor for a metadata value wrapping a python artifact as
:py:class:`PythonArtifactMetadataValue`. Can be used as the value type for the
`metadata` parameter for supported events.
Example:
.. code-block:: python
@op
def emit_metadata(context, df):
yield AssetMaterialization(
asset_key="my_dataset",
metadata={
"class": MetadataValue.python_artifact(MyClass),
"function": MetadataValue.python_artifact(my_function),
}
)
Args:
value (Callable): The python class or function for a metadata entry.
"""
check.callable_param(python_artifact, "python_artifact")
return PythonArtifactMetadataValue(python_artifact.__module__, python_artifact.__name__)
[docs]
@public
@staticmethod
def float(value: float) -> "FloatMetadataValue":
"""Static constructor for a metadata value wrapping a float as
:py:class:`FloatMetadataValue`. Can be used as the value type for the `metadata`
parameter for supported events.
Example:
.. code-block:: python
@op
def emit_metadata(context, df):
yield AssetMaterialization(
asset_key="my_dataset",
metadata={
"size (bytes)": MetadataValue.float(calculate_bytes(df)),
}
)
Args:
value (float): The float value for a metadata entry.
"""
return FloatMetadataValue(value)
[docs]
@public
@staticmethod
def int(value: int) -> "IntMetadataValue":
"""Static constructor for a metadata value wrapping an int as
:py:class:`IntMetadataValue`. Can be used as the value type for the `metadata`
parameter for supported events.
Example:
.. code-block:: python
@op
def emit_metadata(context, df):
yield AssetMaterialization(
asset_key="my_dataset",
metadata={
"number of rows": MetadataValue.int(len(df)),
},
)
Args:
value (int): The int value for a metadata entry.
"""
return IntMetadataValue(value)
[docs]
@public
@staticmethod
def bool(value: bool) -> "BoolMetadataValue":
"""Static constructor for a metadata value wrapping a bool as
:py:class:`BoolMetadataValuye`. Can be used as the value type for the `metadata`
parameter for supported events.
Example:
.. code-block:: python
@op
def emit_metadata(context, df):
yield AssetMaterialization(
asset_key="my_dataset",
metadata={
"num rows > 1000": MetadataValue.bool(len(df) > 1000),
},
)
Args:
value (bool): The bool value for a metadata entry.
"""
return BoolMetadataValue(value)
[docs]
@public
@staticmethod
def timestamp(value: Union["float", datetime]) -> "TimestampMetadataValue":
"""Static constructor for a metadata value wrapping a UNIX timestamp as a
:py:class:`TimestampMetadataValue`. Can be used as the value type for the `metadata`
parameter for supported events.
Args:
value (Union[float, datetime]): The unix timestamp value for a metadata entry. If a
datetime is provided, the timestamp will be extracted. datetimes without timezones
are not accepted, because their timestamps can be ambiguous.
"""
if isinstance(value, float):
return TimestampMetadataValue(value)
elif isinstance(value, datetime):
if value.tzinfo is None:
check.failed(
"Datetime values provided to MetadataValue.timestamp must have timezones, "
f"but {value.isoformat()} does not"
)
return TimestampMetadataValue(value.timestamp())
else:
check.failed(f"Expected either a float or a datetime, but received a {type(value)}")
[docs]
@public
@staticmethod
def dagster_run(run_id: str) -> "DagsterRunMetadataValue":
"""Static constructor for a metadata value wrapping a reference to a Dagster run.
Args:
run_id (str): The ID of the run.
"""
return DagsterRunMetadataValue(run_id)
[docs]
@public
@staticmethod
def asset(asset_key: AssetKey) -> "DagsterAssetMetadataValue":
"""Static constructor for a metadata value referencing a Dagster asset, by key.
For example:
.. code-block:: python
@op
def validate_table(context, df):
yield AssetMaterialization(
asset_key=AssetKey("my_table"),
metadata={
"Related asset": MetadataValue.asset(AssetKey('my_other_table')),
},
)
Args:
asset_key (AssetKey): The asset key referencing the asset.
"""
from dagster._core.definitions.events import AssetKey
check.inst_param(asset_key, "asset_key", AssetKey)
return DagsterAssetMetadataValue(asset_key)
[docs]
@public
@staticmethod
def job(
job_name: str,
location_name: str,
*,
repository_name: Optional[str] = None,
) -> "DagsterJobMetadataValue":
"""Static constructor for a metadata value referencing a Dagster job, by name.
For example:
.. code-block:: python
@op
def emit_metadata(context, df):
yield AssetMaterialization(
asset_key="my_dataset"
metadata={
"Producing job": MetadataValue.job('my_other_job'),
},
)
Args:
job_name (str): The name of the job.
location_name (Optional[str]): The code location name for the job.
repository_name (Optional[str]): The repository name of the job, if different from the
default.
"""
return DagsterJobMetadataValue(
job_name=check.str_param(job_name, "job_name"),
location_name=check.str_param(location_name, "location_name"),
repository_name=check.opt_str_param(repository_name, "repository_name"),
)
[docs]
@public
@staticmethod
@experimental
def table(
records: Sequence[TableRecord], schema: Optional[TableSchema] = None
) -> "TableMetadataValue":
"""Static constructor for a metadata value wrapping arbitrary tabular data as
:py:class:`TableMetadataValue`. Can be used as the value type for the `metadata`
parameter for supported events.
Example:
.. code-block:: python
@op
def emit_metadata(context):
yield ExpectationResult(
success=not has_errors,
label="is_valid",
metadata={
"errors": MetadataValue.table(
records=[
TableRecord(code="invalid-data-type", row=2, col="name"),
],
schema=TableSchema(
columns=[
TableColumn(name="code", type="string"),
TableColumn(name="row", type="int"),
TableColumn(name="col", type="string"),
]
)
),
},
)
"""
return TableMetadataValue(records, schema)
[docs]
@public
@staticmethod
def table_schema(
schema: TableSchema,
) -> "TableSchemaMetadataValue":
"""Static constructor for a metadata value wrapping a table schema as
:py:class:`TableSchemaMetadataValue`. Can be used as the value type
for the `metadata` parameter for supported events.
Example:
.. code-block:: python
schema = TableSchema(
columns = [
TableColumn(name="id", type="int"),
TableColumn(name="status", type="bool"),
]
)
DagsterType(
type_check_fn=some_validation_fn,
name='MyTable',
metadata={
'my_table_schema': MetadataValue.table_schema(schema),
}
)
Args:
schema (TableSchema): The table schema for a metadata entry.
"""
return TableSchemaMetadataValue(schema)
[docs]
@public
@staticmethod
def column_lineage(
lineage: TableColumnLineage,
) -> "TableColumnLineageMetadataValue":
"""Static constructor for a metadata value wrapping a column lineage as
:py:class:`TableColumnLineageMetadataValue`. Can be used as the value type
for the `metadata` parameter for supported events.
Args:
lineage (TableColumnLineage): The column lineage for a metadata entry.
"""
return TableColumnLineageMetadataValue(lineage)
[docs]
@public
@staticmethod
def null() -> "NullMetadataValue":
"""Static constructor for a metadata value representing null. Can be used as the value type
for the `metadata` parameter for supported events.
"""
return NullMetadataValue()
# not public because rest of code location metadata API is not public
@staticmethod
def code_location_reconstruction(data: str) -> "CodeLocationReconstructionMetadataValue":
"""Static constructor for a metadata value wrapping arbitrary code location data useful during reconstruction as
:py:class:`CodeLocationReconstructionMetadataValue`. Can be used as the value type for the `metadata`
parameter for supported events.
Args:
data (str): The serialized code location state for a metadata entry.
"""
return CodeLocationReconstructionMetadataValue(data)
# ########################
# ##### METADATA VALUE TYPES
# ########################
# NOTE: We have `type: ignore` in a few places below because mypy complains about an instance method
# (e.g. `text`) overriding a static method on the superclass of the same name. This is not a concern
# for us because these static methods should never be called on instances.
# NOTE: `XMetadataValue` classes are serialized with a storage name of `XMetadataEntryData` to
# maintain backward compatibility. See docstring of `whitelist_for_serdes` for more info.
[docs]
@whitelist_for_serdes(storage_name="TextMetadataEntryData")
class TextMetadataValue(
NamedTuple(
"_TextMetadataValue",
[
("text", PublicAttr[Optional[str]]),
],
),
MetadataValue[str],
):
"""Container class for text metadata entry data.
Args:
text (Optional[str]): The text data.
"""
def __new__(cls, text: Optional[str]):
return super(TextMetadataValue, cls).__new__(
cls, check.opt_str_param(text, "text", default="")
)
@public
@property
def value(self) -> Optional[str]:
"""Optional[str]: The wrapped text data."""
return self.text
[docs]
@whitelist_for_serdes(storage_name="UrlMetadataEntryData")
class UrlMetadataValue(
NamedTuple(
"_UrlMetadataValue",
[
("url", PublicAttr[Optional[str]]),
],
),
MetadataValue[str],
):
"""Container class for URL metadata entry data.
Args:
url (Optional[str]): The URL as a string.
"""
def __new__(cls, url: Optional[str]):
return super(UrlMetadataValue, cls).__new__(
cls, check.opt_str_param(url, "url", default="")
)
@public
@property
def value(self) -> Optional[str]:
"""Optional[str]: The wrapped URL."""
return self.url
[docs]
@whitelist_for_serdes(storage_name="PathMetadataEntryData")
class PathMetadataValue(
NamedTuple("_PathMetadataValue", [("path", PublicAttr[Optional[str]])]), MetadataValue[str]
):
"""Container class for path metadata entry data.
Args:
path (Optional[str]): The path as a string or conforming to os.PathLike.
"""
def __new__(cls, path: Optional[Union[str, os.PathLike]]):
return super(PathMetadataValue, cls).__new__(
cls, check.opt_path_param(path, "path", default="")
)
@public
@property
def value(self) -> Optional[str]:
"""Optional[str]: The wrapped path."""
return self.path
[docs]
@whitelist_for_serdes(storage_name="NotebookMetadataEntryData")
class NotebookMetadataValue(
NamedTuple("_NotebookMetadataValue", [("path", PublicAttr[Optional[str]])]), MetadataValue[str]
):
"""Container class for notebook metadata entry data.
Args:
path (Optional[str]): The path to the notebook as a string or conforming to os.PathLike.
"""
def __new__(cls, path: Optional[Union[str, os.PathLike]]):
return super(NotebookMetadataValue, cls).__new__(
cls, check.opt_path_param(path, "path", default="")
)
@public
@property
def value(self) -> Optional[str]:
"""Optional[str]: The wrapped path to the notebook as a string."""
return self.path
class JsonDataFieldSerializer(FieldSerializer):
def pack(
self,
mapping: JsonSerializableValue,
whitelist_map: WhitelistMap,
descent_path: str,
) -> JsonSerializableValue:
# return the json serializable data field as is
return mapping
def unpack(
self,
unpacked_value: JsonSerializableValue,
whitelist_map: WhitelistMap,
context: UnpackContext,
) -> PackableValue:
# erase any serdes objects that were stored here in earlier versions and
# return plain json serializable data
return pack_value(unpacked_value, whitelist_map=whitelist_map)
[docs]
@whitelist_for_serdes(
storage_name="JsonMetadataEntryData",
field_serializers={"data": JsonDataFieldSerializer},
)
class JsonMetadataValue(
NamedTuple(
"_JsonMetadataValue",
[
("data", PublicAttr[Optional[Union[Sequence[Any], Mapping[str, Any]]]]),
],
),
MetadataValue[Union[Sequence[Any], Mapping[str, Any]]],
):
"""Container class for JSON metadata entry data.
Args:
data (Union[Sequence[Any], Dict[str, Any]]): The JSON data.
"""
def __new__(cls, data: Optional[Union[Sequence[Any], Mapping[str, Any]]]):
data = check.opt_inst_param(data, "data", (Sequence, Mapping))
try:
# check that the value is JSON serializable
seven.dumps(data)
except TypeError:
raise DagsterInvalidMetadata("Value is not JSON serializable.")
return super(JsonMetadataValue, cls).__new__(cls, data)
@public
@property
def value(self) -> Optional[Union[Sequence[Any], Mapping[str, Any]]]:
"""Optional[Union[Sequence[Any], Dict[str, Any]]]: The wrapped JSON data."""
return self.data
[docs]
@whitelist_for_serdes(storage_name="MarkdownMetadataEntryData")
class MarkdownMetadataValue(
NamedTuple(
"_MarkdownMetadataValue",
[
("md_str", PublicAttr[Optional[str]]),
],
),
MetadataValue[str],
):
"""Container class for markdown metadata entry data.
Args:
md_str (Optional[str]): The markdown as a string.
"""
def __new__(cls, md_str: Optional[str]):
return super(MarkdownMetadataValue, cls).__new__(
cls, check.opt_str_param(md_str, "md_str", default="")
)
@public
@property
def value(self) -> Optional[str]:
"""Optional[str]: The wrapped markdown as a string."""
return self.md_str
# This should be deprecated or fixed so that `value` does not return itself.
[docs]
@whitelist_for_serdes(storage_name="PythonArtifactMetadataEntryData")
class PythonArtifactMetadataValue(
NamedTuple(
"_PythonArtifactMetadataValue",
[
("module", PublicAttr[str]),
("name", PublicAttr[str]),
],
),
MetadataValue["PythonArtifactMetadataValue"],
):
"""Container class for python artifact metadata entry data.
Args:
module (str): The module where the python artifact can be found
name (str): The name of the python artifact
"""
def __new__(cls, module: str, name: str):
return super(PythonArtifactMetadataValue, cls).__new__(
cls, check.str_param(module, "module"), check.str_param(name, "name")
)
@public
@property
def value(self) -> Self:
"""PythonArtifactMetadataValue: Identity function."""
return self
[docs]
@whitelist_for_serdes(storage_name="FloatMetadataEntryData")
class FloatMetadataValue(
NamedTuple(
"_FloatMetadataValue",
[
("value", PublicAttr[Optional[float]]),
],
),
MetadataValue[float],
):
"""Container class for float metadata entry data.
Args:
value (Optional[float]): The float value.
"""
def __new__(cls, value: Optional[float]):
return super(FloatMetadataValue, cls).__new__(cls, check.opt_float_param(value, "value"))
[docs]
@whitelist_for_serdes(storage_name="IntMetadataEntryData")
class IntMetadataValue(
NamedTuple(
"_IntMetadataValue",
[
("value", PublicAttr[Optional[int]]),
],
),
MetadataValue[int],
):
"""Container class for int metadata entry data.
Args:
value (Optional[int]): The int value.
"""
def __new__(cls, value: Optional[int]):
return super(IntMetadataValue, cls).__new__(cls, check.opt_int_param(value, "value"))
@whitelist_for_serdes(storage_name="BoolMetadataEntryData")
class BoolMetadataValue(
NamedTuple("_BoolMetadataValue", [("value", PublicAttr[Optional[bool]])]),
MetadataValue[bool],
):
"""Container class for bool metadata entry data.
Args:
value (Optional[bool]): The bool value.
"""
def __new__(cls, value: Optional[bool]):
return super(BoolMetadataValue, cls).__new__(cls, check.opt_bool_param(value, "value"))
[docs]
@whitelist_for_serdes
class TimestampMetadataValue(
NamedTuple(
"_DateTimeMetadataValue",
[("value", PublicAttr[float])],
),
MetadataValue[float],
):
"""Container class for metadata value that's a unix timestamp.
Args:
value (float): Seconds since the unix epoch.
"""
def __new__(cls, value: float):
return super(TimestampMetadataValue, cls).__new__(cls, check.float_param(value, "value"))
[docs]
@whitelist_for_serdes(storage_name="DagsterPipelineRunMetadataEntryData")
class DagsterRunMetadataValue(
NamedTuple(
"_DagsterRunMetadataValue",
[
("run_id", PublicAttr[str]),
],
),
MetadataValue[str],
):
"""Representation of a dagster run.
Args:
run_id (str): The run id
"""
def __new__(cls, run_id: str):
return super(DagsterRunMetadataValue, cls).__new__(cls, check.str_param(run_id, "run_id"))
@public
@property
def value(self) -> str:
"""str: The wrapped run id."""
return self.run_id
@whitelist_for_serdes
class DagsterJobMetadataValue(
NamedTuple(
"_DagsterJobMetadataValue",
[
("job_name", PublicAttr[str]),
("location_name", PublicAttr[str]),
("repository_name", PublicAttr[Optional[str]]),
],
),
MetadataValue["DagsterJobMetadataValue"],
):
"""Representation of a dagster run.
Args:
job_name (str): The job's name
location_name (str): The job's code location name
repository_name (Optional[str]): The job's repository name. If not provided, the job is
assumed to be in the same repository as this object.
"""
def __new__(
cls,
job_name: str,
location_name: str,
repository_name: Optional[str] = None,
):
return super(DagsterJobMetadataValue, cls).__new__(
cls,
check.str_param(job_name, "job_name"),
check.str_param(location_name, "location_name"),
check.opt_str_param(repository_name, "repository_name"),
)
@public
@property
def value(self) -> Self:
return self
[docs]
@whitelist_for_serdes(storage_name="DagsterAssetMetadataEntryData")
class DagsterAssetMetadataValue(
NamedTuple("_DagsterAssetMetadataValue", [("asset_key", PublicAttr[AssetKey])]),
MetadataValue[AssetKey],
):
"""Representation of a dagster asset.
Args:
asset_key (AssetKey): The dagster asset key
"""
def __new__(cls, asset_key: AssetKey):
from dagster._core.definitions.events import AssetKey
return super(DagsterAssetMetadataValue, cls).__new__(
cls, check.inst_param(asset_key, "asset_key", AssetKey)
)
@public
@property
def value(self) -> AssetKey:
"""AssetKey: The wrapped :py:class:`AssetKey`."""
return self.asset_key
# This should be deprecated or fixed so that `value` does not return itself.
[docs]
@experimental
@whitelist_for_serdes(storage_name="TableMetadataEntryData")
class TableMetadataValue(
NamedTuple(
"_TableMetadataValue",
[
("records", PublicAttr[Sequence[TableRecord]]),
("schema", PublicAttr[TableSchema]),
],
),
MetadataValue["TableMetadataValue"],
):
"""Container class for table metadata entry data.
Args:
records (TableRecord): The data as a list of records (i.e. rows).
schema (Optional[TableSchema]): A schema for the table.
Example:
.. code-block:: python
from dagster import TableMetadataValue, TableRecord
TableMetadataValue(
schema=None,
records=[
TableRecord({"column1": 5, "column2": "x"}),
TableRecord({"column1": 7, "column2": "y"}),
]
)
"""
[docs]
@public
@staticmethod
def infer_column_type(value: object) -> str:
"""str: Infer the :py:class:`TableSchema` column type that will be used for a value."""
if isinstance(value, bool):
return "bool"
elif isinstance(value, int):
return "int"
elif isinstance(value, float):
return "float"
else:
return "string"
def __new__(cls, records: Sequence[TableRecord], schema: Optional[TableSchema]):
check.sequence_param(records, "records", of_type=TableRecord)
check.opt_inst_param(schema, "schema", TableSchema)
if len(records) == 0:
schema = check.not_none(schema, "schema must be provided if records is empty")
else:
columns = set(records[0].data.keys())
for record in records[1:]:
check.invariant(
set(record.data.keys()) == columns, "All records must have the same fields"
)
schema = schema or TableSchema(
columns=[
TableColumn(name=k, type=TableMetadataValue.infer_column_type(v))
for k, v in records[0].data.items()
]
)
return super(TableMetadataValue, cls).__new__(
cls,
records,
schema,
)
@public
@property
def value(self) -> Self:
"""TableMetadataValue: Identity function."""
return self
[docs]
@whitelist_for_serdes(storage_name="TableSchemaMetadataEntryData")
class TableSchemaMetadataValue(
NamedTuple("_TableSchemaMetadataValue", [("schema", PublicAttr[TableSchema])]),
MetadataValue[TableSchema],
):
"""Representation of a schema for arbitrary tabular data.
Args:
schema (TableSchema): The dictionary containing the schema representation.
"""
def __new__(cls, schema: TableSchema):
return super(TableSchemaMetadataValue, cls).__new__(
cls, check.inst_param(schema, "schema", TableSchema)
)
@public
@property
def value(self) -> TableSchema:
"""TableSchema: The wrapped :py:class:`TableSchema`."""
return self.schema
[docs]
@whitelist_for_serdes
class TableColumnLineageMetadataValue(
NamedTuple(
"_TableColumnLineageMetadataValue", [("column_lineage", PublicAttr[TableColumnLineage])]
),
MetadataValue[TableColumnLineage],
):
"""Representation of the lineage of column inputs to column outputs of arbitrary tabular data.
Args:
column_lineage (TableColumnLineage): The lineage of column inputs to column outputs
for the table.
"""
def __new__(cls, column_lineage: TableColumnLineage):
return super(TableColumnLineageMetadataValue, cls).__new__(
cls, check.inst_param(column_lineage, "column_lineage", TableColumnLineage)
)
@public
@property
def value(self) -> TableColumnLineage:
"""TableSpec: The wrapped :py:class:`TableSpec`."""
return self.column_lineage
@whitelist_for_serdes(storage_name="NullMetadataEntryData")
class NullMetadataValue(NamedTuple("_NullMetadataValue", []), MetadataValue[None]):
"""Representation of null."""
@public
@property
def value(self) -> None:
"""None: The wrapped null value."""
return None
@whitelist_for_serdes
class CodeLocationReconstructionMetadataValue(
NamedTuple("_CodeLocationReconstructionMetadataValue", [("data", PublicAttr[str])]),
MetadataValue[str],
):
"""Representation of some state data used to define the Definitions in a code location. Users
are expected to serialize data before passing it to this class.
Args:
data (str): A string representing data used to define the Definitions in a
code location.
"""
def __new__(cls, data: str):
return super(CodeLocationReconstructionMetadataValue, cls).__new__(
cls, check.str_param(data, "data")
)
@public
@property
def value(self) -> str:
"""str: The wrapped code location state data."""
return self.data