DataMigrationScriptsSkill data-migration-scripts

创建安全、可逆的数据库迁移脚本,用于模式变更、数据迁移和转换,支持零停机部署。

数据工程 0 次安装 0 次浏览 更新于 3/3/2026

数据迁移脚本

概览

创建健壮、安全且可逆的数据迁移脚本,用于数据库模式更改和数据转换,最小化停机时间。

何时使用

  • 数据库模式更改
  • 添加/移除/修改列
  • 迁移不同数据库系统间的数据
  • 数据转换和清理
  • 分割或合并表
  • 更改数据类型
  • 添加索引和约束
  • 回填数据
  • 多租户数据迁移

迁移原则

  1. 可逆 - 每次迁移都应有回滚
  2. 幂等 - 多次运行安全
  3. 原子 - 全部或无执行
  4. 测试 - 在类似生产的数据上测试
  5. 监控 - 跟踪进度和错误
  6. 文档化 - 清晰的目的和副作用

实施示例

1. Knex.js 迁移 (Node.js)

import { Knex } from 'knex';

// migrations/20240101000000_add_user_preferences.ts
export async function up(knex: Knex): Promise<void> {
  // 创建新表
  await knex.schema.createTable('user_preferences', (table) => {
    table.uuid('id').primary().defaultTo(knex.raw('gen_random_uuid()'));
    table.uuid('user_id').notNullable().references('id').inTable('users').onDelete('CASCADE');
    table.jsonb('preferences').defaultTo('{}');
    table.timestamp('created_at').defaultTo(knex.fn.now());
    table.timestamp('updated_at').defaultTo(knex.fn.now());

    table.index('user_id');
  });

  // 迁移现有数据
  await knex.raw(`
    INSERT INTO user_preferences (user_id, preferences)
    SELECT id, jsonb_build_object(
      'theme', COALESCE(theme, 'light'),
      'notifications', COALESCE(notifications_enabled, true)
    )
    FROM users
    WHERE theme IS NOT NULL OR notifications_enabled IS NOT NULL
  `);

  console.log('Migrated user preferences for', await knex('user_preferences').count());
}

export async function down(knex: Knex): Promise<void> {
  // 恢复数据到原始表
  await knex.raw(`
    UPDATE users u
    SET
      theme = (p.preferences->>'theme'),
      notifications_enabled = (p.preferences->>'notifications')::boolean
    FROM user_preferences p
    WHERE u.id = p.user_id
  `);

  // 删除新表
  await knex.schema.dropTableIfExists('user_preferences');
}
// migrations/20240102000000_add_email_verification.ts
export async function up(knex: Knex): Promise<void> {
  // 添加新列
  await knex.schema.table('users', (table) => {
    table.boolean('email_verified').defaultTo(false);
    table.timestamp('email_verified_at').nullable();
    table.string('verification_token').nullable();
  });

  // 回填已验证状态给现有用户
  await knex('users')
    .where('created_at', '<', knex.raw("NOW() - INTERVAL '30 days'"))
    .update({
      email_verified: true,
      email_verified_at: knex.fn.now()
    });

  // 添加索引
  await knex.schema.table('users', (table) => {
    table.index('verification_token');
  });
}

export async function down(knex: Knex): Promise<void> {
  await knex.schema.table('users', (table) => {
    table.dropIndex('verification_token');
    table.dropColumn('email_verified');
    table.dropColumn('email_verified_at');
    table.dropColumn('verification_token');
  });
}

2. Alembic 迁移 (Python/SQLAlchemy)

"""添加用户角色和权限

修订 ID: a1b2c3d4e5f6
修订依据: previous_revision
创建日期: 2024-01-01 00:00:00

"""
from alembic import op
import sqlalchemy as sa
from sqlalchemy.dialects import postgresql

# 修订标识符
revision = 'a1b2c3d4e5f6'
down_revision = 'previous_revision'
branch_labels = None
depends_on = None


