数据验证器Skill data-validator

数据验证技能是一个用于确保数据质量、完整性和合规性的工具。它通过模式验证、业务规则验证和数据质量检查,验证数据是否符合预定义标准和规则。支持生成详细验证报告,适用于API、数据库、批量数据处理等场景,帮助提升数据治理和业务决策效率。关键词:数据验证、模式验证、业务规则、数据质量、验证报告、数据治理、完整性检查。

数据治理 0 次安装 0 次浏览 更新于 3/11/2026

名称: 数据验证器 描述: 验证数据是否符合模式、业务规则和数据质量标准。

数据验证技能

验证数据是否符合模式、业务规则和数据质量标准。

说明

您是一名数据验证专家。当被调用时:

  1. 模式验证

    • 根据JSON模式验证
    • 检查数据库模式合规性
    • 验证API请求/响应格式
    • 确保数据类型正确性
    • 验证必填字段
  2. 业务规则验证

    • 应用领域特定规则
    • 验证数据范围和约束
    • 检查引用完整性
    • 验证业务逻辑约束
    • 验证计算字段
  3. 数据质量检查

    • 检查完整性
    • 检测重复项
    • 识别离群值和异常
    • 验证格式模式(电子邮件、电话等)
    • 检查数据一致性
  4. 生成验证报告

    • 详细错误信息
    • 验证统计信息
    • 数据质量评分
    • 修复建议
    • 合规性摘要

使用示例

@data-validator data.json --schema schema.json
@data-validator --check-duplicates
@data-validator --rules business-rules.yaml
@data-validator --quality-report
@data-validator --fix-errors

模式验证

JSON模式验证

Python (jsonschema)

from jsonschema import validate, ValidationError, Draft7Validator
import json

def validate_json_schema(data, schema):
    """
    根据JSON模式验证数据
    """
    try:
        validate(instance=data, schema=schema)
        return {
            'valid': True,
            'errors': []
        }
    except ValidationError as e:
        return {
            'valid': False,
            'errors': [{
                'path': list(e.path),
                'message': e.message,
                'validator': e.validator,
                'validator_value': e.validator_value
            }]
        }

def validate_with_detailed_errors(data, schema):
    """
    验证并收集所有错误
    """
    validator = Draft7Validator(schema)
    errors = []

    for error in validator.iter_errors(data):
        errors.append({
            'path': '.'.join(str(p) for p in error.path),
            'message': error.message,
            'validator': error.validator,
            'failed_value': error.instance
        })

    return {
        'valid': len(errors) == 0,
        'errors': errors,
        'error_count': len(errors)
    }

# 示例模式
user_schema = {
    "type": "object",
    "properties": {
        "id": {
            "type": "integer",
            "minimum": 1
        },
        "email": {
            "type": "string",
            "format": "email",
            "pattern": "^[a-zA-Z0-9._%+-]+@[a-zA-Z0-9.-]+\\.[a-zA-Z]{2,}$"
        },
        "age": {
            "type": "integer",
            "minimum": 0,
            "maximum": 150
        },
        "phone": {
            "type": "string",
            "pattern": "^\\+?[1-9]\\d{1,14}$"
        },
        "status": {
            "type": "string",
            "enum": ["active", "inactive", "suspended"]
        },
        "created_at": {
            "type": "string",
            "format": "date-time"
        },
        "tags": {
            "type": "array",
            "items": {"type": "string"},
            "minItems": 1,
            "uniqueItems": True
        },
        "address": {
            "type": "object",
            "properties": {
                "street": {"type": "string"},
                "city": {"type": "string"},
                "zip": {"type": "string", "pattern": "^\\d{5}(-\\d{4})?$"}
            },
            "required": ["street", "city"]
        }
    },
    "required": ["id", "email", "status"],
    "additionalProperties": False
}

# 验证数据
user_data = {
    "id": 1,
    "email": "user@example.com",
    "age": 30,
    "status": "active",
    "tags": ["developer", "admin"]
}

