FastAPI-Dapr后端脚手架Skill scaffolding-fastapi-dapr

这是一个用于快速构建现代化、生产就绪的 Python 后端服务的技能模板。它集成了 FastAPI 高性能 Web 框架、SQLModel 异步数据库 ORM、Dapr 微服务运行时以及 JWT/JWKS 安全认证。核心功能包括:RESTful API 开发、事件驱动架构、异步数据库操作、定时任务调度、结构化日志和分层架构设计。适用于开发微服务、云原生应用、事件处理系统和需要高并发、可扩展性的企业级后端。关键词:FastAPI, Dapr, 微服务, 后端开发, Python, 异步编程, 云原生, JWT 认证, SQLModel, 事件驱动。

后端开发 0 次安装 0 次浏览 更新于 3/2/2026

名称: scaffolding-fastapi-dapr 描述: | 使用 SQLModel、Dapr 集成和 JWT 身份验证构建生产级 FastAPI 后端。 适用于:使用 Neon PostgreSQL 构建 REST API、使用 Dapr 发布/订阅实现事件驱动微服务、调度作业或创建带有 JWT/JWKS 验证的 CRUD 端点。 不适用于:构建简单脚本或非微服务架构。

FastAPI + Dapr 后端

使用 SQLModel、Dapr 集成和 JWT 身份验证构建生产级 FastAPI 后端。

快速开始

# 项目设置
uv init backend && cd backend
uv add fastapi sqlmodel pydantic httpx python-jose uvicorn

# 开发模式
uv run uvicorn main:app --reload --port 8000

# 使用 Dapr 边车
dapr run --app-id myapp --app-port 8000 -- uvicorn main:app

FastAPI 核心模式

1. SQLModel 模式(数据库 + API)

from sqlmodel import SQLModel, Field
from datetime import datetime
from typing import Optional, Literal

class TaskBase(SQLModel):
    title: str = Field(max_length=200, index=True)
    status: Literal["pending", "in_progress", "completed"] = "pending"

class Task(TaskBase, table=True):
    id: Optional[int] = Field(default=None, primary_key=True)
    created_at: datetime = Field(default_factory=datetime.now)

class TaskCreate(TaskBase):
    pass

class TaskRead(TaskBase):
    id: int
    created_at: datetime

2. 异步数据库设置

from sqlmodel.ext.asyncio.session import AsyncSession
from sqlalchemy.ext.asyncio import create_async_engine
import os

DATABASE_URL = os.getenv("DATABASE_URL").replace("postgresql://", "postgresql+asyncpg://")
engine = create_async_engine(DATABASE_URL)

async def get_session() -> AsyncSession:
    async with AsyncSession(engine) as session:
        yield session

3. CRUD 端点

from fastapi import FastAPI, Depends, HTTPException
from sqlmodel import select

app = FastAPI()

@app.post("/tasks", response_model=TaskRead, status_code=201)
async def create_task(task: TaskCreate, session: AsyncSession = Depends(get_session)):
    db_task = Task.model_validate(task)
    session.add(db_task)
    await session.commit()
    await session.refresh(db_task)
    return db_task

@app.get("/tasks/{task_id}", response_model=TaskRead)
async def get_task(task_id: int, session: AsyncSession = Depends(get_session)):
    task = await session.get(Task, task_id)
    if not task:
        raise HTTPException(status_code=404, detail="未找到")
    return task

@app.patch("/tasks/{task_id}", response_model=TaskRead)
async def update_task(task_id: int, update: TaskUpdate, session: AsyncSession = Depends(get_session)):
    task = await session.get(Task, task_id)
    if not task:
        raise HTTPException(status_code=404, detail="未找到")
    update_data = update.model_dump(exclude_unset=True)
    task.sqlmodel_update(update_data)
    session.add(task)
    await session.commit()
    await session.refresh(task)
    return task

4. JWT/JWKS 身份验证

from jose import jwt
import httpx

JWKS_URL = f"{SSO_URL}/.well-known/jwks.json"

async def get_current_user(authorization: str = Header()):
    token = authorization.replace("Bearer ", "")
    async with httpx.AsyncClient() as client:
        jwks = (await client.get(JWKS_URL)).json()
    payload = jwt.decode(token, jwks, algorithms=["RS256"])
    return payload

@app.get("/protected")
async def protected_route(user = Depends(get_current_user)):
    return {"user": user["sub"]}

有关审计日志、分页和 OpenAPI 配置,请参阅 references/fastapi-patterns.md