def upgrade():
    # 创建角色表
    op.create_table(
        'roles',
        sa.Column('id', sa.Integer(), primary_key=True),
        sa.Column('name', sa.String(50), unique=True, nullable=False),
        sa.Column('description', sa.Text()),
        sa.Column('created_at', sa.DateTime(), server_default=sa.func.now()),
    )

    # 创建用户角色连接表
    op.create_table(
        'user_roles',
        sa.Column('user_id', sa.Integer(), sa.ForeignKey('users.id', ondelete='CASCADE')),
        sa.Column('role_id', sa.Integer(), sa.ForeignKey('roles.id', ondelete='CASCADE')),
        sa.Column('assigned_at', sa.DateTime(), server_default=sa.func.now()),
        sa.PrimaryKeyConstraint('user_id', 'role_id')
    )

    # 创建索引
    op.create_index('idx_user_roles_user_id', 'user_roles', ['user_id'])
    op.create_index('idx_user_roles_role_id', 'user_roles', ['role_id'])

    # 插入默认角色
    op.execute("""
        INSERT INTO roles (name, description) VALUES
        ('admin', 'Administrator with full access'),
        ('user', 'Standard user'),
        ('guest', 'Guest with limited access')
    """)

    # 将现有用户迁移到默认角色
    op.execute("""
        INSERT INTO user_roles (user_id, role_id)
        SELECT u.id, r.id
        FROM users u
        CROSS JOIN roles r
        WHERE r.name = 'user'
    """)


def downgrade():
    # 反向顺序删除表
    op.drop_index('idx_user_roles_role_id', 'user_roles')
    op.drop_index('idx_user_roles_user_id', 'user_roles')
    op.drop_table('user_roles')
    op.drop_table('roles')

3. 批量大数据迁移

import { Knex } from 'knex';

interface MigrationProgress {
  total: number;
  processed: number;
  errors: number;
  startTime: number;
}

class LargeDataMigration {
  private batchSize = 1000;
  private progress: MigrationProgress = {
    total: 0,
    processed: 0,
    errors: 0,
    startTime: Date.now()
  };

  async migrate(knex: Knex): Promise<void> {
    console.log('Starting large data migration...');

    // 获取总数
    const result = await knex('old_table').count('* as count').first();
    this.progress.total = parseInt(result?.count as string || '0');

    console.log(`Total records to migrate: ${this.progress.total}`);

    // 批量处理
    let offset = 0;
    while (offset < this.progress.total) {
      await this.processBatch(knex, offset);
      offset += this.batchSize;

      // 记录进度
      this.logProgress();

      // 小延迟以避免压垮数据库
      await this.delay(100);
    }

    console.log('Migration complete!');
    this.logProgress();
  }

  private async processBatch(knex: Knex, offset: number): Promise<void> {
    const trx = await knex.transaction();

    try {
      // 获取批次
      const records = await trx('old_table')
        .select('*')
        .limit(this.batchSize)
        .offset(offset);

      // 转换并插入
      const transformed = records.map(record => this.transformRecord(record));

      if (transformed.length > 0) {
        await trx('new_table')
          .insert(transformed)
          .onConflict('id')
          .merge(); // Upsert
      }

      await trx.commit();

      this.progress.processed += records.length;
    } catch (error) {
      await trx.rollback();
      console.error(`Batch failed at offset ${offset}:`, error);
      this.progress.errors += this.batchSize;

      // 根据错误严重性继续或中止
      throw error;
    }
  }

  private transformRecord(record: any): any {
    return {
      id: record.id,
      user_id: record.userId,
      data: JSON.stringify(record.legacyData),
      created_at: record.createdAt,
      updated_at: new Date()
    };
  }

  private logProgress(): void {
    const percent = ((this.progress.processed / this.progress.total) * 100).toFixed(2);
    const elapsed = Date.now() - this.progress.startTime;
    const rate = this.progress.processed / (elapsed / 1000);

    console.log(
      `Progress: ${this.progress.processed}/${this.progress.total} (${percent}%) ` +
      `Errors: ${this.progress.errors} ` +
      `Rate: ${rate.toFixed(2)} records/sec`
    );
  }

