SQLModelCRUD技能Skill sqlmodel-crud

SQLModel CRUD 技能提供数据库模型和 CRUD 操作的专家指导,包括 Pydantic 集成、异步会话管理、查询构建以及实体间的关系配置。

后端开发 0 次安装 0 次浏览 更新于 3/2/2026

SQLModel CRUD 技能

概览

SQLModel 数据库模型和 CRUD 操作的专家指导,包括 Pydantic 集成、异步会话管理、带有连接的查询构建,以及 ERP 实体(如学生、费用和出勤)的关系配置。

应用场景

当用户请求以下内容时,将触发此技能:

  • 模型: “学生模型”, “SQLModel”, “数据库模型”, “table=True”
  • CRUD 操作: “创建学生”, “读取费用”, “更新出勤”, “删除记录”
  • 查询构建: “带连接的查询”, “选择语句”, “where 子句”, “分页”
  • 关系: “ForeignKey”, “back_populates”, “关系”, “链接模型”
  • 验证: “Pydantic 验证器”, “字段约束”, “索引”

核心规则

1. 模型:带有 Pydantic 验证的 table=True

# models/student.py
from sqlmodel import SQLModel, Field, Relationship
from datetime import datetime
from typing import Optional, List
from enum import Enum

class StudentRole(str, Enum):
    STUDENT = "student"
    TEACHER = "teacher"
    ADMIN = "admin"

class Student(SQLModel, table=True):
    """带有费用和出勤关系的学生模型"""
    id: Optional[str] = Field(default=None, primary_key=True)
    name: str = Field(min_length=2, max_length=100, index=True)
    email: str = Field(unique=True, index=True)
    phone: Optional[str] = Field(default=None, pattern=r'^\+?[\d\s-]+$')
    password_hash: str
    role: StudentRole = Field(default=StudentRole.STUDENT)
    class_id: Optional[str] = Field(default=None, foreign_key="class.id", index=True)
    created_at: datetime = Field(default_factory=datetime.utcnow)
    updated_at: datetime = Field(default_factory=datetime.utcnow)

    # 关系
    class_obj: Optional["Class"] = Relationship(back_populates="students")
    fees: List["Fee"] = Relationship(back_populates="student")
    attendance: List["Attendance"] = Relationship(back_populates="student")

    class Config:
        from_attributes = True

class Class(SQLModel, table=True):
    """用于组织学生的班级模型"""
    id: Optional[str] = Field(default=None, primary_key=True)
    name: str = Field(min_length=2, max_length=50)
    grade_level: int = Field(ge=1, le=12)
    academic_year: str = Field(max_length=9, pattern=r'^\d{4}-\d{4}$')
    created_at: datetime = Field(default_factory=datetime.utcnow)

    # 关系
    students: List[Student] = Relationship(back_populates="class_obj")

class Fee(SQLModel, table=True):
    """与学生相关联的费用模型"""
    id: Optional[str] = Field(default=None, primary_key=True)
    student_id: str = Field(foreign_key="student.id", index=True)
    amount: float = Field(gt=0)
    description: str = Field(max_length=500)
    status: str = Field(default="pending", pattern=r'^(pending|paid|overdue|waived)$')
    due_date: datetime
    paid_date: Optional[datetime] = None
    created_at: datetime = Field(default_factory=datetime.utcnow)
    updated_at: datetime = Field(default_factory=datetime.utcnow)

    # 关系
    student: Optional[Student] = Relationship(back_populates="fees")

class Attendance(SQLModel, table=True):
    """与学生相关联的出勤模型"""
    id: Optional[str] = Field(default=None, primary_key=True)
    student_id: str = Field(foreign_key="student.id", index=True)
    date: datetime = Field(index=True)
    status: str = Field(pattern=r'^(present|absent|late|excused)$')
    notes: Optional[str] = None
    created_at: datetime = Field(default_factory=datetime.utcnow)

    # 关系
    student: Optional[Student] = Relationship(back_populates="attendance")

