数据库迁移生成器Skill migration-generator

该技能用于自动生成数据库迁移脚本,支持多种框架如SQL、Node.js、Python、Ruby、Go、PHP等,包括模型变更、模式差异识别、正向反向迁移、数据迁移和零停机策略,遵循最佳实践以确保安全、可逆和高效部署,适用于软件开发、DevOps和数据库管理场景,关键词:数据库迁移、ORM框架、迁移生成、DevOps、安全部署、数据迁移、零停机、最佳实践。

DevOps 0 次安装 0 次浏览 更新于 3/11/2026

name: migration-generator description: 从模型变更、模式差异和迁移最佳实践中创建数据库迁移。

迁移生成器技能

从模型变更、模式差异和迁移最佳实践中创建数据库迁移。

指令

您是一个数据库迁移专家。当被调用时:

  1. 检测模式变更

    • 比较当前模式与期望状态
    • 识别添加/移除的表和列
    • 检测修改的列类型和约束
    • 查找更改的索引和外键
  2. 生成迁移文件

    • 创建正向(向上)和反向(向下)迁移
    • 适用时使用ORM特定迁移格式
    • 包含数据迁移(当需要时)
    • 处理边缘情况和潜在数据丢失
  3. 确保安全性

    • 防止意外数据删除
    • 添加回滚能力
    • 包含验证步骤
    • 警告关于破坏性变更
  4. 最佳实践

    • 使迁移原子化和可逆
    • 避免生产中的破坏性操作
    • 先在暂存环境测试迁移
    • 保持迁移小而专注

支持框架

  • SQL:原始SQL迁移(PostgreSQL、MySQL、SQLite)
  • Node.js:Prisma、TypeORM、Sequelize、Knex.js
  • Python:Alembic、Django迁移、SQLAlchemy
  • Ruby:Rails Active Record迁移
  • Go:golang-migrate、goose
  • PHP:Laravel迁移、Doctrine

使用示例

@migration-generator 添加用户邮箱验证
@migration-generator --from-diff
@migration-generator --rollback
@migration-generator --data-migration
@migration-generator --zero-downtime

原始SQL迁移

PostgreSQL - 添加表

-- migrations/001_create_users_table.up.sql
CREATE TABLE users (
  id SERIAL PRIMARY KEY,
  username VARCHAR(50) UNIQUE NOT NULL,
  email VARCHAR(255) UNIQUE NOT NULL,
  password_hash VARCHAR(255) NOT NULL,
  active BOOLEAN DEFAULT true NOT NULL,
  created_at TIMESTAMP DEFAULT NOW() NOT NULL,
  updated_at TIMESTAMP DEFAULT NOW() NOT NULL
);

-- 创建索引
CREATE INDEX idx_users_email ON users(email);
CREATE INDEX idx_users_username ON users(username);
CREATE INDEX idx_users_active ON users(active) WHERE active = true;

-- 添加注释
COMMENT ON TABLE users IS '应用程序用户';
COMMENT ON COLUMN users.email IS '用户邮箱地址(唯一)';

-- migrations/001_create_users_table.down.sql
DROP TABLE IF EXISTS users CASCADE;

添加带默认值的列

-- migrations/002_add_email_verified.up.sql
-- 步骤 1:添加列作为可空
ALTER TABLE users ADD COLUMN email_verified BOOLEAN;

-- 步骤 2:为现有行设置默认值
UPDATE users SET email_verified = false WHERE email_verified IS NULL;

-- 步骤 3:使列 NOT NULL
ALTER TABLE users ALTER COLUMN email_verified SET NOT NULL;

-- 步骤 4:为未来行设置默认
ALTER TABLE users ALTER COLUMN email_verified SET DEFAULT false;

-- migrations/002_add_email_verified.down.sql
ALTER TABLE users DROP COLUMN email_verified;

修改列类型(安全)

-- migrations/003_increase_email_length.up.sql
-- 安全:增加varchar长度
ALTER TABLE users ALTER COLUMN email TYPE VARCHAR(320);

-- migrations/003_increase_email_length.down.sql
-- 警告:如果数据超过旧限制可能失败
ALTER TABLE users ALTER COLUMN email TYPE VARCHAR(255);

添加外键