result = validate_with_detailed_errors(user_data, user_schema)

if result['valid']:
    print("✅ 数据有效")
else:
    print(f"❌ 找到 {result['error_count']} 个错误:")
    for error in result['errors']:
        print(f"  - {error['path']}: {error['message']}")

JavaScript (AJV)

const Ajv = require('ajv');
const addFormats = require('ajv-formats');

const ajv = new Ajv({ allErrors: true });
addFormats(ajv);

const schema = {
  type: 'object',
  properties: {
    id: { type: 'integer', minimum: 1 },
    email: { type: 'string', format: 'email' },
    age: { type: 'integer', minimum: 0, maximum: 150 },
    status: { type: 'string', enum: ['active', 'inactive', 'suspended'] }
  },
  required: ['id', 'email', 'status'],
  additionalProperties: false
};

function validateData(data) {
  const validate = ajv.compile(schema);
  const valid = validate(data);

  return {
    valid,
    errors: validate.errors || []
  };
}

// 使用
const userData = {
  id: 1,
  email: 'user@example.com',
  status: 'active'
};

const result = validateData(userData);
console.log(result);

数据库模式验证

import pandas as pd
from sqlalchemy import inspect

def validate_dataframe_schema(df, expected_schema):
    """
    根据预期模式验证DataFrame

    expected_schema = {
        'column_name': {
            'type': 'int64',
            'nullable': False,
            'unique': False,
            'min': 0,
            'max': 100
        }
    }
    """
    errors = []

    # 检查列是否存在
    expected_columns = set(expected_schema.keys())
    actual_columns = set(df.columns)

    missing_columns = expected_columns - actual_columns
    extra_columns = actual_columns - expected_columns

    if missing_columns:
        errors.append({
            'type': 'missing_columns',
            'columns': list(missing_columns)
        })

    if extra_columns:
        errors.append({
            'type': 'extra_columns',
            'columns': list(extra_columns)
        })

    # 验证每列
    for col_name, col_schema in expected_schema.items():
        if col_name not in df.columns:
            continue

        col = df[col_name]

        # 检查数据类型
        expected_type = col_schema.get('type')
        if expected_type and str(col.dtype) != expected_type:
            errors.append({
                'type': 'wrong_type',
                'column': col_name,
                'expected': expected_type,
                'actual': str(col.dtype)
            })

        # 检查可空性
        if not col_schema.get('nullable', True):
            null_count = col.isnull().sum()
            if null_count > 0:
                errors.append({
                    'type': 'null_values',
                    'column': col_name,
                    'count': int(null_count)
                })

        # 检查唯一性
        if col_schema.get('unique', False):
            dup_count = col.duplicated().sum()
            if dup_count > 0:
                errors.append({
                    'type': 'duplicate_values',
                    'column': col_name,
                    'count': int(dup_count)
                })

        # 检查范围
        if 'min' in col_schema and pd.api.types.is_numeric_dtype(col):
            min_val = col.min()
            if min_val < col_schema['min']:
                errors.append({
                    'type': 'below_minimum',
                    'column': col_name,
                    'min_allowed': col_schema['min'],
                    'min_found': float(min_val)
                })

        if 'max' in col_schema and pd.api.types.is_numeric_dtype(col):
            max_val = col.max()
            if max_val > col_schema['max']:
                errors.append({
                    'type': 'above_maximum',
                    'column': col_name,
                    'max_allowed': col_schema['max'],
                    'max_found': float(max_val)
                })

        # 检查模式
        if 'pattern' in col_schema and col.dtype == 'object':
            import re
            pattern = re.compile(col_schema['pattern'])
            invalid = ~col.dropna().astype(str).str.match(pattern)
            invalid_count = invalid.sum()

            if invalid_count > 0:
                errors.append({
                    'type': 'pattern_mismatch',
                    'column': col_name,
                    'pattern': col_schema['pattern'],
                    'count': int(invalid_count)
                })

    return {
        'valid': len(errors) == 0,
        'errors': errors
    }

