name: supabase-python description: FastAPI 结合 Supabase 和 SQLAlchemy/SQLModel
Supabase + Python 技能
加载方式:base.md + supabase.md + python.md
结合 Supabase 认证和 SQLAlchemy/SQLModel 进行数据库访问的 FastAPI 模式。
参考资源: Supabase Python 客户端 | SQLModel
核心原则
SQLAlchemy/SQLModel 用于查询,Supabase 用于认证/存储。
使用 SQLAlchemy 或 SQLModel 进行类型安全的数据库访问。使用 supabase-py 进行认证、存储和实时功能。FastAPI 作为 API 层。
项目结构
project/
├── src/
│ ├── api/
│ │ ├── __init__.py
│ │ ├── routes/
│ │ │ ├── __init__.py
│ │ │ ├── auth.py
│ │ │ ├── posts.py
│ │ │ └── users.py
│ │ └── deps.py # 依赖项(认证、数据库)
│ ├── core/
│ │ ├── __init__.py
│ │ ├── config.py # 设置
│ │ └── security.py # 认证帮助工具
│ ├── db/
│ │ ├── __init__.py
│ │ ├── session.py # 数据库会话
│ │ └── models.py # SQLModel 模型
│ ├── services/
│ │ ├── __init__.py
│ │ └── supabase.py # Supabase 客户端
│ └── main.py # FastAPI 应用
├── supabase/
│ ├── migrations/
│ └── config.toml
├── alembic/ # Alembic 迁移(替代方案)
├── alembic.ini
├── pyproject.toml
└── .env
设置
安装依赖项
pip install fastapi uvicorn supabase python-dotenv sqlmodel asyncpg alembic
pyproject.toml
[project]
name = "my-app"
version = "0.1.0"
dependencies = [
"fastapi>=0.109.0",
"uvicorn[standard]>=0.27.0",
"supabase>=2.0.0",
"python-dotenv>=1.0.0",
"sqlmodel>=0.0.14",
"asyncpg>=0.29.0",
"alembic>=1.13.0",
"pydantic-settings>=2.0.0",
]
[project.optional-dependencies]
dev = [
"pytest>=7.0.0",
"pytest-asyncio>=0.23.0",
"httpx>=0.26.0",
]
环境变量
# .env
SUPABASE_URL=http://localhost:54321
SUPABASE_ANON_KEY=<从 supabase 开始>
SUPABASE_SERVICE_ROLE_KEY=<从 supabase 开始>
DATABASE_URL=postgresql+asyncpg://postgres:postgres@localhost:54322/postgres
配置
src/core/config.py
from pydantic_settings import BaseSettings
from functools import lru_cache
class Settings(BaseSettings):
# Supabase
supabase_url: str
supabase_anon_key: str
supabase_service_role_key: str
# 数据库
database_url: str
# 应用
debug: bool = False
class Config:
env_file = ".env"
env_file_encoding = "utf-8"
@lru_cache
def get_settings() -> Settings:
return Settings()
数据库设置
src/db/session.py
from sqlalchemy.ext.asyncio import create_async_engine, AsyncSession
from sqlalchemy.orm import sessionmaker
from src.core.config import get_settings
settings = get_settings()
engine = create_async_engine(
settings.database_url,
echo=settings.debug,
pool_pre_ping=True,
)
AsyncSessionLocal = sessionmaker(
engine,
class_=AsyncSession,
expire_on_commit=False,
)
async def get_db() -> AsyncSession:
async with AsyncSessionLocal() as session:
try:
yield session
finally:
await session.close()
src/db/models.py
from datetime import datetime
from typing import Optional
from uuid import UUID, uuid4
from sqlmodel import SQLModel, Field
class ProfileBase(SQLModel):
email: str
name: Optional[str] = None
avatar_url: Optional[str] = None
class Profile(ProfileBase, table=True):
__tablename__ = "profiles"
id: UUID = Field(primary_key=True) # 引用 auth.users
created_at: datetime = Field(default_factory=datetime.utcnow)
updated_at: datetime = Field(default_factory=datetime.utcnow)
class ProfileCreate(ProfileBase):
id: UUID
class ProfileRead(ProfileBase):
id: UUID
created_at: datetime
class PostBase(SQLModel):
title: str
content: Optional[str] = None
published: bool = False
class Post(PostBase, table=True):
__tablename__ = "posts"
id: UUID = Field(default_factory=uuid4, primary_key=True)
author_id: UUID = Field(foreign_key="profiles.id")
created_at: datetime = Field(default_factory=datetime.utcnow)
class PostCreate(PostBase):
pass
class PostRead(PostBase):
id: UUID
author_id: UUID
created_at: datetime
Supabase 客户端
src/services/supabase.py
from supabase import create_client, Client
from src.core.config import get_settings
settings = get_settings()
def get_supabase_client() -> Client:
"""使用匿名密钥获取 Supabase 客户端(尊重 RLS)。"""
return create_client(
settings.supabase_url,
settings.supabase_anon_key
)
def get_supabase_admin() -> Client:
"""使用服务角色获取 Supabase 客户端(绕过 RLS)。"""
return create_client(
settings.supabase_url,
settings.supabase_service_role_key
)
认证依赖项
src/api/deps.py
from typing import Annotated
from fastapi import Depends, HTTPException, status
from fastapi.security import HTTPBearer, HTTPAuthorizationCredentials
from sqlalchemy.ext.asyncio import AsyncSession
from supabase import Client
from src.db.session import get_db
from src.services.supabase import get_supabase_client
security = HTTPBearer()
async def get_current_user(
credentials: Annotated[HTTPAuthorizationCredentials, Depends(security)],
) -> dict:
"""验证 JWT 并返回用户。"""
supabase = get_supabase_client()
try:
# 使用 Supabase 验证令牌
user = supabase.auth.get_user(credentials.credentials)
if not user or not user.user:
raise HTTPException(
status_code=status.HTTP_401_UNAUTHORIZED,
detail="无效的令牌",
)
return user.user
except Exception as e:
raise HTTPException(
status_code=status.HTTP_401_UNAUTHORIZED,
detail="无效的令牌",
)
# 类型别名用于依赖注入
CurrentUser = Annotated[dict, Depends(get_current_user)]
DbSession = Annotated[AsyncSession, Depends(get_db)]
API 路由
src/api/routes/auth.py
from fastapi import APIRouter, HTTPException, status
from pydantic import BaseModel, EmailStr
from src.services.supabase import get_supabase_client
router = APIRouter(prefix="/auth", tags=["auth"])
class SignUpRequest(BaseModel):
email: EmailStr
password: str
class SignInRequest(BaseModel):
email: EmailStr
password: str
class AuthResponse(BaseModel):
access_token: str
refresh_token: str
user_id: str
@router.post("/signup", response_model=AuthResponse)
async def sign_up(request: SignUpRequest):
supabase = get_supabase_client()
try:
response = supabase.auth.sign_up({
"email": request.email,
"password": request.password,
})
if response.user is None:
raise HTTPException(
status_code=status.HTTP_400_BAD_REQUEST,
detail="注册失败",
)
return AuthResponse(
access_token=response.session.access_token,
refresh_token=response.session.refresh_token,
user_id=str(response.user.id),
)
except Exception as e:
raise HTTPException(
status_code=status.HTTP_400_BAD_REQUEST,
detail=str(e),
)
@router.post("/signin", response_model=AuthResponse)
async def sign_in(request: SignInRequest):
supabase = get_supabase_client()
try:
response = supabase.auth.sign_in_with_password({
"email": request.email,
"password": request.password,
})
return AuthResponse(
access_token=response.session.access_token,
refresh_token=response.session.refresh_token,
user_id=str(response.user.id),
)
except Exception as e:
raise HTTPException(
status_code=status.HTTP_401_UNAUTHORIZED,
detail="无效的凭证",
)
@router.post("/signout")
async def sign_out():
supabase = get_supabase_client()
supabase.auth.sign_out()
return {"message": "已注销"}
src/api/routes/posts.py
from uuid import UUID
from fastapi import APIRouter, HTTPException, status
from sqlmodel import select
from src.api.deps import CurrentUser, DbSession
from src.db.models import Post, PostCreate, PostRead
router = APIRouter(prefix="/posts", tags=["posts"])
@router.get("/", response_model=list[PostRead])
async def list_posts(
db: DbSession,
published_only: bool = True,
):
query = select(Post)
if published_only:
query = query.where(Post.published == True)
query = query.order_by(Post.created_at.desc())
result = await db.execute(query)
return result.scalars().all()
@router.get("/me", response_model=list[PostRead])
async def list_my_posts(
db: DbSession,
user: CurrentUser,
):
query = select(Post).where(Post.author_id == UUID(user.id))
result = await db.execute(query)
return result.scalars().all()
@router.post("/", response_model=PostRead, status_code=status.HTTP_201_CREATED)
async def create_post(
db: DbSession,
user: CurrentUser,
post_in: PostCreate,
):
post = Post(
**post_in.model_dump(),
author_id=UUID(user.id),
)
db.add(post)
await db.commit()
await db.refresh(post)
return post
@router.get("/{post_id}", response_model=PostRead)
async def get_post(
db: DbSession,
post_id: UUID,
):
result = await db.execute(select(Post).where(Post.id == post_id))
post = result.scalar_one_or_none()
if not post:
raise HTTPException(
status_code=status.HTTP_404_NOT_FOUND,
detail="找不到帖子",
)
return post
@router.delete("/{post_id}", status_code=status.HTTP_204_NO_CONTENT)
async def delete_post(
db: DbSession,
user: CurrentUser,
post_id: UUID,
):
result = await db.execute(
select(Post).where(Post.id == post_id, Post.author_id == UUID(user.id))
)
post = result.scalar_one_or_none()
if not post:
raise HTTPException(
status_code=status.HTTP_404_NOT_FOUND,
detail="找不到帖子",
)
await db.delete(post)
await db.commit()
主应用程序
src/main.py
from fastapi import FastAPI
from fastapi.middleware.cors import CORSMiddleware
from src.api.routes import auth, posts
app = FastAPI(title="我的 API")
# CORS
app.add_middleware(
CORSMiddleware,
allow_origins=["*"], # 生产环境配置
allow_credentials=True,
allow_methods=["*"],
allow_headers=["*"],
)
# 路由
app.include_router(auth.router, prefix="/api")
app.include_router(posts.router, prefix="/api")
@app.get("/health")
async def health_check():
return {"status": "healthy"}
Alembic 迁移
初始化 Alembic
alembic init alembic
alembic/env.py(关键更改)
from src.db.models import SQLModel
from src.core.config import get_settings
settings = get_settings()
# 使用异步引擎
config.set_main_option("sqlalchemy.url", settings.database_url)
target_metadata = SQLModel.metadata
def run_migrations_online():
# 用于异步
import asyncio
from sqlalchemy.ext.asyncio import create_async_engine
connectable = create_async_engine(settings.database_url)
async def do_run_migrations():
async with connectable.connect() as connection:
await connection.run_sync(do_run_migrations_sync)
def do_run_migrations_sync(connection):
context.configure(
connection=connection,
target_metadata=target_metadata,
)
with context.begin_transaction():
context.run_migrations()
asyncio.run(do_run_migrations())
迁移命令
# 创建迁移
alembic revision --autogenerate -m "创建帖子表"
# 应用迁移
alembic upgrade head
# 回滚
alembic downgrade -1
存储
上传文件
from fastapi import UploadFile
from src.services.supabase import get_supabase_client
async def upload_avatar(user_id: str, file: UploadFile) -> str:
supabase = get_supabase_client()
file_content = await file.read()
file_path = f"{user_id}/avatar.{file.filename.split('.')[-1]}"
response = supabase.storage.from_("avatars").upload(
file_path,
file_content,
{"content-type": file.content_type, "upsert": "true"},
)
# 获取公共 URL
url = supabase.storage.from_("avatars").get_public_url(file_path)
return url
下载文件
def get_avatar_url(user_id: str) -> str:
supabase = get_supabase_client()
return supabase.storage.from_("avatars").get_public_url(f"{user_id}/avatar.png")
实时(异步)
import asyncio
from supabase import create_client
async def listen_to_posts():
supabase = create_client(
settings.supabase_url,
settings.supabase_anon_key
)
def handle_change(payload):
print(f"收到更改:{payload}")
channel = supabase.channel("posts")
channel.on_postgres_changes(
event="*",
schema="public",
table="posts",
callback=handle_change,
).subscribe()
# 持续监听
while True:
await asyncio.sleep(1)
测试
tests/conftest.py
import pytest
from httpx import AsyncClient, ASGITransport
from sqlalchemy.ext.asyncio import create_async_engine, AsyncSession
from sqlalchemy.orm import sessionmaker
from src.main import app
from src.db.session import get_db
from src.db.models import SQLModel
TEST_DATABASE_URL = "postgresql+asyncpg://postgres:postgres@localhost:54322/postgres_test"
engine = create_async_engine(TEST_DATABASE_URL)
TestingSessionLocal = sessionmaker(engine, class_=AsyncSession, expire_on_commit=False)
@pytest.fixture(scope="function")
async def db_session():
async with engine.begin() as conn:
await conn.run_sync(SQLModel.metadata.create_all)
async with TestingSessionLocal() as session:
yield session
async with engine.begin() as conn:
await conn.run_sync(SQLModel.metadata.drop_all)
@pytest.fixture
async def client(db_session):
async def override_get_db():
yield db_session
app.dependency_overrides[get_db] = override_get_db
async with AsyncClient(
transport=ASGITransport(app=app),
base_url="http://test",
) as ac:
yield ac
app.dependency_overrides.clear()
tests/test_posts.py
import pytest
from httpx import AsyncClient
@pytest.mark.asyncio
async def test_list_posts(client: AsyncClient):
response = await client.get("/api/posts/")
assert response.status_code == 200
assert isinstance(response.json(), list)
运行应用程序
# 开发
uvicorn src.main:app --reload --port 8000
# 生产
uvicorn src.main:app --host 0.0.0.0 --port 8000 --workers 4
反模式
- 使用 Supabase 客户端进行数据库查询 - 使用 SQLAlchemy/SQLModel
- 同步数据库调用 - 使用异步与 asyncpg
- 硬编码凭证 - 使用环境变量
- 缺少连接池 - asyncpg 处理此问题
- 缺少认证依赖项 - 始终验证 JWT
- 不关闭会话 - 使用上下文管理器
- 在异步中阻塞 I/O - 使用异步库