名称: 数据验证器 描述: 验证数据是否符合模式、业务规则和数据质量标准。
数据验证技能
验证数据是否符合模式、业务规则和数据质量标准。
说明
您是一名数据验证专家。当被调用时:
-
模式验证:
- 根据JSON模式验证
- 检查数据库模式合规性
- 验证API请求/响应格式
- 确保数据类型正确性
- 验证必填字段
-
业务规则验证:
- 应用领域特定规则
- 验证数据范围和约束
- 检查引用完整性
- 验证业务逻辑约束
- 验证计算字段
-
数据质量检查:
- 检查完整性
- 检测重复项
- 识别离群值和异常
- 验证格式模式(电子邮件、电话等)
- 检查数据一致性
-
生成验证报告:
- 详细错误信息
- 验证统计信息
- 数据质量评分
- 修复建议
- 合规性摘要
使用示例
@data-validator data.json --schema schema.json
@data-validator --check-duplicates
@data-validator --rules business-rules.yaml
@data-validator --quality-report
@data-validator --fix-errors
模式验证
JSON模式验证
Python (jsonschema)
from jsonschema import validate, ValidationError, Draft7Validator
import json
def validate_json_schema(data, schema):
"""
根据JSON模式验证数据
"""
try:
validate(instance=data, schema=schema)
return {
'valid': True,
'errors': []
}
except ValidationError as e:
return {
'valid': False,
'errors': [{
'path': list(e.path),
'message': e.message,
'validator': e.validator,
'validator_value': e.validator_value
}]
}
def validate_with_detailed_errors(data, schema):
"""
验证并收集所有错误
"""
validator = Draft7Validator(schema)
errors = []
for error in validator.iter_errors(data):
errors.append({
'path': '.'.join(str(p) for p in error.path),
'message': error.message,
'validator': error.validator,
'failed_value': error.instance
})
return {
'valid': len(errors) == 0,
'errors': errors,
'error_count': len(errors)
}
# 示例模式
user_schema = {
"type": "object",
"properties": {
"id": {
"type": "integer",
"minimum": 1
},
"email": {
"type": "string",
"format": "email",
"pattern": "^[a-zA-Z0-9._%+-]+@[a-zA-Z0-9.-]+\\.[a-zA-Z]{2,}$"
},
"age": {
"type": "integer",
"minimum": 0,
"maximum": 150
},
"phone": {
"type": "string",
"pattern": "^\\+?[1-9]\\d{1,14}$"
},
"status": {
"type": "string",
"enum": ["active", "inactive", "suspended"]
},
"created_at": {
"type": "string",
"format": "date-time"
},
"tags": {
"type": "array",
"items": {"type": "string"},
"minItems": 1,
"uniqueItems": True
},
"address": {
"type": "object",
"properties": {
"street": {"type": "string"},
"city": {"type": "string"},
"zip": {"type": "string", "pattern": "^\\d{5}(-\\d{4})?$"}
},
"required": ["street", "city"]
}
},
"required": ["id", "email", "status"],
"additionalProperties": False
}
# 验证数据
user_data = {
"id": 1,
"email": "user@example.com",
"age": 30,
"status": "active",
"tags": ["developer", "admin"]
}
result = validate_with_detailed_errors(user_data, user_schema)
if result['valid']:
print("✅ 数据有效")
else:
print(f"❌ 找到 {result['error_count']} 个错误:")
for error in result['errors']:
print(f" - {error['path']}: {error['message']}")
JavaScript (AJV)
const Ajv = require('ajv');
const addFormats = require('ajv-formats');
const ajv = new Ajv({ allErrors: true });
addFormats(ajv);
const schema = {
type: 'object',
properties: {
id: { type: 'integer', minimum: 1 },
email: { type: 'string', format: 'email' },
age: { type: 'integer', minimum: 0, maximum: 150 },
status: { type: 'string', enum: ['active', 'inactive', 'suspended'] }
},
required: ['id', 'email', 'status'],
additionalProperties: false
};
function validateData(data) {
const validate = ajv.compile(schema);
const valid = validate(data);
return {
valid,
errors: validate.errors || []
};
}
// 使用
const userData = {
id: 1,
email: 'user@example.com',
status: 'active'
};
const result = validateData(userData);
console.log(result);
数据库模式验证
import pandas as pd
from sqlalchemy import inspect
def validate_dataframe_schema(df, expected_schema):
"""
根据预期模式验证DataFrame
expected_schema = {
'column_name': {
'type': 'int64',
'nullable': False,
'unique': False,
'min': 0,
'max': 100
}
}
"""
errors = []
# 检查列是否存在
expected_columns = set(expected_schema.keys())
actual_columns = set(df.columns)
missing_columns = expected_columns - actual_columns
extra_columns = actual_columns - expected_columns
if missing_columns:
errors.append({
'type': 'missing_columns',
'columns': list(missing_columns)
})
if extra_columns:
errors.append({
'type': 'extra_columns',
'columns': list(extra_columns)
})
# 验证每列
for col_name, col_schema in expected_schema.items():
if col_name not in df.columns:
continue
col = df[col_name]
# 检查数据类型
expected_type = col_schema.get('type')
if expected_type and str(col.dtype) != expected_type:
errors.append({
'type': 'wrong_type',
'column': col_name,
'expected': expected_type,
'actual': str(col.dtype)
})
# 检查可空性
if not col_schema.get('nullable', True):
null_count = col.isnull().sum()
if null_count > 0:
errors.append({
'type': 'null_values',
'column': col_name,
'count': int(null_count)
})
# 检查唯一性
if col_schema.get('unique', False):
dup_count = col.duplicated().sum()
if dup_count > 0:
errors.append({
'type': 'duplicate_values',
'column': col_name,
'count': int(dup_count)
})
# 检查范围
if 'min' in col_schema and pd.api.types.is_numeric_dtype(col):
min_val = col.min()
if min_val < col_schema['min']:
errors.append({
'type': 'below_minimum',
'column': col_name,
'min_allowed': col_schema['min'],
'min_found': float(min_val)
})
if 'max' in col_schema and pd.api.types.is_numeric_dtype(col):
max_val = col.max()
if max_val > col_schema['max']:
errors.append({
'type': 'above_maximum',
'column': col_name,
'max_allowed': col_schema['max'],
'max_found': float(max_val)
})
# 检查模式
if 'pattern' in col_schema and col.dtype == 'object':
import re
pattern = re.compile(col_schema['pattern'])
invalid = ~col.dropna().astype(str).str.match(pattern)
invalid_count = invalid.sum()
if invalid_count > 0:
errors.append({
'type': 'pattern_mismatch',
'column': col_name,
'pattern': col_schema['pattern'],
'count': int(invalid_count)
})
return {
'valid': len(errors) == 0,
'errors': errors
}
# 示例使用
expected_schema = {
'user_id': {
'type': 'int64',
'nullable': False,
'unique': True,
'min': 1
},
'email': {
'type': 'object',
'nullable': False,
'pattern': r'^[a-zA-Z0-9._%+-]+@[a-zA-Z0-9.-]+\.[a-zA-Z]{2,}$'
},
'age': {
'type': 'int64',
'nullable': True,
'min': 0,
'max': 150
},
'score': {
'type': 'float64',
'nullable': False,
'min': 0.0,
'max': 100.0
}
}
df = pd.DataFrame({
'user_id': [1, 2, 3],
'email': ['user1@example.com', 'user2@example.com', 'invalid'],
'age': [25, 30, 200],
'score': [85.5, 92.0, 78.5]
})
result = validate_dataframe_schema(df, expected_schema)
业务规则验证
class DataValidator:
"""
灵活的数据验证器,带有自定义规则
"""
def __init__(self):
self.rules = []
self.errors = []
def add_rule(self, name, validator_func, error_message):
"""
添加验证规则
validator_func: 函数,接收数据并返回布尔值
"""
self.rules.append({
'name': name,
'validator': validator_func,
'error_message': error_message
})
def validate(self, data):
"""根据所有规则验证数据"""
self.errors = []
for rule in self.rules:
try:
is_valid = rule['validator'](data)
if not is_valid:
self.errors.append({
'rule': rule['name'],
'message': rule['error_message']
})
except Exception as e:
self.errors.append({
'rule': rule['name'],
'message': f"验证错误: {str(e)}"
})
return {
'valid': len(self.errors) == 0,
'errors': self.errors
}
# 示例:电商订单验证
validator = DataValidator()
# 规则:订单总额必须匹配行项目总和
validator.add_rule(
'order_total_matches',
lambda data: abs(data['total'] - sum(item['price'] * item['quantity']
for item in data['items'])) < 0.01,
"订单总额不匹配行项目总和"
)
# 规则:实物商品需要送货地址
validator.add_rule(
'shipping_address_required',
lambda data: not any(item['type'] == 'physical' for item in data['items'])
or 'shipping_address' in data,
"实物商品需要送货地址"
)
# 规则:折扣不能超过订单小计
validator.add_rule(
'discount_valid',
lambda data: data.get('discount', 0) <= data.get('subtotal', 0),
"折扣不能超过订单小计"
)
# 规则:数字商品需要电子邮件
validator.add_rule(
'email_for_digital',
lambda data: not any(item['type'] == 'digital' for item in data['items'])
or ('email' in data and '@' in data['email']),
"数字商品需要有效电子邮件"
)
# 验证订单
order = {
'total': 150.00,
'subtotal': 150.00,
'discount': 10.00,
'items': [
{'name': 'Product A', 'type': 'physical', 'price': 50.00, 'quantity': 2},
{'name': 'Product B', 'type': 'digital', 'price': 50.00, 'quantity': 1}
],
'email': 'user@example.com'
}
result = validator.validate(order)
if not result['valid']:
for error in result['errors']:
print(f"❌ {error['rule']}: {error['message']}")
复杂业务规则
def validate_user_registration(data):
"""
全面的用户注册验证
"""
errors = []
# 必填字段
required = ['username', 'email', 'password', 'terms_accepted']
for field in required:
if field not in data or not data[field]:
errors.append(f"字段 '{field}' 是必填的")
# 用户名验证
if 'username' in data:
username = data['username']
if len(username) < 3:
errors.append("用户名必须至少3个字符")
if len(username) > 20:
errors.append("用户名不能超过20个字符")
if not username.isalnum():
errors.append("用户名只能包含字母和数字")
# 电子邮件验证
if 'email' in data:
import re
email_pattern = r'^[a-zA-Z0-9._%+-]+@[a-zA-Z0-9.-]+\.[a-zA-Z]{2,}$'
if not re.match(email_pattern, data['email']):
errors.append("无效电子邮件格式")
# 密码验证
if 'password' in data:
password = data['password']
if len(password) < 8:
errors.append("密码必须至少8个字符")
if not any(c.isupper() for c in password):
errors.append("密码必须至少包含一个大写字母")
if not any(c.islower() for c in password):
errors.append("密码必须至少包含一个小写字母")
if not any(c.isdigit() for c in password):
errors.append("密码必须至少包含一个数字")
if not any(c in '!@#$%^&*()_+-=' for c in password):
errors.append("密码必须至少包含一个特殊字符")
# 密码确认
if 'password' in data and 'password_confirm' in data:
if data['password'] != data['password_confirm']:
errors.append("密码不匹配")
# 年龄验证
if 'birthdate' in data:
from datetime import datetime
try:
birthdate = datetime.fromisoformat(data['birthdate'])
age = (datetime.now() - birthdate).days / 365.25
if age < 13:
errors.append("必须至少13岁")
if age > 150:
errors.append("无效出生日期")
except:
errors.append("无效出生日期格式")
# 条款接受
if not data.get('terms_accepted'):
errors.append("必须接受条款和条件")
return {
'valid': len(errors) == 0,
'errors': errors
}
数据质量验证
完整性检查
def check_completeness(df):
"""
检查数据完整性
"""
report = {
'total_cells': len(df) * len(df.columns),
'total_rows': len(df),
'total_columns': len(df.columns),
'columns': {}
}
for col in df.columns:
null_count = df[col].isnull().sum()
completeness = (1 - null_count / len(df)) * 100
report['columns'][col] = {
'total': len(df),
'null_count': int(null_count),
'non_null_count': int(len(df) - null_count),
'completeness_percent': round(completeness, 2)
}
# 整体完整性
total_nulls = df.isnull().sum().sum()
report['overall_completeness'] = round(
(1 - total_nulls / report['total_cells']) * 100,
2
)
return report
def check_duplicates(df, subset=None):
"""
检查重复行
"""
dup_mask = df.duplicated(subset=subset, keep=False)
duplicates = df[dup_mask]
return {
'has_duplicates': dup_mask.any(),
'duplicate_count': int(dup_mask.sum()),
'duplicate_percent': round(dup_mask.sum() / len(df) * 100, 2),
'duplicate_rows': duplicates.to_dict('records') if len(duplicates) < 100 else []
}
def check_outliers(df, column, method='iqr'):
"""
检测数字列中的离群值
"""
if method == 'iqr':
Q1 = df[column].quantile(0.25)
Q3 = df[column].quantile(0.75)
IQR = Q3 - Q1
lower_bound = Q1 - 1.5 * IQR
upper_bound = Q3 + 1.5 * IQR
outliers = df[(df[column] < lower_bound) | (df[column] > upper_bound)]
elif method == 'zscore':
from scipy import stats
z_scores = np.abs(stats.zscore(df[column].dropna()))
outliers = df[z_scores > 3]
return {
'method': method,
'lower_bound': float(lower_bound) if method == 'iqr' else None,
'upper_bound': float(upper_bound) if method == 'iqr' else None,
'outlier_count': len(outliers),
'outlier_percent': round(len(outliers) / len(df) * 100, 2),
'outliers': outliers[column].tolist()[:100] # 限制到100个
}
格式验证
import re
def validate_email(email):
"""验证电子邮件格式"""
pattern = r'^[a-zA-Z0-9._%+-]+@[a-zA-Z0-9.-]+\.[a-zA-Z]{2,}$'
return bool(re.match(pattern, email))
def validate_phone(phone, country='US'):
"""验证电话号码"""
patterns = {
'US': r'^\+?1?\d{10}$',
'UK': r'^\+?44\d{10}$',
'international': r'^\+?[1-9]\d{1,14}$'
}
phone_clean = re.sub(r'[^\d+]', '', phone)
pattern = patterns.get(country, patterns['international'])
return bool(re.match(pattern, phone_clean))
def validate_url(url):
"""验证URL格式"""
pattern = r'^https?://[a-zA-Z0-9-._~:/?#\[\]@!$&\'()*+,;=]+$'
return bool(re.match(pattern, url))
def validate_date(date_string, format='%Y-%m-%d'):
"""验证日期格式"""
from datetime import datetime
try:
datetime.strptime(date_string, format)
return True
except:
return False
def validate_credit_card(card_number):
"""使用Luhn算法验证信用卡号"""
card_number = re.sub(r'[\s-]', '', card_number)
if not card_number.isdigit():
return False
if len(card_number) < 13 or len(card_number) > 19:
return False
# Luhn算法
def luhn_checksum(card_num):
def digits_of(n):
return [int(d) for d in str(n)]
digits = digits_of(card_num)
odd_digits = digits[-1::-2]
even_digits = digits[-2::-2]
checksum = sum(odd_digits)
for d in even_digits:
checksum += sum(digits_of(d * 2))
return checksum % 10
return luhn_checksum(card_number) == 0
def validate_formats_in_dataframe(df):
"""
验证DataFrame中的常见格式
"""
results = {}
for col in df.columns:
col_lower = col.lower()
# 电子邮件验证
if 'email' in col_lower:
invalid = df[~df[col].apply(validate_email)]
results[col] = {
'type': 'email',
'valid_count': len(df) - len(invalid),
'invalid_count': len(invalid),
'invalid_samples': invalid[col].head(5).tolist()
}
# 电话验证
elif 'phone' in col_lower:
invalid = df[~df[col].apply(validate_phone)]
results[col] = {
'type': 'phone',
'valid_count': len(df) - len(invalid),
'invalid_count': len(invalid),
'invalid_samples': invalid[col].head(5).tolist()
}
# URL验证
elif 'url' in col_lower or 'link' in col_lower:
invalid = df[~df[col].apply(validate_url)]
results[col] = {
'type': 'url',
'valid_count': len(df) - len(invalid),
'invalid_count': len(invalid),
'invalid_samples': invalid[col].head(5).tolist()
}
return results
验证报告生成
def generate_validation_report(df, schema=None, business_rules=None):
"""
生成全面的验证报告
"""
from datetime import datetime
report = f"""# 数据验证报告
**生成时间:** {datetime.now().strftime('%Y-%m-%d %H:%M:%S')}
**数据集:** {len(df):,} 行 × {len(df.columns)} 列
---
## 摘要
"""
# 完整性检查
completeness = check_completeness(df)
report += f"- **整体完整性:** {completeness['overall_completeness']}%
"
# 重复检查
duplicates = check_duplicates(df)
report += f"- **重复行:** {duplicates['duplicate_count']:,} ({duplicates['duplicate_percent']}%)
"
# 模式验证
if schema:
schema_result = validate_dataframe_schema(df, schema)
status = "✅ 通过" if schema_result['valid'] else f"❌ 失败 ({len(schema_result['errors'])} 个错误)"
report += f"- **模式验证:** {status}
"
report += "
---
## 完整性分析
"
report += "| 列 | 非空 | 空计数 | 完整性 |
"
report += "|------|------|------|------|
"
for col, stats in completeness['columns'].items():
report += f"| {col} | {stats['non_null_count']:,} | {stats['null_count']:,} | {stats['completeness_percent']}% |
"
# 模式验证详情
if schema and not schema_result['valid']:
report += "
---
## 模式验证错误
"
for error in schema_result['errors']:
report += f"### {error['type'].replace('_', ' ').title()}
"
if error['type'] == 'wrong_type':
report += f"- **列:** {error['column']}
"
report += f"- **预期:** {error['expected']}
"
report += f"- **实际:** {error['actual']}
"
elif error['type'] in ['null_values', 'duplicate_values']:
report += f"- **列:** {error['column']}
"
report += f"- **计数:** {error['count']:,}
"
elif error['type'] == 'pattern_mismatch':
report += f"- **列:** {error['column']}
"
report += f"- **模式:** `{error['pattern']}`
"
report += f"- **无效计数:** {error['count']:,}
"
# 格式验证
format_results = validate_formats_in_dataframe(df)
if format_results:
report += "
---
## 格式验证
"
for col, result in format_results.items():
report += f"### {col} ({result['type']})
"
report += f"- **有效:** {result['valid_count']:,}
"
report += f"- **无效:** {result['invalid_count']:,}
"
if result['invalid_samples']:
report += f"
**无效样本:**
"
for sample in result['invalid_samples']:
report += f"- `{sample}`
"
report += "
"
# 数据质量评分
quality_score = calculate_quality_score(df, schema, duplicates, completeness)
report += f"
---
## 数据质量评分
"
report += f"### 总体评分: {quality_score['overall']}/100
"
for dimension, score in quality_score['dimensions'].items():
report += f"- **{dimension}:** {score}/100
"
return report
def calculate_quality_score(df, schema, duplicates, completeness):
"""计算数据质量评分"""
scores = {}
# 完整性评分
scores['完整性'] = completeness['overall_completeness']
# 唯一性评分
scores['唯一性'] = 100 - duplicates['duplicate_percent']
# 有效性评分(如果提供模式)
if schema:
schema_result = validate_dataframe_schema(df, schema)
error_rate = len(schema_result['errors']) / (len(df) * len(df.columns))
scores['有效性'] = max(0, 100 - (error_rate * 100))
else:
scores['有效性'] = 100
# 总体评分
overall = sum(scores.values()) / len(scores)
return {
'overall': round(overall, 1),
'dimensions': {k: round(v, 1) for k, v in scores.items()}
}
最佳实践
- 定义清晰的验证规则 在实施之前
- 尽早验证 在数据管道中
- 提供详细的错误信息 用于调试
- 使用模式验证 用于API契约
- 实现业务规则验证 与模式分开
- 记录验证失败 用于监控
- 生成验证报告 用于审计
- 优雅处理验证错误
- 测试验证规则 使用边界情况
- 版本控制 验证模式和规则
常见验证模式
API请求验证
def validate_api_request(request_data, endpoint):
"""验证API请求数据"""
schemas = {
'/users': user_schema,
'/orders': order_schema,
'/products': product_schema
}
schema = schemas.get(endpoint)
if not schema:
return {'valid': False, 'error': '未知端点'}
return validate_json_schema(request_data, schema)
批量数据验证
def validate_batch(records, validator):
"""验证记录批处理"""
results = []
for i, record in enumerate(records):
result = validator.validate(record)
result['record_index'] = i
if not result['valid']:
results.append(result)
return {
'total_records': len(records),
'valid_records': len(records) - len(results),
'invalid_records': len(results),
'failures': results
}
笔记
- 始终在系统边界验证
- 使用适当的验证级别(语法、语义、业务)
- 缓存验证结果以提高性能
- 为用户提供清晰的错误信息
- 记录验证指标用于监控
- 考虑大数据集的验证性能
- 使用流式验证处理大数据
- 保持验证规则可维护和可测试