-- migrations/004_create_orders.up.sql
CREATE TABLE orders (
  id SERIAL PRIMARY KEY,
  user_id INTEGER NOT NULL,
  total_amount DECIMAL(10,2) NOT NULL CHECK (total_amount >= 0),
  status VARCHAR(20) DEFAULT 'pending' NOT NULL,
  created_at TIMESTAMP DEFAULT NOW() NOT NULL,
  updated_at TIMESTAMP DEFAULT NOW() NOT NULL,

  CONSTRAINT fk_orders_user_id
    FOREIGN KEY (user_id)
    REFERENCES users(id)
    ON DELETE CASCADE
);

-- 外键和常见查询的索引
CREATE INDEX idx_orders_user_id ON orders(user_id);
CREATE INDEX idx_orders_status ON orders(status);
CREATE INDEX idx_orders_created_at ON orders(created_at);

-- 常见查询模式的复合索引
CREATE INDEX idx_orders_user_status ON orders(user_id, status);

-- migrations/004_create_orders.down.sql
DROP TABLE IF EXISTS orders CASCADE;

重命名列(安全)

-- migrations/005_rename_password_column.up.sql
-- 步骤 1:添加新列
ALTER TABLE users ADD COLUMN password_hash_new VARCHAR(255);

-- 步骤 2:复制数据
UPDATE users SET password_hash_new = password_hash;

-- 步骤 3:使 NOT NULL
ALTER TABLE users ALTER COLUMN password_hash_new SET NOT NULL;

-- 步骤 4:删除旧列
ALTER TABLE users DROP COLUMN password_hash;

-- 步骤 5:重命名新列
ALTER TABLE users RENAME COLUMN password_hash_new TO password_hash;

-- migrations/005_rename_password_column.down.sql
-- 使用相同模式可逆
ALTER TABLE users ADD COLUMN password_hash_old VARCHAR(255);
UPDATE users SET password_hash_old = password_hash;
ALTER TABLE users ALTER COLUMN password_hash_old SET NOT NULL;
ALTER TABLE users DROP COLUMN password_hash;
ALTER TABLE users RENAME COLUMN password_hash_old TO password_hash;

ORM迁移示例

Prisma迁移

// schema.prisma - 添加新模型
model User {
  id            Int       @id @default(autoincrement())
  email         String    @unique
  username      String    @unique
  passwordHash  String    @map("password_hash")
  active        Boolean   @default(true)
  emailVerified Boolean   @default(false) @map("email_verified")
  createdAt     DateTime  @default(now()) @map("created_at")
  updatedAt     DateTime  @updatedAt @map("updated_at")

  orders  Order[]
  profile UserProfile?

  @@index([email])
  @@index([username])
  @@map("users")
}

model UserProfile {
  id        Int      @id @default(autoincrement())
  userId    Int      @unique @map("user_id")
  bio       String?  @db.Text
  avatarUrl String?  @map("avatar_url")

  user User @relation(fields: [userId], references: [id], onDelete: Cascade)

  @@map("user_profiles")
}
# 生成迁移
npx prisma migrate dev --name add_user_profile

# 在生产环境中应用迁移
npx prisma migrate deploy

# 重置数据库(仅开发环境!)
npx prisma migrate reset

生成的迁移:

-- CreateTable
CREATE TABLE "user_profiles" (
    "id" SERIAL NOT NULL,
    "user_id" INTEGER NOT NULL,
    "bio" TEXT,
    "avatar_url" TEXT,

    CONSTRAINT "user_profiles_pkey" PRIMARY KEY ("id")
);

-- CreateIndex
CREATE UNIQUE INDEX "user_profiles_user_id_key" ON "user_profiles"("user_id");

-- AddForeignKey
ALTER TABLE "user_profiles" ADD CONSTRAINT "user_profiles_user_id_fkey"
  FOREIGN KEY ("user_id") REFERENCES "users"("id") ON DELETE CASCADE ON UPDATE CASCADE;

TypeORM迁移

// migration/1234567890123-CreateUser.ts
import { MigrationInterface, QueryRunner, Table, TableIndex } from 'typeorm';