  private delay(ms: number): Promise<void> {
    return new Promise(resolve => setTimeout(resolve, ms));
  }
}

// 迁移中的使用
export async function up(knex: Knex): Promise<void> {
  const migration = new LargeDataMigration();
  await migration.migrate(knex);
}

4. 零停机迁移模式

// 第一阶段:添加新列(可空)
export async function up_phase1(knex: Knex): Promise<void> {
  await knex.schema.table('users', (table) => {
    table.string('email_new').nullable();
  });

  console.log('Phase 1: Added new column');
}

// 第二阶段:回填数据
export async function up_phase2(knex: Knex): Promise<void> {
  const batchSize = 1000;
  let processed = 0;

  while (true) {
    const result = await knex('users')
      .whereNull('email_new')
      .whereNotNull('email')
      .limit(batchSize)
      .update({
        email_new: knex.raw('email')
      });

    processed += result;

    if (result < batchSize) break;

    console.log(`Backfilled ${processed} records`);
    await new Promise(resolve => setTimeout(resolve, 100));
  }

  console.log(`Phase 2: Backfilled ${processed} total records`);
}

// 第三阶段:添加约束
export async function up_phase3(knex: Knex): Promise<void> {
  await knex.schema.alterTable('users', (table) => {
    table.string('email_new').notNullable().alter();
    table.unique('email_new');
  });

  console.log('Phase 3: Added constraints');
}

// 第四阶段:删除旧列
export async function up_phase4(knex: Knex): Promise<void> {
  await knex.schema.table('users', (table) => {
    table.dropColumn('email');
  });

  await knex.schema.table('users', (table) => {
    table.renameColumn('email_new', 'email');
  });

  console.log('Phase 4: Completed migration');
}

5. 迁移验证

class MigrationValidator {
  async validate(knex: Knex, migration: string): Promise<boolean> {
    console.log(`Validating migration: ${migration}`);

    const checks = [
      this.checkDataIntegrity(knex),
      this.checkConstraints(knex),
      this.checkIndexes(knex),
      this.checkRowCounts(knex)
    ];

    const results = await Promise.all(checks);
    const passed = results.every(r => r);

    if (passed) {
      console.log('✓ All validation checks passed');
    } else {
      console.error('✗ Validation failed');
    }

    return passed;
  }

  private async checkDataIntegrity(knex: Knex): Promise<boolean> {
    // 检查孤立记录
    const orphaned = await knex('user_roles')
      .leftJoin('users', 'user_roles.user_id', 'users.id')
      .whereNull('users.id')
      .count('* as count')
      .first();

    const count = parseInt(orphaned?.count as string || '0');

    if (count > 0) {
      console.error(`Found ${count} orphaned user_roles records`);
      return false;
    }

    console.log('✓ Data integrity check passed');
    return true;
  }

  private async checkConstraints(knex: Knex): Promise<boolean> {
    // 验证约束存在
    const result = await knex.raw(`
      SELECT COUNT(*) as count
      FROM information_schema.table_constraints
      WHERE table_name = 'users'
      AND constraint_type = 'UNIQUE'
      AND constraint_name LIKE '%email%'
    `);

    const hasConstraint = result.rows[0].count > 0;

    if (!hasConstraint) {
      console.error('Email unique constraint missing');
      return false;
    }

    console.log('✓ Constraints check passed');
    return true;
  }

  private async checkIndexes(knex: Knex): Promise<boolean> {
    // 验证索引存在
    const result = await knex.raw(`
      SELECT indexname
      FROM pg_indexes
      WHERE tablename = 'users'
      AND indexname LIKE '%email%'
    `);

    if (result.rows.length === 0) {
      console.error('Email index missing');
      return false;
    }

    console.log('✓ Indexes check passed');
    return true;
  }

