name: alembic description: 使用Alembic管理SQLAlchemy项目的数据库迁移 when_to_use: 当您需要在SQLAlchemy项目中创建、应用或管理数据库架构变更时
Alembic 数据库迁移
Alembic 是一个用于SQLAlchemy项目的数据库迁移工具,为您的数据库架构提供版本控制。
快速开始
创建迁移(自动生成)
# 根据模型变更生成迁移
uv run alembic revision --autogenerate -m "添加用户表"
# 检查是否有待处理的变更
uv run alembic check
应用迁移
# 升级到最新版本
uv run alembic upgrade head
# 升级到特定修订版本
uv run alembic upgrade ae1027a6acf
# 回退一个修订版本
uv run alembic downgrade -1
# 回退到基础(空架构)
uv run alembic downgrade base
检查状态
# 显示当前数据库修订版本
uv run alembic current
# 显示所有修订历史
uv run alembic history
# 显示修订详情
uv run alembic show ae1027a6acf
常用模式
自动生成配置
异步SQLAlchemy的env.py设置:
import asyncio
from logging.config import fileConfig
from sqlalchemy import pool
from sqlalchemy.ext.asyncio import async_engine_from_config
from alembic import context
# 导入您的模型
from app.models import Base
from app.config import get_settings
config = context.config
settings = get_settings()
# 为异步配置数据库URL
database_url = settings.database_url.replace("postgresql://", "postgresql+asyncpg://")
config.set_main_option("sqlalchemy.url", database_url)
target_metadata = Base.metadata
async def run_async_migrations():
connectable = async_engine_from_config(
config.get_section(config.config_ini_section, {}),
prefix="sqlalchemy.",
poolclass=pool.NullPool,
)
async with connectable.connect() as connection:
await connection.run_sync(do_run_migrations)
await connectable.dispose()
def do_run_migrations(connection):
context.configure(
connection=connection,
target_metadata=target_metadata,
compare_type=True,
compare_server_default=True,
render_as_batch=False, # 对于SQLite设置为True
)
with context.begin_transaction():
context.run_migrations()
def run_migrations_online():
asyncio.run(run_async_migrations())
if context.is_offline_mode():
run_migrations_offline()
else:
run_migrations_online()
手动迁移操作
常见架构变更:
from alembic import op
import sqlalchemy as sa
def upgrade():
# 添加列
op.add_column('users', sa.Column('email', sa.String(255), nullable=True))
# 重命名表
op.rename_table('old_table', 'new_table')
# 创建索引
op.create_index('ix_users_email', 'users', ['email'])
# 添加约束
op.create_check_constraint('ck_age_positive', 'users', 'age > 0')
def downgrade():
# 反向操作
op.drop_constraint('ck_age_positive', 'users')
op.drop_index('ix_users_email')
op.rename_table('new_table', 'old_table')
op.drop_column('users', 'email')
批处理模式(用于SQLite)
在env.py中配置批处理模式:
context.configure(
connection=connection,
target_metadata=target_metadata,
render_as_batch=True # SQLite迁移必需
)
生成的批处理迁移:
def upgrade():
with op.batch_alter_table('users', schema=None) as batch_op:
batch_op.add_column(sa.Column('email', sa.String(length=255), nullable=True))
batch_op.create_index('ix_users_email', ['email'], unique=False)
过滤对象
在自动生成中跳过某些对象:
def include_object(object, name, type_, reflected, compare_to):
# 跳过临时表
if type_ == "table" and name.startswith("temp_"):
return False
# 跳过带有skip_autogenerate标志的列
if type_ == "column" and not reflected:
if object.info.get("skip_autogenerate", False):
return False
return True
context.configure(
connection=connection,
target_metadata=target_metadata,
include_object=include_object
)
按模式过滤:
def include_name(name, type_, parent_names):
if type_ == "schema":
return name in [None, "public", "auth"] # 包含默认+特定模式
elif type_ == "table":
return parent_names["schema_qualified_table_name"] in target_metadata.tables
return True
context.configure(
connection=connection,
target_metadata=target_metadata,
include_name=include_name,
include_schemas=True
)
自定义迁移处理
修改生成的迁移:
def process_revision_directives(context, revision, directives):
script = directives[0]
# 跳过空迁移
if config.cmd_opts.autogenerate and script.upgrade_ops.is_empty():
directives[:] = []
return
# 移除单向迁移的降级操作
script.downgrade_ops.ops[:] = []
context.configure(
connection=connection,
target_metadata=target_metadata,
process_revision_directives=process_revision_directives
)
数据迁移
在架构变更期间迁移数据:
def upgrade():
# 添加新列
op.add_column('users', sa.Column('full_name', sa.String(255), nullable=True))
# 迁移数据
connection = op.get_bind()
connection.execute(
sa.text("UPDATE users SET full_name = first_name || ' ' || last_name")
)
# 数据迁移后使列必需
op.alter_column('users', 'full_name', nullable=False)
def downgrade():
op.drop_column('users', 'full_name')
分支迁移
处理多个分支:
# 创建分支
uv run alembic revision -m "创建功能分支" --head=base --branch-label=feature_x
# 升级特定分支
uv run alembic upgrade feature_x@head
# 合并分支
uv run alembic merge -m "将feature_x合并到main" feature_x@head main@head
实用代码片段
检查数据库是否最新
from alembic import config, script
from alembic.runtime import migration
from sqlalchemy import create_engine
def is_database_up_to_date(alembic_cfg_path, database_url):
"""检查数据库架构是否匹配最新迁移"""
cfg = config.Config(alembic_cfg_path)
directory = script.ScriptDirectory.from_config(cfg)
engine = create_engine(database_url)
with engine.begin() as connection:
context = migration.MigrationContext.configure(connection)
current_heads = set(context.get_current_heads())
latest_heads = set(directory.get_heads())
return current_heads == latest_heads
以编程方式运行迁移
from alembic import command
from alembic.config import Config
def run_migrations(alembic_ini_path):
"""运行所有待处理迁移"""
alembic_cfg = Config(alembic_ini_path)
command.upgrade(alembic_cfg, "head")
def create_migration(alembic_ini_path, message, autogenerate=True):
"""创建新迁移"""
alembic_cfg = Config(alembic_ini_path)
command.revision(alembic_cfg, message=message, autogenerate=autogenerate)
自定义迁移操作
from alembic.autogenerate import rewriter
from alembic.operations import ops
writer = rewriter.Rewriter()
@writer.rewrites(ops.AddColumnOp)
def add_column_non_nullable(context, revision, op):
"""分两步添加非空列"""
if not op.column.nullable:
op.column.nullable = True
return [
op,
ops.AlterColumnOp(
op.table_name,
op.column.name,
nullable=False,
existing_type=op.column.type,
schema=op.schema
)
]
return op
# 在env.py中使用
context.configure(
connection=connection,
target_metadata=target_metadata,
process_revision_directives=writer
)
要求
- Python 3.8+:异步支持必需
- SQLAlchemy 2.0+:用于现代异步模式
- PostgreSQL/MySQL/SQLite:支持的数据库
- Alembic 1.8+:迁移工具
常见依赖
# 核心依赖
uv add alembic sqlalchemy
# 用于PostgreSQL异步
uv add asyncpg
# 用于MySQL异步
uv add aiomysql
# 用于SQLite(内置)
# 无需额外包
开发设置
# 在现有项目中初始化Alembic
uv run alembic init alembic
# 为您的模型配置env.py
# 编辑alembic.ini以设置数据库URL
# 首次迁移
uv run alembic revision --autogenerate -m "初始架构"
uv run alembic upgrade head