export class CreateUser1234567890123 implements MigrationInterface {
  public async up(queryRunner: QueryRunner): Promise<void> {
    await queryRunner.createTable(
      new Table({
        name: 'users',
        columns: [
          {
            name: 'id',
            type: 'int',
            isPrimary: true,
            isGenerated: true,
            generationStrategy: 'increment',
          },
          {
            name: 'email',
            type: 'varchar',
            length: '255',
            isUnique: true,
            isNullable: false,
          },
          {
            name: 'username',
            type: 'varchar',
            length: '50',
            isUnique: true,
            isNullable: false,
          },
          {
            name: 'password_hash',
            type: 'varchar',
            length: '255',
            isNullable: false,
          },
          {
            name: 'active',
            type: 'boolean',
            default: true,
            isNullable: false,
          },
          {
            name: 'created_at',
            type: 'timestamp',
            default: 'now()',
            isNullable: false,
          },
          {
            name: 'updated_at',
            type: 'timestamp',
            default: 'now()',
            isNullable: false,
          },
        ],
      }),
      true,
    );

    // 创建索引
    await queryRunner.createIndex(
      'users',
      new TableIndex({
        name: 'idx_users_email',
        columnNames: ['email'],
      }),
    );

    await queryRunner.createIndex(
      'users',
      new TableIndex({
        name: 'idx_users_username',
        columnNames: ['username'],
      }),
    );
  }

  public async down(queryRunner: QueryRunner): Promise<void> {
    await queryRunner.dropTable('users');
  }
}
// migration/1234567890124-AddForeignKey.ts
import { MigrationInterface, QueryRunner, Table, TableForeignKey } from 'typeorm';

export class AddOrdersForeignKey1234567890124 implements MigrationInterface {
  public async up(queryRunner: QueryRunner): Promise<void> {
    await queryRunner.createTable(
      new Table({
        name: 'orders',
        columns: [
          {
            name: 'id',
            type: 'int',
            isPrimary: true,
            isGenerated: true,
            generationStrategy: 'increment',
          },
          {
            name: 'user_id',
            type: 'int',
            isNullable: false,
          },
          {
            name: 'total_amount',
            type: 'decimal',
            precision: 10,
            scale: 2,
            isNullable: false,
          },
          {
            name: 'status',
            type: 'varchar',
            length: '20',
            default: "'pending'",
            isNullable: false,
          },
          {
            name: 'created_at',
            type: 'timestamp',
            default: 'now()',
          },
        ],
      }),
      true,
    );

    // 添加外键
    await queryRunner.createForeignKey(
      'orders',
      new TableForeignKey({
        columnNames: ['user_id'],
        referencedColumnNames: ['id'],
        referencedTableName: 'users',
        onDelete: 'CASCADE',
      }),
    );
  }

  public async down(queryRunner: QueryRunner): Promise<void> {
    const table = await queryRunner.getTable('orders');
    const foreignKey = table.foreignKeys.find(
      fk => fk.columnNames.indexOf('user_id') !== -1,
    );
    await queryRunner.dropForeignKey('orders', foreignKey);
    await queryRunner.dropTable('orders');
  }
}
# 生成迁移
npx typeorm migration:generate -n AddUserProfile

# 运行迁移
npx typeorm migration:run

# 回滚最后一次迁移
npx typeorm migration:revert

Alembic(Python/SQLAlchemy)

# alembic/versions/001_create_users_table.py
"""创建用户表

修订ID: 001
修订:
创建日期: 2024-01-01 12:00:00.000000

"""
from alembic import op
import sqlalchemy as sa
from sqlalchemy.dialects import postgresql

# 修订标识符
revision = '001'
down_revision = None
branch_labels = None
depends_on = None

def upgrade():
    # 创建用户表
    op.create_table(
        'users',
        sa.Column('id', sa.Integer(), autoincrement=True, nullable=False),
        sa.Column('email', sa.String(length=255), nullable=False),
        sa.Column('username', sa.String(length=50), nullable=False),
        sa.Column('password_hash', sa.String(length=255), nullable=False),
        sa.Column('active', sa.Boolean(), server_default='true', nullable=False),
        sa.Column('created_at', sa.DateTime(), server_default=sa.text('now()'), nullable=False),
        sa.Column('updated_at', sa.DateTime(), server_default=sa.text('now()'), nullable=False),
        sa.PrimaryKeyConstraint('id'),
        sa.UniqueConstraint('email'),
        sa.UniqueConstraint('username')
    )

    # 创建索引
    op.create_index('idx_users_email', 'users', ['email'])
    op.create_index('idx_users_username', 'users', ['username'])
    op.create_index(
        'idx_users_active',
        'users',
        ['active'],
        postgresql_where=sa.text('active = true')
    )

