GreatExpectations数据质量验证器Skill great-expectations-validator

Great Expectations 数据质量验证器是一个专业的数据治理工具,用于自动化数据质量检查、模式验证和期望测试。它支持创建期望套件、生成数据文档、集成机器学习管道,并提供全面的数据质量监控解决方案。关键词:数据质量验证、Great Expectations、期望套件、数据治理、机器学习管道、自动化测试、数据文档、ETL验证、数据工程、数据科学工具。

数据治理 0 次安装 11 次浏览 更新于 2/23/2026

name: great-expectations-validator description: 使用Great Expectations进行数据质量验证的技能,包括模式验证、期望套件、数据文档和机器学习管道中的自动化数据质量检查。 allowed-tools: Read, Grep, Write, Bash, Edit, Glob

Great Expectations 验证器

使用Great Expectations进行全面的数据测试、文档和质量监控,以验证数据质量。

概述

此技能提供使用Great Expectations(GX)进行数据质量验证的能力,GX是领先的开源数据质量库。它支持创建和执行期望套件、生成数据文档以及与机器学习管道集成。

能力

期望套件管理

  • 创建和配置期望套件
  • 为列和表定义期望
  • 根据期望验证数据
  • 存储和版本化期望套件

数据验证

  • 模式验证(列存在性、类型)
  • 统计验证(分布、范围)
  • 参照完整性检查
  • 基于SQL的自定义期望
  • 正则表达式模式匹配

数据文档

  • 生成数据文档(Data Docs)
  • 创建分析报告
  • 记录验证结果
  • 构建数据字典

管道集成

  • 检查点配置和执行
  • 批次请求管理
  • 基于操作的工作流(通知、存储)
  • 与Airflow、Prefect、Dagster集成

自定义期望

  • 定义领域特定期望
  • 参数化期望
  • 多列期望
  • 基于行条件的期望

先决条件

安装

pip install great_expectations>=0.18.0

可选连接器

# 数据库连接器
pip install great_expectations[sqlalchemy]

# 云存储
pip install great_expectations[s3]  # AWS
pip install great_expectations[gcs]  # GCP
pip install great_expectations[azure]  # Azure

# Spark支持
pip install great_expectations[spark]

使用模式

初始化Great Expectations项目

# 初始化GX项目
great_expectations init

# 创建:
# great_expectations/
# ├── great_expectations.yml
# ├── expectations/
# ├── checkpoints/
# ├── plugins/
# └── uncommitted/

从分析器创建期望套件

import great_expectations as gx

# 初始化上下文
context = gx.get_context()

# 添加数据源
datasource = context.sources.add_pandas("my_datasource")
data_asset = datasource.add_csv_asset("customers", filepath_or_buffer="customers.csv")

# 创建批次请求
batch_request = data_asset.build_batch_request()

# 使用分析器创建期望套件
expectation_suite = context.add_or_update_expectation_suite("customer_suite")

validator = context.get_validator(
    batch_request=batch_request,
    expectation_suite_name="customer_suite"
)

# 分析并生成期望
validator.expect_column_to_exist("customer_id")
validator.expect_column_values_to_be_unique("customer_id")
validator.expect_column_values_to_not_be_null("customer_id")
validator.expect_column_values_to_be_between("age", min_value=0, max_value=120)
validator.expect_column_values_to_be_in_set("status", ["active", "inactive", "pending"])
validator.expect_column_values_to_match_regex("email", r"^[\w\.-]+@[\w\.-]+\.\w+$")

# 保存套件
validator.save_expectation_suite(discard_failed_expectations=False)

使用检查点验证数据

import great_expectations as gx

context = gx.get_context()

# 创建检查点
checkpoint = context.add_or_update_checkpoint(
    name="customer_checkpoint",
    validations=[
        {
            "batch_request": {
                "datasource_name": "my_datasource",
                "data_asset_name": "customers"
            },
            "expectation_suite_name": "customer_suite"
        }
    ],
    action_list=[
        {
            "name": "store_validation_result",
            "action": {"class_name": "StoreValidationResultAction"}
        },
        {
            "name": "update_data_docs",
            "action": {"class_name": "UpdateDataDocsAction"}
        }
    ]
)

# 运行检查点
result = checkpoint.run()

# 检查结果
if result.success:
    print("验证通过!")
else:
    print("验证失败!")
    for validation_result in result.run_results.values():
        for result in validation_result.results:
            if not result.success:
                print(f"失败:{result.expectation_config.expectation_type}")

常见期望

# 列存在性和类型
validator.expect_column_to_exist("column_name")
validator.expect_column_values_to_be_of_type("column_name", "int64")
validator.expect_table_column_count_to_equal(10)

