alembicSkill alembic

Alembic数据库迁移工具是用于SQLAlchemy项目的专业数据库架构版本控制系统。它支持自动生成迁移脚本、管理数据库变更历史、支持异步操作和多种数据库后端(PostgreSQL、MySQL、SQLite)。主要功能包括:数据库架构变更管理、版本控制、数据迁移、分支合并、回滚操作等。关键词:数据库迁移、SQLAlchemy、Python、Alembic、数据库架构管理、版本控制、异步数据库、PostgreSQL、MySQL、SQLite、数据迁移工具。

后端开发 0 次安装 4 次浏览 更新于 3/2/2026

name: alembic description: 使用Alembic管理SQLAlchemy项目的数据库迁移 when_to_use: 当您需要在SQLAlchemy项目中创建、应用或管理数据库架构变更时

Alembic 数据库迁移

Alembic 是一个用于SQLAlchemy项目的数据库迁移工具,为您的数据库架构提供版本控制。

快速开始

创建迁移(自动生成)

# 根据模型变更生成迁移
uv run alembic revision --autogenerate -m "添加用户表"

# 检查是否有待处理的变更
uv run alembic check

应用迁移

# 升级到最新版本
uv run alembic upgrade head

# 升级到特定修订版本
uv run alembic upgrade ae1027a6acf

# 回退一个修订版本
uv run alembic downgrade -1

# 回退到基础(空架构)
uv run alembic downgrade base

检查状态

# 显示当前数据库修订版本
uv run alembic current

# 显示所有修订历史
uv run alembic history

# 显示修订详情
uv run alembic show ae1027a6acf

常用模式

自动生成配置

异步SQLAlchemy的env.py设置:

import asyncio
from logging.config import fileConfig
from sqlalchemy import pool
from sqlalchemy.ext.asyncio import async_engine_from_config
from alembic import context

# 导入您的模型
from app.models import Base
from app.config import get_settings

config = context.config
settings = get_settings()

# 为异步配置数据库URL
database_url = settings.database_url.replace("postgresql://", "postgresql+asyncpg://")
config.set_main_option("sqlalchemy.url", database_url)

target_metadata = Base.metadata

async def run_async_migrations():
    connectable = async_engine_from_config(
        config.get_section(config.config_ini_section, {}),
        prefix="sqlalchemy.",
        poolclass=pool.NullPool,
    )

    async with connectable.connect() as connection:
        await connection.run_sync(do_run_migrations)

    await connectable.dispose()

def do_run_migrations(connection):
    context.configure(
        connection=connection,
        target_metadata=target_metadata,
        compare_type=True,
        compare_server_default=True,
        render_as_batch=False,  # 对于SQLite设置为True
    )

    with context.begin_transaction():
        context.run_migrations()

def run_migrations_online():
    asyncio.run(run_async_migrations())

if context.is_offline_mode():
    run_migrations_offline()
else:
    run_migrations_online()

手动迁移操作

常见架构变更:

from alembic import op
import sqlalchemy as sa

def upgrade():
    # 添加列
    op.add_column('users', sa.Column('email', sa.String(255), nullable=True))

    # 重命名表
    op.rename_table('old_table', 'new_table')

    # 创建索引
    op.create_index('ix_users_email', 'users', ['email'])

    # 添加约束
    op.create_check_constraint('ck_age_positive', 'users', 'age > 0')

def downgrade():
    # 反向操作
    op.drop_constraint('ck_age_positive', 'users')
    op.drop_index('ix_users_email')
    op.rename_table('new_table', 'old_table')
    op.drop_column('users', 'email')

批处理模式(用于SQLite)

在env.py中配置批处理模式:

context.configure(
    connection=connection,
    target_metadata=target_metadata,
    render_as_batch=True  # SQLite迁移必需
)

生成的批处理迁移:

def upgrade():
    with op.batch_alter_table('users', schema=None) as batch_op:
        batch_op.add_column(sa.Column('email', sa.String(length=255), nullable=True))
        batch_op.create_index('ix_users_email', ['email'], unique=False)

过滤对象

在自动生成中跳过某些对象:

def include_object(object, name, type_, reflected, compare_to):
    # 跳过临时表
    if type_ == "table" and name.startswith("temp_"):
        return False

    # 跳过带有skip_autogenerate标志的列
    if type_ == "column" and not reflected:
        if object.info.get("skip_autogenerate", False):
            return False

    return True