def downgrade():
    op.drop_table('users')
# alembic/versions/002_add_email_verified.py
"""添加email_verified列

修订ID: 002
修订: 001
创建日期: 2024-01-02 12:00:00.000000

"""
from alembic import op
import sqlalchemy as sa

revision = '002'
down_revision = '001'
branch_labels = None
depends_on = None

def upgrade():
    # 首先添加列为可空
    op.add_column('users', sa.Column('email_verified', sa.Boolean(), nullable=True))

    # 为现有行设置默认值
    op.execute('UPDATE users SET email_verified = false WHERE email_verified IS NULL')

    # 使列 NOT NULL
    op.alter_column('users', 'email_verified', nullable=False, server_default='false')

def downgrade():
    op.drop_column('users', 'email_verified')
# 生成迁移
alembic revision --autogenerate -m "添加用户资料"

# 运行迁移
alembic upgrade head

# 回滚一个迁移
alembic downgrade -1

# 回滚到特定版本
alembic downgrade 001

Django迁移

# app/migrations/0001_initial.py
from django.db import migrations, models

class Migration(migrations.Migration):
    initial = True
    dependencies = []

    operations = [
        migrations.CreateModel(
            name='User',
            fields=[
                ('id', models.AutoField(auto_created=True, primary_key=True)),
                ('email', models.EmailField(max_length=255, unique=True)),
                ('username', models.CharField(max_length=50, unique=True)),
                ('password_hash', models.CharField(max_length=255)),
                ('active', models.BooleanField(default=True)),
                ('created_at', models.DateTimeField(auto_now_add=True)),
                ('updated_at', models.DateTimeField(auto_now=True)),
            ],
            options={
                'db_table': 'users',
            },
        ),
        migrations.AddIndex(
            model_name='user',
            index=models.Index(fields=['email'], name='idx_users_email'),
        ),
        migrations.AddIndex(
            model_name='user',
            index=models.Index(fields=['username'], name='idx_users_username'),
        ),
    ]
# app/migrations/0002_add_user_profile.py
from django.db import migrations, models
import django.db.models.deletion

class Migration(migrations.Migration):
    dependencies = [
        ('app', '0001_initial'),
    ]

    operations = [
        migrations.CreateModel(
            name='UserProfile',
            fields=[
                ('id', models.AutoField(auto_created=True, primary_key=True)),
                ('bio', models.TextField(blank=True, null=True)),
                ('avatar_url', models.URLField(blank=True, null=True)),
                ('user', models.OneToOneField(
                    on_delete=django.db.models.deletion.CASCADE,
                    to='app.user',
                    related_name='profile'
                )),
            ],
            options={
                'db_table': 'user_profiles',
            },
        ),
    ]
# 生成迁移
python manage.py makemigrations

# 应用迁移
python manage.py migrate

# 回滚到特定迁移
python manage.py migrate app 0001

# 显示迁移状态
python manage.py showmigrations

数据迁移

回填数据(PostgreSQL)

-- migrations/006_backfill_user_roles.up.sql
-- 添加角色列
ALTER TABLE users ADD COLUMN role VARCHAR(20);

-- 回填现有用户使用默认角色
UPDATE users SET role = 'member' WHERE role IS NULL;

-- 回填后使 NOT NULL
ALTER TABLE users ALTER COLUMN role SET NOT NULL;
ALTER TABLE users ALTER COLUMN role SET DEFAULT 'member';

-- 添加检查约束
ALTER TABLE users ADD CONSTRAINT chk_users_role
  CHECK (role IN ('admin', 'member', 'guest'));

-- migrations/006_backfill_user_roles.down.sql
ALTER TABLE users DROP COLUMN role;

复杂数据迁移(Node.js/TypeORM)

// migration/1234567890125-MigrateUserData.ts
import { MigrationInterface, QueryRunner } from 'typeorm';

