数据验证Skill data-validation

数据验证技能用于确保输入、处理和输出数据符合预期格式、类型和约束,提高软件安全性、数据质量和系统可靠性。它涵盖模式验证(如JSON Schema、Zod、Pydantic)、输入清理、输出编码、类型强制等,适用于API验证、数据管道验证、机器学习特征验证等场景。关键词:数据验证、输入验证、输出编码、数据安全、模式验证、数据质量。

测试 0 次安装 0 次浏览 更新于 3/24/2026

name: data-validation description: 数据验证模式,包括模式验证、输入清理、输出编码和类型强制。在实现验证、模式、表单验证、API验证、JSON Schema、Zod、Pydantic、Joi、Yup、清理、XSS预防、注入预防、转义、编码、白名单、约束检查、不变验证、数据管道验证、ML特征验证或自定义验证器时使用。

数据验证

概述

数据验证确保在处理前,输入数据符合预期的格式、类型和约束。本技能涵盖模式验证库、输入清理、输出编码、类型强制策略、安全重点验证(XSS、注入预防)、数据管道验证和全面的错误处理。

触发关键词

在处理以下内容时使用此技能:

  • 模式验证:JSON Schema、Zod、Pydantic、Joi、Yup、Ajv、class-validator
  • 输入处理:验证、清理、输入验证、表单验证
  • 安全验证:XSS预防、注入预防、转义、编码、白名单、黑名单
  • 约束:约束检查、不变验证、业务规则、数据质量
  • API验证:请求验证、响应验证、API合约
  • 数据管道:Great Expectations、dbt测试、数据质量检查
  • ML/AI:特征验证、分布检查、数据漂移检测

代理分配

代理 责任
senior-software-engineer (Opus) 模式架构、验证策略设计、复杂验证模式
software-engineer (Sonnet) 实现验证逻辑、集成模式库、编写验证器
senior-software-engineer (Opus) XSS预防、注入预防、清理策略、编码
senior-software-engineer (Opus) 基础设施配置验证、管道验证、数据质量检查

关键概念

JSON Schema 验证

import Ajv, { JSONSchemaType, ValidateFunction } from "ajv";
import addFormats from "ajv-formats";

// 初始化 Ajv 并添加格式
const ajv = new Ajv({
  allErrors: true, // 返回所有错误,不只是第一个
  removeAdditional: true, // 移除不在模式中的属性
  useDefaults: true, // 应用默认值
  coerceTypes: true, // 尽可能强制类型
});
addFormats(ajv);

// 使用 TypeScript 类型定义模式
interface CreateUserRequest {
  email: string;
  password: string;
  name: string;
  age?: number;
  role: "user" | "admin" | "moderator";
  preferences?: {
    newsletter: boolean;
    theme: "light" | "dark";
  };
}

const createUserSchema: JSONSchemaType<CreateUserRequest> = {
  type: "object",
  properties: {
    email: { type: "string", format: "email", maxLength: 255 },
    password: {
      type: "string",
      minLength: 12,
      maxLength: 128,
      pattern:
        "^(?=.*[a-z])(?=.*[A-Z])(?=.*\\d)(?=.*[@$!%*?&])[A-Za-z\\d@$!%*?&]+$",
    },
    name: { type: "string", minLength: 1, maxLength: 100 },
    age: { type: "integer", minimum: 13, maximum: 150, nullable: true },
    role: { type: "string", enum: ["user", "admin", "moderator"] },
    preferences: {
      type: "object",
      properties: {
        newsletter: { type: "boolean", default: false },
        theme: { type: "string", enum: ["light", "dark"], default: "light" },
      },
      required: ["newsletter", "theme"],
      additionalProperties: false,
      nullable: true,
    },
  },
  required: ["email", "password", "name", "role"],
  additionalProperties: false,
};

// 编译并缓存验证器
const validateCreateUser = ajv.compile(createUserSchema);