要求:

  • 使用 table=True 用于数据库表
  • 添加 index=True 用于频繁查询的字段
  • 使用 unique=True 用于电子邮件和其他唯一字段
  • 添加 foreign_key 用于关系
  • 使用 Pydantic 验证器(min_length, max_length, pattern, ge, gt)
  • 使用 Relationshipback_populates 用于双向链接
  • 包含 created_atupdated_at 时间戳

2. CRUD 操作:异步会话管理

# crud/student.py
from sqlmodel import select, and_
from sqlalchemy.ext.asyncio import AsyncSession
from typing import List, Optional
from datetime import datetime

from models.student import Student
from schemas.student import StudentCreate, StudentUpdate

class StudentCRUD:
    """学生模型的 CRUD 操作"""

    @staticmethod
    async def create(db: AsyncSession, student_data: StudentCreate) -> Student:
        """创建一个新学生"""
        student = Student(
            name=student_data.name,
            email=student_data.email,
            phone=student_data.phone,
            password_hash=student_data.password_hash,
            role=student_data.role,
            class_id=student_data.class_id,
        )
        db.add(student)
        await db.commit()
        await db.refresh(student)
        return student

    @staticmethod
    async def get_by_id(db: AsyncSession, student_id: str) -> Optional[Student]:
        """通过 ID 获取学生"""
        result = await db.execute(
            select(Student).where(Student.id == student_id)
        )
        return result.scalar_one_or_none()

    @staticmethod
    async def get_by_email(db: AsyncSession, email: str) -> Optional[Student]:
        """通过电子邮件获取学生"""
        result = await db.execute(
            select(Student).where(Student.email == email)
        )
        return result.scalar_one_or_none()

    @staticmethod
    async def get_all(
        db: AsyncSession,
        skip: int = 0,
        limit: int = 100,
        class_id: Optional[str] = None,
        role: Optional[str] = None,
    ) -> List[Student]:
        """获取所有学生,支持分页和过滤"""
        query = select(Student)

        if class_id:
            query = query.where(Student.class_id == class_id)
        if role:
            query = query.where(Student.role == role)

        query = query.offset(skip).limit(limit).order_by(Student.created_at.desc())

        result = await db.execute(query)
        return result.scalars().all()

    @staticmethod
    async def update(
        db: AsyncSession,
        student_id: str,
        student_data: StudentUpdate,
    ) -> Optional[Student]:
        """更新学生"""
        student = await StudentCRUD.get_by_id(db, student_id)
        if not student:
            return None

        update_data = student_data.model_dump(exclude_unset=True)
        for field, value in update_data.items():
            setattr(student, field, value)

        student.updated_at = datetime.utcnow()
        await db.commit()
        await db.refresh(student)
        return student

    @staticmethod
    async def delete(db: AsyncSession, student_id: str) -> bool:
        """删除学生"""
        student = await StudentCRUD.get_by_id(db, student_id)
        if not student:
            return False

        await db.delete(student)
        await db.commit()
        return True

    @staticmethod
    async def count(
        db: AsyncSession,
        class_id: Optional[str] = None,
        role: Optional[str] = None,
    ) -> int:
        """统计学生数量,支持过滤"""
        query = select(Student)

        if class_id:
            query = query.where(Student.class_id == class_id)
        if role:
            query = query.where(Student.role == role)

        result = await db.execute(query)
        return len(result.scalars().all())

要求:

  • 使用 AsyncSession 进行异步数据库操作
  • 使用 select(), where(), offset(), limit(), order_by()
  • 返回 Optional[T] 用于单个项目,List[T] 用于列表
  • 使用 scalar_one_or_none() 用于单个结果
  • 使用 scalars().all() 用于列表结果
  • 处理事务,提交/回滚
  • 修改时更新 updated_at 时间戳

3. 查询构建:连接、过滤器、分页

# queries/advanced.py
from sqlmodel import select, and_, or_, func
from sqlalchemy.ext.asyncio import AsyncSession
from typing import Optional

from models.student import Student, Fee, Attendance, Class

