DatabaseMigrations DatabaseMigrations

数据库迁移是用于模式演变和数据转换的数据库迁移策略和工具,它们帮助团队以受控、可复现的方式发展数据库结构,同时维护数据完整性并最小化停机时间。

数据工程 0 次安装 0 次浏览 更新于 3/5/2026

数据库迁移

概览

数据库迁移是随时间修改数据库模式和数据的版本化脚本。它们使团队能够以受控、可复现的方式发展他们的数据库结构,同时维护数据完整性并最小化停机时间。

前提条件

  • 理解SQL和数据库模式设计
  • 了解数据库系统(PostgreSQL、MySQL等)
  • 熟悉版本控制系统
  • 基本理解数据转换概念

核心概念

数据库迁移是什么?

数据库迁移是:

  • 版本化:每个迁移都有一个唯一的版本号或时间戳
  • 有序:迁移按特定顺序应用
  • 可逆:大多数迁移可以回滚
  • 幂等:可以安全地多次运行
  • 团队兼容:允许多个开发人员工作在模式更改上

迁移类型

  1. 模式迁移

    • 创建/删除表
    • 添加/移除列
    • 修改约束
    • 创建/删除索引
  2. 数据迁移

    • 转换现有数据
    • 填充参考表
    • 清理不一致的数据
    • 回填新列
  3. 回滚迁移

    • 逆转模式更改
    • 恢复先前的数据状态
    • 处理边缘情况

迁移生命周期

开发 → 测试 → 暂存 → 生产
     ↓            ↓          ↓          ↓
   创建      验证     验证      部署
   测试         测试       测试      监控

实施指南

基本迁移结构

// migrations/20240124000001_create_users_table.js
module.exports = {
  up: async (db) => {
    await db.schema.createTable('users', (table) => {
      table.increments('id').primary();
      table.string('email', 255).notNullable().unique();
      table.string('password_hash', 255).notNullable();
      table.timestamp('created_at').defaultTo(db.fn.now());
      table.timestamp('updated_at').defaultTo(db.fn.now());
    });
  },

  down: async (db) => {
    await db.schema.dropTable('users');
  }
};

使用Knex.js

const knex = require('knex');

// 初始化knex
const db = knex({
  client: 'pg',
  connection: {
    host: 'localhost',
    user: 'user',
    password: 'password',
    database: 'mydb'
  },
  migrations: {
    directory: './migrations',
    tableName: 'knex_migrations'
  }
});

// 创建一个新的迁移
// npx knex migrate:make create_users_table

// 运行迁移
await db.migrate.latest();

// 回滚上一个迁移
await db.migrate.rollback();

// 获取当前版本
const current = await db.migrate.currentVersion();
console.log('当前迁移版本:', current);

使用Sequelize

const { Sequelize } = require('sequelize');

const sequelize = new Sequelize('mydb', 'user', 'password', {
  dialect: 'postgres',
  host: 'localhost',
  logging: false
});

// 定义迁移
module.exports = {
  up: async (queryInterface, Sequelize) => {
    await queryInterface.createTable('users', {
      id: {
        type: Sequelize.INTEGER,
        primaryKey: true,
        autoIncrement: true
      },
      email: {
        type: Sequelize.STRING(255),
        allowNull: false,
        unique: true
      },
      password_hash: {
        type: Sequelize.STRING(255),
        allowNull: false
      },
      created_at: {
        type: Sequelize.DATE,
        defaultValue: Sequelize.literal('CURRENT_TIMESTAMP')
      },
      updated_at: {
        type: Sequelize.DATE,
        defaultValue: Sequelize.literal('CURRENT_TIMESTAMP')
      }
    });
  },

  down: async (queryInterface) => {
    await queryInterface.dropTable('users');
  }
};

// 运行迁移
await sequelize.sync({ alter: true });

使用TypeORM

import { MigrationInterface, QueryRunner } from 'typeorm';

export class CreateUsersTable1234567890 implements MigrationInterface {
  public async up(queryRunner: QueryRunner): Promise<void> {
    await queryRunner.createTable(new Table({
      name: 'users',
      columns: [
        {
          name: 'id',
          type: 'int',
          isPrimary: true,
          isGenerated: true,
          generationStrategy: 'increment'
        },
        {
          name: 'email',
          type: 'varchar',
          length: '255',
          isNullable: false,
          isUnique: true
        },
        {
          name: 'password_hash',
          type: 'varchar',
          length: '255',
          isNullable: false
        },
        {
          name: 'created_at',
          type: 'timestamp',
          default: 'CURRENT_TIMESTAMP'
        },
        {
          name: 'updated_at',
          type: 'timestamp',
          default: 'CURRENT_TIMESTAMP'
        }
      ]
    }));
  }

