代码迁移Skill code-migration

代码迁移技能涉及在软件版本、框架或技术之间安全地迁移代码库的策略和模式,包括依赖审计、框架迁移、废弃处理、自动代码转换、增量迁移、测试、功能标志和回滚策略。关键词:代码迁移、框架迁移、版本升级、自动重构、增量部署、影子模式、绞杀者模式、功能标志、回滚策略。

架构设计 0 次安装 0 次浏览 更新于 3/24/2026

name: 代码迁移 description: 用于安全代码迁移和升级的策略和模式。使用于升级框架、迁移技术、处理废弃、或计划增量迁移。触发器:迁移、升级、版本升级、破坏性变更、废弃、代码转换、代码转换工具、AST转换、jscodeshift、ts-morph、框架迁移、数据库迁移、模式迁移、遗留代码、现代化、重构、功能标志、回滚、绞杀者模式、蓝绿部署、金丝雀发布、影子模式、并行运行。

代码迁移

概述

代码迁移涉及在版本、框架或技术之间移动代码库,同时保持稳定性。这个技能涵盖了版本升级策略、框架迁移、废弃处理、自动代码转换、增量迁移模式、迁移期间的测试、功能标志和回滚策略。

指令

1. 版本升级策略

依赖审计与规划

# dependency_audit.py
import subprocess
import json
from dataclasses import dataclass
from typing import List, Dict, Optional
from enum import Enum

class RiskLevel(Enum):
    LOW = "low"
    MEDIUM = "medium"
    HIGH = "high"
    CRITICAL = "critical"

@dataclass
class DependencyChange:
    name: str
    current_version: str
    target_version: str
    risk_level: RiskLevel
    breaking_changes: List[str]
    migration_notes: str

class DependencyAuditor:
    def __init__(self, package_manager: str = "npm"):
        self.package_manager = package_manager

    def get_outdated(self) -> Dict[str, dict]:
        """获取过时依赖列表。"""
        if self.package_manager == "npm":
            result = subprocess.run(
                ["npm", "outdated", "--json"],
                capture_output=True,
                text=True
            )
            return json.loads(result.stdout) if result.stdout else {}
        elif self.package_manager == "pip":
            result = subprocess.run(
                ["pip", "list", "--outdated", "--format=json"],
                capture_output=True,
                text=True
            )
            return {
                pkg["name"]: {
                    "current": pkg["version"],
                    "wanted": pkg["latest_version"]
                }
                for pkg in json.loads(result.stdout)
            }

    def assess_risk(self, change: DependencyChange) -> RiskLevel:
        """评估依赖升级的风险级别。"""
        current_parts = change.current_version.split(".")
        target_parts = change.target_version.split(".")

        if current_parts[0] != target_parts[0]:
            return RiskLevel.HIGH  # 主版本变更
        elif current_parts[1] != target_parts[1]:
            return RiskLevel.MEDIUM  # 次版本变更
        else:
            return RiskLevel.LOW  # 补丁版本变更