class StudentQueries:
    """带有连接的学生高级查询"""

    @staticmethod
    async def get_student_with_fees(
        db: AsyncSession,
        student_id: str,
    ) -> Optional[Student]:
        """获取带有所有费用的学生"""
        result = await db.execute(
            select(Student)
            .where(Student.id == student_id)
            .options(
                # 急切加载费用关系
                selectinload(Student.fees)
            )
        )
        return result.scalar_one_or_none()

    @staticmethod
    async def get_students_with_pending_fees(
        db: AsyncSession,
        skip: int = 0,
        limit: int = 100,
    ) -> List[Student]:
        """获取有待缴费用余额的学生"""
        # 子查询,查找有待缴费用的学生
        subquery = (
            select(Fee.student_id)
            .where(Fee.status == "pending")
            .distinct()
        )

        result = await db.execute(
            select(Student)
            .where(Student.id.in_(subquery))
            .offset(skip)
            .limit(limit)
            .order_by(Student.name)
        )
        return result.scalars().all()

    @staticmethod
    async def get_student_attendance_summary(
        db: AsyncSession,
        student_id: str,
        month: int,
        year: int,
    ) -> dict:
        """获取学生一个月的出勤摘要"""
        from datetime import datetime

        start_date = datetime(year, month, 1)
        end_date = datetime(year, month + 1, 1) if month < 12 else datetime(year + 1, 1, 1)

        result = await db.execute(
            select(
                func.count().label("total_days"),
                func.sum(
                    case((Attendance.status == "present", 1), else_=0)
                ).label("present_days"),
                func.sum(
                    case((Attendance.status == "absent", 1), else_=0)
                ).label("absent_days"),
                func.sum(
                    case((Attendance.status == "late", 1), else_=0)
                ).label("late_days"),
            )
            .where(
                and_(
                    Attendance.student_id == student_id,
                    Attendance.date >= start_date,
                    Attendance.date < end_date,
                )
            )
        )
        row = result.one()
        return {
            "total_days": row.total_days or 0,
            "present_days": row.present_days or 0,
            "absent_days": row.absent_days or 0,
            "late_days": row.late_days or 0,
        }

    @staticmethod
    async def search_students(
        db: AsyncSession,
        search_term: str,
        skip: int = 0,
        limit: int = 100,
    ) -> List[Student]:
        """通过姓名或电子邮件搜索学生"""
        search_pattern = f"%{search_term}%"

        result = await db.execute(
            select(Student)
            .where(
                or_(
                    Student.name.ilike(search_pattern),
                    Student.email.ilike(search_pattern),
                )
            )
            .offset(skip)
            .limit(limit)
            .order_by(Student.name)
        )
        return result.scalars().all()

    @staticmethod
    async def get_students_by_class(
        db: AsyncSession,
        class_id: str,
        include_inactive: bool = False,
    ) -> List[Student]:
        """获取班级中的所有学生"""
        query = select(Student).where(Student.class_id == class_id)

        if not include_inactive:
            # 假设有一个 is_active 字段
            query = query.where(Student.is_active == True)

        query = query.order_by(Student.name)

        result = await db.execute(query)
        return result.scalars().all()

    @staticmethod
    async def get_fee_statistics(
        db: AsyncSession,
        class_id: Optional[str] = None,
    ) -> dict:
        """获取费用统计数据,可按班级过滤"""
        base_query = select(Fee)

        if class_id:
            # 与学生连接以按班级过滤
            base_query = (
                select(Fee)
                .join(Student, Student.id == Fee.student_id)
                .where(Student.class_id == class_id)
            )

        result = await db.execute(
            select(
                func.count().label("total_fees"),
                func.sum(Fee.amount).label("total_amount"),
                func.sum(
                    case((Fee.status == "paid", Fee.amount), else_=0)
                ).label("total_collected"),
                func.sum(
                    case((Fee.status == "pending", Fee.amount), else_=0)
                ).label("total_pending"),
            )
        )
        row = result.one()
        return {
            "total_fees": row.total_fees or 0,
            "total_amount": float(row.total_amount or 0),
            "total_collected": float(row.total_collected or 0),
            "total_pending": float(row.total_pending or 0),
        }