  public async down(queryRunner: QueryRunner): Promise<void> {
    await queryRunner.dropTable('users');
  }
}

使用Prisma

// prisma/migrations/20240124000001_init/migration.sql
-- CreateUsersTable
CREATE TABLE "users" (
    "id" SERIAL PRIMARY KEY,
    "email" VARCHAR(255) NOT NULL UNIQUE,
    "password_hash" VARCHAR(255) NOT NULL,
    "created_at" TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
    "updated_at" TIMESTAMP DEFAULT CURRENT_TIMESTAMP
);

-- CreateUsersTableRollback
DROP TABLE "users";
# 生成迁移
npx prisma migrate dev --name init

# 应用迁移
npx prisma migrate deploy

# 回滚
npx prisma migrate resolve --rolled-back 20240124000001_init

迁移模式

添加列

module.exports = {
  up: async (db) => {
    await db.schema.table('users')
      .addColumn('phone', 'varchar', { length: 20 });
  },

  down: async (db) => {
    await db.schema.table('users')
      .dropColumn('phone');
  }
};

添加具有默认值的列

module.exports = {
  up: async (db) => {
    await db.schema.table('users')
      .addColumn('status', 'varchar', {
        length: 50,
        defaultTo: 'active',
        notNullable: true
      });
  },

  down: async (db) => {
    await db.schema.table('users')
      .dropColumn('status');
  }
};

重命名列

module.exports = {
  up: async (db) => {
    await db.schema.table('users')
      .renameColumn('email_address', 'email');
  },

  down: async (db) => {
    await db.schema.table('users')
      .renameColumn('email', 'email_address');
  }
};

添加索引

module.exports = {
  up: async (db) => {
    await db.schema.table('users')
      .index('email', 'idx_users_email');
  },

  down: async (db) => {
    await db.schema.table('users')
      .dropIndex('email', 'idx_users_email');
  }
};

添加外键

module.exports = {
  up: async (db) => {
    await db.schema.table('posts')
      .foreign('user_id')
      .references('id')
      .inTable('users')
      .onDelete('CASCADE');
  },

  down: async (db) => {
    await db.schema.table('posts')
      .dropForeign('user_id');
  }
};

数据迁移

module.exports = {
  up: async (db) => {
    // 添加新列
    await db.schema.table('users')
      .addColumn('full_name', 'varchar', { length: 255 });

    // 迁移数据
    await db('users')
      .update({
        full_name: db.raw('CONCAT(first_name, " ", last_name)')
      });

    // 删除旧列
    await db.schema.table('users')
      .dropColumn('first_name');
    await db.schema.table('users')
      .dropColumn('last_name');
  },

  down: async (db) => {
    // 添加回旧列
    await db.schema.table('users')
      .addColumn('first_name', 'varchar', { length: 100 });
    await db.schema.table('users')
      .addColumn('last_name', 'varchar', { length: 100 });

    // 迁移数据回
    await db.raw(`
      UPDATE users
      SET first_name = SPLIT_PART(full_name, ' ', 1),
          last_name = SPLIT_PART(full_name, ' ', 2)
    `);

    // 删除新列
    await db.schema.table('users')
      .dropColumn('full_name');
  }
};

零停机迁移

扩展列(PostgreSQL)

module.exports = {
  up: async (db) => {
    // 第1步:添加新列为nullable
    await db.schema.table('users')
      .addColumn('email_new', 'varchar', { length: 255 });

    // 第2步:回填数据
    await db.raw(`
      UPDATE users
      SET email_new = email
      WHERE email_new IS NULL
    `);

    // 第3步:添加约束
    await db.raw(`
      ALTER TABLE users
      ADD CONSTRAINT users_email_new_key
      UNIQUE (email_new)
    `);

    // 第4步:交换列
    await db.raw(`
      ALTER TABLE users
      RENAME COLUMN email TO email_old
    `);

    await db.raw(`
      ALTER TABLE users
      RENAME COLUMN email_new TO email
    `);

    // 第5步:删除旧列
    await db.schema.table('users')
      .dropColumn('email_old');
  },

  down: async (db) => {
    // 反转过程
    await db.schema.table('users')
      .addColumn('email_old', 'varchar', { length: 255 });

    await db.raw(`
      UPDATE users
      SET email_old = email
      WHERE email_old IS NULL
    `);

    await db.raw(`
      ALTER TABLE users
      DROP CONSTRAINT IF EXISTS users_email_key
    `);

    await db.raw(`
      ALTER TABLE users
      RENAME COLUMN email TO email_new
    `);

    await db.raw(`
      ALTER TABLE users
      RENAME COLUMN email_old TO email
    `);

    await db.schema.table('users')
      .dropColumn('email_new');
  }
};