def create_migration_plan(dependencies: List[DependencyChange]) -> str:
    """生成迁移计划文档。"""
    plan = ["# 依赖迁移计划
"]

    # 按风险级别分组
    for risk in [RiskLevel.LOW, RiskLevel.MEDIUM, RiskLevel.HIGH, RiskLevel.CRITICAL]:
        deps = [d for d in dependencies if d.risk_level == risk]
        if deps:
            plan.append(f"
## {risk.value.title()} 风险更新
")
            for dep in deps:
                plan.append(f"### {dep.name}: {dep.current_version} -> {dep.target_version}
")
                if dep.breaking_changes:
                    plan.append("**破坏性变更:**
")
                    for change in dep.breaking_changes:
                        plan.append(f"- {change}
")
                plan.append(f"**备注:** {dep.migration_notes}
")

    return "".join(plan)

分阶段升级过程

# staged_upgrade.py
from dataclasses import dataclass
from typing import List, Callable
import subprocess

@dataclass
class UpgradeStage:
    name: str
    description: str
    dependencies: List[str]
    pre_checks: List[Callable[[], bool]]
    post_checks: List[Callable[[], bool]]
    rollback_steps: List[str]

class StagedUpgrader:
    def __init__(self):
        self.stages: List[UpgradeStage] = []
        self.completed_stages: List[str] = []

    def add_stage(self, stage: UpgradeStage):
        self.stages.append(stage)

    def execute(self, dry_run: bool = False) -> bool:
        for stage in self.stages:
            print(f"执行阶段:{stage.name}")

            # 运行预检查
            for check in stage.pre_checks:
                if not check():
                    print(f"阶段预检查失败:{stage.name}")
                    return False

            if not dry_run:
                # 执行升级
                for dep in stage.dependencies:
                    self._upgrade_dependency(dep)

                # 运行后检查
                for check in stage.post_checks:
                    if not check():
                        print(f"阶段后检查失败:{stage.name}")
                        self._rollback(stage)
                        return False

            self.completed_stages.append(stage.name)
            print(f"完成阶段:{stage.name}")

        return True

    def _upgrade_dependency(self, dep: str):
        subprocess.run(["npm", "install", dep], check=True)

    def _rollback(self, failed_stage: UpgradeStage):
        print(f"回滚阶段:{failed_stage.name}")
        for step in failed_stage.rollback_steps:
            subprocess.run(step, shell=True)

# 使用示例
def check_tests_pass() -> bool:
    result = subprocess.run(["npm", "test"], capture_output=True)
    return result.returncode == 0

def check_build_succeeds() -> bool:
    result = subprocess.run(["npm", "run", "build"], capture_output=True)
    return result.returncode == 0

upgrader = StagedUpgrader()
upgrader.add_stage(UpgradeStage(
    name="核心依赖",
    description="升级React及相关包",
    dependencies=["react@18", "react-dom@18"],
    pre_checks=[check_tests_pass],
    post_checks=[check_tests_pass, check_build_succeeds],
    rollback_steps=["git checkout package.json", "npm install"]
))

2. 框架迁移

适配器模式用于渐进迁移

// 从Express迁移到Fastify示例

// adapter.ts - 通用接口
interface RequestAdapter {
  params: Record<string, string>;
  query: Record<string, string>;
  body: unknown;
  headers: Record<string, string>;
}

interface ResponseAdapter {
  status(code: number): ResponseAdapter;
  json(data: unknown): void;
  send(data: string): void;
}

type RouteHandler = (
  req: RequestAdapter,
  res: ResponseAdapter,
) => Promise<void>;

// express-adapter.ts
import { Request, Response } from "express";

function adaptExpressRequest(req: Request): RequestAdapter {
  return {
    params: req.params,
    query: req.query as Record<string, string>,
    body: req.body,
    headers: req.headers as Record<string, string>,
  };
}

function adaptExpressResponse(res: Response): ResponseAdapter {
  return {
    status: (code: number) => {
      res.status(code);
      return adaptExpressResponse(res);
    },
    json: (data: unknown) => res.json(data),
    send: (data: string) => res.send(data),
  };
}

export function wrapForExpress(handler: RouteHandler) {
  return async (req: Request, res: Response) => {
    await handler(adaptExpressRequest(req), adaptExpressResponse(res));
  };
}

// fastify-adapter.ts
import { FastifyRequest, FastifyReply } from "fastify";

function adaptFastifyRequest(req: FastifyRequest): RequestAdapter {
  return {
    params: req.params as Record<string, string>,
    query: req.query as Record<string, string>,
    body: req.body,
    headers: req.headers as Record<string, string>,
  };
}

function adaptFastifyResponse(reply: FastifyReply): ResponseAdapter {
  return {
    status: (code: number) => {
      reply.status(code);
      return adaptFastifyResponse(reply);
    },
    json: (data: unknown) => reply.send(data),
    send: (data: string) => reply.send(data),
  };
}

export function wrapForFastify(handler: RouteHandler) {
  return async (req: FastifyRequest, reply: FastifyReply) => {
    await handler(adaptFastifyRequest(req), adaptFastifyResponse(reply));
  };
}

// handlers/users.ts - 框架无关的处理程序
export const getUser: RouteHandler = async (req, res) => {
  const { id } = req.params;
  const user = await userService.findById(id);

  if (!user) {
    return res.status(404).json({ error: "用户未找到" });
  }

  res.json(user);
};

// routes-express.ts (现有)
import express from "express";
import { wrapForExpress } from "./express-adapter";
import { getUser } from "./handlers/users";

const router = express.Router();
router.get("/users/:id", wrapForExpress(getUser));

// routes-fastify.ts (新)
import fastify from "fastify";
import { wrapForFastify } from "./fastify-adapter";
import { getUser } from "./handlers/users";

const app = fastify();
app.get("/users/:id", wrapForFastify(getUser));

绞杀者模式

# strangler_fig.py
from fastapi import FastAPI, Request, Response
from httpx import AsyncClient
from typing import Set

class StranglerProxy:
    """在遗留系统和新系统之间路由请求。"""

    def __init__(
        self,
        legacy_url: str,
        migrated_routes: Set[str] = None
    ):
        self.legacy_url = legacy_url
        self.migrated_routes = migrated_routes or set()
        self.client = AsyncClient()

    def is_migrated(self, path: str) -> bool:
        """检查路由是否已迁移到新系统。"""
        for route in self.migrated_routes:
            if path.startswith(route):
                return True
        return False

    async def proxy_to_legacy(self, request: Request) -> Response:
        """将请求转发到遗留系统。"""
        url = f"{self.legacy_url}{request.url.path}"

        response = await self.client.request(
            method=request.method,
            url=url,
            headers=dict(request.headers),
            content=await request.body(),
            params=request.query_params,
        )

        return Response(
            content=response.content,
            status_code=response.status_code,
            headers=dict(response.headers),
        )

# 使用
app = FastAPI()
strangler = StranglerProxy(
    legacy_url="http://legacy-service:8080",
    migrated_routes={"/api/v2/users", "/api/v2/products"}
)

@app.api_route("/{path:path}", methods=["GET", "POST", "PUT", "DELETE"])
async def route_request(request: Request, path: str):
    if strangler.is_migrated(f"/{path}"):
        # 在新系统中处理(FastAPI路由优先)
        raise HTTPException(status_code=404)
    else:
        # 代理到遗留系统
        return await strangler.proxy_to_legacy(request)

# 新系统中的迁移端点
@app.get("/api/v2/users/{user_id}")
async def get_user(user_id: str):
    # 新实现
    return await user_service.get(user_id)

3. 废弃处理

# deprecation.py
import warnings
import functools
from typing import Callable, Optional
from datetime import date

def deprecated(
    reason: str,
    replacement: Optional[str] = None,
    removal_version: Optional[str] = None,
    removal_date: Optional[date] = None
):
    """标记函数为废弃,并提供迁移指导。"""

    def decorator(func: Callable) -> Callable:
        message = f"{func.__name__} 已废弃:{reason}"

        if replacement:
            message += f" 请使用 {replacement} 替代。"

        if removal_version:
            message += f" 将在版本 {removal_version} 中移除。"
        elif removal_date:
            message += f" 将在 {removal_date} 之后移除。"

        @functools.wraps(func)
        def wrapper(*args, **kwargs):
            warnings.warn(message, DeprecationWarning, stacklevel=2)
            return func(*args, **kwargs)

        # 在文档字符串中添加废弃信息
        wrapper.__doc__ = f"已废弃:{message}

{func.__doc__ or ''}"
        wrapper._deprecated = True
        wrapper._deprecation_info = {
            "reason": reason,
            "replacement": replacement,
            "removal_version": removal_version,
            "removal_date": removal_date,
        }

        return wrapper

    return decorator

# 使用
@deprecated(
    reason="遗留认证方法",
    replacement="authenticate_v2()",
    removal_version="3.0.0"
)
def authenticate(username: str, password: str) -> bool:
    """旧认证方法。"""
    return legacy_auth(username, password)

def authenticate_v2(credentials: Credentials) -> AuthResult:
    """新认证方法,改进安全性。"""
    return modern_auth(credentials)

# 废弃使用跟踪
class DeprecationTracker:
    """在生产中跟踪废弃代码使用。"""

    def __init__(self):
        self.usage_counts: Dict[str, int] = {}

    def track(self, func_name: str, caller_info: str):
        key = f"{func_name}:{caller_info}"
        self.usage_counts[key] = self.usage_counts.get(key, 0) + 1

    def get_report(self) -> Dict[str, any]:
        return {
            "total_deprecated_calls": sum(self.usage_counts.values()),
            "by_function": self.usage_counts,
        }

tracker = DeprecationTracker()

def tracked_deprecated(tracker: DeprecationTracker):
    """跟踪废弃函数使用的装饰器。"""

    def decorator(func: Callable) -> Callable:
        @functools.wraps(func)
        def wrapper(*args, **kwargs):
            import traceback
            caller = traceback.extract_stack()[-2]
            tracker.track(func.__name__, f"{caller.filename}:{caller.lineno}")
            return func(*args, **kwargs)
        return wrapper

    return decorator

4. 增量迁移模式

# incremental_migration.py
from typing import TypeVar, Generic, Callable
from dataclasses import dataclass
from enum import Enum
import random

T = TypeVar('T')

class MigrationPhase(Enum):
    LEGACY_ONLY = "legacy_only"
    SHADOW_MODE = "shadow_mode"       # 同时运行,比较结果
    CANARY = "canary"                 # 小比例到新系统
    GRADUAL_ROLLOUT = "gradual"       # 增加比例到新系统
    NEW_ONLY = "new_only"

@dataclass
class MigrationConfig:
    phase: MigrationPhase
    new_traffic_percentage: float = 0.0
    shadow_mode_enabled: bool = False
    comparison_enabled: bool = False

class IncrementalMigrator(Generic[T]):
    """管理实现之间的增量迁移。"""

    def __init__(
        self,
        legacy_impl: Callable[..., T],
        new_impl: Callable[..., T],
        config: MigrationConfig,
        comparator: Callable[[T, T], bool] = None
    ):
        self.legacy = legacy_impl
        self.new = new_impl
        self.config = config
        self.comparator = comparator or (lambda a, b: a == b)
        self.metrics = MigrationMetrics()

    async def execute(self, *args, **kwargs) -> T:
        phase = self.config.phase

        if phase == MigrationPhase.LEGACY_ONLY:
            return await self.legacy(*args, **kwargs)

        elif phase == MigrationPhase.SHADOW_MODE:
            return await self._execute_shadow(*args, **kwargs)

        elif phase in [MigrationPhase.CANARY, MigrationPhase.GRADUAL_ROLLOUT]:
            return await self._execute_percentage(*args, **kwargs)

        elif phase == MigrationPhase.NEW_ONLY:
            return await self.new(*args, **kwargs)

    async def _execute_shadow(self, *args, **kwargs) -> T:
        """同时运行两个实现,返回遗留结果。"""
        legacy_result = await self.legacy(*args, **kwargs)

        try:
            new_result = await self.new(*args, **kwargs)

            # 比较结果
            if self.config.comparison_enabled:
                matches = self.comparator(legacy_result, new_result)
                self.metrics.record_comparison(matches)

                if not matches:
                    self.metrics.record_mismatch(
                        args, kwargs, legacy_result, new_result
                    )
        except Exception as e:
            self.metrics.record_new_impl_error(e)

        return legacy_result

    async def _execute_percentage(self, *args, **kwargs) -> T:
        """基于百分比路由流量。"""
        use_new = random.random() < self.config.new_traffic_percentage

        if use_new:
            try:
                result = await self.new(*args, **kwargs)
                self.metrics.record_new_success()
                return result
            except Exception as e:
                self.metrics.record_new_impl_error(e)
                # 回退到遗留系统
                return await self.legacy(*args, **kwargs)
        else:
            return await self.legacy(*args, **kwargs)

@dataclass
class MigrationMetrics:
    comparisons: int = 0
    matches: int = 0
    mismatches: int = 0
    new_impl_errors: int = 0
    new_impl_successes: int = 0

    def record_comparison(self, matched: bool):
        self.comparisons += 1
        if matched:
            self.matches += 1
        else:
            self.mismatches += 1

    def record_mismatch(self, args, kwargs, legacy, new):
        # 记录用于调试
        print(f"不匹配:遗留={legacy}, 新={new}")

    def record_new_impl_error(self, error: Exception):
        self.new_impl_errors += 1

    def record_new_success(self):
        self.new_impl_successes += 1

    @property
    def match_rate(self) -> float:
        if self.comparisons == 0:
            return 0.0
        return self.matches / self.comparisons

# 使用
migrator = IncrementalMigrator(
    legacy_impl=old_search_service.search,
    new_impl=new_search_service.search,
    config=MigrationConfig(
        phase=MigrationPhase.SHADOW_MODE,
        comparison_enabled=True
    ),
    comparator=lambda a, b: set(r.id for r in a) == set(r.id for r in b)
)

results = await migrator.execute(query="test")

5. 功能标志用于迁移

# feature_flags.py
from typing import Dict, Any, Optional, Callable
from dataclasses import dataclass
from enum import Enum
import hashlib

class FlagState(Enum):
    OFF = "off"
    ON = "on"
    PERCENTAGE = "percentage"
    USER_SEGMENT = "user_segment"

@dataclass
class FeatureFlag:
    name: str
    state: FlagState
    percentage: float = 0.0
    segments: list = None
    default: bool = False

class FeatureFlagService:
    def __init__(self):
        self.flags: Dict[str, FeatureFlag] = {}
        self._overrides: Dict[str, Dict[str, bool]] = {}

    def register(self, flag: FeatureFlag):
        self.flags[flag.name] = flag

    def is_enabled(
        self,
        flag_name: str,
        user_id: Optional[str] = None,
        context: Dict[str, Any] = None
    ) -> bool:
        flag = self.flags.get(flag_name)
        if not flag:
            return False

        # 检查用户覆盖
        if user_id and user_id in self._overrides.get(flag_name, {}):
            return self._overrides[flag_name][user_id]

        if flag.state == FlagState.OFF:
            return False
        elif flag.state == FlagState.ON:
            return True
        elif flag.state == FlagState.PERCENTAGE:
            return self._check_percentage(flag_name, user_id, flag.percentage)
        elif flag.state == FlagState.USER_SEGMENT:
            return self._check_segment(user_id, flag.segments, context)

        return flag.default

    def _check_percentage(
        self,
        flag_name: str,
        user_id: Optional[str],
        percentage: float
    ) -> bool:
        """基于用户ID的确定性百分比检查。"""
        if not user_id:
            return False

        hash_input = f"{flag_name}:{user_id}"
        hash_value = int(hashlib.md5(hash_input.encode()).hexdigest(), 16)
        return (hash_value % 100) < (percentage * 100)

    def _check_segment(
        self,
        user_id: Optional[str],
        segments: list,
        context: Dict[str, Any]
    ) -> bool:
        """检查用户是否属于启用段。"""
        if not context:
            return False

        user_segment = context.get("segment")
        return user_segment in (segments or [])

    def set_override(self, flag_name: str, user_id: str, enabled: bool):
        """为测试设置每个用户的覆盖。"""
        if flag_name not in self._overrides:
            self._overrides[flag_name] = {}
        self._overrides[flag_name][user_id] = enabled

    def update_percentage(self, flag_name: str, percentage: float):
        """更新推出百分比。"""
        if flag_name in self.flags:
            self.flags[flag_name].percentage = percentage

# 迁移特定标志
flags = FeatureFlagService()

flags.register(FeatureFlag(
    name="use_new_search_api",
    state=FlagState.PERCENTAGE,
    percentage=0.1  # 10% 推出
))

flags.register(FeatureFlag(
    name="use_new_payment_processor",
    state=FlagState.USER_SEGMENT,
    segments=["beta_testers", "employees"]
))

# 在代码中使用
async def search(query: str, user_id: str):
    if flags.is_enabled("use_new_search_api", user_id):
        return await new_search_service.search(query)
    else:
        return await legacy_search_service.search(query)

6. 自动代码转换

// codemod-example.ts - 使用 jscodeshift 进行自动重构
import jscodeshift, { API, FileInfo, Options } from "jscodeshift";

/**
 * 代码转换:从类组件迁移到函数组件
 *
 * 之前:
 *   class MyComponent extends React.Component {
 *     render() { return <div>Hello</div>; }
 *   }
 *
 * 之后:
 *   function MyComponent() {
 *     return <div>Hello</div>;
 *   }
 */
export default function transformer(
  file: FileInfo,
  api: API,
  options: Options,
) {
  const j = api.jscodeshift;
  const root = j(file.source);

  // 查找所有只有 render 方法的类组件
  root
    .find(j.ClassDeclaration)
    .filter((path) => {
      const superClass = path.value.superClass;
      if (!superClass) return false;

      // 检查是否继承 React.Component
      const extendsReact =
        (j.MemberExpression.check(superClass) &&
          j.Identifier.check(superClass.object) &&
          superClass.object.name === "React" &&
          j.Identifier.check(superClass.property) &&
          superClass.property.name === "Component") ||
        (j.Identifier.check(superClass) && superClass.name === "Component");

      if (!extendsReact) return false;

      // 检查是否只有 render 方法
      const methods = path.value.body.body.filter((node) =>
        j.MethodDefinition.check(node),
      );
      return (
        methods.length === 1 &&
        j.Identifier.check(methods[0].key) &&
        methods[0].key.name === "render"
      );
    })
    .replaceWith((path) => {
      const className = path.value.id?.name || "Component";
      const renderMethod = path.value.body.body.find(
        (node) =>
          j.MethodDefinition.check(node) &&
          j.Identifier.check(node.key) &&
          node.key.name === "render",
      ) as any;

      const returnStatement = renderMethod.value.body.body.find((stmt: any) =>
        j.ReturnStatement.check(stmt),
      );

      // 创建函数组件
      return j.functionDeclaration(
        j.identifier(className),
        [],
        j.blockStatement([returnStatement]),
      );
    });

  return root.toSource();
}

// 运行:jscodeshift -t codemod-example.ts src/**/*.tsx
# ast_codemod.py - 使用 Python AST 进行重构
import ast
import astor
from typing import List

class FunctionRenamer(ast.NodeTransformer):
    """代码转换:重命名废弃函数调用。"""

    def __init__(self, old_name: str, new_name: str):
        self.old_name = old_name
        self.new_name = new_name
        self.changes: List[str] = []

    def visit_Call(self, node: ast.Call) -> ast.Call:
        if isinstance(node.func, ast.Name) and node.func.id == self.old_name:
            old_lineno = node.lineno
            node.func.id = self.new_name
            self.changes.append(f"行 {old_lineno}: {self.old_name} -> {self.new_name}")

        self.generic_visit(node)
        return node

def apply_codemod(source_code: str, old_name: str, new_name: str) -> tuple[str, List[str]]:
    """将函数重命名代码转换应用于源代码。"""
    tree = ast.parse(source_code)
    renamer = FunctionRenamer(old_name, new_name)
    new_tree = renamer.visit(tree)

    new_source = astor.to_source(new_tree)
    return new_source, renamer.changes

# 示例:迁移 authenticate() 到 authenticate_v2()
with open("auth.py") as f:
    source = f.read()

new_source, changes = apply_codemod(source, "authenticate", "authenticate_v2")

print("更改:")
for change in changes:
    print(f"  {change}")

with open("auth.py", "w") as f:
    f.write(new_source)
// rust_codemod_example.rs - 使用 syn 进行 Rust AST 操作
use syn::{visit_mut::VisitMut, File, ItemFn, Expr, ExprMethodCall};
use quote::ToTokens;

/// 代码转换:替换 .unwrap() 为适当的错误处理
struct UnwrapReplacer {
    changes: Vec<String>,
}

impl VisitMut for UnwrapReplacer {
    fn visit_expr_method_call_mut(&mut self, node: &mut ExprMethodCall) {
        if node.method == "unwrap" {
            // 转换 .unwrap() 为 ?
            self.changes.push(format!(
                "已替换 .unwrap() 为 ? 运算符"
            ));

            // 注意:这是简化版 - 真实实现需要正确处理表达式转换
        }

        syn::visit_mut::visit_expr_method_call_mut(self, node);
    }
}

fn apply_codemod(source: &str) -> Result<(String, Vec<String>), syn::Error> {
    let mut file = syn::parse_file(source)?;

    let mut replacer = UnwrapReplacer { changes: Vec::new() };
    replacer.visit_file_mut(&mut file);

    let new_source = file.to_token_stream().to_string();
    Ok((new_source, replacer.changes))
}

7. 增量迁移策略

黑暗启动

# dark_launch.py
from typing import Callable, TypeVar, Generic
import logging

T = TypeVar('T')

class DarkLaunch(Generic[T]):
    """执行新代码路径但忽略结果(仅验证)。"""

    def __init__(
        self,
        production_impl: Callable[..., T],
        dark_impl: Callable[..., T],
        enabled: bool = False,
        sample_rate: float = 1.0
    ):
        self.production = production_impl
        self.dark = dark_impl
        self.enabled = enabled
        self.sample_rate = sample_rate
        self.logger = logging.getLogger(__name__)

    async def execute(self, *args, **kwargs) -> T:
        """执行生产代码,可选在后台运行黑暗代码。"""
        # 始终运行生产代码
        result = await self.production(*args, **kwargs)

        # 有条件地运行黑暗代码(不阻塞)
        if self.enabled and random.random() < self.sample_rate:
            import asyncio
            asyncio.create_task(self._run_dark(*args, **kwargs))

        return result

    async def _run_dark(self, *args, **kwargs):
        """运行黑暗实现并记录结果/错误。"""
        try:
            dark_result = await self.dark(*args, **kwargs)
            self.logger.info(f"黑暗启动成功:{dark_result}")
        except Exception as e:
            self.logger.error(f"黑暗启动失败:{e}", exc_info=True)

# 使用
dark_search = DarkLaunch(
    production_impl=old_search_service.search,
    dark_impl=new_search_service.search,
    enabled=True,
    sample_rate=0.1  # 10% 采样
)

results = await dark_search.execute(query="test")

抽象分支

// branch-by-abstraction.ts
// 步骤1:创建抽象层
interface PaymentProcessor {
  processPayment(amount: number, token: string): Promise<PaymentResult>;
  refund(transactionId: string): Promise<void>;
}

// 步骤2:为遗留系统实现抽象
class StripeAdapter implements PaymentProcessor {
  constructor(private stripe: StripeAPI) {}

  async processPayment(amount: number, token: string): Promise<PaymentResult> {
    const charge = await this.stripe.charges.create({
      amount,
      source: token,
    });
    return { transactionId: charge.id, status: "success" };
  }

  async refund(transactionId: string): Promise<void> {
    await this.stripe.refunds.create({ charge: transactionId });
  }
}

// 步骤3:为新系统实现抽象
class BraintreeAdapter implements PaymentProcessor {
  constructor(private braintree: BraintreeAPI) {}

  async processPayment(amount: number, token: string): Promise<PaymentResult> {
    const result = await this.braintree.transaction.sale({
      amount: amount.toString(),
      paymentMethodNonce: token,
    });
    return { transactionId: result.transaction.id, status: "success" };
  }

  async refund(transactionId: string): Promise<void> {
    await this.braintree.transaction.refund(transactionId);
  }
}

// 步骤4:基于功能标志路由到实现
class PaymentService {
  constructor(
    private stripeAdapter: StripeAdapter,
    private braintreeAdapter: BraintreeAdapter,
    private featureFlags: FeatureFlagService,
  ) {}

  private getProcessor(userId: string): PaymentProcessor {
    if (this.featureFlags.isEnabled("use_braintree", userId)) {
      return this.braintreeAdapter;
    }
    return this.stripeAdapter;
  }

  async processPayment(
    userId: string,
    amount: number,
    token: string,
  ): Promise<PaymentResult> {
    const processor = this.getProcessor(userId);
    return processor.processPayment(amount, token);
  }
}

// 步骤5:迁移完成后,可移除抽象

8. 迁移期间测试

# migration_testing.py
import pytest
from typing import Callable, Any, List
from dataclasses import dataclass

@dataclass
class MigrationTestCase:
    name: str
    input_data: Any
    expected_output: Any
    legacy_impl: Callable
    new_impl: Callable

class MigrationTester:
    """验证迁移正确性的测试框架。"""

    def __init__(self):
        self.test_cases: List[MigrationTestCase] = []
        self.failures: List[dict] = []

    def add_test_case(self, test_case: MigrationTestCase):
        self.test_cases.append(test_case)

    async def run_all(self) -> bool:
        """运行所有测试用例,比较遗留与新实现。"""
        all_passed = True

        for test_case in self.test_cases:
            try:
                legacy_result = await test_case.legacy_impl(test_case.input_data)
                new_result = await test_case.new_impl(test_case.input_data)

                if legacy_result != new_result:
                    all_passed = False
                    self.failures.append({
                        'test': test_case.name,
                        'input': test_case.input_data,
                        'legacy': legacy_result,
                        'new': new_result,
                        'expected': test_case.expected_output
                    })
                    print(f"失败:{test_case.name}")
                    print(f"  遗留:{legacy_result}")
                    print(f"  新:    {new_result}")
                else:
                    print(f"通过:{test_case.name}")

            except Exception as e:
                all_passed = False
                self.failures.append({
                    'test': test_case.name,
                    'error': str(e)
                })
                print(f"错误:{test_case.name} - {e}")

        return all_passed

    def generate_report(self) -> str:
        """生成迁移测试报告。"""
        total = len(self.test_cases)
        passed = total - len(self.failures)

        report = [
            "# 迁移测试报告",
            f"总计:{total} | 通过:{passed} | 失败:{len(self.failures)}",
            ""
        ]

        if self.failures:
            report.append("## 失败")
            for failure in self.failures:
                report.append(f"- {failure.get('test', '未知')}")
                if 'error' in failure:
                    report.append(f"  错误:{failure['error']}")
                else:
                    report.append(f"  遗留:{failure.get('legacy')}")
                    report.append(f"  新:{failure.get('new')}")

        return "
".join(report)

# 使用
tester = MigrationTester()

tester.add_test_case(MigrationTestCase(
    name="search_basic_query",
    input_data="test query",
    expected_output=SearchResults(...),
    legacy_impl=old_search.search,
    new_impl=new_search.search
))

await tester.run_all()
print(tester.generate_report())
# snapshot_testing.py
import json
import hashlib
from pathlib import Path

class SnapshotTester:
    """用于迁移验证的快照测试。"""

    def __init__(self, snapshot_dir: str = ".snapshots"):
        self.snapshot_dir = Path(snapshot_dir)
        self.snapshot_dir.mkdir(exist_ok=True)

    def capture_snapshot(self, name: str, data: Any):
        """捕获当前输出的快照。"""
        snapshot_path = self.snapshot_dir / f"{name}.json"

        serialized = json.dumps(data, sort_keys=True, indent=2)
        snapshot_path.write_text(serialized)

        # 同时保存哈希以便快速比较
        hash_path = self.snapshot_dir / f"{name}.hash"
        data_hash = hashlib.sha256(serialized.encode()).hexdigest()
        hash_path.write_text(data_hash)

    def verify_snapshot(self, name: str, data: Any) -> bool:
        """验证数据匹配快照。"""
        snapshot_path = self.snapshot_dir / f"{name}.json"

        if not snapshot_path.exists():
            raise FileNotFoundError(f"快照 {name} 未找到")

        expected = snapshot_path.read_text()
        actual = json.dumps(data, sort_keys=True, indent=2)

        if expected != actual:
            # 保存差异用于调试
            diff_path = self.snapshot_dir / f"{name}.diff"
            diff_path.write_text(f"预期:
{expected}

实际:
{actual}")
            return False

        return True

# 迁移工作流与快照
snapshot = SnapshotTester()

# 1. 从遗留系统捕获基线
legacy_data = await legacy_search.search("test")
snapshot.capture_snapshot("search_test_legacy", legacy_data)

# 2. 运行迁移

# 3. 验证新系统匹配遗留快照
new_data = await new_search.search("test")
matches = snapshot.verify_snapshot("search_test_legacy", new_data)

assert matches, "新实现不匹配遗留行为"

9. 回滚策略

# rollback.py
from dataclasses import dataclass
from typing import List, Callable, Optional
from datetime import datetime
import subprocess
import json

@dataclass
class RollbackPoint:
    id: str
    timestamp: datetime
    description: str
    git_commit: str
    db_migration_version: str
    config_snapshot: dict
    rollback_commands: List[str]

class RollbackManager:
    def __init__(self):
        self.rollback_points: List[RollbackPoint] = []
        self.current_point: Optional[RollbackPoint] = None

    def create_rollback_point(
        self,
        description: str,
        rollback_commands: List[str] = None
    ) -> RollbackPoint:
        """在迁移前创建回滚点。"""
        point = RollbackPoint(
            id=f"rbp_{datetime.now().strftime('%Y%m%d_%H%M%S')}",
            timestamp=datetime.now(),
            description=description,
            git_commit=self._get_current_commit(),
            db_migration_version=self._get_db_version(),
            config_snapshot=self._capture_config(),
            rollback_commands=rollback_commands or []
        )

        self.rollback_points.append(point)
        self._save_point(point)
        return point

    def rollback_to(self, point_id: str) -> bool:
        """执行回滚到指定点。"""
        point = self._find_point(point_id)
        if not point:
            raise ValueError(f"回滚点 {point_id} 未找到")

        print(f"回滚到:{point.description}")

        try:
            # 1. 回滚代码
            self._rollback_code(point.git_commit)

            # 2. 回滚数据库
            self._rollback_database(point.db_migration_version)

            # 3. 恢复配置
            self._restore_config(point.config_snapshot)

            # 4. 执行自定义回滚命令
            for cmd in point.rollback_commands:
                subprocess.run(cmd, shell=True, check=True)

            self.current_point = point
            return True

        except Exception as e:
            print(f"回滚失败:{e}")
            return False

    def _get_current_commit(self) -> str:
        result = subprocess.run(
            ["git", "rev-parse", "HEAD"],
            capture_output=True,
            text=True
        )
        return result.stdout.strip()

    def _get_db_version(self) -> str:
        # 实现取决于迁移工具
        return "migration_version"

    def _capture_config(self) -> dict:
        # 捕获当前配置
        return {}

    def _rollback_code(self, commit: str):
        subprocess.run(["git", "checkout", commit], check=True)

    def _rollback_database(self, version: str):
        # Alembic 示例
        subprocess.run(["alembic", "downgrade", version], check=True)

    def _restore_config(self, config: dict):
        # 恢复配置
        pass

    def _save_point(self, point: RollbackPoint):
        # 持久化回滚点
        with open(f"rollback_points/{point.id}.json", "w") as f:
            json.dump({
                "id": point.id,
                "timestamp": point.timestamp.isoformat(),
                "description": point.description,
                "git_commit": point.git_commit,
                "db_migration_version": point.db_migration_version,
                "rollback_commands": point.rollback_commands
            }, f)

    def _find_point(self, point_id: str) -> Optional[RollbackPoint]:
        return next(
            (p for p in self.rollback_points if p.id == point_id),
            None
        )

# 自动回滚触发器
class AutoRollback:
    def __init__(
        self,
        rollback_manager: RollbackManager,
        health_check: Callable[[], bool],
        error_threshold: int = 10,
        check_interval: int = 60
    ):
        self.manager = rollback_manager
        self.health_check = health_check
        self.error_threshold = error_threshold
        self.check_interval = check_interval
        self.error_count = 0

    async def monitor(self, rollback_point_id: str):
        """监控健康状态并在需要时自动回滚。"""
        import asyncio

        while True:
            try:
                if not self.health_check():
                    self.error_count += 1
                    print(f"健康检查失败({self.error_count}/{self.error_threshold})")

                    if self.error_count >= self.error_threshold:
                        print("错误阈值达到,启动回滚")
                        self.manager.rollback_to(rollback_point_id)
                        break
                else:
                    self.error_count = 0

                await asyncio.sleep(self.check_interval)

            except Exception as e:
                print(f"监控错误:{e}")
                self.error_count += 1

最佳实践

  1. 彻底规划:在开始前记录迁移步骤、依赖项和回滚程序。

  2. 增量迁移:使用绞杀者模式、功能标志、黑暗启动和百分比推出以降低风险。

  3. 自动化可能部分:使用代码转换工具(jscodeshift、ts-morph、AST工具)自动化重复代码转换。

  4. 影子测试:在切换前并行运行新旧系统以验证行为。比较结果并记录差异。

  5. 持续测试:使用迁移特定测试(快照测试、比较测试)在每一步验证正确性。

  6. 密切监控:在迁移期间跟踪错误率、延迟和业务指标。设置自动回滚触发器。

  7. 抽象分支:创建抽象层以将迁移与功能开发解耦。

  8. 保持向后兼容性:在过渡期间保持API兼容性。支持新旧接口。

  9. 文档化废弃:清晰的废弃通知和迁移指南帮助消费者。跟踪废弃使用。

  10. 始终有回滚计划:切勿在没有测试回滚计划的情况下部署迁移。在每个阶段前创建回滚点。

示例

完整迁移工作流

class MigrationWorkflow:
    def __init__(self):
        self.rollback_manager = RollbackManager()
        self.feature_flags = FeatureFlagService()
        self.metrics = MigrationMetrics()

    async def execute_migration(
        self,
        name: str,
        legacy_impl: Callable,
        new_impl: Callable
    ):
        # 1. 创建回滚点
        rollback_point = self.rollback_manager.create_rollback_point(
            description=f"在 {name} 迁移前"
        )

        # 2. 注册功能标志
        self.feature_flags.register(FeatureFlag(
            name=f"migration_{name}",
            state=FlagState.PERCENTAGE,
            percentage=0.0
        ))

        # 3. 影子模式测试
        print("阶段1:影子模式测试")
        migrator = IncrementalMigrator(
            legacy_impl=legacy_impl,
            new_impl=new_impl,
            config=MigrationConfig(
                phase=MigrationPhase.SHADOW_MODE,
                comparison_enabled=True
            )
        )

        # 运行影子测试...
        if migrator.metrics.match_rate < 0.99:
            print("影子模式匹配率太低,中止")
            return False

        # 4. 金丝雀部署
        print("阶段2:金丝雀部署(1%)")
        self.feature_flags.update_percentage(f"migration_{name}", 0.01)

        # 监控问题...

        # 5. 渐进推出
        for percentage in [0.05, 0.1, 0.25, 0.5, 0.75, 1.0]:
            print(f"阶段3:推出到 {percentage*100}%")
            self.feature_flags.update_percentage(f"migration_{name}", percentage)
            # 监控和验证...

        # 6. 完成迁移
        print("迁移完成")
        return True