name: migration-generator description: 从模型变更、模式差异和迁移最佳实践中创建数据库迁移。
迁移生成器技能
从模型变更、模式差异和迁移最佳实践中创建数据库迁移。
指令
您是一个数据库迁移专家。当被调用时:
-
检测模式变更:
- 比较当前模式与期望状态
- 识别添加/移除的表和列
- 检测修改的列类型和约束
- 查找更改的索引和外键
-
生成迁移文件:
- 创建正向(向上)和反向(向下)迁移
- 适用时使用ORM特定迁移格式
- 包含数据迁移(当需要时)
- 处理边缘情况和潜在数据丢失
-
确保安全性:
- 防止意外数据删除
- 添加回滚能力
- 包含验证步骤
- 警告关于破坏性变更
-
最佳实践:
- 使迁移原子化和可逆
- 避免生产中的破坏性操作
- 先在暂存环境测试迁移
- 保持迁移小而专注
支持框架
- SQL:原始SQL迁移(PostgreSQL、MySQL、SQLite)
- Node.js:Prisma、TypeORM、Sequelize、Knex.js
- Python:Alembic、Django迁移、SQLAlchemy
- Ruby:Rails Active Record迁移
- Go:golang-migrate、goose
- PHP:Laravel迁移、Doctrine
使用示例
@migration-generator 添加用户邮箱验证
@migration-generator --from-diff
@migration-generator --rollback
@migration-generator --data-migration
@migration-generator --zero-downtime
原始SQL迁移
PostgreSQL - 添加表
-- migrations/001_create_users_table.up.sql
CREATE TABLE users (
id SERIAL PRIMARY KEY,
username VARCHAR(50) UNIQUE NOT NULL,
email VARCHAR(255) UNIQUE NOT NULL,
password_hash VARCHAR(255) NOT NULL,
active BOOLEAN DEFAULT true NOT NULL,
created_at TIMESTAMP DEFAULT NOW() NOT NULL,
updated_at TIMESTAMP DEFAULT NOW() NOT NULL
);
-- 创建索引
CREATE INDEX idx_users_email ON users(email);
CREATE INDEX idx_users_username ON users(username);
CREATE INDEX idx_users_active ON users(active) WHERE active = true;
-- 添加注释
COMMENT ON TABLE users IS '应用程序用户';
COMMENT ON COLUMN users.email IS '用户邮箱地址(唯一)';
-- migrations/001_create_users_table.down.sql
DROP TABLE IF EXISTS users CASCADE;
添加带默认值的列
-- migrations/002_add_email_verified.up.sql
-- 步骤 1:添加列作为可空
ALTER TABLE users ADD COLUMN email_verified BOOLEAN;
-- 步骤 2:为现有行设置默认值
UPDATE users SET email_verified = false WHERE email_verified IS NULL;
-- 步骤 3:使列 NOT NULL
ALTER TABLE users ALTER COLUMN email_verified SET NOT NULL;
-- 步骤 4:为未来行设置默认
ALTER TABLE users ALTER COLUMN email_verified SET DEFAULT false;
-- migrations/002_add_email_verified.down.sql
ALTER TABLE users DROP COLUMN email_verified;
修改列类型(安全)
-- migrations/003_increase_email_length.up.sql
-- 安全:增加varchar长度
ALTER TABLE users ALTER COLUMN email TYPE VARCHAR(320);
-- migrations/003_increase_email_length.down.sql
-- 警告:如果数据超过旧限制可能失败
ALTER TABLE users ALTER COLUMN email TYPE VARCHAR(255);
添加外键
-- migrations/004_create_orders.up.sql
CREATE TABLE orders (
id SERIAL PRIMARY KEY,
user_id INTEGER NOT NULL,
total_amount DECIMAL(10,2) NOT NULL CHECK (total_amount >= 0),
status VARCHAR(20) DEFAULT 'pending' NOT NULL,
created_at TIMESTAMP DEFAULT NOW() NOT NULL,
updated_at TIMESTAMP DEFAULT NOW() NOT NULL,
CONSTRAINT fk_orders_user_id
FOREIGN KEY (user_id)
REFERENCES users(id)
ON DELETE CASCADE
);
-- 外键和常见查询的索引
CREATE INDEX idx_orders_user_id ON orders(user_id);
CREATE INDEX idx_orders_status ON orders(status);
CREATE INDEX idx_orders_created_at ON orders(created_at);
-- 常见查询模式的复合索引
CREATE INDEX idx_orders_user_status ON orders(user_id, status);
-- migrations/004_create_orders.down.sql
DROP TABLE IF EXISTS orders CASCADE;
重命名列(安全)
-- migrations/005_rename_password_column.up.sql
-- 步骤 1:添加新列
ALTER TABLE users ADD COLUMN password_hash_new VARCHAR(255);
-- 步骤 2:复制数据
UPDATE users SET password_hash_new = password_hash;
-- 步骤 3:使 NOT NULL
ALTER TABLE users ALTER COLUMN password_hash_new SET NOT NULL;
-- 步骤 4:删除旧列
ALTER TABLE users DROP COLUMN password_hash;
-- 步骤 5:重命名新列
ALTER TABLE users RENAME COLUMN password_hash_new TO password_hash;
-- migrations/005_rename_password_column.down.sql
-- 使用相同模式可逆
ALTER TABLE users ADD COLUMN password_hash_old VARCHAR(255);
UPDATE users SET password_hash_old = password_hash;
ALTER TABLE users ALTER COLUMN password_hash_old SET NOT NULL;
ALTER TABLE users DROP COLUMN password_hash;
ALTER TABLE users RENAME COLUMN password_hash_old TO password_hash;
ORM迁移示例
Prisma迁移
// schema.prisma - 添加新模型
model User {
id Int @id @default(autoincrement())
email String @unique
username String @unique
passwordHash String @map("password_hash")
active Boolean @default(true)
emailVerified Boolean @default(false) @map("email_verified")
createdAt DateTime @default(now()) @map("created_at")
updatedAt DateTime @updatedAt @map("updated_at")
orders Order[]
profile UserProfile?
@@index([email])
@@index([username])
@@map("users")
}
model UserProfile {
id Int @id @default(autoincrement())
userId Int @unique @map("user_id")
bio String? @db.Text
avatarUrl String? @map("avatar_url")
user User @relation(fields: [userId], references: [id], onDelete: Cascade)
@@map("user_profiles")
}
# 生成迁移
npx prisma migrate dev --name add_user_profile
# 在生产环境中应用迁移
npx prisma migrate deploy
# 重置数据库(仅开发环境!)
npx prisma migrate reset
生成的迁移:
-- CreateTable
CREATE TABLE "user_profiles" (
"id" SERIAL NOT NULL,
"user_id" INTEGER NOT NULL,
"bio" TEXT,
"avatar_url" TEXT,
CONSTRAINT "user_profiles_pkey" PRIMARY KEY ("id")
);
-- CreateIndex
CREATE UNIQUE INDEX "user_profiles_user_id_key" ON "user_profiles"("user_id");
-- AddForeignKey
ALTER TABLE "user_profiles" ADD CONSTRAINT "user_profiles_user_id_fkey"
FOREIGN KEY ("user_id") REFERENCES "users"("id") ON DELETE CASCADE ON UPDATE CASCADE;
TypeORM迁移
// migration/1234567890123-CreateUser.ts
import { MigrationInterface, QueryRunner, Table, TableIndex } from 'typeorm';
export class CreateUser1234567890123 implements MigrationInterface {
public async up(queryRunner: QueryRunner): Promise<void> {
await queryRunner.createTable(
new Table({
name: 'users',
columns: [
{
name: 'id',
type: 'int',
isPrimary: true,
isGenerated: true,
generationStrategy: 'increment',
},
{
name: 'email',
type: 'varchar',
length: '255',
isUnique: true,
isNullable: false,
},
{
name: 'username',
type: 'varchar',
length: '50',
isUnique: true,
isNullable: false,
},
{
name: 'password_hash',
type: 'varchar',
length: '255',
isNullable: false,
},
{
name: 'active',
type: 'boolean',
default: true,
isNullable: false,
},
{
name: 'created_at',
type: 'timestamp',
default: 'now()',
isNullable: false,
},
{
name: 'updated_at',
type: 'timestamp',
default: 'now()',
isNullable: false,
},
],
}),
true,
);
// 创建索引
await queryRunner.createIndex(
'users',
new TableIndex({
name: 'idx_users_email',
columnNames: ['email'],
}),
);
await queryRunner.createIndex(
'users',
new TableIndex({
name: 'idx_users_username',
columnNames: ['username'],
}),
);
}
public async down(queryRunner: QueryRunner): Promise<void> {
await queryRunner.dropTable('users');
}
}
// migration/1234567890124-AddForeignKey.ts
import { MigrationInterface, QueryRunner, Table, TableForeignKey } from 'typeorm';
export class AddOrdersForeignKey1234567890124 implements MigrationInterface {
public async up(queryRunner: QueryRunner): Promise<void> {
await queryRunner.createTable(
new Table({
name: 'orders',
columns: [
{
name: 'id',
type: 'int',
isPrimary: true,
isGenerated: true,
generationStrategy: 'increment',
},
{
name: 'user_id',
type: 'int',
isNullable: false,
},
{
name: 'total_amount',
type: 'decimal',
precision: 10,
scale: 2,
isNullable: false,
},
{
name: 'status',
type: 'varchar',
length: '20',
default: "'pending'",
isNullable: false,
},
{
name: 'created_at',
type: 'timestamp',
default: 'now()',
},
],
}),
true,
);
// 添加外键
await queryRunner.createForeignKey(
'orders',
new TableForeignKey({
columnNames: ['user_id'],
referencedColumnNames: ['id'],
referencedTableName: 'users',
onDelete: 'CASCADE',
}),
);
}
public async down(queryRunner: QueryRunner): Promise<void> {
const table = await queryRunner.getTable('orders');
const foreignKey = table.foreignKeys.find(
fk => fk.columnNames.indexOf('user_id') !== -1,
);
await queryRunner.dropForeignKey('orders', foreignKey);
await queryRunner.dropTable('orders');
}
}
# 生成迁移
npx typeorm migration:generate -n AddUserProfile
# 运行迁移
npx typeorm migration:run
# 回滚最后一次迁移
npx typeorm migration:revert
Alembic(Python/SQLAlchemy)
# alembic/versions/001_create_users_table.py
"""创建用户表
修订ID: 001
修订:
创建日期: 2024-01-01 12:00:00.000000
"""
from alembic import op
import sqlalchemy as sa
from sqlalchemy.dialects import postgresql
# 修订标识符
revision = '001'
down_revision = None
branch_labels = None
depends_on = None
def upgrade():
# 创建用户表
op.create_table(
'users',
sa.Column('id', sa.Integer(), autoincrement=True, nullable=False),
sa.Column('email', sa.String(length=255), nullable=False),
sa.Column('username', sa.String(length=50), nullable=False),
sa.Column('password_hash', sa.String(length=255), nullable=False),
sa.Column('active', sa.Boolean(), server_default='true', nullable=False),
sa.Column('created_at', sa.DateTime(), server_default=sa.text('now()'), nullable=False),
sa.Column('updated_at', sa.DateTime(), server_default=sa.text('now()'), nullable=False),
sa.PrimaryKeyConstraint('id'),
sa.UniqueConstraint('email'),
sa.UniqueConstraint('username')
)
# 创建索引
op.create_index('idx_users_email', 'users', ['email'])
op.create_index('idx_users_username', 'users', ['username'])
op.create_index(
'idx_users_active',
'users',
['active'],
postgresql_where=sa.text('active = true')
)
def downgrade():
op.drop_table('users')
# alembic/versions/002_add_email_verified.py
"""添加email_verified列
修订ID: 002
修订: 001
创建日期: 2024-01-02 12:00:00.000000
"""
from alembic import op
import sqlalchemy as sa
revision = '002'
down_revision = '001'
branch_labels = None
depends_on = None
def upgrade():
# 首先添加列为可空
op.add_column('users', sa.Column('email_verified', sa.Boolean(), nullable=True))
# 为现有行设置默认值
op.execute('UPDATE users SET email_verified = false WHERE email_verified IS NULL')
# 使列 NOT NULL
op.alter_column('users', 'email_verified', nullable=False, server_default='false')
def downgrade():
op.drop_column('users', 'email_verified')
# 生成迁移
alembic revision --autogenerate -m "添加用户资料"
# 运行迁移
alembic upgrade head
# 回滚一个迁移
alembic downgrade -1
# 回滚到特定版本
alembic downgrade 001
Django迁移
# app/migrations/0001_initial.py
from django.db import migrations, models
class Migration(migrations.Migration):
initial = True
dependencies = []
operations = [
migrations.CreateModel(
name='User',
fields=[
('id', models.AutoField(auto_created=True, primary_key=True)),
('email', models.EmailField(max_length=255, unique=True)),
('username', models.CharField(max_length=50, unique=True)),
('password_hash', models.CharField(max_length=255)),
('active', models.BooleanField(default=True)),
('created_at', models.DateTimeField(auto_now_add=True)),
('updated_at', models.DateTimeField(auto_now=True)),
],
options={
'db_table': 'users',
},
),
migrations.AddIndex(
model_name='user',
index=models.Index(fields=['email'], name='idx_users_email'),
),
migrations.AddIndex(
model_name='user',
index=models.Index(fields=['username'], name='idx_users_username'),
),
]
# app/migrations/0002_add_user_profile.py
from django.db import migrations, models
import django.db.models.deletion
class Migration(migrations.Migration):
dependencies = [
('app', '0001_initial'),
]
operations = [
migrations.CreateModel(
name='UserProfile',
fields=[
('id', models.AutoField(auto_created=True, primary_key=True)),
('bio', models.TextField(blank=True, null=True)),
('avatar_url', models.URLField(blank=True, null=True)),
('user', models.OneToOneField(
on_delete=django.db.models.deletion.CASCADE,
to='app.user',
related_name='profile'
)),
],
options={
'db_table': 'user_profiles',
},
),
]
# 生成迁移
python manage.py makemigrations
# 应用迁移
python manage.py migrate
# 回滚到特定迁移
python manage.py migrate app 0001
# 显示迁移状态
python manage.py showmigrations
数据迁移
回填数据(PostgreSQL)
-- migrations/006_backfill_user_roles.up.sql
-- 添加角色列
ALTER TABLE users ADD COLUMN role VARCHAR(20);
-- 回填现有用户使用默认角色
UPDATE users SET role = 'member' WHERE role IS NULL;
-- 回填后使 NOT NULL
ALTER TABLE users ALTER COLUMN role SET NOT NULL;
ALTER TABLE users ALTER COLUMN role SET DEFAULT 'member';
-- 添加检查约束
ALTER TABLE users ADD CONSTRAINT chk_users_role
CHECK (role IN ('admin', 'member', 'guest'));
-- migrations/006_backfill_user_roles.down.sql
ALTER TABLE users DROP COLUMN role;
复杂数据迁移(Node.js/TypeORM)
// migration/1234567890125-MigrateUserData.ts
import { MigrationInterface, QueryRunner } from 'typeorm';
export class MigrateUserData1234567890125 implements MigrationInterface {
public async up(queryRunner: QueryRunner): Promise<void> {
// 获取所有用户
const users = await queryRunner.query('SELECT id, full_name FROM users');
// 将full_name拆分为first_name和last_name
for (const user of users) {
const parts = user.full_name?.split(' ') || ['', ''];
const firstName = parts[0] || '';
const lastName = parts.slice(1).join(' ') || '';
await queryRunner.query(
'UPDATE users SET first_name = $1, last_name = $2 WHERE id = $3',
[firstName, lastName, user.id],
);
}
// 删除旧列
await queryRunner.query('ALTER TABLE users DROP COLUMN full_name');
}
public async down(queryRunner: QueryRunner): Promise<void> {
// 添加回full_name列
await queryRunner.query('ALTER TABLE users ADD COLUMN full_name VARCHAR(255)');
// 重构full_name
await queryRunner.query(
`UPDATE users SET full_name = first_name || ' ' || last_name`,
);
// 删除first_name和last_name
await queryRunner.query('ALTER TABLE users DROP COLUMN first_name');
await queryRunner.query('ALTER TABLE users DROP COLUMN last_name');
}
}
数据迁移与Python/Alembic
# alembic/versions/003_migrate_prices.py
"""将价格迁移到分
修订ID: 003
修订: 002
创建日期: 2024-01-03 12:00:00.000000
"""
from alembic import op
import sqlalchemy as sa
revision = '003'
down_revision = '002'
def upgrade():
# 添加新列
op.add_column('products', sa.Column('price_cents', sa.Integer(), nullable=True))
# 迁移数据:将十进制转换为分
op.execute('''
UPDATE products
SET price_cents = CAST(price * 100 AS INTEGER)
''')
# 迁移后使 NOT NULL
op.alter_column('products', 'price_cents', nullable=False)
# 删除旧列
op.drop_column('products', 'price')
# 重命名新列
op.alter_column('products', 'price_cents', new_column_name='price')
def downgrade():
# 添加回十进制列
op.add_column('products', sa.Column('price_decimal', sa.Numeric(10, 2), nullable=True))
# 转换回十进制
op.execute('''
UPDATE products
SET price_decimal = price / 100.0
''')
op.alter_column('products', 'price_decimal', nullable=False)
op.drop_column('products', 'price')
op.alter_column('products', 'price_decimal', new_column_name='price')
零停机迁移
添加 NOT NULL 列
-- 迁移 1:添加列为可空
ALTER TABLE users ADD COLUMN phone VARCHAR(20);
-- 部署写入phone列的应用程序代码
-- 迁移 2:回填现有数据
UPDATE users SET phone = 'UNKNOWN' WHERE phone IS NULL;
-- 迁移 3:使列 NOT NULL
ALTER TABLE users ALTER COLUMN phone SET NOT NULL;
ALTER TABLE users ALTER COLUMN phone SET DEFAULT 'UNKNOWN';
重命名列(零停机)
-- 阶段 1:添加新列
ALTER TABLE users ADD COLUMN email_address VARCHAR(255);
-- 阶段 2:部署写入两个列的应用程序代码
-- 阶段 3:回填数据
UPDATE users SET email_address = email WHERE email_address IS NULL;
-- 阶段 4:部署从新列读取的应用程序代码
-- 阶段 5:删除旧列
ALTER TABLE users DROP COLUMN email;
-- 阶段 6:重命名新列(可选)
ALTER TABLE users RENAME COLUMN email_address TO email;
删除列(安全)
-- 阶段 1:部署不使用该列的代码
-- 阶段 2:移除 NOT NULL 约束(使其安全回滚)
ALTER TABLE users ALTER COLUMN deprecated_field DROP NOT NULL;
-- 阶段 3:等待并验证无问题
-- 阶段 4:删除该列
ALTER TABLE users DROP COLUMN deprecated_field;
常见模式
添加枚举列
-- 创建枚举类型(PostgreSQL)
CREATE TYPE user_status AS ENUM ('active', 'inactive', 'suspended');
-- 添加带有枚举类型的列
ALTER TABLE users ADD COLUMN status user_status DEFAULT 'active' NOT NULL;
-- 回滚
ALTER TABLE users DROP COLUMN status;
DROP TYPE user_status;
添加JSON列
-- PostgreSQL
ALTER TABLE users ADD COLUMN metadata JSONB DEFAULT '{}' NOT NULL;
CREATE INDEX idx_users_metadata ON users USING GIN(metadata);
-- MySQL
ALTER TABLE users ADD COLUMN metadata JSON;
添加全文搜索
-- PostgreSQL
ALTER TABLE products ADD COLUMN search_vector tsvector;
-- 创建生成列
UPDATE products SET search_vector =
to_tsvector('english', name || ' ' || description);
-- 为快速搜索创建GIN索引
CREATE INDEX idx_products_search ON products USING GIN(search_vector);
-- 触发器以保持search_vector更新
CREATE TRIGGER products_search_update
BEFORE INSERT OR UPDATE ON products
FOR EACH ROW EXECUTE FUNCTION
tsvector_update_trigger(search_vector, 'pg_catalog.english', name, description);
最佳实践
要做 ✓
- 使迁移可逆 - 始终实现
down迁移 - 先在暂存环境测试 - 切勿在生产中运行未测试的迁移
- 保持迁移小 - 每次迁移一个逻辑变更
- 使用事务 - 确保原子性(当数据库支持时)
- 迁移前备份 - 始终有回滚计划
- 并发添加索引 - 在PostgreSQL中使用
CONCURRENTLY以避免锁定 - 版本控制迁移 - 将迁移提交与代码变更一起
- 文档化破坏性变更 - 为复杂迁移添加注释
- 使用批量更新 - 对于大数据迁移,分块处理
不要做 ✗
- 切勿修改已提交的迁移 - 创建新迁移替代
- **不要使用 SELECT *** - 在数据迁移中指定列
- 避免长时间运行的迁移 - 分成更小的步骤
- 不要假设数据状态 - 转换前验证
- 切勿跳过迁移 - 按顺序运行
- 不要忽略警告 - 处理弃用通知
- 避免循环依赖 - 保持迁移顺序清晰
- 不要忘记索引 - 特别是外键上的索引
迁移检查清单
## 迁移前检查清单
- [ ] 在本地数据库测试迁移
- [ ] 在暂存环境测试迁移
- [ ] 创建数据库备份
- [ ] 迁移可逆(down迁移工作)
- [ ] 审查潜在数据丢失
- [ ] 检查长时间运行操作
- [ ] 外键约束已验证
- [ ] 为新列添加索引
- [ ] 性能影响评估
- [ ] 团队通知迁移计划
## 迁移后检查清单
- [ ] 迁移成功完成
- [ ] 检查应用程序日志中的错误
- [ ] 监控数据库性能
- [ ] 回滚计划测试(如果需要)
- [ ] 文档更新
- [ ] 在版本控制中标记迁移已应用
故障排除
迁移中途失败
-- 检查迁移状态
SELECT * FROM schema_migrations;
-- 如果事务失败,手动回滚
BEGIN;
-- 手动运行down迁移
ROLLBACK;
-- 或标记为未应用
DELETE FROM schema_migrations WHERE version = '20240101120000';
大表迁移
-- 对大更新使用批处理
DO $$
DECLARE
batch_size INTEGER := 1000;
offset_val INTEGER := 0;
rows_updated INTEGER;
BEGIN
LOOP
UPDATE users
SET email_verified = false
WHERE id IN (
SELECT id FROM users
WHERE email_verified IS NULL
ORDER BY id
LIMIT batch_size
OFFSET offset_val
);
GET DIAGNOSTICS rows_updated = ROW_COUNT;
EXIT WHEN rows_updated = 0;
offset_val := offset_val + batch_size;
COMMIT;
RAISE NOTICE '更新了 % 行', offset_val;
END LOOP;
END $$;
注意事项
- 始终先在非生产环境测试迁移
- 尽可能使用数据库事务
- 将迁移保存在版本控制中
- 文档化复杂迁移
- 考虑生产中的零停机策略
- 迁移期间监控数据库性能
- 准备回滚计划
- 可用时使用ORM迁移工具以获得类型安全性