# 示例使用
expected_schema = {
    'user_id': {
        'type': 'int64',
        'nullable': False,
        'unique': True,
        'min': 1
    },
    'email': {
        'type': 'object',
        'nullable': False,
        'pattern': r'^[a-zA-Z0-9._%+-]+@[a-zA-Z0-9.-]+\.[a-zA-Z]{2,}$'
    },
    'age': {
        'type': 'int64',
        'nullable': True,
        'min': 0,
        'max': 150
    },
    'score': {
        'type': 'float64',
        'nullable': False,
        'min': 0.0,
        'max': 100.0
    }
}

df = pd.DataFrame({
    'user_id': [1, 2, 3],
    'email': ['user1@example.com', 'user2@example.com', 'invalid'],
    'age': [25, 30, 200],
    'score': [85.5, 92.0, 78.5]
})

result = validate_dataframe_schema(df, expected_schema)

业务规则验证

class DataValidator:
    """
    灵活的数据验证器,带有自定义规则
    """

    def __init__(self):
        self.rules = []
        self.errors = []

    def add_rule(self, name, validator_func, error_message):
        """
        添加验证规则

        validator_func: 函数,接收数据并返回布尔值
        """
        self.rules.append({
            'name': name,
            'validator': validator_func,
            'error_message': error_message
        })

    def validate(self, data):
        """根据所有规则验证数据"""
        self.errors = []

        for rule in self.rules:
            try:
                is_valid = rule['validator'](data)
                if not is_valid:
                    self.errors.append({
                        'rule': rule['name'],
                        'message': rule['error_message']
                    })
            except Exception as e:
                self.errors.append({
                    'rule': rule['name'],
                    'message': f"验证错误: {str(e)}"
                })

        return {
            'valid': len(self.errors) == 0,
            'errors': self.errors
        }

# 示例:电商订单验证
validator = DataValidator()

# 规则:订单总额必须匹配行项目总和
validator.add_rule(
    'order_total_matches',
    lambda data: abs(data['total'] - sum(item['price'] * item['quantity']
                                         for item in data['items'])) < 0.01,
    "订单总额不匹配行项目总和"
)

# 规则:实物商品需要送货地址
validator.add_rule(
    'shipping_address_required',
    lambda data: not any(item['type'] == 'physical' for item in data['items'])
                 or 'shipping_address' in data,
    "实物商品需要送货地址"
)

# 规则:折扣不能超过订单小计
validator.add_rule(
    'discount_valid',
    lambda data: data.get('discount', 0) <= data.get('subtotal', 0),
    "折扣不能超过订单小计"
)

# 规则:数字商品需要电子邮件
validator.add_rule(
    'email_for_digital',
    lambda data: not any(item['type'] == 'digital' for item in data['items'])
                 or ('email' in data and '@' in data['email']),
    "数字商品需要有效电子邮件"
)

# 验证订单
order = {
    'total': 150.00,
    'subtotal': 150.00,
    'discount': 10.00,
    'items': [
        {'name': 'Product A', 'type': 'physical', 'price': 50.00, 'quantity': 2},
        {'name': 'Product B', 'type': 'digital', 'price': 50.00, 'quantity': 1}
    ],
    'email': 'user@example.com'
}

result = validator.validate(order)

if not result['valid']:
    for error in result['errors']:
        print(f"❌ {error['rule']}: {error['message']}")

复杂业务规则

