SQLModel CRUD 技能
概览
SQLModel 数据库模型和 CRUD 操作的专家指导,包括 Pydantic 集成、异步会话管理、带有连接的查询构建,以及 ERP 实体(如学生、费用和出勤)的关系配置。
应用场景
当用户请求以下内容时,将触发此技能:
- 模型: “学生模型”, “SQLModel”, “数据库模型”, “table=True”
- CRUD 操作: “创建学生”, “读取费用”, “更新出勤”, “删除记录”
- 查询构建: “带连接的查询”, “选择语句”, “where 子句”, “分页”
- 关系: “ForeignKey”, “back_populates”, “关系”, “链接模型”
- 验证: “Pydantic 验证器”, “字段约束”, “索引”
核心规则
1. 模型:带有 Pydantic 验证的 table=True
# models/student.py
from sqlmodel import SQLModel, Field, Relationship
from datetime import datetime
from typing import Optional, List
from enum import Enum
class StudentRole(str, Enum):
STUDENT = "student"
TEACHER = "teacher"
ADMIN = "admin"
class Student(SQLModel, table=True):
"""带有费用和出勤关系的学生模型"""
id: Optional[str] = Field(default=None, primary_key=True)
name: str = Field(min_length=2, max_length=100, index=True)
email: str = Field(unique=True, index=True)
phone: Optional[str] = Field(default=None, pattern=r'^\+?[\d\s-]+$')
password_hash: str
role: StudentRole = Field(default=StudentRole.STUDENT)
class_id: Optional[str] = Field(default=None, foreign_key="class.id", index=True)
created_at: datetime = Field(default_factory=datetime.utcnow)
updated_at: datetime = Field(default_factory=datetime.utcnow)
# 关系
class_obj: Optional["Class"] = Relationship(back_populates="students")
fees: List["Fee"] = Relationship(back_populates="student")
attendance: List["Attendance"] = Relationship(back_populates="student")
class Config:
from_attributes = True
class Class(SQLModel, table=True):
"""用于组织学生的班级模型"""
id: Optional[str] = Field(default=None, primary_key=True)
name: str = Field(min_length=2, max_length=50)
grade_level: int = Field(ge=1, le=12)
academic_year: str = Field(max_length=9, pattern=r'^\d{4}-\d{4}$')
created_at: datetime = Field(default_factory=datetime.utcnow)
# 关系
students: List[Student] = Relationship(back_populates="class_obj")
class Fee(SQLModel, table=True):
"""与学生相关联的费用模型"""
id: Optional[str] = Field(default=None, primary_key=True)
student_id: str = Field(foreign_key="student.id", index=True)
amount: float = Field(gt=0)
description: str = Field(max_length=500)
status: str = Field(default="pending", pattern=r'^(pending|paid|overdue|waived)$')
due_date: datetime
paid_date: Optional[datetime] = None
created_at: datetime = Field(default_factory=datetime.utcnow)
updated_at: datetime = Field(default_factory=datetime.utcnow)
# 关系
student: Optional[Student] = Relationship(back_populates="fees")
class Attendance(SQLModel, table=True):
"""与学生相关联的出勤模型"""
id: Optional[str] = Field(default=None, primary_key=True)
student_id: str = Field(foreign_key="student.id", index=True)
date: datetime = Field(index=True)
status: str = Field(pattern=r'^(present|absent|late|excused)$')
notes: Optional[str] = None
created_at: datetime = Field(default_factory=datetime.utcnow)
# 关系
student: Optional[Student] = Relationship(back_populates="attendance")
要求:
- 使用
table=True用于数据库表 - 添加
index=True用于频繁查询的字段 - 使用
unique=True用于电子邮件和其他唯一字段 - 添加
foreign_key用于关系 - 使用 Pydantic 验证器(min_length, max_length, pattern, ge, gt)
- 使用
Relationship与back_populates用于双向链接 - 包含
created_at和updated_at时间戳
2. CRUD 操作:异步会话管理
# crud/student.py
from sqlmodel import select, and_
from sqlalchemy.ext.asyncio import AsyncSession
from typing import List, Optional
from datetime import datetime
from models.student import Student
from schemas.student import StudentCreate, StudentUpdate
class StudentCRUD:
"""学生模型的 CRUD 操作"""
@staticmethod
async def create(db: AsyncSession, student_data: StudentCreate) -> Student:
"""创建一个新学生"""
student = Student(
name=student_data.name,
email=student_data.email,
phone=student_data.phone,
password_hash=student_data.password_hash,
role=student_data.role,
class_id=student_data.class_id,
)
db.add(student)
await db.commit()
await db.refresh(student)
return student
@staticmethod
async def get_by_id(db: AsyncSession, student_id: str) -> Optional[Student]:
"""通过 ID 获取学生"""
result = await db.execute(
select(Student).where(Student.id == student_id)
)
return result.scalar_one_or_none()
@staticmethod
async def get_by_email(db: AsyncSession, email: str) -> Optional[Student]:
"""通过电子邮件获取学生"""
result = await db.execute(
select(Student).where(Student.email == email)
)
return result.scalar_one_or_none()
@staticmethod
async def get_all(
db: AsyncSession,
skip: int = 0,
limit: int = 100,
class_id: Optional[str] = None,
role: Optional[str] = None,
) -> List[Student]:
"""获取所有学生,支持分页和过滤"""
query = select(Student)
if class_id:
query = query.where(Student.class_id == class_id)
if role:
query = query.where(Student.role == role)
query = query.offset(skip).limit(limit).order_by(Student.created_at.desc())
result = await db.execute(query)
return result.scalars().all()
@staticmethod
async def update(
db: AsyncSession,
student_id: str,
student_data: StudentUpdate,
) -> Optional[Student]:
"""更新学生"""
student = await StudentCRUD.get_by_id(db, student_id)
if not student:
return None
update_data = student_data.model_dump(exclude_unset=True)
for field, value in update_data.items():
setattr(student, field, value)
student.updated_at = datetime.utcnow()
await db.commit()
await db.refresh(student)
return student
@staticmethod
async def delete(db: AsyncSession, student_id: str) -> bool:
"""删除学生"""
student = await StudentCRUD.get_by_id(db, student_id)
if not student:
return False
await db.delete(student)
await db.commit()
return True
@staticmethod
async def count(
db: AsyncSession,
class_id: Optional[str] = None,
role: Optional[str] = None,
) -> int:
"""统计学生数量,支持过滤"""
query = select(Student)
if class_id:
query = query.where(Student.class_id == class_id)
if role:
query = query.where(Student.role == role)
result = await db.execute(query)
return len(result.scalars().all())
要求:
- 使用
AsyncSession进行异步数据库操作 - 使用
select(),where(),offset(),limit(),order_by() - 返回
Optional[T]用于单个项目,List[T]用于列表 - 使用
scalar_one_or_none()用于单个结果 - 使用
scalars().all()用于列表结果 - 处理事务,提交/回滚
- 修改时更新
updated_at时间戳
3. 查询构建:连接、过滤器、分页
# queries/advanced.py
from sqlmodel import select, and_, or_, func
from sqlalchemy.ext.asyncio import AsyncSession
from typing import Optional
from models.student import Student, Fee, Attendance, Class
class StudentQueries:
"""带有连接的学生高级查询"""
@staticmethod
async def get_student_with_fees(
db: AsyncSession,
student_id: str,
) -> Optional[Student]:
"""获取带有所有费用的学生"""
result = await db.execute(
select(Student)
.where(Student.id == student_id)
.options(
# 急切加载费用关系
selectinload(Student.fees)
)
)
return result.scalar_one_or_none()
@staticmethod
async def get_students_with_pending_fees(
db: AsyncSession,
skip: int = 0,
limit: int = 100,
) -> List[Student]:
"""获取有待缴费用余额的学生"""
# 子查询,查找有待缴费用的学生
subquery = (
select(Fee.student_id)
.where(Fee.status == "pending")
.distinct()
)
result = await db.execute(
select(Student)
.where(Student.id.in_(subquery))
.offset(skip)
.limit(limit)
.order_by(Student.name)
)
return result.scalars().all()
@staticmethod
async def get_student_attendance_summary(
db: AsyncSession,
student_id: str,
month: int,
year: int,
) -> dict:
"""获取学生一个月的出勤摘要"""
from datetime import datetime
start_date = datetime(year, month, 1)
end_date = datetime(year, month + 1, 1) if month < 12 else datetime(year + 1, 1, 1)
result = await db.execute(
select(
func.count().label("total_days"),
func.sum(
case((Attendance.status == "present", 1), else_=0)
).label("present_days"),
func.sum(
case((Attendance.status == "absent", 1), else_=0)
).label("absent_days"),
func.sum(
case((Attendance.status == "late", 1), else_=0)
).label("late_days"),
)
.where(
and_(
Attendance.student_id == student_id,
Attendance.date >= start_date,
Attendance.date < end_date,
)
)
)
row = result.one()
return {
"total_days": row.total_days or 0,
"present_days": row.present_days or 0,
"absent_days": row.absent_days or 0,
"late_days": row.late_days or 0,
}
@staticmethod
async def search_students(
db: AsyncSession,
search_term: str,
skip: int = 0,
limit: int = 100,
) -> List[Student]:
"""通过姓名或电子邮件搜索学生"""
search_pattern = f"%{search_term}%"
result = await db.execute(
select(Student)
.where(
or_(
Student.name.ilike(search_pattern),
Student.email.ilike(search_pattern),
)
)
.offset(skip)
.limit(limit)
.order_by(Student.name)
)
return result.scalars().all()
@staticmethod
async def get_students_by_class(
db: AsyncSession,
class_id: str,
include_inactive: bool = False,
) -> List[Student]:
"""获取班级中的所有学生"""
query = select(Student).where(Student.class_id == class_id)
if not include_inactive:
# 假设有一个 is_active 字段
query = query.where(Student.is_active == True)
query = query.order_by(Student.name)
result = await db.execute(query)
return result.scalars().all()
@staticmethod
async def get_fee_statistics(
db: AsyncSession,
class_id: Optional[str] = None,
) -> dict:
"""获取费用统计数据,可按班级过滤"""
base_query = select(Fee)
if class_id:
# 与学生连接以按班级过滤
base_query = (
select(Fee)
.join(Student, Student.id == Fee.student_id)
.where(Student.class_id == class_id)
)
result = await db.execute(
select(
func.count().label("total_fees"),
func.sum(Fee.amount).label("total_amount"),
func.sum(
case((Fee.status == "paid", Fee.amount), else_=0)
).label("total_collected"),
func.sum(
case((Fee.status == "pending", Fee.amount), else_=0)
).label("total_pending"),
)
)
row = result.one()
return {
"total_fees": row.total_fees or 0,
"total_amount": float(row.total_amount or 0),
"total_collected": float(row.total_collected or 0),
"total_pending": float(row.total_pending or 0),
}
要求:
- 使用
selectinload()急切加载关系 - 使用
subquery进行复杂过滤 - 使用
and_()/or_()进行复合条件 - 使用
func.count(),func.sum()进行聚合 - 使用
case()进行条件聚合 - 使用
ilike()进行不区分大小写的搜索 - 使用
order_by()进行排序 - 使用
offset()/limit()进行分页
4. 关系:ForeignKey 和 back_populates
# models/relationships.py
from sqlmodel import SQLModel, Field, Relationship
from datetime import datetime
from typing import Optional, List
class Parent(SQLModel, table=True):
"""与学生相关联的家长模型"""
id: Optional[str] = Field(default=None, primary_key=True)
name: str
email: str = Field(unique=True)
phone: Optional[str]
# 一对多:家长可以有多个学生
students: List["Student"] = Relationship(
back_populates="parent",
link_model="student_parent_link" # 多对多通过表
)
class StudentParentLink(SQLModel, table=True):
"""学生和家长的多对多链接表"""
student_id: str = Field(foreign_key="student.id", primary_key=True)
parent_id: str = Field(foreign_key="parent.id", primary_key=True)
relationship_type: str = Field(default="father") # father, mother, guardian
# 更新学生模型,添加多对多关系
class Student(SQLModel, table=True):
id: Optional[str] = Field(default=None, primary_key=True)
name: str
email: str = Field(unique=True)
# 多对多:学生可以有多个家长
parents: List[Parent] = Relationship(
back_populates="students",
link_model=StudentParentLink
)
# 一对一关系示例
class StudentProfile(SQLModel, table=True):
"""学生一对一的个人资料"""
id: Optional[str] = Field(default=None, primary_key=True)
student_id: str = Field(foreign_key="student.id", unique=True)
bio: Optional[str] = None
avatar_url: Optional[str]
preferences: Optional[str] = None # JSON字符串用于灵活的数据
student: Optional[Student] = Relationship()
# 自参照关系
class Employee(SQLModel, table=True):
"""带有经理关系的员工"""
id: Optional[str] = Field(default=None, primary_key=True)
name: str
email: str = Field(unique=True)
manager_id: Optional[str] = Field(foreign_key="employee.id")
# 自参照关系
manager: Optional["Employee"] = Relationship(
back_populates="subordinates",
sa_relationship_kwargs={"remote_side": "[Employee.id]"}
)
subordinates: List["Employee"] = Relationship(back_populates="manager")
要求:
- 使用
foreign_key用于关系链接 - 使用
back_populates用于双向关系 - 使用
unique=True用于一对一关系 - 使用链接表用于多对多关系
- 使用
remote_side用于自参照关系 - 使用
sa_relationship_kwargs用于高级 SQLAlchemy 选项
输出要求
代码文件
-
模型:
models/__init__.pymodels/student.pymodels/fee.pymodels/attendance.pymodels/class.py
-
CRUD 操作:
crud/__init__.pycrud/student.pycrud/fee.pycrud/attendance.py
-
高级查询:
queries/__init__.pyqueries/advanced.py
集成要求
- @fastapi-app:在 FastAPI 路由中使用模型
- @fastapi-app/dependencies:数据库会话依赖项
- @api-client:前端的类型安全响应
文档
- PHR:为模式设计创建提示历史记录
- ADR:记录关系选择、索引策略
- 注释:记录复杂查询和关系
工作流程
-
设计模式
- 确定实体(学生、费用等)
- 定义关系(一对多、多对多)
- 添加约束(唯一、外键、索引)
-
创建模型
- 使用带有
table=True的 SQLModel 类定义 - 添加带有类型和验证器的字段
- 使用
back_populates配置关系
- 使用带有
-
构建 CRUD 操作
- 实现创建、读取、更新、删除
- 添加分页和过滤
- 处理异步会话
-
创建高级查询
- 实现连接和子查询
- 添加聚合和摘要
- 搜索和过滤功能
-
测试和优化
- 测试所有 CRUD 操作
- 验证关系是否正确工作
- 使用索引优化慢查询
质量检查表
在完成任何 SQLModel 实现之前:
- [ ] 频繁查询的索引:为常用于过滤/排序的字段添加 index=True
- [ ] 约束 FK/唯一:使用 foreign_key 用于关系,unique=True 用于电子邮件
- [ ] 类型化结果:返回正确类型的 Optional[T] 或 List[T]
- [ ] 异步支持:使用 AsyncSession 进行所有操作
- [ ] 关系:使用 back_populates 用于双向链接
- [ ] 验证:使用 Field 验证器(min_length, max_length, pattern)
- [ ] 时间戳:包含 created_at 和 updated_at
- [ ] 级联:为关系配置删除行为
- [ ] 急切加载:使用 selectinload 用于关系访问
- [ ] 错误处理:处理 IntegrityError, 外键违规
常见模式
学生模型与 CRUD
# models/student.py
class Student(SQLModel, table=True):
id: Optional[str] = Field(default=None, primary_key=True)
name: str = Field(min_length=2, max_length=100, index=True)
email: str = Field(unique=True, index=True)
is_active: bool = Field(default=True)
created_at: datetime = Field(default_factory=datetime.utcnow)
fees: List["Fee"] = Relationship(back_populates="student")
CRUD 费用
# crud/fee.py
class FeeCRUD:
@staticmethod
async def create(db: AsyncSession, fee_data: FeeCreate) -> Fee:
fee = Fee(**fee_data.model_dump())
db.add(fee)
await db.commit()
await db.refresh(fee)
return fee
@staticmethod
async def get_by_id(db: AsyncSession, fee_id: str) -> Optional[Fee]:
result = await db.execute(
select(Fee).where(Fee.id == fee_id)
)
return result.scalar_one_or_none()
@staticmethod
async def get_pending_by_student(db: AsyncSession, student_id: str) -> List[Fee]:
result = await db.execute(
select(Fee)
.where(
and_(
Fee.student_id == student_id,
Fee.status == "pending"
)
)
.order_by(Fee.due_date)
)
return result.scalars().all()
带连接的查询
# 使用连接获取带有班级名称的学生
async def get_students_with_class(db: AsyncSession) -> List[dict]:
result = await db.execute(
select(
Student.id,
Student.name,
Student.email,
Class.name.label("class_name"),
)
.outerjoin(Class, Student.class_id == Class.id)
.order_by(Student.name)
)
return [dict(row._mapping) for row in result]
数据库设置
# database.py
from sqlmodel import create_engine, SQLModel
from sqlalchemy.ext.asyncio import create_async_engine, AsyncSession
from sqlalchemy.orm import sessionmaker
import os
DATABASE_URL = os.getenv(
"DATABASE_URL",
"postgresql+asyncpg://user:password@localhost:5432/erp_db"
)
# 异步引擎
async_engine = create_async_engine(DATABASE_URL, echo=True)
async_session_maker = sessionmaker(
async_engine,
class_=AsyncSession,
expire_on_commit=False,
)
async def init_db():
"""初始化数据库表"""
async with async_engine.begin() as conn:
await conn.run_sync(SQLModel.metadata.create_all)
async def get_db() -> AsyncSession:
async with async_session_maker() as session:
try:
yield session
await session.commit()
except Exception:
await session.rollback()
raise
finally:
await session.close()
Alembic 迁移
# alembic/env.py
from sqlalchemy import pool
from sqlalchemy.engine import Connection
from sqlalchemy.ext.asyncio import async_engine_from_url
from alembic import context
# 导入模型以自动生成
from models.student import Student
from models.fee import Fee
from models.attendance import Attendance
target_metadata = SQLModel.metadata
# alembic revision --autogenerate -m "Initial migration"
# alembic upgrade head
参考
- SQLModel 文档:https://sqlmodel.tiangolo.com
- SQLAlchemy 关系:https://docs.sqlalchemy.org/en/20/orm/relationships.html
- SQLAlchemy 异步:https://docs.sqlalchemy.org/en/20/orm/extensions/asyncio.html
- Pydantic 验证:https://docs.pydantic.dev/latest/usage/validators/
- Alembic 迁移:https://alembic.sqlalchemy.org/en/latest/