扩展列(MySQL)

module.exports = {
  up: async (db) => {
    // 第1步:添加新列
    await db.schema.table('users')
      .addColumn('email_new', 'varchar', { length: 255 });

    // 第2步:分批回填数据
    const batchSize = 1000;
    let offset = 0;

    while (true) {
      const result = await db('users')
        .whereNull('email_new')
        .limit(batchSize)
        .offset(offset)
        .update({
          email_new: db.raw('email')
        });

      if (result === 0) break;
      offset += batchSize;
    }

    // 第3步:重命名列
    await db.raw(`
      ALTER TABLE users
      CHANGE COLUMN email email_old VARCHAR(255)
    `);

    await db.raw(`
      ALTER TABLE users
      CHANGE COLUMN email_new email VARCHAR(255) NOT NULL UNIQUE
    `);

    // 第4步:删除旧列
    await db.schema.table('users')
      .dropColumn('email_old');
  },

  down: async (db) => {
    // 反转过程
    await db.schema.table('users')
      .addColumn('email_old', 'varchar', { length: 255 });

    const batchSize = 1000;
    let offset = 0;

    while (true) {
      const result = await db('users')
        .whereNull('email_old')
        .limit(batchSize)
        .offset(offset)
        .update({
          email_old: db.raw('email')
        });

      if (result === 0) break;
      offset += batchSize;
    }

    await db.raw(`
      ALTER TABLE users
      CHANGE COLUMN email email_new VARCHAR(255)
    `);

    await db.raw(`
      ALTER TABLE users
      CHANGE COLUMN email_old email VARCHAR(255) NOT NULL UNIQUE
    `);

    await db.schema.table('users')
      .dropColumn('email_new');
  }
};

添加索引而不锁定

module.exports = {
  up: async (db) => {
    // PostgreSQL: CREATE INDEX CONCURRENTLY
    await db.raw(`
      CREATE INDEX CONCURRENTLY idx_users_email
      ON users(email)
    `);
  },

  down: async (db) => {
    await db.raw(`
      DROP INDEX CONCURRENTLY IF EXISTS idx_users_email
    `);
  }
};

回滚策略

简单回滚

module.exports = {
  up: async (db) => {
    await db.schema.createTable('users', (table) => {
      table.increments('id').primary();
      table.string('email', 255).notNullable().unique();
    });
  },

  down: async (db) => {
    await db.schema.dropTable('users');
  }
};

数据保留回滚

module.exports = {
  up: async (db) => {
    // 创建备份表
    await db.raw(`
      CREATE TABLE users_backup AS
      SELECT * FROM users
    `);

    // 进行更改
    await db('users')
      .where('status', 'inactive')
      .update({ status: 'deleted' });
  },

  down: async (db) => {
    // 从备份恢复
    await db.raw(`
      UPDATE users u
      SET status = ub.status
      FROM users_backup ub
      WHERE u.id = ub.id
    `);

    // 删除备份表
    await db.schema.dropTableIfExists('users_backup');
  }
};

条件回滚

module.exports = {
  up: async (db) => {
    // 检查列是否存在
    const hasColumn = await db.schema.hasColumn('users', 'phone');

    if (!hasColumn) {
      await db.schema.table('users')
        .addColumn('phone', 'varchar', { length: 20 });
    }
  },

  down: async (db) => {
    await db.schema.table('users')
      .dropColumnIfExists('phone');
  }
};

版本控制

命名约定

# 基于时间戳
20240124000001_create_users_table.js
20240124000002_add_phone_to_users.js

# 基于描述
20240124_120000_create_users_table.js
20240124_130000_add_phone_to_users.js

# 序列
001_create_users_table.js
002_add_phone_to_users.js

迁移跟踪表

CREATE TABLE schema_migrations (
  id SERIAL PRIMARY KEY,
  version VARCHAR(255) NOT NULL UNIQUE,
  name VARCHAR(255),
  applied_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP
);

分支迁移

// 处理分支特定的迁移
const isProduction = process.env.NODE_ENV === 'production';

module.exports = {
  up: async (db) => {
    await db.schema.createTable('users', (table) => {
      table.increments('id').primary();
      table.string('email', 255).notNullable().unique();
    });

    // 生产特定的迁移
    if (isProduction) {
      await db.raw(`
        CREATE INDEX CONCURRENTLY idx_users_email
        ON users(email)
      `);
    }
  },

  down: async (db) => {
    if (isProduction) {
      await db.raw(`
        DROP INDEX CONCURRENTLY IF EXISTS idx_users_email
      `);
    }

    await db.schema.dropTable('users');
  }
};