def validate_user_registration(data):
    """
    全面的用户注册验证
    """
    errors = []

    # 必填字段
    required = ['username', 'email', 'password', 'terms_accepted']
    for field in required:
        if field not in data or not data[field]:
            errors.append(f"字段 '{field}' 是必填的")

    # 用户名验证
    if 'username' in data:
        username = data['username']

        if len(username) < 3:
            errors.append("用户名必须至少3个字符")

        if len(username) > 20:
            errors.append("用户名不能超过20个字符")

        if not username.isalnum():
            errors.append("用户名只能包含字母和数字")

    # 电子邮件验证
    if 'email' in data:
        import re
        email_pattern = r'^[a-zA-Z0-9._%+-]+@[a-zA-Z0-9.-]+\.[a-zA-Z]{2,}$'
        if not re.match(email_pattern, data['email']):
            errors.append("无效电子邮件格式")

    # 密码验证
    if 'password' in data:
        password = data['password']

        if len(password) < 8:
            errors.append("密码必须至少8个字符")

        if not any(c.isupper() for c in password):
            errors.append("密码必须至少包含一个大写字母")

        if not any(c.islower() for c in password):
            errors.append("密码必须至少包含一个小写字母")

        if not any(c.isdigit() for c in password):
            errors.append("密码必须至少包含一个数字")

        if not any(c in '!@#$%^&*()_+-=' for c in password):
            errors.append("密码必须至少包含一个特殊字符")

    # 密码确认
    if 'password' in data and 'password_confirm' in data:
        if data['password'] != data['password_confirm']:
            errors.append("密码不匹配")

    # 年龄验证
    if 'birthdate' in data:
        from datetime import datetime
        try:
            birthdate = datetime.fromisoformat(data['birthdate'])
            age = (datetime.now() - birthdate).days / 365.25

            if age < 13:
                errors.append("必须至少13岁")

            if age > 150:
                errors.append("无效出生日期")
        except:
            errors.append("无效出生日期格式")

    # 条款接受
    if not data.get('terms_accepted'):
        errors.append("必须接受条款和条件")

    return {
        'valid': len(errors) == 0,
        'errors': errors
    }

数据质量验证

完整性检查

def check_completeness(df):
    """
    检查数据完整性
    """
    report = {
        'total_cells': len(df) * len(df.columns),
        'total_rows': len(df),
        'total_columns': len(df.columns),
        'columns': {}
    }

    for col in df.columns:
        null_count = df[col].isnull().sum()
        completeness = (1 - null_count / len(df)) * 100

        report['columns'][col] = {
            'total': len(df),
            'null_count': int(null_count),
            'non_null_count': int(len(df) - null_count),
            'completeness_percent': round(completeness, 2)
        }

    # 整体完整性
    total_nulls = df.isnull().sum().sum()
    report['overall_completeness'] = round(
        (1 - total_nulls / report['total_cells']) * 100,
        2
    )

    return report

def check_duplicates(df, subset=None):
    """
    检查重复行
    """
    dup_mask = df.duplicated(subset=subset, keep=False)
    duplicates = df[dup_mask]

    return {
        'has_duplicates': dup_mask.any(),
        'duplicate_count': int(dup_mask.sum()),
        'duplicate_percent': round(dup_mask.sum() / len(df) * 100, 2),
        'duplicate_rows': duplicates.to_dict('records') if len(duplicates) < 100 else []
    }

def check_outliers(df, column, method='iqr'):
    """
    检测数字列中的离群值
    """
    if method == 'iqr':
        Q1 = df[column].quantile(0.25)
        Q3 = df[column].quantile(0.75)
        IQR = Q3 - Q1

        lower_bound = Q1 - 1.5 * IQR
        upper_bound = Q3 + 1.5 * IQR

        outliers = df[(df[column] < lower_bound) | (df[column] > upper_bound)]

    elif method == 'zscore':
        from scipy import stats
        z_scores = np.abs(stats.zscore(df[column].dropna()))
        outliers = df[z_scores > 3]

    return {
        'method': method,
        'lower_bound': float(lower_bound) if method == 'iqr' else None,
        'upper_bound': float(upper_bound) if method == 'iqr' else None,
        'outlier_count': len(outliers),
        'outlier_percent': round(len(outliers) / len(df) * 100, 2),
        'outliers': outliers[column].tolist()[:100]  # 限制到100个
    }

格式验证

import re

def validate_email(email):
    """验证电子邮件格式"""
    pattern = r'^[a-zA-Z0-9._%+-]+@[a-zA-Z0-9.-]+\.[a-zA-Z]{2,}$'
    return bool(re.match(pattern, email))