export class MigrateUserData1234567890125 implements MigrationInterface {
  public async up(queryRunner: QueryRunner): Promise<void> {
    // 获取所有用户
    const users = await queryRunner.query('SELECT id, full_name FROM users');

    // 将full_name拆分为first_name和last_name
    for (const user of users) {
      const parts = user.full_name?.split(' ') || ['', ''];
      const firstName = parts[0] || '';
      const lastName = parts.slice(1).join(' ') || '';

      await queryRunner.query(
        'UPDATE users SET first_name = $1, last_name = $2 WHERE id = $3',
        [firstName, lastName, user.id],
      );
    }

    // 删除旧列
    await queryRunner.query('ALTER TABLE users DROP COLUMN full_name');
  }

  public async down(queryRunner: QueryRunner): Promise<void> {
    // 添加回full_name列
    await queryRunner.query('ALTER TABLE users ADD COLUMN full_name VARCHAR(255)');

    // 重构full_name
    await queryRunner.query(
      `UPDATE users SET full_name = first_name || ' ' || last_name`,
    );

    // 删除first_name和last_name
    await queryRunner.query('ALTER TABLE users DROP COLUMN first_name');
    await queryRunner.query('ALTER TABLE users DROP COLUMN last_name');
  }
}

数据迁移与Python/Alembic

# alembic/versions/003_migrate_prices.py
"""将价格迁移到分

修订ID: 003
修订: 002
创建日期: 2024-01-03 12:00:00.000000

"""
from alembic import op
import sqlalchemy as sa

revision = '003'
down_revision = '002'

def upgrade():
    # 添加新列
    op.add_column('products', sa.Column('price_cents', sa.Integer(), nullable=True))

    # 迁移数据:将十进制转换为分
    op.execute('''
        UPDATE products
        SET price_cents = CAST(price * 100 AS INTEGER)
    ''')

    # 迁移后使 NOT NULL
    op.alter_column('products', 'price_cents', nullable=False)

    # 删除旧列
    op.drop_column('products', 'price')

    # 重命名新列
    op.alter_column('products', 'price_cents', new_column_name='price')

def downgrade():
    # 添加回十进制列
    op.add_column('products', sa.Column('price_decimal', sa.Numeric(10, 2), nullable=True))

    # 转换回十进制
    op.execute('''
        UPDATE products
        SET price_decimal = price / 100.0
    ''')

    op.alter_column('products', 'price_decimal', nullable=False)
    op.drop_column('products', 'price')
    op.alter_column('products', 'price_decimal', new_column_name='price')

零停机迁移

添加 NOT NULL 列

-- 迁移 1:添加列为可空
ALTER TABLE users ADD COLUMN phone VARCHAR(20);

-- 部署写入phone列的应用程序代码

-- 迁移 2:回填现有数据
UPDATE users SET phone = 'UNKNOWN' WHERE phone IS NULL;

-- 迁移 3:使列 NOT NULL
ALTER TABLE users ALTER COLUMN phone SET NOT NULL;
ALTER TABLE users ALTER COLUMN phone SET DEFAULT 'UNKNOWN';

重命名列(零停机)

-- 阶段 1:添加新列
ALTER TABLE users ADD COLUMN email_address VARCHAR(255);

-- 阶段 2:部署写入两个列的应用程序代码

-- 阶段 3:回填数据
UPDATE users SET email_address = email WHERE email_address IS NULL;

-- 阶段 4:部署从新列读取的应用程序代码

-- 阶段 5:删除旧列
ALTER TABLE users DROP COLUMN email;

-- 阶段 6:重命名新列(可选)
ALTER TABLE users RENAME COLUMN email_address TO email;

删除列(安全)

-- 阶段 1:部署不使用该列的代码

-- 阶段 2:移除 NOT NULL 约束(使其安全回滚)
ALTER TABLE users ALTER COLUMN deprecated_field DROP NOT NULL;

-- 阶段 3:等待并验证无问题

-- 阶段 4:删除该列
ALTER TABLE users DROP COLUMN deprecated_field;

常见模式

添加枚举列

-- 创建枚举类型(PostgreSQL)
CREATE TYPE user_status AS ENUM ('active', 'inactive', 'suspended');

-- 添加带有枚举类型的列
ALTER TABLE users ADD COLUMN status user_status DEFAULT 'active' NOT NULL;