Dapr 集成模式

1. 发布/订阅订阅

from fastapi import APIRouter, Request

router = APIRouter(prefix="/dapr", tags=["Dapr"])

@router.get("/subscribe")
async def subscribe():
    """Dapr 调用此方法来发现订阅。"""
    return [{
        "pubsubname": "pubsub",
        "topic": "task-created",
        "route": "/dapr/task-created"
    }]

@router.post("/task-created")
async def handle_task_created(request: Request, session: AsyncSession = Depends(get_session)):
    # CloudEvent 包装器 - 数据是嵌套的
    event = await request.json()
    task_data = event.get("data", event)  # 处理包装和未包装的数据

    # 处理事件
    task = Task.model_validate(task_data)
    session.add(task)
    await session.commit()
    return {"status": "已处理"}

2. 发布事件

import httpx

DAPR_URL = "http://localhost:3500"

async def publish_event(topic: str, data: dict):
    async with httpx.AsyncClient() as client:
        await client.post(
            f"{DAPR_URL}/v1.0/publish/pubsub/{topic}",
            json=data,
            headers={"Content-Type": "application/json"}
        )

3. 定时任务

# 通过 Dapr Jobs API(Alpha)调度任务
async def schedule_job(name: str, schedule: str, callback_url: str, data: dict):
    async with httpx.AsyncClient() as client:
        await client.post(
            f"{DAPR_URL}/v1.0-alpha1/jobs/{name}",
            json={
                "schedule": schedule,  # "@every 5m" 或 "0 */5 * * * *"
                "data": data,
            },
            headers={"dapr-app-callback-url": callback_url}
        )

# 任务回调端点
@app.post("/jobs/process")
async def process_job(request: Request):
    job_data = await request.json()
    # 处理任务执行
    return {"status": "已完成"}

有关状态管理和高级模式,请参阅 references/dapr-patterns.md


生产模式

结构化日志

import structlog

structlog.configure(
    processors=[
        structlog.contextvars.merge_contextvars,
        structlog.processors.TimeStamper(fmt="iso"),
        structlog.processors.JSONRenderer()
    ]
)
log = structlog.get_logger()
log.info("task_created", task_id=task.id, user_id=user["sub"])

仓储 + 服务模式

# 仓储层:仅数据访问
class TaskRepository:
    def __init__(self, session: AsyncSession):
        self.session = session

    async def create(self, task: TaskCreate) -> Task:
        db_task = Task.model_validate(task)
        self.session.add(db_task)
        await self.session.commit()
        return db_task

# 服务层:业务逻辑
class TaskService:
    def __init__(self, repo: TaskRepository):
        self.repo = repo

    async def create_task(self, task: TaskCreate, user_id: str) -> Task:
        # 业务逻辑在此处
        return await self.repo.create(task)

# 依赖注入
def get_task_service(session: AsyncSession = Depends(get_session)):
    return TaskService(TaskRepository(session))

异步测试

@pytest.fixture
async def client(session):
    app.dependency_overrides[get_session] = lambda: session
    async with AsyncClient(
        transport=ASGITransport(app=app),
        base_url="http://test"
    ) as ac:
        yield ac

@pytest.mark.anyio
async def test_create_task(client: AsyncClient):
    response = await client.post("/tasks", json={"title": "Test"})
    assert response.status_code == 201

完整模式请参阅 references/production-testing.md


项目结构

backend/
├── app/
│   ├── __init__.py
│   ├── main.py           # FastAPI 应用
│   ├── database.py       # 异步引擎 + 会话
│   ├── models/           # SQLModel 模式
│   ├── routers/          # API 路由
│   ├── repositories/     # 数据访问层
│   ├── services/         # 业务逻辑层
│   └── dapr/             # Dapr 处理器
├── tests/
│   ├── conftest.py       # 测试夹具
│   └── test_*.py         # 测试文件
├── components/           # Dapr 组件(k8s)
│   ├── pubsub.yaml
│   └── statestore.yaml
└── pyproject.toml

验证

运行:python3 scripts/verify.py

预期输出:✓ scaffolding-fastapi-dapr skill ready

如果验证失败

  1. 检查:references/ 文件夹是否包含两个模式文件
  2. 如果仍然失败,请停止并报告

相关技能

  • configuring-better-auth - 用于 API 端点的 JWT/JWKS 身份验证
  • fetching-library-docs - FastAPI 文档:--library-id /fastapi/fastapi --topic dependencies

参考资料