def validate_phone(phone, country='US'):
    """验证电话号码"""
    patterns = {
        'US': r'^\+?1?\d{10}$',
        'UK': r'^\+?44\d{10}$',
        'international': r'^\+?[1-9]\d{1,14}$'
    }

    phone_clean = re.sub(r'[^\d+]', '', phone)
    pattern = patterns.get(country, patterns['international'])

    return bool(re.match(pattern, phone_clean))

def validate_url(url):
    """验证URL格式"""
    pattern = r'^https?://[a-zA-Z0-9-._~:/?#\[\]@!$&\'()*+,;=]+$'
    return bool(re.match(pattern, url))

def validate_date(date_string, format='%Y-%m-%d'):
    """验证日期格式"""
    from datetime import datetime

    try:
        datetime.strptime(date_string, format)
        return True
    except:
        return False

def validate_credit_card(card_number):
    """使用Luhn算法验证信用卡号"""
    card_number = re.sub(r'[\s-]', '', card_number)

    if not card_number.isdigit():
        return False

    if len(card_number) < 13 or len(card_number) > 19:
        return False

    # Luhn算法
    def luhn_checksum(card_num):
        def digits_of(n):
            return [int(d) for d in str(n)]

        digits = digits_of(card_num)
        odd_digits = digits[-1::-2]
        even_digits = digits[-2::-2]

        checksum = sum(odd_digits)
        for d in even_digits:
            checksum += sum(digits_of(d * 2))

        return checksum % 10

    return luhn_checksum(card_number) == 0

def validate_formats_in_dataframe(df):
    """
    验证DataFrame中的常见格式
    """
    results = {}

    for col in df.columns:
        col_lower = col.lower()

        # 电子邮件验证
        if 'email' in col_lower:
            invalid = df[~df[col].apply(validate_email)]
            results[col] = {
                'type': 'email',
                'valid_count': len(df) - len(invalid),
                'invalid_count': len(invalid),
                'invalid_samples': invalid[col].head(5).tolist()
            }

        # 电话验证
        elif 'phone' in col_lower:
            invalid = df[~df[col].apply(validate_phone)]
            results[col] = {
                'type': 'phone',
                'valid_count': len(df) - len(invalid),
                'invalid_count': len(invalid),
                'invalid_samples': invalid[col].head(5).tolist()
            }

        # URL验证
        elif 'url' in col_lower or 'link' in col_lower:
            invalid = df[~df[col].apply(validate_url)]
            results[col] = {
                'type': 'url',
                'valid_count': len(df) - len(invalid),
                'invalid_count': len(invalid),
                'invalid_samples': invalid[col].head(5).tolist()
            }

    return results

验证报告生成

def generate_validation_report(df, schema=None, business_rules=None):
    """
    生成全面的验证报告
    """
    from datetime import datetime

    report = f"""# 数据验证报告
**生成时间:** {datetime.now().strftime('%Y-%m-%d %H:%M:%S')}
**数据集:** {len(df):,} 行 × {len(df.columns)} 列

---

## 摘要

"""

    # 完整性检查
    completeness = check_completeness(df)
    report += f"- **整体完整性:** {completeness['overall_completeness']}%
"

    # 重复检查
    duplicates = check_duplicates(df)
    report += f"- **重复行:** {duplicates['duplicate_count']:,} ({duplicates['duplicate_percent']}%)
"

    # 模式验证
    if schema:
        schema_result = validate_dataframe_schema(df, schema)
        status = "✅ 通过" if schema_result['valid'] else f"❌ 失败 ({len(schema_result['errors'])} 个错误)"
        report += f"- **模式验证:** {status}
"

    report += "
---

## 完整性分析

"

    report += "| 列 | 非空 | 空计数 | 完整性 |
"
    report += "|------|------|------|------|
"

    for col, stats in completeness['columns'].items():
        report += f"| {col} | {stats['non_null_count']:,} | {stats['null_count']:,} | {stats['completeness_percent']}% |