// 使用错误格式化
function validate<T>(
  validator: ValidateFunction<T>,
  data: unknown,
): { success: true; data: T } | { success: false; errors: ValidationError[] } {
  if (validator(data)) {
    return { success: true, data };
  }

  const errors: ValidationError[] = (validator.errors || []).map((err) => ({
    field:
      err.instancePath.replace(/^\//, "").replace(/\//g, ".") ||
      err.params.missingProperty,
    message: formatAjvError(err),
    code: err.keyword,
  }));

  return { success: false, errors };
}

function formatAjvError(error: Ajv.ErrorObject): string {
  switch (error.keyword) {
    case "required":
      return `${error.params.missingProperty} 是必需的`;
    case "minLength":
      return `必须至少 ${error.params.limit} 个字符`;
    case "maxLength":
      return `必须最多 ${error.params.limit} 个字符`;
    case "format":
      return `无效的 ${error.params.format} 格式`;
    case "enum":
      return `必须是其中之一: ${error.params.allowedValues.join(", ")}`;
    case "pattern":
      return "无效格式";
    case "minimum":
      return `必须至少 ${error.params.limit}`;
    case "maximum":
      return `必须最多 ${error.params.limit}`;
    default:
      return error.message || "无效值";
  }
}

Zod 验证 (TypeScript)

import { z, ZodError, ZodSchema } from "zod";

// 基础模式
const emailSchema = z.string().email().max(255);
const passwordSchema = z
  .string()
  .min(12, "密码必须至少 12 个字符")
  .max(128)
  .regex(/[a-z]/, "密码必须包含小写字母")
  .regex(/[A-Z]/, "密码必须包含大写字母")
  .regex(/[0-9]/, "密码必须包含数字")
  .regex(/[^a-zA-Z0-9]/, "密码必须包含特殊字符");

// 包含转换和精化的复杂模式
const createUserSchema = z
  .object({
    email: emailSchema.transform((e) => e.toLowerCase().trim()),
    password: passwordSchema,
    confirmPassword: z.string(),
    name: z
      .string()
      .min(1)
      .max(100)
      .transform((n) => n.trim()),
    age: z.number().int().min(13).max(150).optional(),
    role: z.enum(["user", "admin", "moderator"]).default("user"),
    tags: z.array(z.string().max(50)).max(10).default([]),
    metadata: z.record(z.string(), z.unknown()).optional(),
    preferences: z
      .object({
        newsletter: z.boolean().default(false),
        theme: z.enum(["light", "dark"]).default("light"),
        notifications: z
          .object({
            email: z.boolean().default(true),
            push: z.boolean().default(false),
            sms: z.boolean().default(false),
          })
          .default({}),
      })
      .default({}),
  })
  .refine((data) => data.password === data.confirmPassword, {
    message: "密码不匹配",
    path: ["confirmPassword"],
  })
  .transform(({ confirmPassword, ...data }) => data); // 移除 confirmPassword

// 从模式推断 TypeScript 类型
type CreateUserInput = z.input<typeof createUserSchema>;
type CreateUserOutput = z.output<typeof createUserSchema>;

// 带格式化错误的验证助手
interface ValidationResult<T> {
  success: boolean;
  data?: T;
  errors?: Array<{
    field: string;
    message: string;
  }>;
}

function validateWithZod<T>(
  schema: ZodSchema<T>,
  data: unknown,
): ValidationResult<T> {
  const result = schema.safeParse(data);

  if (result.success) {
    return { success: true, data: result.data };
  }

  const errors = result.error.errors.map((err) => ({
    field: err.path.join("."),
    message: err.message,
  }));

  return { success: false, errors };
}

// 自定义精化
const uniqueEmailSchema = emailSchema.refine(
  async (email) => {
    const exists = await db.users.findByEmail(email);
    return !exists;
  },
  { message: "邮箱已注册" },
);

// 条件验证
const formSchema = z.discriminatedUnion("type", [
  z.object({
    type: z.literal("individual"),
    firstName: z.string().min(1),
    lastName: z.string().min(1),
    ssn: z.string().regex(/^\d{3}-\d{2}-\d{4}$/),
  }),
  z.object({
    type: z.literal("business"),
    companyName: z.string().min(1),
    ein: z.string().regex(/^\d{2}-\d{7}$/),
  }),
]);

// 递归模式
interface Category {
  name: string;
  children?: Category[];
}

const categorySchema: z.ZodType<Category> = z.lazy(() =>
  z.object({
    name: z.string().min(1),
    children: z.array(categorySchema).optional(),
  }),
);

Pydantic 验证 (Python)

from datetime import datetime
from typing import Optional, List, Literal
from pydantic import (
    BaseModel,
    Field,
    EmailStr,
    validator,
    root_validator,
    constr,
    conint,
)
import re

# 带字段验证的基础模型
class CreateUserRequest(BaseModel):
    email: EmailStr
    password: constr(min_length=12, max_length=128)
    name: constr(min_length=1, max_length=100)
    age: Optional[conint(ge=13, le=150)] = None
    role: Literal['user', 'admin', 'moderator'] = 'user'
    tags: List[str] = Field(default_factory=list, max_items=10)

    class Config:
        # 从字符串中去除空白字符
        anystr_strip_whitespace = True
        # 在赋值时验证
        validate_assignment = True
        # 使用枚举值
        use_enum_values = True

    @validator('email')
    def email_lowercase(cls, v):
        return v.lower()

    @validator('password')
    def password_strength(cls, v):
        if not re.search(r'[a-z]', v):
            raise ValueError('密码必须包含小写字母')
        if not re.search(r'[A-Z]', v):
            raise ValueError('密码必须包含大写字母')
        if not re.search(r'\d', v):
            raise ValueError('密码必须包含数字')
        if not re.search(r'[^a-zA-Z0-9]', v):
            raise ValueError('密码必须包含特殊字符')
        return v

    @validator('tags', each_item=True)
    def validate_tag(cls, v):
        if len(v) > 50:
            raise ValueError('标签最多 50 个字符')
        return v.strip().lower()

# 嵌套模型
class Address(BaseModel):
    street: str
    city: str
    state: constr(min_length=2, max_length=2)
    zip_code: constr(regex=r'^\d{5}(-\d{4})?$')
    country: str = 'US'

class UserProfile(BaseModel):
    user: CreateUserRequest
    addresses: List[Address] = Field(default_factory=list, max_items=5)
    primary_address_index: int = 0

    @root_validator
    def validate_primary_address(cls, values):
        addresses = values.get('addresses', [])
        primary_index = values.get('primary_address_index', 0)

        if addresses and primary_index >= len(addresses):
            raise ValueError('主地址索引超出范围')

        return values

# 通用响应模型
from typing import TypeVar, Generic

T = TypeVar('T')

class ApiResponse(BaseModel, Generic[T]):
    success: bool
    data: Optional[T] = None
    errors: Optional[List[dict]] = None
    timestamp: datetime = Field(default_factory=datetime.utcnow)

# 带数据库查找的自定义验证器
from pydantic import validator
import asyncio

class UniqueEmailModel(BaseModel):
    email: EmailStr

    @validator('email')
    def email_must_be_unique(cls, v):
        # 注意:这是同步的;对于异步,使用 root_validator
        from app.db import user_exists_sync
        if user_exists_sync(v):
            raise ValueError('邮箱已注册')
        return v

# 验证错误处理
from pydantic import ValidationError
from fastapi import HTTPException

def validate_request(model_class, data: dict):
    try:
        return model_class(**data)
    except ValidationError as e:
        errors = []
        for error in e.errors():
            errors.append({
                'field': '.'.join(str(loc) for loc in error['loc']),
                'message': error['msg'],
                'type': error['type'],
            })
        raise HTTPException(status_code=422, detail={'errors': errors})

输入清理

import DOMPurify from "dompurify";
import { JSDOM } from "jsdom";
import validator from "validator";

// 服务器端 DOMPurify 设置
const window = new JSDOM("").window;
const purify = DOMPurify(window);

// HTML 清理
function sanitizeHtml(dirty: string, options?: DOMPurify.Config): string {
  const defaultOptions: DOMPurify.Config = {
    ALLOWED_TAGS: ["b", "i", "em", "strong", "a", "p", "br", "ul", "ol", "li"],
    ALLOWED_ATTR: ["href", "target", "rel"],
    ALLOW_DATA_ATTR: false,
    ADD_ATTR: ["target"], // 添加 target="_blank" 到链接
    FORBID_TAGS: ["script", "style", "iframe", "form", "input"],
    FORBID_ATTR: ["onerror", "onclick", "onload"],
  };

  return purify.sanitize(dirty, { ...defaultOptions, ...options });
}

// 富文本清理(更宽松)
function sanitizeRichText(dirty: string): string {
  return purify.sanitize(dirty, {
    ALLOWED_TAGS: [
      "h1",
      "h2",
      "h3",
      "h4",
      "h5",
      "h6",
      "p",
      "br",
      "hr",
      "b",
      "i",
      "em",
      "strong",
      "u",
      "s",
      "strike",
      "ul",
      "ol",
      "li",
      "a",
      "img",
      "blockquote",
      "pre",
      "code",
      "table",
      "thead",
      "tbody",
      "tr",
      "th",
      "td",
    ],
    ALLOWED_ATTR: ["href", "src", "alt", "title", "class", "id"],
    ALLOW_DATA_ATTR: false,
  });
}

// SQL 安全字符串(可能时使用参数化查询)
function sanitizeForSql(input: string): string {
  return input
    .replace(/'/g, "''")
    .replace(/\\/g, "\\\\")
    .replace(/\x00/g, "\\0")
    .replace(/
/g, "\
")
    .replace(/\r/g, "\\r")
    .replace(/\x1a/g, "\\Z");
}

// 文件名清理
function sanitizeFilename(filename: string): string {
  return filename
    .replace(/[^a-zA-Z0-9._-]/g, "_") // 替换特殊字符
    .replace(/\.{2,}/g, ".") // 移除连续点
    .replace(/^\.+|\.+$/g, "") // 移除前导/尾随点
    .substring(0, 255); // 限制长度
}

// 路径遍历预防
function sanitizePath(userPath: string, basePath: string): string {
  const path = require("path");
  const resolvedPath = path.resolve(basePath, userPath);

  if (!resolvedPath.startsWith(path.resolve(basePath))) {
    throw new Error("检测到路径遍历");
  }

  return resolvedPath;

// 综合输入清理器
interface SanitizationOptions {
  trim?: boolean;
  lowercase?: boolean;
  stripHtml?: boolean;
  maxLength?: number;
  allowedChars?: RegExp;
}

function sanitizeString(
  input: string,
  options: SanitizationOptions = {},
): string {
  let result = input;

  if (options.trim !== false) {
    result = result.trim();
  }

  if (options.stripHtml) {
    result = validator.stripLow(validator.escape(result));
  }

  if (options.lowercase) {
    result = result.toLowerCase();
  }

  if (options.allowedChars) {
    result = result.replace(
      new RegExp(`[^${options.allowedChars.source}]`, "g"),
      "",
    );
  }

  if (options.maxLength) {
    result = result.substring(0, options.maxLength);
  }

  // 移除空字节
  result = result.replace(/\x00/g, "");

  return result;
}

// 常见清理预设
const sanitizers = {
  username: (input: string) =>
    sanitizeString(input, {
      lowercase: true,
      maxLength: 30,
      allowedChars: /[a-z0-9_-]/,
    }),

  email: (input: string) => validator.normalizeEmail(input) || "",

  phone: (input: string) => input.replace(/[^0-9+()-\s]/g, "").substring(0, 20),

  slug: (input: string) =>
    sanitizeString(input, {
      lowercase: true,
      maxLength: 100,
    })
      .replace(/\s+/g, "-")
      .replace(/[^a-z0-9-]/g, ""),

  searchQuery: (input: string) =>
    sanitizeString(input, {
      trim: true,
      maxLength: 200,
      stripHtml: true,
    }),
};

输出编码

// HTML 编码
function encodeHtml(str: string): string {
  const entities: Record<string, string> = {
    "&": "&amp;",
    "<": "&lt;",
    ">": "&gt;",
    '"': "&quot;",
    "'": "&#x27;",
    "/": "&#x2F;",
    "`": "&#x60;",
    "=": "&#x3D;",
  };

  return str.replace(/[&<>"'`=/]/g, (char) => entities[char]);
}

// JavaScript 字符串编码(用于嵌入 <script> 标签)
function encodeJsString(str: string): string {
  return str
    .replace(/\\/g, "\\\\")
    .replace(/'/g, "\\'")
    .replace(/"/g, '\\"')
    .replace(/
/g, "\
")
    .replace(/\r/g, "\\r")
    .replace(/\t/g, "\\t")
    .replace(/</g, "\\x3c")
    .replace(/>/g, "\\x3e")
    .replace(/&/g, "\\x26");
}

// URL 编码
function encodeUrlParam(str: string): string {
  return encodeURIComponent(str);
}

// CSS 编码
function encodeCss(str: string): string {
  return str.replace(/[^a-zA-Z0-9]/g, (char) => {
    const hex = char.charCodeAt(0).toString(16);
    return `\\${hex} `;
  });
}

// JSON 编码(安全用于嵌入 HTML)
function encodeJsonForHtml(obj: unknown): string {
  return JSON.stringify(obj)
    .replace(/</g, "\\u003c")
    .replace(/>/g, "\\u003e")
    .replace(/&/g, "\\u0026")
    .replace(/'/g, "\\u0027");
}

// 上下文感知输出编码
type OutputContext = "html" | "htmlAttribute" | "javascript" | "url" | "css";

function encode(str: string, context: OutputContext): string {
  switch (context) {
    case "html":
      return encodeHtml(str);
    case "htmlAttribute":
      return encodeHtml(str).replace(/"/g, "&quot;");
    case "javascript":
      return encodeJsString(str);
    case "url":
      return encodeUrlParam(str);
    case "css":
      return encodeCss(str);
    default:
      return encodeHtml(str);
  }
}

// React 风格转义(用于 JSX)
function escapeForReact(str: string): string {
  // React 已经转义,但对于 dangerouslySetInnerHTML:
  return encodeHtml(str);
}

// 模板字面量标签用于安全 HTML
function safeHtml(strings: TemplateStringsArray, ...values: unknown[]): string {
  return strings.reduce((result, str, i) => {
    const value = values[i - 1];
    const encoded =
      typeof value === "string" ? encodeHtml(value) : String(value ?? "");
    return result + encoded + str;
  });
}

// 使用
const userInput = '<script>alert("xss")</script>';
const safe = safeHtml`<div class="user-content">${userInput}</div>`;
// 结果: <div class="user-content">&lt;script&gt;alert(&quot;xss&quot;)&lt;/script&gt;</div>

API 请求/响应验证

// Express 中间件用于请求验证
import { Request, Response, NextFunction } from "express";
import { z, ZodSchema } from "zod";

function validate<T>(
  schema: ZodSchema<T>,
  source: "body" | "query" | "params" = "body",
) {
  return (req: Request, res: Response, next: NextFunction) => {
    const result = schema.safeParse(req[source]);

    if (!result.success) {
      return res.status(422).json({
        error: "验证错误",
        details: result.error.errors.map((e) => ({
          field: e.path.join("."),
          message: e.message,
        })),
      });
    }

    req[source] = result.data;
    next();
  };
}

// 使用
const createUserSchema = z.object({
  email: z.string().email(),
  password: z.string().min(12),
  name: z.string().min(1).max(100),
});

app.post("/users", validate(createUserSchema), async (req, res) => {
  // req.body 现在已类型化和验证
  const user = await createUser(req.body);
  res.status(201).json(user);
});

// 响应验证
const userResponseSchema = z.object({
  id: z.string().uuid(),
  email: z.string().email(),
  name: z.string(),
  createdAt: z.string().datetime(),
});

function validateResponse<T>(schema: ZodSchema<T>, data: unknown): T {
  const result = schema.safeParse(data);
  if (!result.success) {
    throw new Error("无效的响应格式");
  }
  return result.data;
}

数据管道验证 (Great Expectations)

# Great Expectations 用于数据质量验证
import great_expectations as ge
from great_expectations.dataset import PandasDataset

# 加载数据集并设置期望
df = ge.read_csv('data.csv')

# 基础期望
df.expect_column_to_exist('user_id')
df.expect_column_values_to_not_be_null('email')
df.expect_column_values_to_be_unique('email')
df.expect_column_values_to_match_regex('email', r'^[^@]+@[^@]+\.[^@]+$')
df.expect_column_values_to_be_in_set('status', ['active', 'inactive', 'pending'])

# 数值期望
df.expect_column_values_to_be_between('age', 0, 150)
df.expect_column_mean_to_be_between('price', 10, 1000)

# 日期期望
df.expect_column_values_to_be_dateutil_parseable('created_at')

# 自定义期望
def custom_validation(df):
    # 邮箱域名必须匹配公司域名
    emails = df['email'].str.split('@', expand=True)[1]
    return (emails == df['company_domain']).all()

df.expect_column_pair_values_to_be_equal('email_domain', 'company_domain',
                                          custom_fn=custom_validation)

# 运行验证套件
results = df.validate()
if not results['success']:
    for result in results['results']:
        if not result['success']:
            print(f"验证失败: {result['expectation_config']}")

# dbt 测试用于 SQL 数据验证
# models/schema.yml
version: 2

models:
  - name: users
    columns:
      - name: user_id
        tests:
          - unique
          - not_null
      - name: email
        tests:
          - unique
          - not_null
          - email_format  # 自定义测试
      - name: age
        tests:
          - dbt_utils.accepted_range:
              min_value: 0
              max_value: 150
      - name: status
        tests:
          - accepted_values:
              values: ['active', 'inactive', 'pending']
      - name: created_at
        tests:
          - not_null
          - dbt_utils.recency:
              datepart: day
              field: created_at
              interval: 7

ML 特征验证

# ML 管道的特征验证
import numpy as np
import pandas as pd
from typing import Dict, List, Tuple

class FeatureValidator:
    def __init__(self, expected_schema: Dict[str, str]):
        self.expected_schema = expected_schema
        self.baseline_stats = {}

    def validate_schema(self, df: pd.DataFrame) -> List[str]:
        errors = []

        # 检查列存在
        expected_cols = set(self.expected_schema.keys())
        actual_cols = set(df.columns)

        missing = expected_cols - actual_cols
        if missing:
            errors.append(f"缺失列: {missing}")

        extra = actual_cols - expected_cols
        if extra:
            errors.append(f"意外列: {extra}")

        # 检查数据类型
        for col, expected_type in self.expected_schema.items():
            if col in df.columns:
                actual_type = str(df[col].dtype)
                if not actual_type.startswith(expected_type):
                    errors.append(f"列 {col}: 期望 {expected_type}, 得到 {actual_type}")

        return errors

    def validate_distributions(self, df: pd.DataFrame,
                               threshold: float = 3.0) -> List[str]:
        errors = []

        for col in df.select_dtypes(include=[np.number]).columns:
            if col not in self.baseline_stats:
                continue

            baseline_mean = self.baseline_stats[col]['mean']
            baseline_std = self.baseline_stats[col]['std']

            current_mean = df[col].mean()
            current_std = df[col].std()

            # 使用 z 分数检查分布漂移
            mean_zscore = abs((current_mean - baseline_mean) / baseline_std)
            if mean_zscore > threshold:
                errors.append(f"列 {col}: 检测到均值漂移 (z 分数: {mean_zscore:.2f})")

            # 检查方差变化
            variance_ratio = current_std / baseline_std
            if variance_ratio < 0.5 or variance_ratio > 2.0:
                errors.append(f"列 {col}: 检测到方差变化 (比率: {variance_ratio:.2f})")

        return errors

    def validate_null_rates(self, df: pd.DataFrame,
                            max_null_rate: float = 0.05) -> List[str]:
        errors = []
        null_rates = df.isnull().sum() / len(df)

        for col, rate in null_rates.items():
            if rate > max_null_rate:
                errors.append(f"列 {col}: 空值率 {rate:.2%} 超过阈值 {max_null_rate:.2%}")

        return errors

    def validate_categorical_values(self, df: pd.DataFrame,
                                     expected_categories: Dict[str, List]) -> List[str]:
        errors = []

        for col, expected in expected_categories.items():
            if col not in df.columns:
                continue

            actual = set(df[col].dropna().unique())
            expected_set = set(expected)

            unexpected = actual - expected_set
            if unexpected:
                errors.append(f"列 {col}: 意外类别 {unexpected}")

        return errors

    def set_baseline(self, df: pd.DataFrame):
        for col in df.select_dtypes(include=[np.number]).columns:
            self.baseline_stats[col] = {
                'mean': df[col].mean(),
                'std': df[col].std(),
                'min': df[col].min(),
                'max': df[col].max(),
            }

# 使用
validator = FeatureValidator({
    'user_id': 'int',
    'age': 'float',
    'income': 'float',
    'category': 'object',
})

# 从训练数据设置基线
validator.set_baseline(training_df)

# 验证新数据
errors = []
errors.extend(validator.validate_schema(new_df))
errors.extend(validator.validate_distributions(new_df))
errors.extend(validator.validate_null_rates(new_df))
errors.extend(validator.validate_categorical_values(new_df, {
    'category': ['A', 'B', 'C']
}))

if errors:
    raise ValueError(f"特征验证失败:
" + "
".join(errors))

基础设施配置验证

# Kubernetes 配置验证的 JSON Schema
apiVersion: v1
kind: ConfigMap
metadata:
  name: validation-schema
data:
  deployment-schema.json: |
    {
      "$schema": "http://json-schema.org/draft-07/schema#",
      "type": "object",
      "required": ["apiVersion", "kind", "metadata", "spec"],
      "properties": {
        "apiVersion": {
          "type": "string",
          "pattern": "^apps/v1$"
        },
        "kind": {
          "type": "string",
          "enum": ["Deployment"]
        },
        "spec": {
          "type": "object",
          "required": ["replicas", "selector", "template"],
          "properties": {
            "replicas": {
              "type": "integer",
              "minimum": 1,
              "maximum": 100
            },
            "selector": {
              "type": "object",
              "required": ["matchLabels"]
            },
            "template": {
              "type": "object",
              "required": ["metadata", "spec"],
              "properties": {
                "spec": {
                  "type": "object",
                  "required": ["containers"],
                  "properties": {
                    "containers": {
                      "type": "array",
                      "minItems": 1,
                      "items": {
                        "type": "object",
                        "required": ["name", "image"],
                        "properties": {
                          "resources": {
                            "type": "object",
                            "required": ["requests", "limits"]
                          }
                        }
                      }
                    }
                  }
                }
              }
            }
          }
        }
      }
    }
# Terraform 配置验证
import hcl2
import json
from jsonschema import validate, ValidationError

def validate_terraform_config(config_path: str, schema_path: str):
    # 解析 HCL
    with open(config_path, 'r') as f:
        config = hcl2.load(f)

    # 加载模式
    with open(schema_path, 'r') as f:
        schema = json.load(f)

    # 验证
    try:
        validate(instance=config, schema=schema)
        print("Terraform 配置有效")
    except ValidationError as e:
        print(f"验证错误: {e.message}")
        print(f"路径: {' -> '.join(str(p) for p in e.path)}")
        raise

# 自定义业务规则验证
def validate_aws_resource_tags(config: dict) -> List[str]:
    errors = []
    required_tags = {'Environment', 'Owner', 'CostCenter'}

    for resource in config.get('resource', {}).values():
        for resource_name, resource_config in resource.items():
            tags = set(resource_config.get('tags', {}).keys())
            missing = required_tags - tags

            if missing:
                errors.append(f"资源 {resource_name} 缺失标签: {missing}")

    return errors

最佳实践

  1. 尽早验证

    • 在边界验证(API 端点、表单提交、管道摄取)
    • 快速失败并提供清晰错误消息
    • 不要信任任何外部输入
  2. 使用模式验证库

    • 优先使用 Zod/Pydantic 以获得类型安全
    • JSON Schema 用于语言无关验证
    • 从模式生成 TypeScript 类型
  3. 清理和编码

    • 根据上下文清理输入(HTML、SQL、路径)
    • 根据渲染位置编码输出
    • 使用参数化查询代替 SQL 转义
  4. 安全第一验证

    • 白名单允许值而非黑名单
    • 使用输出编码预防 XSS
    • 使用参数化查询和清理预防注入
    • 验证文件上传(类型、大小、内容)
  5. 数据管道验证

    • 处理前验证模式
    • 检查数据分布漂移
    • 监控空值率和基数
    • 使用 Great Expectations 进行全面的数据质量检查
  6. ML 特征验证

    • 验证模式匹配训练数据
    • 检测分布漂移
    • 检查意外类别
    • 监控特征相关性
  7. 错误消息

    • 提供具体、可操作的错误消息
    • 在错误中包含字段名
    • 在生产中不暴露内部细节
  8. 深度防御

    • 在客户端和服务器端验证
    • 应用最小权限原则
    • 在多个层次验证(API、服务、数据库)