context.configure(
    connection=connection,
    target_metadata=target_metadata,
    include_object=include_object
)

按模式过滤:

def include_name(name, type_, parent_names):
    if type_ == "schema":
        return name in [None, "public", "auth"]  # 包含默认+特定模式
    elif type_ == "table":
        return parent_names["schema_qualified_table_name"] in target_metadata.tables
    return True

context.configure(
    connection=connection,
    target_metadata=target_metadata,
    include_name=include_name,
    include_schemas=True
)

自定义迁移处理

修改生成的迁移:

def process_revision_directives(context, revision, directives):
    script = directives[0]

    # 跳过空迁移
    if config.cmd_opts.autogenerate and script.upgrade_ops.is_empty():
        directives[:] = []
        return

    # 移除单向迁移的降级操作
    script.downgrade_ops.ops[:] = []

context.configure(
    connection=connection,
    target_metadata=target_metadata,
    process_revision_directives=process_revision_directives
)

数据迁移

在架构变更期间迁移数据:

def upgrade():
    # 添加新列
    op.add_column('users', sa.Column('full_name', sa.String(255), nullable=True))

    # 迁移数据
    connection = op.get_bind()
    connection.execute(
        sa.text("UPDATE users SET full_name = first_name || ' ' || last_name")
    )

    # 数据迁移后使列必需
    op.alter_column('users', 'full_name', nullable=False)

def downgrade():
    op.drop_column('users', 'full_name')

分支迁移

处理多个分支:

# 创建分支
uv run alembic revision -m "创建功能分支" --head=base --branch-label=feature_x

# 升级特定分支
uv run alembic upgrade feature_x@head

# 合并分支
uv run alembic merge -m "将feature_x合并到main" feature_x@head main@head

实用代码片段

检查数据库是否最新

from alembic import config, script
from alembic.runtime import migration
from sqlalchemy import create_engine

def is_database_up_to_date(alembic_cfg_path, database_url):
    """检查数据库架构是否匹配最新迁移"""
    cfg = config.Config(alembic_cfg_path)
    directory = script.ScriptDirectory.from_config(cfg)

    engine = create_engine(database_url)
    with engine.begin() as connection:
        context = migration.MigrationContext.configure(connection)
        current_heads = set(context.get_current_heads())
        latest_heads = set(directory.get_heads())
        return current_heads == latest_heads

以编程方式运行迁移

from alembic import command
from alembic.config import Config

def run_migrations(alembic_ini_path):
    """运行所有待处理迁移"""
    alembic_cfg = Config(alembic_ini_path)
    command.upgrade(alembic_cfg, "head")

def create_migration(alembic_ini_path, message, autogenerate=True):
    """创建新迁移"""
    alembic_cfg = Config(alembic_ini_path)
    command.revision(alembic_cfg, message=message, autogenerate=autogenerate)

自定义迁移操作

from alembic.autogenerate import rewriter
from alembic.operations import ops

writer = rewriter.Rewriter()

@writer.rewrites(ops.AddColumnOp)
def add_column_non_nullable(context, revision, op):
    """分两步添加非空列"""
    if not op.column.nullable:
        op.column.nullable = True
        return [
            op,
            ops.AlterColumnOp(
                op.table_name,
                op.column.name,
                nullable=False,
                existing_type=op.column.type,
                schema=op.schema
            )
        ]
    return op

# 在env.py中使用
context.configure(
    connection=connection,
    target_metadata=target_metadata,
    process_revision_directives=writer
)

要求

  • Python 3.8+:异步支持必需
  • SQLAlchemy 2.0+:用于现代异步模式
  • PostgreSQL/MySQL/SQLite:支持的数据库
  • Alembic 1.8+:迁移工具

常见依赖

# 核心依赖
uv add alembic sqlalchemy

# 用于PostgreSQL异步
uv add asyncpg

# 用于MySQL异步
uv add aiomysql

# 用于SQLite(内置)
# 无需额外包

开发设置

# 在现有项目中初始化Alembic
uv run alembic init alembic

# 为您的模型配置env.py
# 编辑alembic.ini以设置数据库URL

# 首次迁移
uv run alembic revision --autogenerate -m "初始架构"
uv run alembic upgrade head