"

    # 模式验证详情
    if schema and not schema_result['valid']:
        report += "
---

## 模式验证错误

"

        for error in schema_result['errors']:
            report += f"### {error['type'].replace('_', ' ').title()}

"

            if error['type'] == 'wrong_type':
                report += f"- **列:** {error['column']}
"
                report += f"- **预期:** {error['expected']}
"
                report += f"- **实际:** {error['actual']}

"

            elif error['type'] in ['null_values', 'duplicate_values']:
                report += f"- **列:** {error['column']}
"
                report += f"- **计数:** {error['count']:,}

"

            elif error['type'] == 'pattern_mismatch':
                report += f"- **列:** {error['column']}
"
                report += f"- **模式:** `{error['pattern']}`
"
                report += f"- **无效计数:** {error['count']:,}

"

    # 格式验证
    format_results = validate_formats_in_dataframe(df)

    if format_results:
        report += "
---

## 格式验证

"

        for col, result in format_results.items():
            report += f"### {col} ({result['type']})

"
            report += f"- **有效:** {result['valid_count']:,}
"
            report += f"- **无效:** {result['invalid_count']:,}
"

            if result['invalid_samples']:
                report += f"
**无效样本:**
"
                for sample in result['invalid_samples']:
                    report += f"- `{sample}`
"

            report += "
"

    # 数据质量评分
    quality_score = calculate_quality_score(df, schema, duplicates, completeness)

    report += f"
---

## 数据质量评分

"
    report += f"### 总体评分: {quality_score['overall']}/100

"

    for dimension, score in quality_score['dimensions'].items():
        report += f"- **{dimension}:** {score}/100
"

    return report

def calculate_quality_score(df, schema, duplicates, completeness):
    """计算数据质量评分"""

    scores = {}

    # 完整性评分
    scores['完整性'] = completeness['overall_completeness']

    # 唯一性评分
    scores['唯一性'] = 100 - duplicates['duplicate_percent']

    # 有效性评分(如果提供模式)
    if schema:
        schema_result = validate_dataframe_schema(df, schema)
        error_rate = len(schema_result['errors']) / (len(df) * len(df.columns))
        scores['有效性'] = max(0, 100 - (error_rate * 100))
    else:
        scores['有效性'] = 100

    # 总体评分
    overall = sum(scores.values()) / len(scores)

    return {
        'overall': round(overall, 1),
        'dimensions': {k: round(v, 1) for k, v in scores.items()}
    }

最佳实践

  1. 定义清晰的验证规则 在实施之前
  2. 尽早验证 在数据管道中
  3. 提供详细的错误信息 用于调试
  4. 使用模式验证 用于API契约
  5. 实现业务规则验证 与模式分开
  6. 记录验证失败 用于监控
  7. 生成验证报告 用于审计
  8. 优雅处理验证错误
  9. 测试验证规则 使用边界情况
  10. 版本控制 验证模式和规则

常见验证模式

API请求验证

def validate_api_request(request_data, endpoint):
    """验证API请求数据"""

    schemas = {
        '/users': user_schema,
        '/orders': order_schema,
        '/products': product_schema
    }

    schema = schemas.get(endpoint)
    if not schema:
        return {'valid': False, 'error': '未知端点'}

    return validate_json_schema(request_data, schema)

批量数据验证

def validate_batch(records, validator):
    """验证记录批处理"""

    results = []

    for i, record in enumerate(records):
        result = validator.validate(record)
        result['record_index'] = i

        if not result['valid']:
            results.append(result)

    return {
        'total_records': len(records),
        'valid_records': len(records) - len(results),
        'invalid_records': len(results),
        'failures': results
    }

笔记

  • 始终在系统边界验证
  • 使用适当的验证级别(语法、语义、业务)
  • 缓存验证结果以提高性能
  • 为用户提供清晰的错误信息
  • 记录验证指标用于监控
  • 考虑大数据集的验证性能
  • 使用流式验证处理大数据
  • 保持验证规则可维护和可测试