name: data-validation description: 数据验证模式,包括模式验证、输入清理、输出编码和类型强制。在实现验证、模式、表单验证、API验证、JSON Schema、Zod、Pydantic、Joi、Yup、清理、XSS预防、注入预防、转义、编码、白名单、约束检查、不变验证、数据管道验证、ML特征验证或自定义验证器时使用。
数据验证
概述
数据验证确保在处理前,输入数据符合预期的格式、类型和约束。本技能涵盖模式验证库、输入清理、输出编码、类型强制策略、安全重点验证(XSS、注入预防)、数据管道验证和全面的错误处理。
触发关键词
在处理以下内容时使用此技能:
- 模式验证:JSON Schema、Zod、Pydantic、Joi、Yup、Ajv、class-validator
- 输入处理:验证、清理、输入验证、表单验证
- 安全验证:XSS预防、注入预防、转义、编码、白名单、黑名单
- 约束:约束检查、不变验证、业务规则、数据质量
- API验证:请求验证、响应验证、API合约
- 数据管道:Great Expectations、dbt测试、数据质量检查
- ML/AI:特征验证、分布检查、数据漂移检测
代理分配
| 代理 | 责任 |
|---|---|
| senior-software-engineer (Opus) | 模式架构、验证策略设计、复杂验证模式 |
| software-engineer (Sonnet) | 实现验证逻辑、集成模式库、编写验证器 |
| senior-software-engineer (Opus) | XSS预防、注入预防、清理策略、编码 |
| senior-software-engineer (Opus) | 基础设施配置验证、管道验证、数据质量检查 |
关键概念
JSON Schema 验证
import Ajv, { JSONSchemaType, ValidateFunction } from "ajv";
import addFormats from "ajv-formats";
// 初始化 Ajv 并添加格式
const ajv = new Ajv({
allErrors: true, // 返回所有错误,不只是第一个
removeAdditional: true, // 移除不在模式中的属性
useDefaults: true, // 应用默认值
coerceTypes: true, // 尽可能强制类型
});
addFormats(ajv);
// 使用 TypeScript 类型定义模式
interface CreateUserRequest {
email: string;
password: string;
name: string;
age?: number;
role: "user" | "admin" | "moderator";
preferences?: {
newsletter: boolean;
theme: "light" | "dark";
};
}
const createUserSchema: JSONSchemaType<CreateUserRequest> = {
type: "object",
properties: {
email: { type: "string", format: "email", maxLength: 255 },
password: {
type: "string",
minLength: 12,
maxLength: 128,
pattern:
"^(?=.*[a-z])(?=.*[A-Z])(?=.*\\d)(?=.*[@$!%*?&])[A-Za-z\\d@$!%*?&]+$",
},
name: { type: "string", minLength: 1, maxLength: 100 },
age: { type: "integer", minimum: 13, maximum: 150, nullable: true },
role: { type: "string", enum: ["user", "admin", "moderator"] },
preferences: {
type: "object",
properties: {
newsletter: { type: "boolean", default: false },
theme: { type: "string", enum: ["light", "dark"], default: "light" },
},
required: ["newsletter", "theme"],
additionalProperties: false,
nullable: true,
},
},
required: ["email", "password", "name", "role"],
additionalProperties: false,
};
// 编译并缓存验证器
const validateCreateUser = ajv.compile(createUserSchema);
// 使用错误格式化
function validate<T>(
validator: ValidateFunction<T>,
data: unknown,
): { success: true; data: T } | { success: false; errors: ValidationError[] } {
if (validator(data)) {
return { success: true, data };
}
const errors: ValidationError[] = (validator.errors || []).map((err) => ({
field:
err.instancePath.replace(/^\//, "").replace(/\//g, ".") ||
err.params.missingProperty,
message: formatAjvError(err),
code: err.keyword,
}));
return { success: false, errors };
}
function formatAjvError(error: Ajv.ErrorObject): string {
switch (error.keyword) {
case "required":
return `${error.params.missingProperty} 是必需的`;
case "minLength":
return `必须至少 ${error.params.limit} 个字符`;
case "maxLength":
return `必须最多 ${error.params.limit} 个字符`;
case "format":
return `无效的 ${error.params.format} 格式`;
case "enum":
return `必须是其中之一: ${error.params.allowedValues.join(", ")}`;
case "pattern":
return "无效格式";
case "minimum":
return `必须至少 ${error.params.limit}`;
case "maximum":
return `必须最多 ${error.params.limit}`;
default:
return error.message || "无效值";
}
}
Zod 验证 (TypeScript)
import { z, ZodError, ZodSchema } from "zod";
// 基础模式
const emailSchema = z.string().email().max(255);
const passwordSchema = z
.string()
.min(12, "密码必须至少 12 个字符")
.max(128)
.regex(/[a-z]/, "密码必须包含小写字母")
.regex(/[A-Z]/, "密码必须包含大写字母")
.regex(/[0-9]/, "密码必须包含数字")
.regex(/[^a-zA-Z0-9]/, "密码必须包含特殊字符");
// 包含转换和精化的复杂模式
const createUserSchema = z
.object({
email: emailSchema.transform((e) => e.toLowerCase().trim()),
password: passwordSchema,
confirmPassword: z.string(),
name: z
.string()
.min(1)
.max(100)
.transform((n) => n.trim()),
age: z.number().int().min(13).max(150).optional(),
role: z.enum(["user", "admin", "moderator"]).default("user"),
tags: z.array(z.string().max(50)).max(10).default([]),
metadata: z.record(z.string(), z.unknown()).optional(),
preferences: z
.object({
newsletter: z.boolean().default(false),
theme: z.enum(["light", "dark"]).default("light"),
notifications: z
.object({
email: z.boolean().default(true),
push: z.boolean().default(false),
sms: z.boolean().default(false),
})
.default({}),
})
.default({}),
})
.refine((data) => data.password === data.confirmPassword, {
message: "密码不匹配",
path: ["confirmPassword"],
})
.transform(({ confirmPassword, ...data }) => data); // 移除 confirmPassword
// 从模式推断 TypeScript 类型
type CreateUserInput = z.input<typeof createUserSchema>;
type CreateUserOutput = z.output<typeof createUserSchema>;
// 带格式化错误的验证助手
interface ValidationResult<T> {
success: boolean;
data?: T;
errors?: Array<{
field: string;
message: string;
}>;
}
function validateWithZod<T>(
schema: ZodSchema<T>,
data: unknown,
): ValidationResult<T> {
const result = schema.safeParse(data);
if (result.success) {
return { success: true, data: result.data };
}
const errors = result.error.errors.map((err) => ({
field: err.path.join("."),
message: err.message,
}));
return { success: false, errors };
}
// 自定义精化
const uniqueEmailSchema = emailSchema.refine(
async (email) => {
const exists = await db.users.findByEmail(email);
return !exists;
},
{ message: "邮箱已注册" },
);
// 条件验证
const formSchema = z.discriminatedUnion("type", [
z.object({
type: z.literal("individual"),
firstName: z.string().min(1),
lastName: z.string().min(1),
ssn: z.string().regex(/^\d{3}-\d{2}-\d{4}$/),
}),
z.object({
type: z.literal("business"),
companyName: z.string().min(1),
ein: z.string().regex(/^\d{2}-\d{7}$/),
}),
]);
// 递归模式
interface Category {
name: string;
children?: Category[];
}
const categorySchema: z.ZodType<Category> = z.lazy(() =>
z.object({
name: z.string().min(1),
children: z.array(categorySchema).optional(),
}),
);
Pydantic 验证 (Python)
from datetime import datetime
from typing import Optional, List, Literal
from pydantic import (
BaseModel,
Field,
EmailStr,
validator,
root_validator,
constr,
conint,
)
import re
# 带字段验证的基础模型
class CreateUserRequest(BaseModel):
email: EmailStr
password: constr(min_length=12, max_length=128)
name: constr(min_length=1, max_length=100)
age: Optional[conint(ge=13, le=150)] = None
role: Literal['user', 'admin', 'moderator'] = 'user'
tags: List[str] = Field(default_factory=list, max_items=10)
class Config:
# 从字符串中去除空白字符
anystr_strip_whitespace = True
# 在赋值时验证
validate_assignment = True
# 使用枚举值
use_enum_values = True
@validator('email')
def email_lowercase(cls, v):
return v.lower()
@validator('password')
def password_strength(cls, v):
if not re.search(r'[a-z]', v):
raise ValueError('密码必须包含小写字母')
if not re.search(r'[A-Z]', v):
raise ValueError('密码必须包含大写字母')
if not re.search(r'\d', v):
raise ValueError('密码必须包含数字')
if not re.search(r'[^a-zA-Z0-9]', v):
raise ValueError('密码必须包含特殊字符')
return v
@validator('tags', each_item=True)
def validate_tag(cls, v):
if len(v) > 50:
raise ValueError('标签最多 50 个字符')
return v.strip().lower()
# 嵌套模型
class Address(BaseModel):
street: str
city: str
state: constr(min_length=2, max_length=2)
zip_code: constr(regex=r'^\d{5}(-\d{4})?$')
country: str = 'US'
class UserProfile(BaseModel):
user: CreateUserRequest
addresses: List[Address] = Field(default_factory=list, max_items=5)
primary_address_index: int = 0
@root_validator
def validate_primary_address(cls, values):
addresses = values.get('addresses', [])
primary_index = values.get('primary_address_index', 0)
if addresses and primary_index >= len(addresses):
raise ValueError('主地址索引超出范围')
return values
# 通用响应模型
from typing import TypeVar, Generic
T = TypeVar('T')
class ApiResponse(BaseModel, Generic[T]):
success: bool
data: Optional[T] = None
errors: Optional[List[dict]] = None
timestamp: datetime = Field(default_factory=datetime.utcnow)
# 带数据库查找的自定义验证器
from pydantic import validator
import asyncio
class UniqueEmailModel(BaseModel):
email: EmailStr
@validator('email')
def email_must_be_unique(cls, v):
# 注意:这是同步的;对于异步,使用 root_validator
from app.db import user_exists_sync
if user_exists_sync(v):
raise ValueError('邮箱已注册')
return v
# 验证错误处理
from pydantic import ValidationError
from fastapi import HTTPException
def validate_request(model_class, data: dict):
try:
return model_class(**data)
except ValidationError as e:
errors = []
for error in e.errors():
errors.append({
'field': '.'.join(str(loc) for loc in error['loc']),
'message': error['msg'],
'type': error['type'],
})
raise HTTPException(status_code=422, detail={'errors': errors})
输入清理
import DOMPurify from "dompurify";
import { JSDOM } from "jsdom";
import validator from "validator";
// 服务器端 DOMPurify 设置
const window = new JSDOM("").window;
const purify = DOMPurify(window);
// HTML 清理
function sanitizeHtml(dirty: string, options?: DOMPurify.Config): string {
const defaultOptions: DOMPurify.Config = {
ALLOWED_TAGS: ["b", "i", "em", "strong", "a", "p", "br", "ul", "ol", "li"],
ALLOWED_ATTR: ["href", "target", "rel"],
ALLOW_DATA_ATTR: false,
ADD_ATTR: ["target"], // 添加 target="_blank" 到链接
FORBID_TAGS: ["script", "style", "iframe", "form", "input"],
FORBID_ATTR: ["onerror", "onclick", "onload"],
};
return purify.sanitize(dirty, { ...defaultOptions, ...options });
}
// 富文本清理(更宽松)
function sanitizeRichText(dirty: string): string {
return purify.sanitize(dirty, {
ALLOWED_TAGS: [
"h1",
"h2",
"h3",
"h4",
"h5",
"h6",
"p",
"br",
"hr",
"b",
"i",
"em",
"strong",
"u",
"s",
"strike",
"ul",
"ol",
"li",
"a",
"img",
"blockquote",
"pre",
"code",
"table",
"thead",
"tbody",
"tr",
"th",
"td",
],
ALLOWED_ATTR: ["href", "src", "alt", "title", "class", "id"],
ALLOW_DATA_ATTR: false,
});
}
// SQL 安全字符串(可能时使用参数化查询)
function sanitizeForSql(input: string): string {
return input
.replace(/'/g, "''")
.replace(/\\/g, "\\\\")
.replace(/\x00/g, "\\0")
.replace(/
/g, "\
")
.replace(/\r/g, "\\r")
.replace(/\x1a/g, "\\Z");
}
// 文件名清理
function sanitizeFilename(filename: string): string {
return filename
.replace(/[^a-zA-Z0-9._-]/g, "_") // 替换特殊字符
.replace(/\.{2,}/g, ".") // 移除连续点
.replace(/^\.+|\.+$/g, "") // 移除前导/尾随点
.substring(0, 255); // 限制长度
}
// 路径遍历预防
function sanitizePath(userPath: string, basePath: string): string {
const path = require("path");
const resolvedPath = path.resolve(basePath, userPath);
if (!resolvedPath.startsWith(path.resolve(basePath))) {
throw new Error("检测到路径遍历");
}
return resolvedPath;
// 综合输入清理器
interface SanitizationOptions {
trim?: boolean;
lowercase?: boolean;
stripHtml?: boolean;
maxLength?: number;
allowedChars?: RegExp;
}
function sanitizeString(
input: string,
options: SanitizationOptions = {},
): string {
let result = input;
if (options.trim !== false) {
result = result.trim();
}
if (options.stripHtml) {
result = validator.stripLow(validator.escape(result));
}
if (options.lowercase) {
result = result.toLowerCase();
}
if (options.allowedChars) {
result = result.replace(
new RegExp(`[^${options.allowedChars.source}]`, "g"),
"",
);
}
if (options.maxLength) {
result = result.substring(0, options.maxLength);
}
// 移除空字节
result = result.replace(/\x00/g, "");
return result;
}
// 常见清理预设
const sanitizers = {
username: (input: string) =>
sanitizeString(input, {
lowercase: true,
maxLength: 30,
allowedChars: /[a-z0-9_-]/,
}),
email: (input: string) => validator.normalizeEmail(input) || "",
phone: (input: string) => input.replace(/[^0-9+()-\s]/g, "").substring(0, 20),
slug: (input: string) =>
sanitizeString(input, {
lowercase: true,
maxLength: 100,
})
.replace(/\s+/g, "-")
.replace(/[^a-z0-9-]/g, ""),
searchQuery: (input: string) =>
sanitizeString(input, {
trim: true,
maxLength: 200,
stripHtml: true,
}),
};
输出编码
// HTML 编码
function encodeHtml(str: string): string {
const entities: Record<string, string> = {
"&": "&",
"<": "<",
">": ">",
'"': """,
"'": "'",
"/": "/",
"`": "`",
"=": "=",
};
return str.replace(/[&<>"'`=/]/g, (char) => entities[char]);
}
// JavaScript 字符串编码(用于嵌入 <script> 标签)
function encodeJsString(str: string): string {
return str
.replace(/\\/g, "\\\\")
.replace(/'/g, "\\'")
.replace(/"/g, '\\"')
.replace(/
/g, "\
")
.replace(/\r/g, "\\r")
.replace(/\t/g, "\\t")
.replace(/</g, "\\x3c")
.replace(/>/g, "\\x3e")
.replace(/&/g, "\\x26");
}
// URL 编码
function encodeUrlParam(str: string): string {
return encodeURIComponent(str);
}
// CSS 编码
function encodeCss(str: string): string {
return str.replace(/[^a-zA-Z0-9]/g, (char) => {
const hex = char.charCodeAt(0).toString(16);
return `\\${hex} `;
});
}
// JSON 编码(安全用于嵌入 HTML)
function encodeJsonForHtml(obj: unknown): string {
return JSON.stringify(obj)
.replace(/</g, "\\u003c")
.replace(/>/g, "\\u003e")
.replace(/&/g, "\\u0026")
.replace(/'/g, "\\u0027");
}
// 上下文感知输出编码
type OutputContext = "html" | "htmlAttribute" | "javascript" | "url" | "css";
function encode(str: string, context: OutputContext): string {
switch (context) {
case "html":
return encodeHtml(str);
case "htmlAttribute":
return encodeHtml(str).replace(/"/g, """);
case "javascript":
return encodeJsString(str);
case "url":
return encodeUrlParam(str);
case "css":
return encodeCss(str);
default:
return encodeHtml(str);
}
}
// React 风格转义(用于 JSX)
function escapeForReact(str: string): string {
// React 已经转义,但对于 dangerouslySetInnerHTML:
return encodeHtml(str);
}
// 模板字面量标签用于安全 HTML
function safeHtml(strings: TemplateStringsArray, ...values: unknown[]): string {
return strings.reduce((result, str, i) => {
const value = values[i - 1];
const encoded =
typeof value === "string" ? encodeHtml(value) : String(value ?? "");
return result + encoded + str;
});
}
// 使用
const userInput = '<script>alert("xss")</script>';
const safe = safeHtml`<div class="user-content">${userInput}</div>`;
// 结果: <div class="user-content"><script>alert("xss")</script></div>
API 请求/响应验证
// Express 中间件用于请求验证
import { Request, Response, NextFunction } from "express";
import { z, ZodSchema } from "zod";
function validate<T>(
schema: ZodSchema<T>,
source: "body" | "query" | "params" = "body",
) {
return (req: Request, res: Response, next: NextFunction) => {
const result = schema.safeParse(req[source]);
if (!result.success) {
return res.status(422).json({
error: "验证错误",
details: result.error.errors.map((e) => ({
field: e.path.join("."),
message: e.message,
})),
});
}
req[source] = result.data;
next();
};
}
// 使用
const createUserSchema = z.object({
email: z.string().email(),
password: z.string().min(12),
name: z.string().min(1).max(100),
});
app.post("/users", validate(createUserSchema), async (req, res) => {
// req.body 现在已类型化和验证
const user = await createUser(req.body);
res.status(201).json(user);
});
// 响应验证
const userResponseSchema = z.object({
id: z.string().uuid(),
email: z.string().email(),
name: z.string(),
createdAt: z.string().datetime(),
});
function validateResponse<T>(schema: ZodSchema<T>, data: unknown): T {
const result = schema.safeParse(data);
if (!result.success) {
throw new Error("无效的响应格式");
}
return result.data;
}
数据管道验证 (Great Expectations)
# Great Expectations 用于数据质量验证
import great_expectations as ge
from great_expectations.dataset import PandasDataset
# 加载数据集并设置期望
df = ge.read_csv('data.csv')
# 基础期望
df.expect_column_to_exist('user_id')
df.expect_column_values_to_not_be_null('email')
df.expect_column_values_to_be_unique('email')
df.expect_column_values_to_match_regex('email', r'^[^@]+@[^@]+\.[^@]+$')
df.expect_column_values_to_be_in_set('status', ['active', 'inactive', 'pending'])
# 数值期望
df.expect_column_values_to_be_between('age', 0, 150)
df.expect_column_mean_to_be_between('price', 10, 1000)
# 日期期望
df.expect_column_values_to_be_dateutil_parseable('created_at')
# 自定义期望
def custom_validation(df):
# 邮箱域名必须匹配公司域名
emails = df['email'].str.split('@', expand=True)[1]
return (emails == df['company_domain']).all()
df.expect_column_pair_values_to_be_equal('email_domain', 'company_domain',
custom_fn=custom_validation)
# 运行验证套件
results = df.validate()
if not results['success']:
for result in results['results']:
if not result['success']:
print(f"验证失败: {result['expectation_config']}")
# dbt 测试用于 SQL 数据验证
# models/schema.yml
version: 2
models:
- name: users
columns:
- name: user_id
tests:
- unique
- not_null
- name: email
tests:
- unique
- not_null
- email_format # 自定义测试
- name: age
tests:
- dbt_utils.accepted_range:
min_value: 0
max_value: 150
- name: status
tests:
- accepted_values:
values: ['active', 'inactive', 'pending']
- name: created_at
tests:
- not_null
- dbt_utils.recency:
datepart: day
field: created_at
interval: 7
ML 特征验证
# ML 管道的特征验证
import numpy as np
import pandas as pd
from typing import Dict, List, Tuple
class FeatureValidator:
def __init__(self, expected_schema: Dict[str, str]):
self.expected_schema = expected_schema
self.baseline_stats = {}
def validate_schema(self, df: pd.DataFrame) -> List[str]:
errors = []
# 检查列存在
expected_cols = set(self.expected_schema.keys())
actual_cols = set(df.columns)
missing = expected_cols - actual_cols
if missing:
errors.append(f"缺失列: {missing}")
extra = actual_cols - expected_cols
if extra:
errors.append(f"意外列: {extra}")
# 检查数据类型
for col, expected_type in self.expected_schema.items():
if col in df.columns:
actual_type = str(df[col].dtype)
if not actual_type.startswith(expected_type):
errors.append(f"列 {col}: 期望 {expected_type}, 得到 {actual_type}")
return errors
def validate_distributions(self, df: pd.DataFrame,
threshold: float = 3.0) -> List[str]:
errors = []
for col in df.select_dtypes(include=[np.number]).columns:
if col not in self.baseline_stats:
continue
baseline_mean = self.baseline_stats[col]['mean']
baseline_std = self.baseline_stats[col]['std']
current_mean = df[col].mean()
current_std = df[col].std()
# 使用 z 分数检查分布漂移
mean_zscore = abs((current_mean - baseline_mean) / baseline_std)
if mean_zscore > threshold:
errors.append(f"列 {col}: 检测到均值漂移 (z 分数: {mean_zscore:.2f})")
# 检查方差变化
variance_ratio = current_std / baseline_std
if variance_ratio < 0.5 or variance_ratio > 2.0:
errors.append(f"列 {col}: 检测到方差变化 (比率: {variance_ratio:.2f})")
return errors
def validate_null_rates(self, df: pd.DataFrame,
max_null_rate: float = 0.05) -> List[str]:
errors = []
null_rates = df.isnull().sum() / len(df)
for col, rate in null_rates.items():
if rate > max_null_rate:
errors.append(f"列 {col}: 空值率 {rate:.2%} 超过阈值 {max_null_rate:.2%}")
return errors
def validate_categorical_values(self, df: pd.DataFrame,
expected_categories: Dict[str, List]) -> List[str]:
errors = []
for col, expected in expected_categories.items():
if col not in df.columns:
continue
actual = set(df[col].dropna().unique())
expected_set = set(expected)
unexpected = actual - expected_set
if unexpected:
errors.append(f"列 {col}: 意外类别 {unexpected}")
return errors
def set_baseline(self, df: pd.DataFrame):
for col in df.select_dtypes(include=[np.number]).columns:
self.baseline_stats[col] = {
'mean': df[col].mean(),
'std': df[col].std(),
'min': df[col].min(),
'max': df[col].max(),
}
# 使用
validator = FeatureValidator({
'user_id': 'int',
'age': 'float',
'income': 'float',
'category': 'object',
})
# 从训练数据设置基线
validator.set_baseline(training_df)
# 验证新数据
errors = []
errors.extend(validator.validate_schema(new_df))
errors.extend(validator.validate_distributions(new_df))
errors.extend(validator.validate_null_rates(new_df))
errors.extend(validator.validate_categorical_values(new_df, {
'category': ['A', 'B', 'C']
}))
if errors:
raise ValueError(f"特征验证失败:
" + "
".join(errors))
基础设施配置验证
# Kubernetes 配置验证的 JSON Schema
apiVersion: v1
kind: ConfigMap
metadata:
name: validation-schema
data:
deployment-schema.json: |
{
"$schema": "http://json-schema.org/draft-07/schema#",
"type": "object",
"required": ["apiVersion", "kind", "metadata", "spec"],
"properties": {
"apiVersion": {
"type": "string",
"pattern": "^apps/v1$"
},
"kind": {
"type": "string",
"enum": ["Deployment"]
},
"spec": {
"type": "object",
"required": ["replicas", "selector", "template"],
"properties": {
"replicas": {
"type": "integer",
"minimum": 1,
"maximum": 100
},
"selector": {
"type": "object",
"required": ["matchLabels"]
},
"template": {
"type": "object",
"required": ["metadata", "spec"],
"properties": {
"spec": {
"type": "object",
"required": ["containers"],
"properties": {
"containers": {
"type": "array",
"minItems": 1,
"items": {
"type": "object",
"required": ["name", "image"],
"properties": {
"resources": {
"type": "object",
"required": ["requests", "limits"]
}
}
}
}
}
}
}
}
}
}
}
}
# Terraform 配置验证
import hcl2
import json
from jsonschema import validate, ValidationError
def validate_terraform_config(config_path: str, schema_path: str):
# 解析 HCL
with open(config_path, 'r') as f:
config = hcl2.load(f)
# 加载模式
with open(schema_path, 'r') as f:
schema = json.load(f)
# 验证
try:
validate(instance=config, schema=schema)
print("Terraform 配置有效")
except ValidationError as e:
print(f"验证错误: {e.message}")
print(f"路径: {' -> '.join(str(p) for p in e.path)}")
raise
# 自定义业务规则验证
def validate_aws_resource_tags(config: dict) -> List[str]:
errors = []
required_tags = {'Environment', 'Owner', 'CostCenter'}
for resource in config.get('resource', {}).values():
for resource_name, resource_config in resource.items():
tags = set(resource_config.get('tags', {}).keys())
missing = required_tags - tags
if missing:
errors.append(f"资源 {resource_name} 缺失标签: {missing}")
return errors
最佳实践
-
尽早验证
- 在边界验证(API 端点、表单提交、管道摄取)
- 快速失败并提供清晰错误消息
- 不要信任任何外部输入
-
使用模式验证库
- 优先使用 Zod/Pydantic 以获得类型安全
- JSON Schema 用于语言无关验证
- 从模式生成 TypeScript 类型
-
清理和编码
- 根据上下文清理输入(HTML、SQL、路径)
- 根据渲染位置编码输出
- 使用参数化查询代替 SQL 转义
-
安全第一验证
- 白名单允许值而非黑名单
- 使用输出编码预防 XSS
- 使用参数化查询和清理预防注入
- 验证文件上传(类型、大小、内容)
-
数据管道验证
- 处理前验证模式
- 检查数据分布漂移
- 监控空值率和基数
- 使用 Great Expectations 进行全面的数据质量检查
-
ML 特征验证
- 验证模式匹配训练数据
- 检测分布漂移
- 检查意外类别
- 监控特征相关性
-
错误消息
- 提供具体、可操作的错误消息
- 在错误中包含字段名
- 在生产中不暴露内部细节
-
深度防御
- 在客户端和服务器端验证
- 应用最小权限原则
- 在多个层次验证(API、服务、数据库)