数据迁移脚本
概览
创建健壮、安全且可逆的数据迁移脚本,用于数据库模式更改和数据转换,最小化停机时间。
何时使用
- 数据库模式更改
- 添加/移除/修改列
- 迁移不同数据库系统间的数据
- 数据转换和清理
- 分割或合并表
- 更改数据类型
- 添加索引和约束
- 回填数据
- 多租户数据迁移
迁移原则
- 可逆 - 每次迁移都应有回滚
- 幂等 - 多次运行安全
- 原子 - 全部或无执行
- 测试 - 在类似生产的数据上测试
- 监控 - 跟踪进度和错误
- 文档化 - 清晰的目的和副作用
实施示例
1. Knex.js 迁移 (Node.js)
import { Knex } from 'knex';
// migrations/20240101000000_add_user_preferences.ts
export async function up(knex: Knex): Promise<void> {
// 创建新表
await knex.schema.createTable('user_preferences', (table) => {
table.uuid('id').primary().defaultTo(knex.raw('gen_random_uuid()'));
table.uuid('user_id').notNullable().references('id').inTable('users').onDelete('CASCADE');
table.jsonb('preferences').defaultTo('{}');
table.timestamp('created_at').defaultTo(knex.fn.now());
table.timestamp('updated_at').defaultTo(knex.fn.now());
table.index('user_id');
});
// 迁移现有数据
await knex.raw(`
INSERT INTO user_preferences (user_id, preferences)
SELECT id, jsonb_build_object(
'theme', COALESCE(theme, 'light'),
'notifications', COALESCE(notifications_enabled, true)
)
FROM users
WHERE theme IS NOT NULL OR notifications_enabled IS NOT NULL
`);
console.log('Migrated user preferences for', await knex('user_preferences').count());
}
export async function down(knex: Knex): Promise<void> {
// 恢复数据到原始表
await knex.raw(`
UPDATE users u
SET
theme = (p.preferences->>'theme'),
notifications_enabled = (p.preferences->>'notifications')::boolean
FROM user_preferences p
WHERE u.id = p.user_id
`);
// 删除新表
await knex.schema.dropTableIfExists('user_preferences');
}
// migrations/20240102000000_add_email_verification.ts
export async function up(knex: Knex): Promise<void> {
// 添加新列
await knex.schema.table('users', (table) => {
table.boolean('email_verified').defaultTo(false);
table.timestamp('email_verified_at').nullable();
table.string('verification_token').nullable();
});
// 回填已验证状态给现有用户
await knex('users')
.where('created_at', '<', knex.raw("NOW() - INTERVAL '30 days'"))
.update({
email_verified: true,
email_verified_at: knex.fn.now()
});
// 添加索引
await knex.schema.table('users', (table) => {
table.index('verification_token');
});
}
export async function down(knex: Knex): Promise<void> {
await knex.schema.table('users', (table) => {
table.dropIndex('verification_token');
table.dropColumn('email_verified');
table.dropColumn('email_verified_at');
table.dropColumn('verification_token');
});
}
2. Alembic 迁移 (Python/SQLAlchemy)
"""添加用户角色和权限
修订 ID: a1b2c3d4e5f6
修订依据: previous_revision
创建日期: 2024-01-01 00:00:00
"""
from alembic import op
import sqlalchemy as sa
from sqlalchemy.dialects import postgresql
# 修订标识符
revision = 'a1b2c3d4e5f6'
down_revision = 'previous_revision'
branch_labels = None
depends_on = None
def upgrade():
# 创建角色表
op.create_table(
'roles',
sa.Column('id', sa.Integer(), primary_key=True),
sa.Column('name', sa.String(50), unique=True, nullable=False),
sa.Column('description', sa.Text()),
sa.Column('created_at', sa.DateTime(), server_default=sa.func.now()),
)
# 创建用户角色连接表
op.create_table(
'user_roles',
sa.Column('user_id', sa.Integer(), sa.ForeignKey('users.id', ondelete='CASCADE')),
sa.Column('role_id', sa.Integer(), sa.ForeignKey('roles.id', ondelete='CASCADE')),
sa.Column('assigned_at', sa.DateTime(), server_default=sa.func.now()),
sa.PrimaryKeyConstraint('user_id', 'role_id')
)
# 创建索引
op.create_index('idx_user_roles_user_id', 'user_roles', ['user_id'])
op.create_index('idx_user_roles_role_id', 'user_roles', ['role_id'])
# 插入默认角色
op.execute("""
INSERT INTO roles (name, description) VALUES
('admin', 'Administrator with full access'),
('user', 'Standard user'),
('guest', 'Guest with limited access')
""")
# 将现有用户迁移到默认角色
op.execute("""
INSERT INTO user_roles (user_id, role_id)
SELECT u.id, r.id
FROM users u
CROSS JOIN roles r
WHERE r.name = 'user'
""")
def downgrade():
# 反向顺序删除表
op.drop_index('idx_user_roles_role_id', 'user_roles')
op.drop_index('idx_user_roles_user_id', 'user_roles')
op.drop_table('user_roles')
op.drop_table('roles')
3. 批量大数据迁移
import { Knex } from 'knex';
interface MigrationProgress {
total: number;
processed: number;
errors: number;
startTime: number;
}
class LargeDataMigration {
private batchSize = 1000;
private progress: MigrationProgress = {
total: 0,
processed: 0,
errors: 0,
startTime: Date.now()
};
async migrate(knex: Knex): Promise<void> {
console.log('Starting large data migration...');
// 获取总数
const result = await knex('old_table').count('* as count').first();
this.progress.total = parseInt(result?.count as string || '0');
console.log(`Total records to migrate: ${this.progress.total}`);
// 批量处理
let offset = 0;
while (offset < this.progress.total) {
await this.processBatch(knex, offset);
offset += this.batchSize;
// 记录进度
this.logProgress();
// 小延迟以避免压垮数据库
await this.delay(100);
}
console.log('Migration complete!');
this.logProgress();
}
private async processBatch(knex: Knex, offset: number): Promise<void> {
const trx = await knex.transaction();
try {
// 获取批次
const records = await trx('old_table')
.select('*')
.limit(this.batchSize)
.offset(offset);
// 转换并插入
const transformed = records.map(record => this.transformRecord(record));
if (transformed.length > 0) {
await trx('new_table')
.insert(transformed)
.onConflict('id')
.merge(); // Upsert
}
await trx.commit();
this.progress.processed += records.length;
} catch (error) {
await trx.rollback();
console.error(`Batch failed at offset ${offset}:`, error);
this.progress.errors += this.batchSize;
// 根据错误严重性继续或中止
throw error;
}
}
private transformRecord(record: any): any {
return {
id: record.id,
user_id: record.userId,
data: JSON.stringify(record.legacyData),
created_at: record.createdAt,
updated_at: new Date()
};
}
private logProgress(): void {
const percent = ((this.progress.processed / this.progress.total) * 100).toFixed(2);
const elapsed = Date.now() - this.progress.startTime;
const rate = this.progress.processed / (elapsed / 1000);
console.log(
`Progress: ${this.progress.processed}/${this.progress.total} (${percent}%) ` +
`Errors: ${this.progress.errors} ` +
`Rate: ${rate.toFixed(2)} records/sec`
);
}
private delay(ms: number): Promise<void> {
return new Promise(resolve => setTimeout(resolve, ms));
}
}
// 迁移中的使用
export async function up(knex: Knex): Promise<void> {
const migration = new LargeDataMigration();
await migration.migrate(knex);
}
4. 零停机迁移模式
// 第一阶段:添加新列(可空)
export async function up_phase1(knex: Knex): Promise<void> {
await knex.schema.table('users', (table) => {
table.string('email_new').nullable();
});
console.log('Phase 1: Added new column');
}
// 第二阶段:回填数据
export async function up_phase2(knex: Knex): Promise<void> {
const batchSize = 1000;
let processed = 0;
while (true) {
const result = await knex('users')
.whereNull('email_new')
.whereNotNull('email')
.limit(batchSize)
.update({
email_new: knex.raw('email')
});
processed += result;
if (result < batchSize) break;
console.log(`Backfilled ${processed} records`);
await new Promise(resolve => setTimeout(resolve, 100));
}
console.log(`Phase 2: Backfilled ${processed} total records`);
}
// 第三阶段:添加约束
export async function up_phase3(knex: Knex): Promise<void> {
await knex.schema.alterTable('users', (table) => {
table.string('email_new').notNullable().alter();
table.unique('email_new');
});
console.log('Phase 3: Added constraints');
}
// 第四阶段:删除旧列
export async function up_phase4(knex: Knex): Promise<void> {
await knex.schema.table('users', (table) => {
table.dropColumn('email');
});
await knex.schema.table('users', (table) => {
table.renameColumn('email_new', 'email');
});
console.log('Phase 4: Completed migration');
}
5. 迁移验证
class MigrationValidator {
async validate(knex: Knex, migration: string): Promise<boolean> {
console.log(`Validating migration: ${migration}`);
const checks = [
this.checkDataIntegrity(knex),
this.checkConstraints(knex),
this.checkIndexes(knex),
this.checkRowCounts(knex)
];
const results = await Promise.all(checks);
const passed = results.every(r => r);
if (passed) {
console.log('✓ All validation checks passed');
} else {
console.error('✗ Validation failed');
}
return passed;
}
private async checkDataIntegrity(knex: Knex): Promise<boolean> {
// 检查孤立记录
const orphaned = await knex('user_roles')
.leftJoin('users', 'user_roles.user_id', 'users.id')
.whereNull('users.id')
.count('* as count')
.first();
const count = parseInt(orphaned?.count as string || '0');
if (count > 0) {
console.error(`Found ${count} orphaned user_roles records`);
return false;
}
console.log('✓ Data integrity check passed');
return true;
}
private async checkConstraints(knex: Knex): Promise<boolean> {
// 验证约束存在
const result = await knex.raw(`
SELECT COUNT(*) as count
FROM information_schema.table_constraints
WHERE table_name = 'users'
AND constraint_type = 'UNIQUE'
AND constraint_name LIKE '%email%'
`);
const hasConstraint = result.rows[0].count > 0;
if (!hasConstraint) {
console.error('Email unique constraint missing');
return false;
}
console.log('✓ Constraints check passed');
return true;
}
private async checkIndexes(knex: Knex): Promise<boolean> {
// 验证索引存在
const result = await knex.raw(`
SELECT indexname
FROM pg_indexes
WHERE tablename = 'users'
AND indexname LIKE '%email%'
`);
if (result.rows.length === 0) {
console.error('Email index missing');
return false;
}
console.log('✓ Indexes check passed');
return true;
}
private async checkRowCounts(knex: Knex): Promise<boolean> {
const [oldCount, newCount] = await Promise.all([
knex('users').count('* as count').first(),
knex('user_preferences').count('* as count').first()
]);
const old = parseInt(oldCount?.count as string || '0');
const new_ = parseInt(newCount?.count as string || '0');
if (Math.abs(old - new_) > old * 0.01) {
console.error(`Row count mismatch: ${old} vs ${new_}`);
return false;
}
console.log('✓ Row counts check passed');
return true;
}
}
// 使用
export async function up(knex: Knex): Promise<void> {
// 运行迁移
await performMigration(knex);
// 验证
const validator = new MigrationValidator();
const valid = await validator.validate(knex, 'add_user_preferences');
if (!valid) {
throw new Error('Migration validation failed');
}
}
6. 跨数据库迁移
from sqlalchemy import create_engine, MetaData, Table
from sqlalchemy.orm import sessionmaker
import logging
logger = logging.getLogger(__name__)
class CrossDatabaseMigration:
def __init__(self, source_url: str, target_url: str):
self.source_engine = create_engine(source_url)
self.target_engine = create_engine(target_url)
self.source_session = sessionmaker(bind=self.source_engine)()
self.target_session = sessionmaker(bind=self.target_engine)()
def migrate_table(self, table_name: str, batch_size: int = 1000):
"""从源数据库迁移表到目标数据库。"""
logger.info(f"Starting migration of table: {table_name}")
# 获取表元数据
metadata = MetaData()
source_table = Table(
table_name,
metadata,
autoload_with=self.source_engine
)
# 获取总数
total = self.source_session.execute(
source_table.select().with_only_columns(func.count())
).scalar()
logger.info(f"Total records to migrate: {total}")
# 批量迁移
offset = 0
while offset < total:
# 从源数据库获取批次
results = self.source_session.execute(
source_table.select()
.limit(batch_size)
.offset(offset)
).fetchall()
if not results:
break
# 转换并插入到目标数据库
rows = [dict(row._mapping) for row in results]
transformed = [self.transform_row(row) for row in rows]
self.target_session.execute(
source_table.insert(),
transformed
)
self.target_session.commit()
offset += batch_size
logger.info(f"Migrated {offset}/{total} records")
logger.info(f"Completed migration of {table_name}")
def transform_row(self, row: dict) -> dict:
"""如果需要,转换行数据。"""
# 应用任何转换
return row
def cleanup(self):
"""关闭连接。"""
self.source_session.close()
self.target_session.close()
最佳实践
✅ 做
- 总是编写
up和down迁移 - 在类似生产的数据上测试迁移
- 使用事务进行原子操作
- 批量处理大数据集
- 数据插入后添加索引
- 迁移后验证数据
- 记录进度和错误
- 包含错误处理
- 测试回滚程序
- 编写文档
- 运行迁移前备份数据库
- 团队审查
❌ 避免
- 在生产上运行未经测试的迁移
- 进行没有向后兼容性的破坏性更改
- 在单个事务中处理数百万行
- 跳过回滚实现
- 忽略迁移失败
- 修改旧迁移
- 无备份删除数据
- 在生产中手动运行迁移
迁移清单
- [ ] 迁移包含
up和down - [ ] 在类似生产的数据集上测试
- [ ] 适当使用事务
- [ ] 大数据集批量处理
- [ ] 数据插入后添加索引
- [ ] 包含数据验证
- [ ] 实施进度记录
- [ ] 包含错误处理
- [ ] 测试回滚
- [ ] 编写文档
- [ ] 备份
- [ ] 团队审查