Ask AI

Source code for dagster_ge.factory

import datetime
from typing import Any, Dict

import great_expectations as ge
from dagster import (
    ConfigurableResource,
    ExpectationResult,
    IAttachDifferentObjectToOpContext,
    In,
    MetadataValue,
    OpExecutionContext,
    Out,
    Output,
    _check as check,
    op,
    resource,
)
from dagster._core.definitions.resource_definition import dagster_maintained_resource
from dagster_pandas import DataFrame
from great_expectations.render.renderer import ValidationResultsPageRenderer
from great_expectations.render.view import DefaultMarkdownPageView
from pydantic import Field

try:
    # ge < v0.13.0
    from great_expectations.core import convert_to_json_serializable
except ImportError:
    # ge >= v0.13.0
    from great_expectations.core.util import convert_to_json_serializable


class GEContextResource(ConfigurableResource, IAttachDifferentObjectToOpContext):
    ge_root_dir: str = Field(
        default=None,
        description="The root directory for your Great Expectations project.",
    )

    def get_data_context(self):
        if self.ge_root_dir is None:
            return ge.data_context.DataContext()
        return ge.data_context.DataContext(context_root_dir=self.ge_root_dir)

    def get_object_to_set_on_execution_context(self):
        return self.get_data_context()


@dagster_maintained_resource
@resource(config_schema=GEContextResource.to_config_schema())
def ge_data_context(context):
    return GEContextResource.from_resource_context(context).get_data_context()