# 空值处理
validator.expect_column_values_to_not_be_null("column_name")
validator.expect_column_values_to_be_null("deprecated_column")

# 唯一性
validator.expect_column_values_to_be_unique("id_column")
validator.expect_compound_columns_to_be_unique(["col1", "col2"])

# 值范围
validator.expect_column_values_to_be_between("age", min_value=0, max_value=120)
validator.expect_column_min_to_be_between("score", min_value=0)
validator.expect_column_max_to_be_between("score", max_value=100)

# 集合成员
validator.expect_column_values_to_be_in_set("status", ["A", "B", "C"])
validator.expect_column_distinct_values_to_be_in_set("category", ["cat1", "cat2"])

# 字符串模式
validator.expect_column_values_to_match_regex("email", r"^[\w\.-]+@[\w\.-]+\.\w+$")
validator.expect_column_value_lengths_to_be_between("code", min_value=5, max_value=10)

# 统计
validator.expect_column_mean_to_be_between("value", min_value=50, max_value=100)
validator.expect_column_stdev_to_be_between("value", min_value=0, max_value=20)
validator.expect_column_proportion_of_unique_values_to_be_between("id", min_value=0.9)

与Babysitter SDK集成

任务定义示例

const dataValidationTask = defineTask({
  name: 'great-expectations-validation',
  description: '使用Great Expectations验证数据质量',

  inputs: {
    dataPath: { type: 'string', required: true },
    expectationSuiteName: { type: 'string', required: true },
    checkpointName: { type: 'string' },
    failOnError: { type: 'boolean', default: true }
  },

  outputs: {
    success: { type: 'boolean' },
    validationResults: { type: 'object' },
    failedExpectations: { type: 'array' },
    dataDocsUrl: { type: 'string' }
  },

  async run(inputs, taskCtx) {
    return {
      kind: 'skill',
      title: `验证数据:${inputs.expectationSuiteName}`,
      skill: {
        name: 'great-expectations-validator',
        context: {
          operation: 'validate',
          dataPath: inputs.dataPath,
          expectationSuiteName: inputs.expectationSuiteName,
          checkpointName: inputs.checkpointName,
          failOnError: inputs.failOnError
        }
      },
      io: {
        inputJsonPath: `tasks/${taskCtx.effectId}/input.json`,
        outputJsonPath: `tasks/${taskCtx.effectId}/result.json`
      }
    };
  }
});

MCP服务器集成

使用gx-mcp-server

{
  "mcpServers": {
    "great-expectations": {
      "command": "uvx",
      "args": ["gx-mcp-server"],
      "env": {
        "GX_CONTEXT_ROOT": "./great_expectations"
      }
    }
  }
}

可用的MCP工具

  • gx_list_datasources - 列出配置的数据源
  • gx_list_expectation_suites - 列出期望套件
  • gx_run_checkpoint - 执行检查点
  • gx_validate_data - 根据套件验证数据
  • gx_get_validation_results - 检索验证结果

ML管道集成

训练数据验证

def validate_training_data(df, suite_name="training_data_suite"):
    """在模型训练前验证训练数据。"""
    context = gx.get_context()

    # 添加数据框作为数据源
    datasource = context.sources.add_pandas("training_data")
    data_asset = datasource.add_dataframe_asset("df")
    batch_request = data_asset.build_batch_request(dataframe=df)

    # 验证
    checkpoint = context.add_or_update_checkpoint(
        name="training_validation",
        validations=[{
            "batch_request": batch_request,
            "expectation_suite_name": suite_name
        }]
    )

    result = checkpoint.run()

    if not result.success:
        failed = [r for r in result.run_results.values()
                  for r in r.results if not r.success]
        raise ValueError(f"训练数据验证失败:{len(failed)}个期望失败")

    return True

特征质量检查

# ML特征的期望
validator.expect_column_values_to_not_be_null("feature_1", mostly=0.95)
validator.expect_column_values_to_be_between("feature_1", min_value=-3, max_value=3)  # 标准化缩放
validator.expect_column_proportion_of_unique_values_to_be_between("categorical_feature", min_value=0.001)
validator.expect_column_kl_divergence_to_be_less_than("feature_1",
    partition_object=reference_distribution,
    threshold=0.1)

最佳实践

  1. 版本化期望套件:将套件存储在版本控制中
  2. 使用检查点:始终通过检查点验证以确保一致性
  3. 设置mostly参数:使用mostly=0.95允许小的数据质量问题
  4. 生成数据文档:为团队可见性记录数据
  5. 快速失败:在管道早期验证数据
  6. 自定义期望:为您的用例创建领域特定期望

参考