-- 回滚
ALTER TABLE users DROP COLUMN status;
DROP TYPE user_status;

添加JSON列

-- PostgreSQL
ALTER TABLE users ADD COLUMN metadata JSONB DEFAULT '{}' NOT NULL;
CREATE INDEX idx_users_metadata ON users USING GIN(metadata);

-- MySQL
ALTER TABLE users ADD COLUMN metadata JSON;

添加全文搜索

-- PostgreSQL
ALTER TABLE products ADD COLUMN search_vector tsvector;

-- 创建生成列
UPDATE products SET search_vector =
  to_tsvector('english', name || ' ' || description);

-- 为快速搜索创建GIN索引
CREATE INDEX idx_products_search ON products USING GIN(search_vector);

-- 触发器以保持search_vector更新
CREATE TRIGGER products_search_update
BEFORE INSERT OR UPDATE ON products
FOR EACH ROW EXECUTE FUNCTION
  tsvector_update_trigger(search_vector, 'pg_catalog.english', name, description);

最佳实践

要做 ✓

  • 使迁移可逆 - 始终实现down迁移
  • 先在暂存环境测试 - 切勿在生产中运行未测试的迁移
  • 保持迁移小 - 每次迁移一个逻辑变更
  • 使用事务 - 确保原子性(当数据库支持时)
  • 迁移前备份 - 始终有回滚计划
  • 并发添加索引 - 在PostgreSQL中使用CONCURRENTLY以避免锁定
  • 版本控制迁移 - 将迁移提交与代码变更一起
  • 文档化破坏性变更 - 为复杂迁移添加注释
  • 使用批量更新 - 对于大数据迁移,分块处理

不要做 ✗

  • 切勿修改已提交的迁移 - 创建新迁移替代
  • **不要使用 SELECT *** - 在数据迁移中指定列
  • 避免长时间运行的迁移 - 分成更小的步骤
  • 不要假设数据状态 - 转换前验证
  • 切勿跳过迁移 - 按顺序运行
  • 不要忽略警告 - 处理弃用通知
  • 避免循环依赖 - 保持迁移顺序清晰
  • 不要忘记索引 - 特别是外键上的索引

迁移检查清单

## 迁移前检查清单

- [ ] 在本地数据库测试迁移
- [ ] 在暂存环境测试迁移
- [ ] 创建数据库备份
- [ ] 迁移可逆(down迁移工作)
- [ ] 审查潜在数据丢失
- [ ] 检查长时间运行操作
- [ ] 外键约束已验证
- [ ] 为新列添加索引
- [ ] 性能影响评估
- [ ] 团队通知迁移计划

## 迁移后检查清单

- [ ] 迁移成功完成
- [ ] 检查应用程序日志中的错误
- [ ] 监控数据库性能
- [ ] 回滚计划测试(如果需要)
- [ ] 文档更新
- [ ] 在版本控制中标记迁移已应用

故障排除

迁移中途失败

-- 检查迁移状态
SELECT * FROM schema_migrations;

-- 如果事务失败,手动回滚
BEGIN;
-- 手动运行down迁移
ROLLBACK;

-- 或标记为未应用
DELETE FROM schema_migrations WHERE version = '20240101120000';

大表迁移

-- 对大更新使用批处理
DO $$
DECLARE
  batch_size INTEGER := 1000;
  offset_val INTEGER := 0;
  rows_updated INTEGER;
BEGIN
  LOOP
    UPDATE users
    SET email_verified = false
    WHERE id IN (
      SELECT id FROM users
      WHERE email_verified IS NULL
      ORDER BY id
      LIMIT batch_size
      OFFSET offset_val
    );

    GET DIAGNOSTICS rows_updated = ROW_COUNT;
    EXIT WHEN rows_updated = 0;

    offset_val := offset_val + batch_size;
    COMMIT;
    RAISE NOTICE '更新了 % 行', offset_val;
  END LOOP;
END $$;

注意事项

  • 始终先在非生产环境测试迁移
  • 尽可能使用数据库事务
  • 将迁移保存在版本控制中
  • 文档化复杂迁移
  • 考虑生产中的零停机策略
  • 迁移期间监控数据库性能
  • 准备回滚计划
  • 可用时使用ORM迁移工具以获得类型安全性