Ask AI

Source code for dagster_pandas.data_frame

import pandas as pd
from dagster import (
    DagsterInvariantViolationError,
    DagsterType,
    Field,
    MetadataValue,
    StringSource,
    TableColumn,
    TableSchema,
    TableSchemaMetadataValue,
    TypeCheck,
    _check as check,
    dagster_type_loader,
)
from dagster._annotations import experimental
from dagster._config import Selector
from dagster._core.definitions.metadata import normalize_metadata
from dagster._utils import dict_without_keys

from dagster_pandas.constraints import (
    CONSTRAINT_METADATA_KEY,
    ColumnDTypeFnConstraint,
    ColumnDTypeInSetConstraint,
    ConstraintViolationException,
)
from dagster_pandas.validation import PandasColumn, validate_constraints

CONSTRAINT_BLACKLIST = {ColumnDTypeFnConstraint, ColumnDTypeInSetConstraint}


@dagster_type_loader(
    Selector(
        {
            "csv": {
                "path": StringSource,
                "sep": Field(StringSource, is_required=False, default_value=","),
            },
            "parquet": {"path": StringSource},
            "table": {"path": StringSource},
            "pickle": {"path": StringSource},
        },
    )
)
def dataframe_loader(_context, config):
    file_type, file_options = next(iter(config.items()))

    if file_type == "csv":
        path = file_options["path"]
        return pd.read_csv(path, **dict_without_keys(file_options, "path"))
    elif file_type == "parquet":
        return pd.read_parquet(file_options["path"])
    elif file_type == "table":
        return pd.read_csv(file_options["path"], sep="\t")
    elif file_type == "pickle":
        return pd.read_pickle(file_options["path"])
    else:
        raise DagsterInvariantViolationError(f"Unsupported file_type {file_type}")


def df_type_check(_, value):
    if not isinstance(value, pd.DataFrame):
        return TypeCheck(success=False)
    return TypeCheck(
        success=True,
        metadata={
            "row_count": str(len(value)),
            # string cast columns since they may be things like datetime
            "metadata": {"columns": list(map(str, value.columns))},
        },
    )


DataFrame = DagsterType(
    name="PandasDataFrame",
    description="""Two-dimensional size-mutable, potentially heterogeneous
    tabular data structure with labeled axes (rows and columns).
    See http://pandas.pydata.org/""",
    loader=dataframe_loader,
    type_check_fn=df_type_check,
    typing_type=pd.DataFrame,
)


def _construct_constraint_list(constraints):
    def add_bullet(constraint_list, constraint_description):
        return constraint_list + f"+ {constraint_description}\n"

    constraint_list = ""
    for constraint in constraints:
        if constraint.__class__ not in CONSTRAINT_BLACKLIST:
            constraint_list = add_bullet(constraint_list, constraint.markdown_description)
    return constraint_list


def _build_column_header(column_name, constraints):
    header = f"**{column_name}**"
    for constraint in constraints:
        if isinstance(constraint, ColumnDTypeInSetConstraint):
            dtypes_tuple = tuple(constraint.expected_dtype_set)
            return header + f": `{dtypes_tuple if len(dtypes_tuple) > 1 else dtypes_tuple[0]}`"
        elif isinstance(constraint, ColumnDTypeFnConstraint):
            return header + f": Validator `{constraint.type_fn.__name__}`"
    return header


def create_dagster_pandas_dataframe_description(description, columns):
    title = "\n".join([description, "### Columns", ""])
    buildme = title
    for column in columns:
        buildme += f"{_build_column_header(column.name, column.constraints)}\n{_construct_constraint_list(column.constraints)}\n"
    return buildme


def create_table_schema_metadata_from_dataframe(
    pandas_df: pd.DataFrame,
) -> TableSchemaMetadataValue:
    """This function takes a pandas DataFrame and returns its metadata as a Dagster TableSchema.

    Args:
        pandas_df (pandas.DataFrame): A pandas DataFrame for which to create metadata.

    Returns:
        TableSchemaMetadataValue: returns an object with the TableSchema for the DataFrame.
    """
    check.inst_param(
        pandas_df, "pandas_df", pd.DataFrame, "Input must be a pandas DataFrame object"
    )
    return MetadataValue.table_schema(
        TableSchema(
            columns=[
                TableColumn(name=str(name), type=str(dtype))
                for name, dtype in pandas_df.dtypes.items()
            ]
        )
    )


