name: great-expectations-validator description: 使用Great Expectations进行数据质量验证的技能,包括模式验证、期望套件、数据文档和机器学习管道中的自动化数据质量检查。 allowed-tools: Read, Grep, Write, Bash, Edit, Glob
Great Expectations 验证器
使用Great Expectations进行全面的数据测试、文档和质量监控,以验证数据质量。
概述
此技能提供使用Great Expectations(GX)进行数据质量验证的能力,GX是领先的开源数据质量库。它支持创建和执行期望套件、生成数据文档以及与机器学习管道集成。
能力
期望套件管理
- 创建和配置期望套件
- 为列和表定义期望
- 根据期望验证数据
- 存储和版本化期望套件
数据验证
- 模式验证(列存在性、类型)
- 统计验证(分布、范围)
- 参照完整性检查
- 基于SQL的自定义期望
- 正则表达式模式匹配
数据文档
- 生成数据文档(Data Docs)
- 创建分析报告
- 记录验证结果
- 构建数据字典
管道集成
- 检查点配置和执行
- 批次请求管理
- 基于操作的工作流(通知、存储)
- 与Airflow、Prefect、Dagster集成
自定义期望
- 定义领域特定期望
- 参数化期望
- 多列期望
- 基于行条件的期望
先决条件
安装
pip install great_expectations>=0.18.0
可选连接器
# 数据库连接器
pip install great_expectations[sqlalchemy]
# 云存储
pip install great_expectations[s3] # AWS
pip install great_expectations[gcs] # GCP
pip install great_expectations[azure] # Azure
# Spark支持
pip install great_expectations[spark]
使用模式
初始化Great Expectations项目
# 初始化GX项目
great_expectations init
# 创建:
# great_expectations/
# ├── great_expectations.yml
# ├── expectations/
# ├── checkpoints/
# ├── plugins/
# └── uncommitted/
从分析器创建期望套件
import great_expectations as gx
# 初始化上下文
context = gx.get_context()
# 添加数据源
datasource = context.sources.add_pandas("my_datasource")
data_asset = datasource.add_csv_asset("customers", filepath_or_buffer="customers.csv")
# 创建批次请求
batch_request = data_asset.build_batch_request()
# 使用分析器创建期望套件
expectation_suite = context.add_or_update_expectation_suite("customer_suite")
validator = context.get_validator(
batch_request=batch_request,
expectation_suite_name="customer_suite"
)
# 分析并生成期望
validator.expect_column_to_exist("customer_id")
validator.expect_column_values_to_be_unique("customer_id")
validator.expect_column_values_to_not_be_null("customer_id")
validator.expect_column_values_to_be_between("age", min_value=0, max_value=120)
validator.expect_column_values_to_be_in_set("status", ["active", "inactive", "pending"])
validator.expect_column_values_to_match_regex("email", r"^[\w\.-]+@[\w\.-]+\.\w+$")
# 保存套件
validator.save_expectation_suite(discard_failed_expectations=False)
使用检查点验证数据
import great_expectations as gx
context = gx.get_context()
# 创建检查点
checkpoint = context.add_or_update_checkpoint(
name="customer_checkpoint",
validations=[
{
"batch_request": {
"datasource_name": "my_datasource",
"data_asset_name": "customers"
},
"expectation_suite_name": "customer_suite"
}
],
action_list=[
{
"name": "store_validation_result",
"action": {"class_name": "StoreValidationResultAction"}
},
{
"name": "update_data_docs",
"action": {"class_name": "UpdateDataDocsAction"}
}
]
)
# 运行检查点
result = checkpoint.run()
# 检查结果
if result.success:
print("验证通过!")
else:
print("验证失败!")
for validation_result in result.run_results.values():
for result in validation_result.results:
if not result.success:
print(f"失败:{result.expectation_config.expectation_type}")
常见期望
# 列存在性和类型
validator.expect_column_to_exist("column_name")
validator.expect_column_values_to_be_of_type("column_name", "int64")
validator.expect_table_column_count_to_equal(10)
# 空值处理
validator.expect_column_values_to_not_be_null("column_name")
validator.expect_column_values_to_be_null("deprecated_column")
# 唯一性
validator.expect_column_values_to_be_unique("id_column")
validator.expect_compound_columns_to_be_unique(["col1", "col2"])
# 值范围
validator.expect_column_values_to_be_between("age", min_value=0, max_value=120)
validator.expect_column_min_to_be_between("score", min_value=0)
validator.expect_column_max_to_be_between("score", max_value=100)
# 集合成员
validator.expect_column_values_to_be_in_set("status", ["A", "B", "C"])
validator.expect_column_distinct_values_to_be_in_set("category", ["cat1", "cat2"])
# 字符串模式
validator.expect_column_values_to_match_regex("email", r"^[\w\.-]+@[\w\.-]+\.\w+$")
validator.expect_column_value_lengths_to_be_between("code", min_value=5, max_value=10)
# 统计
validator.expect_column_mean_to_be_between("value", min_value=50, max_value=100)
validator.expect_column_stdev_to_be_between("value", min_value=0, max_value=20)
validator.expect_column_proportion_of_unique_values_to_be_between("id", min_value=0.9)
与Babysitter SDK集成
任务定义示例
const dataValidationTask = defineTask({
name: 'great-expectations-validation',
description: '使用Great Expectations验证数据质量',
inputs: {
dataPath: { type: 'string', required: true },
expectationSuiteName: { type: 'string', required: true },
checkpointName: { type: 'string' },
failOnError: { type: 'boolean', default: true }
},
outputs: {
success: { type: 'boolean' },
validationResults: { type: 'object' },
failedExpectations: { type: 'array' },
dataDocsUrl: { type: 'string' }
},
async run(inputs, taskCtx) {
return {
kind: 'skill',
title: `验证数据:${inputs.expectationSuiteName}`,
skill: {
name: 'great-expectations-validator',
context: {
operation: 'validate',
dataPath: inputs.dataPath,
expectationSuiteName: inputs.expectationSuiteName,
checkpointName: inputs.checkpointName,
failOnError: inputs.failOnError
}
},
io: {
inputJsonPath: `tasks/${taskCtx.effectId}/input.json`,
outputJsonPath: `tasks/${taskCtx.effectId}/result.json`
}
};
}
});
MCP服务器集成
使用gx-mcp-server
{
"mcpServers": {
"great-expectations": {
"command": "uvx",
"args": ["gx-mcp-server"],
"env": {
"GX_CONTEXT_ROOT": "./great_expectations"
}
}
}
}
可用的MCP工具
gx_list_datasources- 列出配置的数据源gx_list_expectation_suites- 列出期望套件gx_run_checkpoint- 执行检查点gx_validate_data- 根据套件验证数据gx_get_validation_results- 检索验证结果
ML管道集成
训练数据验证
def validate_training_data(df, suite_name="training_data_suite"):
"""在模型训练前验证训练数据。"""
context = gx.get_context()
# 添加数据框作为数据源
datasource = context.sources.add_pandas("training_data")
data_asset = datasource.add_dataframe_asset("df")
batch_request = data_asset.build_batch_request(dataframe=df)
# 验证
checkpoint = context.add_or_update_checkpoint(
name="training_validation",
validations=[{
"batch_request": batch_request,
"expectation_suite_name": suite_name
}]
)
result = checkpoint.run()
if not result.success:
failed = [r for r in result.run_results.values()
for r in r.results if not r.success]
raise ValueError(f"训练数据验证失败:{len(failed)}个期望失败")
return True
特征质量检查
# ML特征的期望
validator.expect_column_values_to_not_be_null("feature_1", mostly=0.95)
validator.expect_column_values_to_be_between("feature_1", min_value=-3, max_value=3) # 标准化缩放
validator.expect_column_proportion_of_unique_values_to_be_between("categorical_feature", min_value=0.001)
validator.expect_column_kl_divergence_to_be_less_than("feature_1",
partition_object=reference_distribution,
threshold=0.1)
最佳实践
- 版本化期望套件:将套件存储在版本控制中
- 使用检查点:始终通过检查点验证以确保一致性
- 设置mostly参数:使用
mostly=0.95允许小的数据质量问题 - 生成数据文档:为团队可见性记录数据
- 快速失败:在管道早期验证数据
- 自定义期望:为您的用例创建领域特定期望