[docs]def ge_validation_op_factory( name, datasource_name, suite_name, validation_operator_name=None, input_dagster_type=DataFrame, batch_kwargs=None, ): """Generates ops for interacting with GE. Args: name (str): the name of the op datasource_name (str): the name of your DataSource, see your great_expectations.yml suite_name (str): the name of your expectation suite, see your great_expectations.yml validation_operator_name (Optional[str]): what validation operator to run -- defaults to None, which generates an ephemeral validator. If you want to save data docs, use 'action_list_operator'. See https://legacy.docs.greatexpectations.io/en/0.12.1/reference/core_concepts/validation_operators_and_actions.html# input_dagster_type (DagsterType): the Dagster type used to type check the input to the op. Defaults to `dagster_pandas.DataFrame`. batch_kwargs (Optional[dict]): overrides the `batch_kwargs` parameter when calling the `ge_data_context`'s `get_batch` method. Defaults to `{"dataset": dataset}`, where `dataset` is the input to the generated op. Returns: An op that takes in a set of data and yields both an expectation with relevant metadata and an output with all the metadata (for user processing) """ check.str_param(datasource_name, "datasource_name") check.str_param(suite_name, "suite_name") check.opt_str_param(validation_operator_name, "validation_operator_name") batch_kwargs = check.opt_dict_param(batch_kwargs, "batch_kwargs") @op( name=name, ins={"dataset": In(input_dagster_type)}, out=Out( dict, description=""" This op yields an expectationResult with a structured dict of metadata from the GE suite, as well as the full result in case a user wants to process it differently. The structured dict contains both summary stats from the suite as well as expectation by expectation results/details. """, ), required_resource_keys={"ge_data_context"}, tags={"kind": "ge"}, ) def _ge_validation_fn(context: OpExecutionContext, dataset): data_context = context.resources.ge_data_context if validation_operator_name is not None: validation_operator = validation_operator_name else: data_context.add_validation_operator( "ephemeral_validation", {"class_name": "ActionListValidationOperator", "action_list": []}, ) validation_operator = "ephemeral_validation" suite = data_context.get_expectation_suite(suite_name) final_batch_kwargs = batch_kwargs or {"dataset": dataset} if "datasource" in final_batch_kwargs: context.log.warning( "`datasource` field of `batch_kwargs` will be ignored; use the `datasource_name` " "parameter of the op factory instead." ) final_batch_kwargs["datasource"] = datasource_name batch = data_context.get_batch(final_batch_kwargs, suite) run_id = { "run_name": datasource_name + " run", "run_time": datetime.datetime.now(datetime.timezone.utc), } results = data_context.run_validation_operator( validation_operator, assets_to_validate=[batch], run_id=run_id ) res = convert_to_json_serializable(results.list_validation_results())[0] validation_results_page_renderer = ValidationResultsPageRenderer(run_info_at_end=True) rendered_document_content_list = ( validation_results_page_renderer.render_validation_operator_result(results) ) md_str = " ".join(DefaultMarkdownPageView().render(rendered_document_content_list)) yield ExpectationResult( success=res["success"], metadata={"Expectation Results": MetadataValue.md(md_str)}, ) yield Output(res) return _ge_validation_fn
def ge_validation_op_factory_v3( name, datasource_name, data_connector_name, data_asset_name, suite_name, batch_identifiers: dict, input_dagster_type=DataFrame, runtime_method_type="batch_data", extra_kwargs=None, ): """Generates ops for interacting with GE (v3 API). Args: name (str): the name of the op datasource_name (str): the name of your DataSource, see your great_expectations.yml data_connector_name (str): the name of the data connector for this datasource. This should point to a RuntimeDataConnector. For information on how to set this up, see: https://docs.greatexpectations.io/docs/guides/connecting_to_your_data/how_to_create_a_batch_of_data_from_an_in_memory_spark_or_pandas_dataframe data_asset_name (str): the name of the data asset that this op will be validating. suite_name (str): the name of your expectation suite, see your great_expectations.yml batch_identifier_fn (dict): A dicitonary of batch identifiers to uniquely identify this batch of data. To learn more about batch identifiers, see: https://docs.greatexpectations.io/docs/reference/datasources#batches. input_dagster_type (DagsterType): the Dagster type used to type check the input to the op. Defaults to `dagster_pandas.DataFrame`. runtime_method_type (str): how GE should interperet the op input. One of ("batch_data", "path", "query"). Defaults to "batch_data", which will interperet the input as an in-memory object. extra_kwargs (Optional[dict]): adds extra kwargs to the invocation of `ge_data_context`'s `get_validator` method. If not set, input will be: { "datasource_name": datasource_name, "data_connector_name": data_connector_name, "data_asset_name": data_asset_name, "runtime_parameters": { "<runtime_method_type>": <op input> }, "batch_identifiers": batch_identifiers, "expectation_suite_name": suite_name, } Returns: An op that takes in a set of data and yields both an expectation with relevant metadata and an output with all the metadata (for user processing) """ check.str_param(datasource_name, "datasource_name") check.str_param(data_connector_name, "data_connector_name") check.str_param(suite_name, "suite_name") _extra_kwargs: Dict[Any, Any] = check.opt_dict_param(extra_kwargs, "extra_kwargs") @op( name=name, ins={"dataset": In(input_dagster_type)}, out=Out( dict, description=""" This op yields an ExpectationResult with a structured dict of metadata from the GE suite, as well as the full result in case a user wants to process it differently. The structured dict contains both summary stats from the suite as well as expectation by expectation results/details. """, ), required_resource_keys={"ge_data_context"}, tags={"kind": "ge"}, ) def _ge_validation_fn(context: OpExecutionContext, dataset): data_context = context.resources.ge_data_context validator_kwargs = { "datasource_name": datasource_name, "data_connector_name": data_connector_name, "data_asset_name": datasource_name or data_asset_name, "runtime_parameters": {runtime_method_type: dataset}, "batch_identifiers": batch_identifiers, "expectation_suite_name": suite_name, **_extra_kwargs, } validator = data_context.get_validator(**validator_kwargs) run_id = { "run_name": datasource_name + " run", "run_time": datetime.datetime.now(datetime.timezone.utc), } results = validator.validate(run_id=run_id) validation_results_page_renderer = ValidationResultsPageRenderer(run_info_at_end=True) rendered_document_content_list = validation_results_page_renderer.render( validation_results=results ) md_str = "".join(DefaultMarkdownPageView().render(rendered_document_content_list)) yield ExpectationResult( success=bool(results["success"]), metadata={"Expectation Results": MetadataValue.md(md_str)}, ) yield Output(results.to_json_dict()) return _ge_validation_fn