[docs]def create_dagster_pandas_dataframe_type( name, description=None, columns=None, metadata_fn=None, dataframe_constraints=None, loader=None, ): """Constructs a custom pandas dataframe dagster type. Args: name (str): Name of the dagster pandas type. description (Optional[str]): A markdown-formatted string, displayed in tooling. columns (Optional[List[PandasColumn]]): A list of :py:class:`~dagster.PandasColumn` objects which express dataframe column schemas and constraints. metadata_fn (Optional[Callable[[], Union[Dict[str, Union[str, float, int, Dict, MetadataValue]]) A callable which takes your dataframe and returns a dict with string label keys and MetadataValue values. dataframe_constraints (Optional[List[DataFrameConstraint]]): A list of objects that inherit from :py:class:`~dagster.DataFrameConstraint`. This allows you to express dataframe-level constraints. loader (Optional[DagsterTypeLoader]): An instance of a class that inherits from :py:class:`~dagster.DagsterTypeLoader`. If None, we will default to using `dataframe_loader`. """ # We allow for the plugging in of a dagster_type_loader so that users can load their custom # dataframes via configuration their own way if the default configs don't suffice. This is # purely optional. check.str_param(name, "name") metadata_fn = check.opt_callable_param(metadata_fn, "metadata_fn") description = create_dagster_pandas_dataframe_description( check.opt_str_param(description, "description", default=""), check.opt_list_param(columns, "columns", of_type=PandasColumn), ) def _dagster_type_check(_, value): if not isinstance(value, pd.DataFrame): return TypeCheck( success=False, description=( f"Must be a pandas.DataFrame. Got value of type. {type(value).__name__}" ), ) try: validate_constraints( value, pandas_columns=columns, dataframe_constraints=dataframe_constraints, ) except ConstraintViolationException as e: return TypeCheck(success=False, description=str(e)) return TypeCheck( success=True, metadata=_execute_summary_stats(name, value, metadata_fn) if metadata_fn else None, ) return DagsterType( name=name, type_check_fn=_dagster_type_check, loader=loader if loader else dataframe_loader, description=description, typing_type=pd.DataFrame, )
@experimental def create_structured_dataframe_type( name, description=None, columns_validator=None, columns_aggregate_validator=None, dataframe_validator=None, loader=None, ): """Args: name (str): the name of the new type description (Optional[str]): the description of the new type columns_validator (Optional[Union[ColumnConstraintWithMetadata, MultiColumnConstraintWithMetadata]]): what column-level row by row validation you want to have applied. Leave empty for no column-level row by row validation. columns_aggregate_validator (Optional[Union[ColumnAggregateConstraintWithMetadata, MultiAggregateConstraintWithMetadata]]): what column-level aggregate validation you want to have applied, Leave empty for no column-level aggregate validation. dataframe_validator (Optional[Union[ConstraintWithMetadata, MultiConstraintWithMetadata]]): what dataframe-wide validation you want to have applied. Leave empty for no dataframe-wide validation. loader (Optional[DagsterTypeLoader]): An instance of a class that inherits from :py:class:`~dagster.DagsterTypeLoader`. If None, we will default to using `dataframe_loader`. Returns: a DagsterType with the corresponding name and packaged validation. """ def _dagster_type_check(_, value): if not isinstance(value, pd.DataFrame): return TypeCheck( success=False, description=( f"Must be a pandas.DataFrame. Got value of type. {type(value).__name__}" ), ) individual_result_dict = {} if dataframe_validator is not None: individual_result_dict["dataframe"] = dataframe_validator.validate(value) if columns_validator is not None: individual_result_dict["columns"] = columns_validator.validate(value) if columns_aggregate_validator is not None: individual_result_dict["column-aggregates"] = columns_aggregate_validator.validate( value ) typechecks_succeeded = True metadata = {} overall_description = "Failed Constraints: {}" constraint_clauses = [] for key, result in individual_result_dict.items(): result_val = result.success if result_val: continue typechecks_succeeded = typechecks_succeeded and result_val result_dict = result.metadata[CONSTRAINT_METADATA_KEY].data metadata[f"{key}-constraint-metadata"] = MetadataValue.json(result_dict) constraint_clauses.append(f"{key} failing constraints, {result.description}") # returns aggregates, then column, then dataframe return TypeCheck( success=typechecks_succeeded, description=overall_description.format(constraint_clauses), metadata=metadata, ) description = check.opt_str_param(description, "description", default="") return DagsterType( name=name, type_check_fn=_dagster_type_check, loader=loader if loader else dataframe_loader, description=description, ) def _execute_summary_stats(type_name, value, metadata_fn): if not metadata_fn: return [] user_metadata = metadata_fn(value) try: return normalize_metadata(user_metadata) except: raise DagsterInvariantViolationError( "The return value of the user-defined summary_statistics function for pandas " f"data frame type {type_name} returned {value}. This function must return " "Dict[str, RawMetadataValue]." )