要求:

  • 使用 selectinload() 急切加载关系
  • 使用 subquery 进行复杂过滤
  • 使用 and_()/or_() 进行复合条件
  • 使用 func.count(), func.sum() 进行聚合
  • 使用 case() 进行条件聚合
  • 使用 ilike() 进行不区分大小写的搜索
  • 使用 order_by() 进行排序
  • 使用 offset()/limit() 进行分页

4. 关系:ForeignKey 和 back_populates

# models/relationships.py
from sqlmodel import SQLModel, Field, Relationship
from datetime import datetime
from typing import Optional, List

class Parent(SQLModel, table=True):
    """与学生相关联的家长模型"""
    id: Optional[str] = Field(default=None, primary_key=True)
    name: str
    email: str = Field(unique=True)
    phone: Optional[str]

    # 一对多:家长可以有多个学生
    students: List["Student"] = Relationship(
        back_populates="parent",
        link_model="student_parent_link"  # 多对多通过表
    )

class StudentParentLink(SQLModel, table=True):
    """学生和家长的多对多链接表"""
    student_id: str = Field(foreign_key="student.id", primary_key=True)
    parent_id: str = Field(foreign_key="parent.id", primary_key=True)
    relationship_type: str = Field(default="father")  # father, mother, guardian

# 更新学生模型,添加多对多关系
class Student(SQLModel, table=True):
    id: Optional[str] = Field(default=None, primary_key=True)
    name: str
    email: str = Field(unique=True)

    # 多对多:学生可以有多个家长
    parents: List[Parent] = Relationship(
        back_populates="students",
        link_model=StudentParentLink
    )

# 一对一关系示例
class StudentProfile(SQLModel, table=True):
    """学生一对一的个人资料"""
    id: Optional[str] = Field(default=None, primary_key=True)
    student_id: str = Field(foreign_key="student.id", unique=True)
    bio: Optional[str] = None
    avatar_url: Optional[str]
    preferences: Optional[str] = None  # JSON字符串用于灵活的数据

    student: Optional[Student] = Relationship()

# 自参照关系
class Employee(SQLModel, table=True):
    """带有经理关系的员工"""
    id: Optional[str] = Field(default=None, primary_key=True)
    name: str
    email: str = Field(unique=True)
    manager_id: Optional[str] = Field(foreign_key="employee.id")

    # 自参照关系
    manager: Optional["Employee"] = Relationship(
        back_populates="subordinates",
        sa_relationship_kwargs={"remote_side": "[Employee.id]"}
    )
    subordinates: List["Employee"] = Relationship(back_populates="manager")

要求:

  • 使用 foreign_key 用于关系链接
  • 使用 back_populates 用于双向关系
  • 使用 unique=True 用于一对一关系
  • 使用链接表用于多对多关系
  • 使用 remote_side 用于自参照关系
  • 使用 sa_relationship_kwargs 用于高级 SQLAlchemy 选项

输出要求

代码文件

  1. 模型

    • models/__init__.py
    • models/student.py
    • models/fee.py
    • models/attendance.py
    • models/class.py
  2. CRUD 操作

    • crud/__init__.py
    • crud/student.py
    • crud/fee.py
    • crud/attendance.py
  3. 高级查询

    • queries/__init__.py
    • queries/advanced.py

集成要求

  • @fastapi-app:在 FastAPI 路由中使用模型
  • @fastapi-app/dependencies:数据库会话依赖项
  • @api-client:前端的类型安全响应

文档

  • PHR:为模式设计创建提示历史记录
  • ADR:记录关系选择、索引策略
  • 注释:记录复杂查询和关系

工作流程

  1. 设计模式

    • 确定实体(学生、费用等)
    • 定义关系(一对多、多对多)
    • 添加约束(唯一、外键、索引)
  2. 创建模型

    • 使用带有 table=True 的 SQLModel 类定义
    • 添加带有类型和验证器的字段
    • 使用 back_populates 配置关系
  3. 构建 CRUD 操作

    • 实现创建、读取、更新、删除
    • 添加分页和过滤
    • 处理异步会话
  4. 创建高级查询

    • 实现连接和子查询
    • 添加聚合和摘要
    • 搜索和过滤功能
  5. 测试和优化

    • 测试所有 CRUD 操作
    • 验证关系是否正确工作
    • 使用索引优化慢查询