测试迁移

单元测试

const { expect } = require('chai');
const db = require('./db');

describe('Migration: create_users_table', () => {
  let migration;

  before(async () => {
    migration = require('./migrations/20240124000001_create_users_table');
  });

  it('should create users table', async () => {
    await migration.up(db);

    const hasTable = await db.schema.hasTable('users');
    expect(hasTable).to.be.true;
  });

  it('should create all columns', async () => {
    await migration.up(db);

    const hasId = await db.schema.hasColumn('users', 'id');
    const hasEmail = await db.schema.hasColumn('users', 'email');
    const hasPasswordHash = await db.schema.hasColumn('users', 'password_hash');

    expect(hasId).to.be.true;
    expect(hasEmail).to.be.true;
    expect(hasPasswordHash).to.be.true;
  });

  it('should create unique constraint on email', async () => {
    await migration.up(db);

    const indexes = await db('users').columnInfo();
    const emailIndex = indexes.find(i => i.column_name === 'email');

    expect(emailIndex).to.exist;
  });

  afterEach(async () => {
    await migration.down(db);
  });
});

集成测试

describe('Migration Integration', () => {
  let testDb;

  before(async () => {
    // 创建测试数据库
    testDb = knex({
      client: 'pg',
      connection: {
        host: 'localhost',
        user: 'user',
        password: 'password',
        database: 'mydb_test'
      }
    });

    // 运行所有迁移
    await testDb.migrate.latest();
  });

  it('should allow inserting users', async () => {
    await testDb('users').insert({
      email: 'test@example.com',
      password_hash: 'hashed_password'
    });

    const users = await testDb('users').select();
    expect(users).to.have.lengthOf(1);
  });

  it('should enforce unique email constraint', async () => {
    await testDb('users').insert({
      email: 'test@example.com',
      password_hash: 'hashed_password'
    });

    try {
      await testDb('users').insert({
        email: 'test@example.com',
        password_hash: 'different_hash'
      });
      throw new Error('Should have thrown unique constraint error');
    } catch (error) {
      expect(error.code).to.equal('23505'); // 唯一性违规
    }
  });

  after(async () => {
    await testDb.destroy();
  });
});

最佳实践

  1. 使迁移可逆

    • 始终实现down函数
    • 在开发中测试回滚
    • 记录不可逆迁移
  2. 保持迁移小

    • 每个迁移一个逻辑更改
    • 避免合并不相关的更改
    • 将大型迁移拆分为更小的
  3. 使用事务

    • 将迁移包装在事务中
    • 失败时回滚
    • 确保原子性
  4. 彻底测试

    • 在样本数据上测试迁移
    • 验证回滚是否有效
    • 在生产之前在暂存环境中测试
  5. 记录更改

    • 添加注释解释更改
    • 记录重大更改
    • 记录数据转换
  6. 处理大型数据集

    • 使用批处理进行数据迁移
    • 在数据迁移之前添加索引
    • 在迁移期间监控性能
  7. 版本控制

    • 将迁移与代码更改一起提交
    • 永远不要修改已提交的迁移
    • 为更正创建新的迁移

常见陷阱

  1. 不可逆迁移

    // 坏:删除表而不备份
    down: async (db) => {
      await db.schema.dropTable('users'); // 数据丢失!
    }
    
    // 好:删除前创建备份
    up: async (db) => {
      await db.raw('CREATE TABLE users_backup AS SELECT * FROM users');
      await db.schema.dropTable('users');
    }
    
    down: async (db) => {
      await db.raw('CREATE TABLE users AS SELECT * FROM users_backup');
      await db.schema.dropTable('users_backup');
    }
    
  2. 阻塞生产

    // 坏:长时间运行的迁移阻塞生产
    up: async (db) => {
      await db.raw('CREATE INDEX idx_users_email ON users(email)');
    }
    
    // 好:PostgreSQL使用CONCURRENTLY
    up: async (db) => {
      await db.raw('CREATE INDEX CONCURRENTLY idx_users_email ON users(email)');
    }
    
  3. 数据丢失

    // 坏:覆盖数据而不备份
    up: async (db) => {
      await db('users').update({ status: 'active' });
    }
    
    // 好:首先创建备份
    up: async (db) => {
      await db.raw('CREATE TABLE users_backup AS SELECT * FROM users');
      await db('users').update({ status: 'active' });
    }
    

相关技能