from typing import Dict, Mapping, Sequence, Union, cast
from pydantic import BaseModel
from dagster._annotations import experimental
from .asset_check_result import AssetCheckResult
from .asset_check_spec import AssetCheckSeverity, AssetCheckSpec
from .asset_checks import AssetChecksDefinition
from .asset_key import AssetKey, CoercibleToAssetKey
from .assets import AssetsDefinition, SourceAsset
from .decorators.asset_check_decorator import multi_asset_check
from .events import AssetMaterialization
from .metadata import TableColumn, TableMetadataSet, TableSchema
[docs]@experimental
def build_column_schema_change_checks(
*,
assets: Sequence[Union[CoercibleToAssetKey, AssetsDefinition, SourceAsset]],
severity: AssetCheckSeverity = AssetCheckSeverity.WARN,
) -> Sequence[AssetChecksDefinition]:
"""Returns asset checks that pass if the column schema of the asset's latest materialization
is the same as the column schema of the asset's previous materialization.
Args:
assets (Sequence[Union[AssetKey, str, AssetsDefinition, SourceAsset]]): The assets to create
asset checks for.
severity (AssetCheckSeverity): The severity if the check fails. Defaults to WARN.
Returns:
Sequence[AssetsChecksDefinition]
"""
asset_keys = set()
for el in assets:
if isinstance(el, AssetsDefinition):
asset_keys |= el.keys
elif isinstance(el, SourceAsset):
asset_keys.add(el.key)
else:
asset_keys.add(AssetKey.from_coercible(el))
@multi_asset_check(
specs=[
AssetCheckSpec(
"column_schema_change",
asset=asset_key,
description="Checks whether there are changes to column schema between the asset's "
" two most recent materializations",
)
for asset_key in asset_keys
],
can_subset=True,
)
def _checks(context):
instance = context.instance
for asset_check_key in context.selected_asset_check_keys:
materialization_records = instance.fetch_materializations(
limit=2, records_filter=asset_check_key.asset_key
).records
if len(materialization_records) < 2:
yield AssetCheckResult(
passed=True, description="The asset has been materialized fewer than 2 times"
)
else:
record, prev_record = materialization_records
metadata = cast(AssetMaterialization, record.asset_materialization).metadata
prev_metadata = cast(
AssetMaterialization, prev_record.asset_materialization
).metadata
column_schema = TableMetadataSet.extract(metadata).column_schema
prev_column_schema = TableMetadataSet.extract(prev_metadata).column_schema
if column_schema is None:
yield AssetCheckResult(
passed=False,
description="Latest materialization has no column schema metadata",
)
if prev_column_schema is None:
yield AssetCheckResult(
passed=False,
description="Previous materialization has no column schema metadata",
)
diff = TableSchemaDiff.from_table_schemas(prev_column_schema, column_schema)
if diff.added_columns or diff.removed_columns or diff.column_type_changes:
description = (
"Column schema changed between previous and latest materialization."
)
if diff.added_columns:
description += f"\n\nAdded columns: {', '.join(col.name for col in diff.added_columns)}"
if diff.removed_columns:
description += f"\n\nRemoved columns: {', '.join(col.name for col in diff.removed_columns)}"
if diff.column_type_changes:
description += "\n\nColumn type changes:"
for col_name, type_change in diff.column_type_changes.items():
description += (
f"\n- {col_name}: {type_change.old_type} -> {type_change.new_type}"
)
yield AssetCheckResult(passed=False, severity=severity, description=description)
else:
yield AssetCheckResult(
passed=True,
description="No changes to column schema between previous and latest materialization",
)
return [_checks]
class TypeChange(BaseModel):
old_type: str
new_type: str
class TableSchemaDiff(BaseModel):
added_columns: Sequence[TableColumn]
removed_columns: Sequence[TableColumn]
column_type_changes: Mapping[str, TypeChange]
@staticmethod
def from_table_schemas(
old_table_schema: TableSchema, new_table_schema: TableSchema
) -> "TableSchemaDiff":
old_columns_by_key = {column.name: column for column in old_table_schema.columns}
new_columns_by_key = {column.name: column for column in new_table_schema.columns}
added_columns = [
column for column in new_table_schema.columns if column.name not in old_columns_by_key
]
removed_columns = [
column for column in old_table_schema.columns if column.name not in new_columns_by_key
]
column_type_changes: Dict[str, TypeChange] = {}
for name, new_column in new_columns_by_key.items():
old_column = old_columns_by_key.get(name)
if old_column is not None and old_column.type != new_column.type:
column_type_changes[name] = TypeChange(
old_type=old_column.type, new_type=new_column.type
)
return TableSchemaDiff(
added_columns=added_columns,
removed_columns=removed_columns,
column_type_changes=column_type_changes,
)