name: 代码迁移 description: 用于安全代码迁移和升级的策略和模式。使用于升级框架、迁移技术、处理废弃、或计划增量迁移。触发器:迁移、升级、版本升级、破坏性变更、废弃、代码转换、代码转换工具、AST转换、jscodeshift、ts-morph、框架迁移、数据库迁移、模式迁移、遗留代码、现代化、重构、功能标志、回滚、绞杀者模式、蓝绿部署、金丝雀发布、影子模式、并行运行。
代码迁移
概述
代码迁移涉及在版本、框架或技术之间移动代码库,同时保持稳定性。这个技能涵盖了版本升级策略、框架迁移、废弃处理、自动代码转换、增量迁移模式、迁移期间的测试、功能标志和回滚策略。
指令
1. 版本升级策略
依赖审计与规划
# dependency_audit.py
import subprocess
import json
from dataclasses import dataclass
from typing import List, Dict, Optional
from enum import Enum
class RiskLevel(Enum):
LOW = "low"
MEDIUM = "medium"
HIGH = "high"
CRITICAL = "critical"
@dataclass
class DependencyChange:
name: str
current_version: str
target_version: str
risk_level: RiskLevel
breaking_changes: List[str]
migration_notes: str
class DependencyAuditor:
def __init__(self, package_manager: str = "npm"):
self.package_manager = package_manager
def get_outdated(self) -> Dict[str, dict]:
"""获取过时依赖列表。"""
if self.package_manager == "npm":
result = subprocess.run(
["npm", "outdated", "--json"],
capture_output=True,
text=True
)
return json.loads(result.stdout) if result.stdout else {}
elif self.package_manager == "pip":
result = subprocess.run(
["pip", "list", "--outdated", "--format=json"],
capture_output=True,
text=True
)
return {
pkg["name"]: {
"current": pkg["version"],
"wanted": pkg["latest_version"]
}
for pkg in json.loads(result.stdout)
}
def assess_risk(self, change: DependencyChange) -> RiskLevel:
"""评估依赖升级的风险级别。"""
current_parts = change.current_version.split(".")
target_parts = change.target_version.split(".")
if current_parts[0] != target_parts[0]:
return RiskLevel.HIGH # 主版本变更
elif current_parts[1] != target_parts[1]:
return RiskLevel.MEDIUM # 次版本变更
else:
return RiskLevel.LOW # 补丁版本变更
def create_migration_plan(dependencies: List[DependencyChange]) -> str:
"""生成迁移计划文档。"""
plan = ["# 依赖迁移计划
"]
# 按风险级别分组
for risk in [RiskLevel.LOW, RiskLevel.MEDIUM, RiskLevel.HIGH, RiskLevel.CRITICAL]:
deps = [d for d in dependencies if d.risk_level == risk]
if deps:
plan.append(f"
## {risk.value.title()} 风险更新
")
for dep in deps:
plan.append(f"### {dep.name}: {dep.current_version} -> {dep.target_version}
")
if dep.breaking_changes:
plan.append("**破坏性变更:**
")
for change in dep.breaking_changes:
plan.append(f"- {change}
")
plan.append(f"**备注:** {dep.migration_notes}
")
return "".join(plan)
分阶段升级过程
# staged_upgrade.py
from dataclasses import dataclass
from typing import List, Callable
import subprocess
@dataclass
class UpgradeStage:
name: str
description: str
dependencies: List[str]
pre_checks: List[Callable[[], bool]]
post_checks: List[Callable[[], bool]]
rollback_steps: List[str]
class StagedUpgrader:
def __init__(self):
self.stages: List[UpgradeStage] = []
self.completed_stages: List[str] = []
def add_stage(self, stage: UpgradeStage):
self.stages.append(stage)
def execute(self, dry_run: bool = False) -> bool:
for stage in self.stages:
print(f"执行阶段:{stage.name}")
# 运行预检查
for check in stage.pre_checks:
if not check():
print(f"阶段预检查失败:{stage.name}")
return False
if not dry_run:
# 执行升级
for dep in stage.dependencies:
self._upgrade_dependency(dep)
# 运行后检查
for check in stage.post_checks:
if not check():
print(f"阶段后检查失败:{stage.name}")
self._rollback(stage)
return False
self.completed_stages.append(stage.name)
print(f"完成阶段:{stage.name}")
return True
def _upgrade_dependency(self, dep: str):
subprocess.run(["npm", "install", dep], check=True)
def _rollback(self, failed_stage: UpgradeStage):
print(f"回滚阶段:{failed_stage.name}")
for step in failed_stage.rollback_steps:
subprocess.run(step, shell=True)
# 使用示例
def check_tests_pass() -> bool:
result = subprocess.run(["npm", "test"], capture_output=True)
return result.returncode == 0
def check_build_succeeds() -> bool:
result = subprocess.run(["npm", "run", "build"], capture_output=True)
return result.returncode == 0
upgrader = StagedUpgrader()
upgrader.add_stage(UpgradeStage(
name="核心依赖",
description="升级React及相关包",
dependencies=["react@18", "react-dom@18"],
pre_checks=[check_tests_pass],
post_checks=[check_tests_pass, check_build_succeeds],
rollback_steps=["git checkout package.json", "npm install"]
))
2. 框架迁移
适配器模式用于渐进迁移
// 从Express迁移到Fastify示例
// adapter.ts - 通用接口
interface RequestAdapter {
params: Record<string, string>;
query: Record<string, string>;
body: unknown;
headers: Record<string, string>;
}
interface ResponseAdapter {
status(code: number): ResponseAdapter;
json(data: unknown): void;
send(data: string): void;
}
type RouteHandler = (
req: RequestAdapter,
res: ResponseAdapter,
) => Promise<void>;
// express-adapter.ts
import { Request, Response } from "express";
function adaptExpressRequest(req: Request): RequestAdapter {
return {
params: req.params,
query: req.query as Record<string, string>,
body: req.body,
headers: req.headers as Record<string, string>,
};
}
function adaptExpressResponse(res: Response): ResponseAdapter {
return {
status: (code: number) => {
res.status(code);
return adaptExpressResponse(res);
},
json: (data: unknown) => res.json(data),
send: (data: string) => res.send(data),
};
}
export function wrapForExpress(handler: RouteHandler) {
return async (req: Request, res: Response) => {
await handler(adaptExpressRequest(req), adaptExpressResponse(res));
};
}
// fastify-adapter.ts
import { FastifyRequest, FastifyReply } from "fastify";
function adaptFastifyRequest(req: FastifyRequest): RequestAdapter {
return {
params: req.params as Record<string, string>,
query: req.query as Record<string, string>,
body: req.body,
headers: req.headers as Record<string, string>,
};
}
function adaptFastifyResponse(reply: FastifyReply): ResponseAdapter {
return {
status: (code: number) => {
reply.status(code);
return adaptFastifyResponse(reply);
},
json: (data: unknown) => reply.send(data),
send: (data: string) => reply.send(data),
};
}
export function wrapForFastify(handler: RouteHandler) {
return async (req: FastifyRequest, reply: FastifyReply) => {
await handler(adaptFastifyRequest(req), adaptFastifyResponse(reply));
};
}
// handlers/users.ts - 框架无关的处理程序
export const getUser: RouteHandler = async (req, res) => {
const { id } = req.params;
const user = await userService.findById(id);
if (!user) {
return res.status(404).json({ error: "用户未找到" });
}
res.json(user);
};
// routes-express.ts (现有)
import express from "express";
import { wrapForExpress } from "./express-adapter";
import { getUser } from "./handlers/users";
const router = express.Router();
router.get("/users/:id", wrapForExpress(getUser));
// routes-fastify.ts (新)
import fastify from "fastify";
import { wrapForFastify } from "./fastify-adapter";
import { getUser } from "./handlers/users";
const app = fastify();
app.get("/users/:id", wrapForFastify(getUser));
绞杀者模式
# strangler_fig.py
from fastapi import FastAPI, Request, Response
from httpx import AsyncClient
from typing import Set
class StranglerProxy:
"""在遗留系统和新系统之间路由请求。"""
def __init__(
self,
legacy_url: str,
migrated_routes: Set[str] = None
):
self.legacy_url = legacy_url
self.migrated_routes = migrated_routes or set()
self.client = AsyncClient()
def is_migrated(self, path: str) -> bool:
"""检查路由是否已迁移到新系统。"""
for route in self.migrated_routes:
if path.startswith(route):
return True
return False
async def proxy_to_legacy(self, request: Request) -> Response:
"""将请求转发到遗留系统。"""
url = f"{self.legacy_url}{request.url.path}"
response = await self.client.request(
method=request.method,
url=url,
headers=dict(request.headers),
content=await request.body(),
params=request.query_params,
)
return Response(
content=response.content,
status_code=response.status_code,
headers=dict(response.headers),
)
# 使用
app = FastAPI()
strangler = StranglerProxy(
legacy_url="http://legacy-service:8080",
migrated_routes={"/api/v2/users", "/api/v2/products"}
)
@app.api_route("/{path:path}", methods=["GET", "POST", "PUT", "DELETE"])
async def route_request(request: Request, path: str):
if strangler.is_migrated(f"/{path}"):
# 在新系统中处理(FastAPI路由优先)
raise HTTPException(status_code=404)
else:
# 代理到遗留系统
return await strangler.proxy_to_legacy(request)
# 新系统中的迁移端点
@app.get("/api/v2/users/{user_id}")
async def get_user(user_id: str):
# 新实现
return await user_service.get(user_id)
3. 废弃处理
# deprecation.py
import warnings
import functools
from typing import Callable, Optional
from datetime import date
def deprecated(
reason: str,
replacement: Optional[str] = None,
removal_version: Optional[str] = None,
removal_date: Optional[date] = None
):
"""标记函数为废弃,并提供迁移指导。"""
def decorator(func: Callable) -> Callable:
message = f"{func.__name__} 已废弃:{reason}"
if replacement:
message += f" 请使用 {replacement} 替代。"
if removal_version:
message += f" 将在版本 {removal_version} 中移除。"
elif removal_date:
message += f" 将在 {removal_date} 之后移除。"
@functools.wraps(func)
def wrapper(*args, **kwargs):
warnings.warn(message, DeprecationWarning, stacklevel=2)
return func(*args, **kwargs)
# 在文档字符串中添加废弃信息
wrapper.__doc__ = f"已废弃:{message}
{func.__doc__ or ''}"
wrapper._deprecated = True
wrapper._deprecation_info = {
"reason": reason,
"replacement": replacement,
"removal_version": removal_version,
"removal_date": removal_date,
}
return wrapper
return decorator
# 使用
@deprecated(
reason="遗留认证方法",
replacement="authenticate_v2()",
removal_version="3.0.0"
)
def authenticate(username: str, password: str) -> bool:
"""旧认证方法。"""
return legacy_auth(username, password)
def authenticate_v2(credentials: Credentials) -> AuthResult:
"""新认证方法,改进安全性。"""
return modern_auth(credentials)
# 废弃使用跟踪
class DeprecationTracker:
"""在生产中跟踪废弃代码使用。"""
def __init__(self):
self.usage_counts: Dict[str, int] = {}
def track(self, func_name: str, caller_info: str):
key = f"{func_name}:{caller_info}"
self.usage_counts[key] = self.usage_counts.get(key, 0) + 1
def get_report(self) -> Dict[str, any]:
return {
"total_deprecated_calls": sum(self.usage_counts.values()),
"by_function": self.usage_counts,
}
tracker = DeprecationTracker()
def tracked_deprecated(tracker: DeprecationTracker):
"""跟踪废弃函数使用的装饰器。"""
def decorator(func: Callable) -> Callable:
@functools.wraps(func)
def wrapper(*args, **kwargs):
import traceback
caller = traceback.extract_stack()[-2]
tracker.track(func.__name__, f"{caller.filename}:{caller.lineno}")
return func(*args, **kwargs)
return wrapper
return decorator
4. 增量迁移模式
# incremental_migration.py
from typing import TypeVar, Generic, Callable
from dataclasses import dataclass
from enum import Enum
import random
T = TypeVar('T')
class MigrationPhase(Enum):
LEGACY_ONLY = "legacy_only"
SHADOW_MODE = "shadow_mode" # 同时运行,比较结果
CANARY = "canary" # 小比例到新系统
GRADUAL_ROLLOUT = "gradual" # 增加比例到新系统
NEW_ONLY = "new_only"
@dataclass
class MigrationConfig:
phase: MigrationPhase
new_traffic_percentage: float = 0.0
shadow_mode_enabled: bool = False
comparison_enabled: bool = False
class IncrementalMigrator(Generic[T]):
"""管理实现之间的增量迁移。"""
def __init__(
self,
legacy_impl: Callable[..., T],
new_impl: Callable[..., T],
config: MigrationConfig,
comparator: Callable[[T, T], bool] = None
):
self.legacy = legacy_impl
self.new = new_impl
self.config = config
self.comparator = comparator or (lambda a, b: a == b)
self.metrics = MigrationMetrics()
async def execute(self, *args, **kwargs) -> T:
phase = self.config.phase
if phase == MigrationPhase.LEGACY_ONLY:
return await self.legacy(*args, **kwargs)
elif phase == MigrationPhase.SHADOW_MODE:
return await self._execute_shadow(*args, **kwargs)
elif phase in [MigrationPhase.CANARY, MigrationPhase.GRADUAL_ROLLOUT]:
return await self._execute_percentage(*args, **kwargs)
elif phase == MigrationPhase.NEW_ONLY:
return await self.new(*args, **kwargs)
async def _execute_shadow(self, *args, **kwargs) -> T:
"""同时运行两个实现,返回遗留结果。"""
legacy_result = await self.legacy(*args, **kwargs)
try:
new_result = await self.new(*args, **kwargs)
# 比较结果
if self.config.comparison_enabled:
matches = self.comparator(legacy_result, new_result)
self.metrics.record_comparison(matches)
if not matches:
self.metrics.record_mismatch(
args, kwargs, legacy_result, new_result
)
except Exception as e:
self.metrics.record_new_impl_error(e)
return legacy_result
async def _execute_percentage(self, *args, **kwargs) -> T:
"""基于百分比路由流量。"""
use_new = random.random() < self.config.new_traffic_percentage
if use_new:
try:
result = await self.new(*args, **kwargs)
self.metrics.record_new_success()
return result
except Exception as e:
self.metrics.record_new_impl_error(e)
# 回退到遗留系统
return await self.legacy(*args, **kwargs)
else:
return await self.legacy(*args, **kwargs)
@dataclass
class MigrationMetrics:
comparisons: int = 0
matches: int = 0
mismatches: int = 0
new_impl_errors: int = 0
new_impl_successes: int = 0
def record_comparison(self, matched: bool):
self.comparisons += 1
if matched:
self.matches += 1
else:
self.mismatches += 1
def record_mismatch(self, args, kwargs, legacy, new):
# 记录用于调试
print(f"不匹配:遗留={legacy}, 新={new}")
def record_new_impl_error(self, error: Exception):
self.new_impl_errors += 1
def record_new_success(self):
self.new_impl_successes += 1
@property
def match_rate(self) -> float:
if self.comparisons == 0:
return 0.0
return self.matches / self.comparisons
# 使用
migrator = IncrementalMigrator(
legacy_impl=old_search_service.search,
new_impl=new_search_service.search,
config=MigrationConfig(
phase=MigrationPhase.SHADOW_MODE,
comparison_enabled=True
),
comparator=lambda a, b: set(r.id for r in a) == set(r.id for r in b)
)
results = await migrator.execute(query="test")
5. 功能标志用于迁移
# feature_flags.py
from typing import Dict, Any, Optional, Callable
from dataclasses import dataclass
from enum import Enum
import hashlib
class FlagState(Enum):
OFF = "off"
ON = "on"
PERCENTAGE = "percentage"
USER_SEGMENT = "user_segment"
@dataclass
class FeatureFlag:
name: str
state: FlagState
percentage: float = 0.0
segments: list = None
default: bool = False
class FeatureFlagService:
def __init__(self):
self.flags: Dict[str, FeatureFlag] = {}
self._overrides: Dict[str, Dict[str, bool]] = {}
def register(self, flag: FeatureFlag):
self.flags[flag.name] = flag
def is_enabled(
self,
flag_name: str,
user_id: Optional[str] = None,
context: Dict[str, Any] = None
) -> bool:
flag = self.flags.get(flag_name)
if not flag:
return False
# 检查用户覆盖
if user_id and user_id in self._overrides.get(flag_name, {}):
return self._overrides[flag_name][user_id]
if flag.state == FlagState.OFF:
return False
elif flag.state == FlagState.ON:
return True
elif flag.state == FlagState.PERCENTAGE:
return self._check_percentage(flag_name, user_id, flag.percentage)
elif flag.state == FlagState.USER_SEGMENT:
return self._check_segment(user_id, flag.segments, context)
return flag.default
def _check_percentage(
self,
flag_name: str,
user_id: Optional[str],
percentage: float
) -> bool:
"""基于用户ID的确定性百分比检查。"""
if not user_id:
return False
hash_input = f"{flag_name}:{user_id}"
hash_value = int(hashlib.md5(hash_input.encode()).hexdigest(), 16)
return (hash_value % 100) < (percentage * 100)
def _check_segment(
self,
user_id: Optional[str],
segments: list,
context: Dict[str, Any]
) -> bool:
"""检查用户是否属于启用段。"""
if not context:
return False
user_segment = context.get("segment")
return user_segment in (segments or [])
def set_override(self, flag_name: str, user_id: str, enabled: bool):
"""为测试设置每个用户的覆盖。"""
if flag_name not in self._overrides:
self._overrides[flag_name] = {}
self._overrides[flag_name][user_id] = enabled
def update_percentage(self, flag_name: str, percentage: float):
"""更新推出百分比。"""
if flag_name in self.flags:
self.flags[flag_name].percentage = percentage
# 迁移特定标志
flags = FeatureFlagService()
flags.register(FeatureFlag(
name="use_new_search_api",
state=FlagState.PERCENTAGE,
percentage=0.1 # 10% 推出
))
flags.register(FeatureFlag(
name="use_new_payment_processor",
state=FlagState.USER_SEGMENT,
segments=["beta_testers", "employees"]
))
# 在代码中使用
async def search(query: str, user_id: str):
if flags.is_enabled("use_new_search_api", user_id):
return await new_search_service.search(query)
else:
return await legacy_search_service.search(query)
6. 自动代码转换
// codemod-example.ts - 使用 jscodeshift 进行自动重构
import jscodeshift, { API, FileInfo, Options } from "jscodeshift";
/**
* 代码转换:从类组件迁移到函数组件
*
* 之前:
* class MyComponent extends React.Component {
* render() { return <div>Hello</div>; }
* }
*
* 之后:
* function MyComponent() {
* return <div>Hello</div>;
* }
*/
export default function transformer(
file: FileInfo,
api: API,
options: Options,
) {
const j = api.jscodeshift;
const root = j(file.source);
// 查找所有只有 render 方法的类组件
root
.find(j.ClassDeclaration)
.filter((path) => {
const superClass = path.value.superClass;
if (!superClass) return false;
// 检查是否继承 React.Component
const extendsReact =
(j.MemberExpression.check(superClass) &&
j.Identifier.check(superClass.object) &&
superClass.object.name === "React" &&
j.Identifier.check(superClass.property) &&
superClass.property.name === "Component") ||
(j.Identifier.check(superClass) && superClass.name === "Component");
if (!extendsReact) return false;
// 检查是否只有 render 方法
const methods = path.value.body.body.filter((node) =>
j.MethodDefinition.check(node),
);
return (
methods.length === 1 &&
j.Identifier.check(methods[0].key) &&
methods[0].key.name === "render"
);
})
.replaceWith((path) => {
const className = path.value.id?.name || "Component";
const renderMethod = path.value.body.body.find(
(node) =>
j.MethodDefinition.check(node) &&
j.Identifier.check(node.key) &&
node.key.name === "render",
) as any;
const returnStatement = renderMethod.value.body.body.find((stmt: any) =>
j.ReturnStatement.check(stmt),
);
// 创建函数组件
return j.functionDeclaration(
j.identifier(className),
[],
j.blockStatement([returnStatement]),
);
});
return root.toSource();
}
// 运行:jscodeshift -t codemod-example.ts src/**/*.tsx
# ast_codemod.py - 使用 Python AST 进行重构
import ast
import astor
from typing import List
class FunctionRenamer(ast.NodeTransformer):
"""代码转换:重命名废弃函数调用。"""
def __init__(self, old_name: str, new_name: str):
self.old_name = old_name
self.new_name = new_name
self.changes: List[str] = []
def visit_Call(self, node: ast.Call) -> ast.Call:
if isinstance(node.func, ast.Name) and node.func.id == self.old_name:
old_lineno = node.lineno
node.func.id = self.new_name
self.changes.append(f"行 {old_lineno}: {self.old_name} -> {self.new_name}")
self.generic_visit(node)
return node
def apply_codemod(source_code: str, old_name: str, new_name: str) -> tuple[str, List[str]]:
"""将函数重命名代码转换应用于源代码。"""
tree = ast.parse(source_code)
renamer = FunctionRenamer(old_name, new_name)
new_tree = renamer.visit(tree)
new_source = astor.to_source(new_tree)
return new_source, renamer.changes
# 示例:迁移 authenticate() 到 authenticate_v2()
with open("auth.py") as f:
source = f.read()
new_source, changes = apply_codemod(source, "authenticate", "authenticate_v2")
print("更改:")
for change in changes:
print(f" {change}")
with open("auth.py", "w") as f:
f.write(new_source)
// rust_codemod_example.rs - 使用 syn 进行 Rust AST 操作
use syn::{visit_mut::VisitMut, File, ItemFn, Expr, ExprMethodCall};
use quote::ToTokens;
/// 代码转换:替换 .unwrap() 为适当的错误处理
struct UnwrapReplacer {
changes: Vec<String>,
}
impl VisitMut for UnwrapReplacer {
fn visit_expr_method_call_mut(&mut self, node: &mut ExprMethodCall) {
if node.method == "unwrap" {
// 转换 .unwrap() 为 ?
self.changes.push(format!(
"已替换 .unwrap() 为 ? 运算符"
));
// 注意:这是简化版 - 真实实现需要正确处理表达式转换
}
syn::visit_mut::visit_expr_method_call_mut(self, node);
}
}
fn apply_codemod(source: &str) -> Result<(String, Vec<String>), syn::Error> {
let mut file = syn::parse_file(source)?;
let mut replacer = UnwrapReplacer { changes: Vec::new() };
replacer.visit_file_mut(&mut file);
let new_source = file.to_token_stream().to_string();
Ok((new_source, replacer.changes))
}
7. 增量迁移策略
黑暗启动
# dark_launch.py
from typing import Callable, TypeVar, Generic
import logging
T = TypeVar('T')
class DarkLaunch(Generic[T]):
"""执行新代码路径但忽略结果(仅验证)。"""
def __init__(
self,
production_impl: Callable[..., T],
dark_impl: Callable[..., T],
enabled: bool = False,
sample_rate: float = 1.0
):
self.production = production_impl
self.dark = dark_impl
self.enabled = enabled
self.sample_rate = sample_rate
self.logger = logging.getLogger(__name__)
async def execute(self, *args, **kwargs) -> T:
"""执行生产代码,可选在后台运行黑暗代码。"""
# 始终运行生产代码
result = await self.production(*args, **kwargs)
# 有条件地运行黑暗代码(不阻塞)
if self.enabled and random.random() < self.sample_rate:
import asyncio
asyncio.create_task(self._run_dark(*args, **kwargs))
return result
async def _run_dark(self, *args, **kwargs):
"""运行黑暗实现并记录结果/错误。"""
try:
dark_result = await self.dark(*args, **kwargs)
self.logger.info(f"黑暗启动成功:{dark_result}")
except Exception as e:
self.logger.error(f"黑暗启动失败:{e}", exc_info=True)
# 使用
dark_search = DarkLaunch(
production_impl=old_search_service.search,
dark_impl=new_search_service.search,
enabled=True,
sample_rate=0.1 # 10% 采样
)
results = await dark_search.execute(query="test")
抽象分支
// branch-by-abstraction.ts
// 步骤1:创建抽象层
interface PaymentProcessor {
processPayment(amount: number, token: string): Promise<PaymentResult>;
refund(transactionId: string): Promise<void>;
}
// 步骤2:为遗留系统实现抽象
class StripeAdapter implements PaymentProcessor {
constructor(private stripe: StripeAPI) {}
async processPayment(amount: number, token: string): Promise<PaymentResult> {
const charge = await this.stripe.charges.create({
amount,
source: token,
});
return { transactionId: charge.id, status: "success" };
}
async refund(transactionId: string): Promise<void> {
await this.stripe.refunds.create({ charge: transactionId });
}
}
// 步骤3:为新系统实现抽象
class BraintreeAdapter implements PaymentProcessor {
constructor(private braintree: BraintreeAPI) {}
async processPayment(amount: number, token: string): Promise<PaymentResult> {
const result = await this.braintree.transaction.sale({
amount: amount.toString(),
paymentMethodNonce: token,
});
return { transactionId: result.transaction.id, status: "success" };
}
async refund(transactionId: string): Promise<void> {
await this.braintree.transaction.refund(transactionId);
}
}
// 步骤4:基于功能标志路由到实现
class PaymentService {
constructor(
private stripeAdapter: StripeAdapter,
private braintreeAdapter: BraintreeAdapter,
private featureFlags: FeatureFlagService,
) {}
private getProcessor(userId: string): PaymentProcessor {
if (this.featureFlags.isEnabled("use_braintree", userId)) {
return this.braintreeAdapter;
}
return this.stripeAdapter;
}
async processPayment(
userId: string,
amount: number,
token: string,
): Promise<PaymentResult> {
const processor = this.getProcessor(userId);
return processor.processPayment(amount, token);
}
}
// 步骤5:迁移完成后,可移除抽象
8. 迁移期间测试
# migration_testing.py
import pytest
from typing import Callable, Any, List
from dataclasses import dataclass
@dataclass
class MigrationTestCase:
name: str
input_data: Any
expected_output: Any
legacy_impl: Callable
new_impl: Callable
class MigrationTester:
"""验证迁移正确性的测试框架。"""
def __init__(self):
self.test_cases: List[MigrationTestCase] = []
self.failures: List[dict] = []
def add_test_case(self, test_case: MigrationTestCase):
self.test_cases.append(test_case)
async def run_all(self) -> bool:
"""运行所有测试用例,比较遗留与新实现。"""
all_passed = True
for test_case in self.test_cases:
try:
legacy_result = await test_case.legacy_impl(test_case.input_data)
new_result = await test_case.new_impl(test_case.input_data)
if legacy_result != new_result:
all_passed = False
self.failures.append({
'test': test_case.name,
'input': test_case.input_data,
'legacy': legacy_result,
'new': new_result,
'expected': test_case.expected_output
})
print(f"失败:{test_case.name}")
print(f" 遗留:{legacy_result}")
print(f" 新: {new_result}")
else:
print(f"通过:{test_case.name}")
except Exception as e:
all_passed = False
self.failures.append({
'test': test_case.name,
'error': str(e)
})
print(f"错误:{test_case.name} - {e}")
return all_passed
def generate_report(self) -> str:
"""生成迁移测试报告。"""
total = len(self.test_cases)
passed = total - len(self.failures)
report = [
"# 迁移测试报告",
f"总计:{total} | 通过:{passed} | 失败:{len(self.failures)}",
""
]
if self.failures:
report.append("## 失败")
for failure in self.failures:
report.append(f"- {failure.get('test', '未知')}")
if 'error' in failure:
report.append(f" 错误:{failure['error']}")
else:
report.append(f" 遗留:{failure.get('legacy')}")
report.append(f" 新:{failure.get('new')}")
return "
".join(report)
# 使用
tester = MigrationTester()
tester.add_test_case(MigrationTestCase(
name="search_basic_query",
input_data="test query",
expected_output=SearchResults(...),
legacy_impl=old_search.search,
new_impl=new_search.search
))
await tester.run_all()
print(tester.generate_report())
# snapshot_testing.py
import json
import hashlib
from pathlib import Path
class SnapshotTester:
"""用于迁移验证的快照测试。"""
def __init__(self, snapshot_dir: str = ".snapshots"):
self.snapshot_dir = Path(snapshot_dir)
self.snapshot_dir.mkdir(exist_ok=True)
def capture_snapshot(self, name: str, data: Any):
"""捕获当前输出的快照。"""
snapshot_path = self.snapshot_dir / f"{name}.json"
serialized = json.dumps(data, sort_keys=True, indent=2)
snapshot_path.write_text(serialized)
# 同时保存哈希以便快速比较
hash_path = self.snapshot_dir / f"{name}.hash"
data_hash = hashlib.sha256(serialized.encode()).hexdigest()
hash_path.write_text(data_hash)
def verify_snapshot(self, name: str, data: Any) -> bool:
"""验证数据匹配快照。"""
snapshot_path = self.snapshot_dir / f"{name}.json"
if not snapshot_path.exists():
raise FileNotFoundError(f"快照 {name} 未找到")
expected = snapshot_path.read_text()
actual = json.dumps(data, sort_keys=True, indent=2)
if expected != actual:
# 保存差异用于调试
diff_path = self.snapshot_dir / f"{name}.diff"
diff_path.write_text(f"预期:
{expected}
实际:
{actual}")
return False
return True
# 迁移工作流与快照
snapshot = SnapshotTester()
# 1. 从遗留系统捕获基线
legacy_data = await legacy_search.search("test")
snapshot.capture_snapshot("search_test_legacy", legacy_data)
# 2. 运行迁移
# 3. 验证新系统匹配遗留快照
new_data = await new_search.search("test")
matches = snapshot.verify_snapshot("search_test_legacy", new_data)
assert matches, "新实现不匹配遗留行为"
9. 回滚策略
# rollback.py
from dataclasses import dataclass
from typing import List, Callable, Optional
from datetime import datetime
import subprocess
import json
@dataclass
class RollbackPoint:
id: str
timestamp: datetime
description: str
git_commit: str
db_migration_version: str
config_snapshot: dict
rollback_commands: List[str]
class RollbackManager:
def __init__(self):
self.rollback_points: List[RollbackPoint] = []
self.current_point: Optional[RollbackPoint] = None
def create_rollback_point(
self,
description: str,
rollback_commands: List[str] = None
) -> RollbackPoint:
"""在迁移前创建回滚点。"""
point = RollbackPoint(
id=f"rbp_{datetime.now().strftime('%Y%m%d_%H%M%S')}",
timestamp=datetime.now(),
description=description,
git_commit=self._get_current_commit(),
db_migration_version=self._get_db_version(),
config_snapshot=self._capture_config(),
rollback_commands=rollback_commands or []
)
self.rollback_points.append(point)
self._save_point(point)
return point
def rollback_to(self, point_id: str) -> bool:
"""执行回滚到指定点。"""
point = self._find_point(point_id)
if not point:
raise ValueError(f"回滚点 {point_id} 未找到")
print(f"回滚到:{point.description}")
try:
# 1. 回滚代码
self._rollback_code(point.git_commit)
# 2. 回滚数据库
self._rollback_database(point.db_migration_version)
# 3. 恢复配置
self._restore_config(point.config_snapshot)
# 4. 执行自定义回滚命令
for cmd in point.rollback_commands:
subprocess.run(cmd, shell=True, check=True)
self.current_point = point
return True
except Exception as e:
print(f"回滚失败:{e}")
return False
def _get_current_commit(self) -> str:
result = subprocess.run(
["git", "rev-parse", "HEAD"],
capture_output=True,
text=True
)
return result.stdout.strip()
def _get_db_version(self) -> str:
# 实现取决于迁移工具
return "migration_version"
def _capture_config(self) -> dict:
# 捕获当前配置
return {}
def _rollback_code(self, commit: str):
subprocess.run(["git", "checkout", commit], check=True)
def _rollback_database(self, version: str):
# Alembic 示例
subprocess.run(["alembic", "downgrade", version], check=True)
def _restore_config(self, config: dict):
# 恢复配置
pass
def _save_point(self, point: RollbackPoint):
# 持久化回滚点
with open(f"rollback_points/{point.id}.json", "w") as f:
json.dump({
"id": point.id,
"timestamp": point.timestamp.isoformat(),
"description": point.description,
"git_commit": point.git_commit,
"db_migration_version": point.db_migration_version,
"rollback_commands": point.rollback_commands
}, f)
def _find_point(self, point_id: str) -> Optional[RollbackPoint]:
return next(
(p for p in self.rollback_points if p.id == point_id),
None
)
# 自动回滚触发器
class AutoRollback:
def __init__(
self,
rollback_manager: RollbackManager,
health_check: Callable[[], bool],
error_threshold: int = 10,
check_interval: int = 60
):
self.manager = rollback_manager
self.health_check = health_check
self.error_threshold = error_threshold
self.check_interval = check_interval
self.error_count = 0
async def monitor(self, rollback_point_id: str):
"""监控健康状态并在需要时自动回滚。"""
import asyncio
while True:
try:
if not self.health_check():
self.error_count += 1
print(f"健康检查失败({self.error_count}/{self.error_threshold})")
if self.error_count >= self.error_threshold:
print("错误阈值达到,启动回滚")
self.manager.rollback_to(rollback_point_id)
break
else:
self.error_count = 0
await asyncio.sleep(self.check_interval)
except Exception as e:
print(f"监控错误:{e}")
self.error_count += 1
最佳实践
-
彻底规划:在开始前记录迁移步骤、依赖项和回滚程序。
-
增量迁移:使用绞杀者模式、功能标志、黑暗启动和百分比推出以降低风险。
-
自动化可能部分:使用代码转换工具(jscodeshift、ts-morph、AST工具)自动化重复代码转换。
-
影子测试:在切换前并行运行新旧系统以验证行为。比较结果并记录差异。
-
持续测试:使用迁移特定测试(快照测试、比较测试)在每一步验证正确性。
-
密切监控:在迁移期间跟踪错误率、延迟和业务指标。设置自动回滚触发器。
-
抽象分支:创建抽象层以将迁移与功能开发解耦。
-
保持向后兼容性:在过渡期间保持API兼容性。支持新旧接口。
-
文档化废弃:清晰的废弃通知和迁移指南帮助消费者。跟踪废弃使用。
-
始终有回滚计划:切勿在没有测试回滚计划的情况下部署迁移。在每个阶段前创建回滚点。
示例
完整迁移工作流
class MigrationWorkflow:
def __init__(self):
self.rollback_manager = RollbackManager()
self.feature_flags = FeatureFlagService()
self.metrics = MigrationMetrics()
async def execute_migration(
self,
name: str,
legacy_impl: Callable,
new_impl: Callable
):
# 1. 创建回滚点
rollback_point = self.rollback_manager.create_rollback_point(
description=f"在 {name} 迁移前"
)
# 2. 注册功能标志
self.feature_flags.register(FeatureFlag(
name=f"migration_{name}",
state=FlagState.PERCENTAGE,
percentage=0.0
))
# 3. 影子模式测试
print("阶段1:影子模式测试")
migrator = IncrementalMigrator(
legacy_impl=legacy_impl,
new_impl=new_impl,
config=MigrationConfig(
phase=MigrationPhase.SHADOW_MODE,
comparison_enabled=True
)
)
# 运行影子测试...
if migrator.metrics.match_rate < 0.99:
print("影子模式匹配率太低,中止")
return False
# 4. 金丝雀部署
print("阶段2:金丝雀部署(1%)")
self.feature_flags.update_percentage(f"migration_{name}", 0.01)
# 监控问题...
# 5. 渐进推出
for percentage in [0.05, 0.1, 0.25, 0.5, 0.75, 1.0]:
print(f"阶段3:推出到 {percentage*100}%")
self.feature_flags.update_percentage(f"migration_{name}", percentage)
# 监控和验证...
# 6. 完成迁移
print("迁移完成")
return True