  private async checkRowCounts(knex: Knex): Promise<boolean> {
    const [oldCount, newCount] = await Promise.all([
      knex('users').count('* as count').first(),
      knex('user_preferences').count('* as count').first()
    ]);

    const old = parseInt(oldCount?.count as string || '0');
    const new_ = parseInt(newCount?.count as string || '0');

    if (Math.abs(old - new_) > old * 0.01) {
      console.error(`Row count mismatch: ${old} vs ${new_}`);
      return false;
    }

    console.log('✓ Row counts check passed');
    return true;
  }
}

// 使用
export async function up(knex: Knex): Promise<void> {
  // 运行迁移
  await performMigration(knex);

  // 验证
  const validator = new MigrationValidator();
  const valid = await validator.validate(knex, 'add_user_preferences');

  if (!valid) {
    throw new Error('Migration validation failed');
  }
}

6. 跨数据库迁移

from sqlalchemy import create_engine, MetaData, Table
from sqlalchemy.orm import sessionmaker
import logging

logger = logging.getLogger(__name__)

class CrossDatabaseMigration:
    def __init__(self, source_url: str, target_url: str):
        self.source_engine = create_engine(source_url)
        self.target_engine = create_engine(target_url)

        self.source_session = sessionmaker(bind=self.source_engine)()
        self.target_session = sessionmaker(bind=self.target_engine)()

    def migrate_table(self, table_name: str, batch_size: int = 1000):
        """从源数据库迁移表到目标数据库。"""
        logger.info(f"Starting migration of table: {table_name}")

        # 获取表元数据
        metadata = MetaData()
        source_table = Table(
            table_name,
            metadata,
            autoload_with=self.source_engine
        )

        # 获取总数
        total = self.source_session.execute(
            source_table.select().with_only_columns(func.count())
        ).scalar()

        logger.info(f"Total records to migrate: {total}")

        # 批量迁移
        offset = 0
        while offset < total:
            # 从源数据库获取批次
            results = self.source_session.execute(
                source_table.select()
                .limit(batch_size)
                .offset(offset)
            ).fetchall()

            if not results:
                break

            # 转换并插入到目标数据库
            rows = [dict(row._mapping) for row in results]
            transformed = [self.transform_row(row) for row in rows]

            self.target_session.execute(
                source_table.insert(),
                transformed
            )
            self.target_session.commit()

            offset += batch_size
            logger.info(f"Migrated {offset}/{total} records")

        logger.info(f"Completed migration of {table_name}")

    def transform_row(self, row: dict) -> dict:
        """如果需要,转换行数据。"""
        # 应用任何转换
        return row

    def cleanup(self):
        """关闭连接。"""
        self.source_session.close()
        self.target_session.close()

最佳实践

✅ 做

  • 总是编写 updown 迁移
  • 在类似生产的数据上测试迁移
  • 使用事务进行原子操作
  • 批量处理大数据集
  • 数据插入后添加索引
  • 迁移后验证数据
  • 记录进度和错误
  • 包含错误处理
  • 测试回滚程序
  • 编写文档
  • 运行迁移前备份数据库
  • 团队审查

❌ 避免

  • 在生产上运行未经测试的迁移
  • 进行没有向后兼容性的破坏性更改
  • 在单个事务中处理数百万行
  • 跳过回滚实现
  • 忽略迁移失败
  • 修改旧迁移
  • 无备份删除数据
  • 在生产中手动运行迁移

迁移清单

  • [ ] 迁移包含 updown
  • [ ] 在类似生产的数据集上测试
  • [ ] 适当使用事务
  • [ ] 大数据集批量处理
  • [ ] 数据插入后添加索引
  • [ ] 包含数据验证
  • [ ] 实施进度记录
  • [ ] 包含错误处理
  • [ ] 测试回滚
  • [ ] 编写文档
  • [ ] 备份
  • [ ] 团队审查

资源