质量检查表

在完成任何 SQLModel 实现之前:

  • [ ] 频繁查询的索引:为常用于过滤/排序的字段添加 index=True
  • [ ] 约束 FK/唯一:使用 foreign_key 用于关系,unique=True 用于电子邮件
  • [ ] 类型化结果:返回正确类型的 Optional[T] 或 List[T]
  • [ ] 异步支持:使用 AsyncSession 进行所有操作
  • [ ] 关系:使用 back_populates 用于双向链接
  • [ ] 验证:使用 Field 验证器(min_length, max_length, pattern)
  • [ ] 时间戳:包含 created_at 和 updated_at
  • [ ] 级联:为关系配置删除行为
  • [ ] 急切加载:使用 selectinload 用于关系访问
  • [ ] 错误处理:处理 IntegrityError, 外键违规

常见模式

学生模型与 CRUD

# models/student.py
class Student(SQLModel, table=True):
    id: Optional[str] = Field(default=None, primary_key=True)
    name: str = Field(min_length=2, max_length=100, index=True)
    email: str = Field(unique=True, index=True)
    is_active: bool = Field(default=True)
    created_at: datetime = Field(default_factory=datetime.utcnow)

    fees: List["Fee"] = Relationship(back_populates="student")

CRUD 费用

# crud/fee.py
class FeeCRUD:
    @staticmethod
    async def create(db: AsyncSession, fee_data: FeeCreate) -> Fee:
        fee = Fee(**fee_data.model_dump())
        db.add(fee)
        await db.commit()
        await db.refresh(fee)
        return fee

    @staticmethod
    async def get_by_id(db: AsyncSession, fee_id: str) -> Optional[Fee]:
        result = await db.execute(
            select(Fee).where(Fee.id == fee_id)
        )
        return result.scalar_one_or_none()

    @staticmethod
    async def get_pending_by_student(db: AsyncSession, student_id: str) -> List[Fee]:
        result = await db.execute(
            select(Fee)
            .where(
                and_(
                    Fee.student_id == student_id,
                    Fee.status == "pending"
                )
            )
            .order_by(Fee.due_date)
        )
        return result.scalars().all()

带连接的查询

# 使用连接获取带有班级名称的学生
async def get_students_with_class(db: AsyncSession) -> List[dict]:
    result = await db.execute(
        select(
            Student.id,
            Student.name,
            Student.email,
            Class.name.label("class_name"),
        )
        .outerjoin(Class, Student.class_id == Class.id)
        .order_by(Student.name)
    )
    return [dict(row._mapping) for row in result]

数据库设置

# database.py
from sqlmodel import create_engine, SQLModel
from sqlalchemy.ext.asyncio import create_async_engine, AsyncSession
from sqlalchemy.orm import sessionmaker
import os

DATABASE_URL = os.getenv(
    "DATABASE_URL",
    "postgresql+asyncpg://user:password@localhost:5432/erp_db"
)

# 异步引擎
async_engine = create_async_engine(DATABASE_URL, echo=True)
async_session_maker = sessionmaker(
    async_engine,
    class_=AsyncSession,
    expire_on_commit=False,
)

async def init_db():
    """初始化数据库表"""
    async with async_engine.begin() as conn:
        await conn.run_sync(SQLModel.metadata.create_all)

async def get_db() -> AsyncSession:
    async with async_session_maker() as session:
        try:
            yield session
            await session.commit()
        except Exception:
            await session.rollback()
            raise
        finally:
            await session.close()

Alembic 迁移

# alembic/env.py
from sqlalchemy import pool
from sqlalchemy.engine import Connection
from sqlalchemy.ext.asyncio import async_engine_from_url
from alembic import context

# 导入模型以自动生成
from models.student import Student
from models.fee import Fee
from models.attendance import Attendance

target_metadata = SQLModel.metadata

# alembic revision --autogenerate -m "Initial migration"
# alembic upgrade head

参考