数据库迁移
概览
数据库迁移是随时间修改数据库模式和数据的版本化脚本。它们使团队能够以受控、可复现的方式发展他们的数据库结构,同时维护数据完整性并最小化停机时间。
前提条件
- 理解SQL和数据库模式设计
- 了解数据库系统(PostgreSQL、MySQL等)
- 熟悉版本控制系统
- 基本理解数据转换概念
核心概念
数据库迁移是什么?
数据库迁移是:
- 版本化:每个迁移都有一个唯一的版本号或时间戳
- 有序:迁移按特定顺序应用
- 可逆:大多数迁移可以回滚
- 幂等:可以安全地多次运行
- 团队兼容:允许多个开发人员工作在模式更改上
迁移类型
-
模式迁移
- 创建/删除表
- 添加/移除列
- 修改约束
- 创建/删除索引
-
数据迁移
- 转换现有数据
- 填充参考表
- 清理不一致的数据
- 回填新列
-
回滚迁移
- 逆转模式更改
- 恢复先前的数据状态
- 处理边缘情况
迁移生命周期
开发 → 测试 → 暂存 → 生产
↓ ↓ ↓ ↓
创建 验证 验证 部署
测试 测试 测试 监控
实施指南
基本迁移结构
// migrations/20240124000001_create_users_table.js
module.exports = {
up: async (db) => {
await db.schema.createTable('users', (table) => {
table.increments('id').primary();
table.string('email', 255).notNullable().unique();
table.string('password_hash', 255).notNullable();
table.timestamp('created_at').defaultTo(db.fn.now());
table.timestamp('updated_at').defaultTo(db.fn.now());
});
},
down: async (db) => {
await db.schema.dropTable('users');
}
};
使用Knex.js
const knex = require('knex');
// 初始化knex
const db = knex({
client: 'pg',
connection: {
host: 'localhost',
user: 'user',
password: 'password',
database: 'mydb'
},
migrations: {
directory: './migrations',
tableName: 'knex_migrations'
}
});
// 创建一个新的迁移
// npx knex migrate:make create_users_table
// 运行迁移
await db.migrate.latest();
// 回滚上一个迁移
await db.migrate.rollback();
// 获取当前版本
const current = await db.migrate.currentVersion();
console.log('当前迁移版本:', current);
使用Sequelize
const { Sequelize } = require('sequelize');
const sequelize = new Sequelize('mydb', 'user', 'password', {
dialect: 'postgres',
host: 'localhost',
logging: false
});
// 定义迁移
module.exports = {
up: async (queryInterface, Sequelize) => {
await queryInterface.createTable('users', {
id: {
type: Sequelize.INTEGER,
primaryKey: true,
autoIncrement: true
},
email: {
type: Sequelize.STRING(255),
allowNull: false,
unique: true
},
password_hash: {
type: Sequelize.STRING(255),
allowNull: false
},
created_at: {
type: Sequelize.DATE,
defaultValue: Sequelize.literal('CURRENT_TIMESTAMP')
},
updated_at: {
type: Sequelize.DATE,
defaultValue: Sequelize.literal('CURRENT_TIMESTAMP')
}
});
},
down: async (queryInterface) => {
await queryInterface.dropTable('users');
}
};
// 运行迁移
await sequelize.sync({ alter: true });
使用TypeORM
import { MigrationInterface, QueryRunner } from 'typeorm';
export class CreateUsersTable1234567890 implements MigrationInterface {
public async up(queryRunner: QueryRunner): Promise<void> {
await queryRunner.createTable(new Table({
name: 'users',
columns: [
{
name: 'id',
type: 'int',
isPrimary: true,
isGenerated: true,
generationStrategy: 'increment'
},
{
name: 'email',
type: 'varchar',
length: '255',
isNullable: false,
isUnique: true
},
{
name: 'password_hash',
type: 'varchar',
length: '255',
isNullable: false
},
{
name: 'created_at',
type: 'timestamp',
default: 'CURRENT_TIMESTAMP'
},
{
name: 'updated_at',
type: 'timestamp',
default: 'CURRENT_TIMESTAMP'
}
]
}));
}
public async down(queryRunner: QueryRunner): Promise<void> {
await queryRunner.dropTable('users');
}
}
使用Prisma
// prisma/migrations/20240124000001_init/migration.sql
-- CreateUsersTable
CREATE TABLE "users" (
"id" SERIAL PRIMARY KEY,
"email" VARCHAR(255) NOT NULL UNIQUE,
"password_hash" VARCHAR(255) NOT NULL,
"created_at" TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
"updated_at" TIMESTAMP DEFAULT CURRENT_TIMESTAMP
);
-- CreateUsersTableRollback
DROP TABLE "users";
# 生成迁移
npx prisma migrate dev --name init
# 应用迁移
npx prisma migrate deploy
# 回滚
npx prisma migrate resolve --rolled-back 20240124000001_init
迁移模式
添加列
module.exports = {
up: async (db) => {
await db.schema.table('users')
.addColumn('phone', 'varchar', { length: 20 });
},
down: async (db) => {
await db.schema.table('users')
.dropColumn('phone');
}
};
添加具有默认值的列
module.exports = {
up: async (db) => {
await db.schema.table('users')
.addColumn('status', 'varchar', {
length: 50,
defaultTo: 'active',
notNullable: true
});
},
down: async (db) => {
await db.schema.table('users')
.dropColumn('status');
}
};
重命名列
module.exports = {
up: async (db) => {
await db.schema.table('users')
.renameColumn('email_address', 'email');
},
down: async (db) => {
await db.schema.table('users')
.renameColumn('email', 'email_address');
}
};
添加索引
module.exports = {
up: async (db) => {
await db.schema.table('users')
.index('email', 'idx_users_email');
},
down: async (db) => {
await db.schema.table('users')
.dropIndex('email', 'idx_users_email');
}
};
添加外键
module.exports = {
up: async (db) => {
await db.schema.table('posts')
.foreign('user_id')
.references('id')
.inTable('users')
.onDelete('CASCADE');
},
down: async (db) => {
await db.schema.table('posts')
.dropForeign('user_id');
}
};
数据迁移
module.exports = {
up: async (db) => {
// 添加新列
await db.schema.table('users')
.addColumn('full_name', 'varchar', { length: 255 });
// 迁移数据
await db('users')
.update({
full_name: db.raw('CONCAT(first_name, " ", last_name)')
});
// 删除旧列
await db.schema.table('users')
.dropColumn('first_name');
await db.schema.table('users')
.dropColumn('last_name');
},
down: async (db) => {
// 添加回旧列
await db.schema.table('users')
.addColumn('first_name', 'varchar', { length: 100 });
await db.schema.table('users')
.addColumn('last_name', 'varchar', { length: 100 });
// 迁移数据回
await db.raw(`
UPDATE users
SET first_name = SPLIT_PART(full_name, ' ', 1),
last_name = SPLIT_PART(full_name, ' ', 2)
`);
// 删除新列
await db.schema.table('users')
.dropColumn('full_name');
}
};
零停机迁移
扩展列(PostgreSQL)
module.exports = {
up: async (db) => {
// 第1步:添加新列为nullable
await db.schema.table('users')
.addColumn('email_new', 'varchar', { length: 255 });
// 第2步:回填数据
await db.raw(`
UPDATE users
SET email_new = email
WHERE email_new IS NULL
`);
// 第3步:添加约束
await db.raw(`
ALTER TABLE users
ADD CONSTRAINT users_email_new_key
UNIQUE (email_new)
`);
// 第4步:交换列
await db.raw(`
ALTER TABLE users
RENAME COLUMN email TO email_old
`);
await db.raw(`
ALTER TABLE users
RENAME COLUMN email_new TO email
`);
// 第5步:删除旧列
await db.schema.table('users')
.dropColumn('email_old');
},
down: async (db) => {
// 反转过程
await db.schema.table('users')
.addColumn('email_old', 'varchar', { length: 255 });
await db.raw(`
UPDATE users
SET email_old = email
WHERE email_old IS NULL
`);
await db.raw(`
ALTER TABLE users
DROP CONSTRAINT IF EXISTS users_email_key
`);
await db.raw(`
ALTER TABLE users
RENAME COLUMN email TO email_new
`);
await db.raw(`
ALTER TABLE users
RENAME COLUMN email_old TO email
`);
await db.schema.table('users')
.dropColumn('email_new');
}
};
扩展列(MySQL)
module.exports = {
up: async (db) => {
// 第1步:添加新列
await db.schema.table('users')
.addColumn('email_new', 'varchar', { length: 255 });
// 第2步:分批回填数据
const batchSize = 1000;
let offset = 0;
while (true) {
const result = await db('users')
.whereNull('email_new')
.limit(batchSize)
.offset(offset)
.update({
email_new: db.raw('email')
});
if (result === 0) break;
offset += batchSize;
}
// 第3步:重命名列
await db.raw(`
ALTER TABLE users
CHANGE COLUMN email email_old VARCHAR(255)
`);
await db.raw(`
ALTER TABLE users
CHANGE COLUMN email_new email VARCHAR(255) NOT NULL UNIQUE
`);
// 第4步:删除旧列
await db.schema.table('users')
.dropColumn('email_old');
},
down: async (db) => {
// 反转过程
await db.schema.table('users')
.addColumn('email_old', 'varchar', { length: 255 });
const batchSize = 1000;
let offset = 0;
while (true) {
const result = await db('users')
.whereNull('email_old')
.limit(batchSize)
.offset(offset)
.update({
email_old: db.raw('email')
});
if (result === 0) break;
offset += batchSize;
}
await db.raw(`
ALTER TABLE users
CHANGE COLUMN email email_new VARCHAR(255)
`);
await db.raw(`
ALTER TABLE users
CHANGE COLUMN email_old email VARCHAR(255) NOT NULL UNIQUE
`);
await db.schema.table('users')
.dropColumn('email_new');
}
};
添加索引而不锁定
module.exports = {
up: async (db) => {
// PostgreSQL: CREATE INDEX CONCURRENTLY
await db.raw(`
CREATE INDEX CONCURRENTLY idx_users_email
ON users(email)
`);
},
down: async (db) => {
await db.raw(`
DROP INDEX CONCURRENTLY IF EXISTS idx_users_email
`);
}
};
回滚策略
简单回滚
module.exports = {
up: async (db) => {
await db.schema.createTable('users', (table) => {
table.increments('id').primary();
table.string('email', 255).notNullable().unique();
});
},
down: async (db) => {
await db.schema.dropTable('users');
}
};
数据保留回滚
module.exports = {
up: async (db) => {
// 创建备份表
await db.raw(`
CREATE TABLE users_backup AS
SELECT * FROM users
`);
// 进行更改
await db('users')
.where('status', 'inactive')
.update({ status: 'deleted' });
},
down: async (db) => {
// 从备份恢复
await db.raw(`
UPDATE users u
SET status = ub.status
FROM users_backup ub
WHERE u.id = ub.id
`);
// 删除备份表
await db.schema.dropTableIfExists('users_backup');
}
};
条件回滚
module.exports = {
up: async (db) => {
// 检查列是否存在
const hasColumn = await db.schema.hasColumn('users', 'phone');
if (!hasColumn) {
await db.schema.table('users')
.addColumn('phone', 'varchar', { length: 20 });
}
},
down: async (db) => {
await db.schema.table('users')
.dropColumnIfExists('phone');
}
};
版本控制
命名约定
# 基于时间戳
20240124000001_create_users_table.js
20240124000002_add_phone_to_users.js
# 基于描述
20240124_120000_create_users_table.js
20240124_130000_add_phone_to_users.js
# 序列
001_create_users_table.js
002_add_phone_to_users.js
迁移跟踪表
CREATE TABLE schema_migrations (
id SERIAL PRIMARY KEY,
version VARCHAR(255) NOT NULL UNIQUE,
name VARCHAR(255),
applied_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP
);
分支迁移
// 处理分支特定的迁移
const isProduction = process.env.NODE_ENV === 'production';
module.exports = {
up: async (db) => {
await db.schema.createTable('users', (table) => {
table.increments('id').primary();
table.string('email', 255).notNullable().unique();
});
// 生产特定的迁移
if (isProduction) {
await db.raw(`
CREATE INDEX CONCURRENTLY idx_users_email
ON users(email)
`);
}
},
down: async (db) => {
if (isProduction) {
await db.raw(`
DROP INDEX CONCURRENTLY IF EXISTS idx_users_email
`);
}
await db.schema.dropTable('users');
}
};
测试迁移
单元测试
const { expect } = require('chai');
const db = require('./db');
describe('Migration: create_users_table', () => {
let migration;
before(async () => {
migration = require('./migrations/20240124000001_create_users_table');
});
it('should create users table', async () => {
await migration.up(db);
const hasTable = await db.schema.hasTable('users');
expect(hasTable).to.be.true;
});
it('should create all columns', async () => {
await migration.up(db);
const hasId = await db.schema.hasColumn('users', 'id');
const hasEmail = await db.schema.hasColumn('users', 'email');
const hasPasswordHash = await db.schema.hasColumn('users', 'password_hash');
expect(hasId).to.be.true;
expect(hasEmail).to.be.true;
expect(hasPasswordHash).to.be.true;
});
it('should create unique constraint on email', async () => {
await migration.up(db);
const indexes = await db('users').columnInfo();
const emailIndex = indexes.find(i => i.column_name === 'email');
expect(emailIndex).to.exist;
});
afterEach(async () => {
await migration.down(db);
});
});
集成测试
describe('Migration Integration', () => {
let testDb;
before(async () => {
// 创建测试数据库
testDb = knex({
client: 'pg',
connection: {
host: 'localhost',
user: 'user',
password: 'password',
database: 'mydb_test'
}
});
// 运行所有迁移
await testDb.migrate.latest();
});
it('should allow inserting users', async () => {
await testDb('users').insert({
email: 'test@example.com',
password_hash: 'hashed_password'
});
const users = await testDb('users').select();
expect(users).to.have.lengthOf(1);
});
it('should enforce unique email constraint', async () => {
await testDb('users').insert({
email: 'test@example.com',
password_hash: 'hashed_password'
});
try {
await testDb('users').insert({
email: 'test@example.com',
password_hash: 'different_hash'
});
throw new Error('Should have thrown unique constraint error');
} catch (error) {
expect(error.code).to.equal('23505'); // 唯一性违规
}
});
after(async () => {
await testDb.destroy();
});
});
最佳实践
-
使迁移可逆
- 始终实现
down函数 - 在开发中测试回滚
- 记录不可逆迁移
- 始终实现
-
保持迁移小
- 每个迁移一个逻辑更改
- 避免合并不相关的更改
- 将大型迁移拆分为更小的
-
使用事务
- 将迁移包装在事务中
- 失败时回滚
- 确保原子性
-
彻底测试
- 在样本数据上测试迁移
- 验证回滚是否有效
- 在生产之前在暂存环境中测试
-
记录更改
- 添加注释解释更改
- 记录重大更改
- 记录数据转换
-
处理大型数据集
- 使用批处理进行数据迁移
- 在数据迁移之前添加索引
- 在迁移期间监控性能
-
版本控制
- 将迁移与代码更改一起提交
- 永远不要修改已提交的迁移
- 为更正创建新的迁移
常见陷阱
-
不可逆迁移
// 坏:删除表而不备份 down: async (db) => { await db.schema.dropTable('users'); // 数据丢失! } // 好:删除前创建备份 up: async (db) => { await db.raw('CREATE TABLE users_backup AS SELECT * FROM users'); await db.schema.dropTable('users'); } down: async (db) => { await db.raw('CREATE TABLE users AS SELECT * FROM users_backup'); await db.schema.dropTable('users_backup'); } -
阻塞生产
// 坏:长时间运行的迁移阻塞生产 up: async (db) => { await db.raw('CREATE INDEX idx_users_email ON users(email)'); } // 好:PostgreSQL使用CONCURRENTLY up: async (db) => { await db.raw('CREATE INDEX CONCURRENTLY idx_users_email ON users(email)'); } -
数据丢失
// 坏:覆盖数据而不备份 up: async (db) => { await db('users').update({ status: 'active' }); } // 好:首先创建备份 up: async (db) => { await db.raw('CREATE TABLE users_backup AS SELECT * FROM users'); await db('users').update({ status: 'active' }); }