Supabase+Python技能 supabase-python

利用 Supabase 进行认证和存储,结合 FastAPI、SQLAlchemy/SQLModel 实现高效、安全的数据库访问和API开发。

后端开发 0 次安装 0 次浏览 更新于 3/5/2026

name: supabase-python description: FastAPI 结合 Supabase 和 SQLAlchemy/SQLModel

Supabase + Python 技能

加载方式:base.md + supabase.md + python.md

结合 Supabase 认证和 SQLAlchemy/SQLModel 进行数据库访问的 FastAPI 模式。

参考资源: Supabase Python 客户端 | SQLModel


核心原则

SQLAlchemy/SQLModel 用于查询,Supabase 用于认证/存储。

使用 SQLAlchemy 或 SQLModel 进行类型安全的数据库访问。使用 supabase-py 进行认证、存储和实时功能。FastAPI 作为 API 层。


项目结构

project/
├── src/
│   ├── api/
│   │   ├── __init__.py
│   │   ├── routes/
│   │   │   ├── __init__.py
│   │   │   ├── auth.py
│   │   │   ├── posts.py
│   │   │   └── users.py
│   │   └── deps.py              # 依赖项(认证、数据库)
│   ├── core/
│   │   ├── __init__.py
│   │   ├── config.py            # 设置
│   │   └── security.py          # 认证帮助工具
│   ├── db/
│   │   ├── __init__.py
│   │   ├── session.py           # 数据库会话
│   │   └── models.py            # SQLModel 模型
│   ├── services/
│   │   ├── __init__.py
│   │   └── supabase.py          # Supabase 客户端
│   └── main.py                  # FastAPI 应用
├── supabase/
│   ├── migrations/
│   └── config.toml
├── alembic/                     # Alembic 迁移(替代方案)
├── alembic.ini
├── pyproject.toml
└── .env

设置

安装依赖项

pip install fastapi uvicorn supabase python-dotenv sqlmodel asyncpg alembic

pyproject.toml

[project]
name = "my-app"
version = "0.1.0"
dependencies = [
    "fastapi>=0.109.0",
    "uvicorn[standard]>=0.27.0",
    "supabase>=2.0.0",
    "python-dotenv>=1.0.0",
    "sqlmodel>=0.0.14",
    "asyncpg>=0.29.0",
    "alembic>=1.13.0",
    "pydantic-settings>=2.0.0",
]

[project.optional-dependencies]
dev = [
    "pytest>=7.0.0",
    "pytest-asyncio>=0.23.0",
    "httpx>=0.26.0",
]

环境变量

# .env
SUPABASE_URL=http://localhost:54321
SUPABASE_ANON_KEY=<从 supabase 开始>
SUPABASE_SERVICE_ROLE_KEY=<从 supabase 开始>
DATABASE_URL=postgresql+asyncpg://postgres:postgres@localhost:54322/postgres

配置

src/core/config.py

from pydantic_settings import BaseSettings
from functools import lru_cache


class Settings(BaseSettings):
    # Supabase
    supabase_url: str
    supabase_anon_key: str
    supabase_service_role_key: str

    # 数据库
    database_url: str

    # 应用
    debug: bool = False

    class Config:
        env_file = ".env"
        env_file_encoding = "utf-8"


@lru_cache
def get_settings() -> Settings:
    return Settings()

数据库设置

src/db/session.py

from sqlalchemy.ext.asyncio import create_async_engine, AsyncSession
from sqlalchemy.orm import sessionmaker
from src.core.config import get_settings

settings = get_settings()

engine = create_async_engine(
    settings.database_url,
    echo=settings.debug,
    pool_pre_ping=True,
)

AsyncSessionLocal = sessionmaker(
    engine,
    class_=AsyncSession,
    expire_on_commit=False,
)


async def get_db() -> AsyncSession:
    async with AsyncSessionLocal() as session:
        try:
            yield session
        finally:
            await session.close()

src/db/models.py

from datetime import datetime
from typing import Optional
from uuid import UUID, uuid4
from sqlmodel import SQLModel, Field


class ProfileBase(SQLModel):
    email: str
    name: Optional[str] = None
    avatar_url: Optional[str] = None


class Profile(ProfileBase, table=True):
    __tablename__ = "profiles"

    id: UUID = Field(primary_key=True)  # 引用 auth.users
    created_at: datetime = Field(default_factory=datetime.utcnow)
    updated_at: datetime = Field(default_factory=datetime.utcnow)


class ProfileCreate(ProfileBase):
    id: UUID


class ProfileRead(ProfileBase):
    id: UUID
    created_at: datetime

class PostBase(SQLModel):
    title: str
    content: Optional[str] = None
    published: bool = False


class Post(PostBase, table=True):
    __tablename__ = "posts"

    id: UUID = Field(default_factory=uuid4, primary_key=True)
    author_id: UUID = Field(foreign_key="profiles.id")
    created_at: datetime = Field(default_factory=datetime.utcnow)


class PostCreate(PostBase):
    pass


class PostRead(PostBase):
    id: UUID
    author_id: UUID
    created_at: datetime

Supabase 客户端

src/services/supabase.py

from supabase import create_client, Client
from src.core.config import get_settings

settings = get_settings()


def get_supabase_client() -> Client:
    """使用匿名密钥获取 Supabase 客户端(尊重 RLS)。"""
    return create_client(
        settings.supabase_url,
        settings.supabase_anon_key
    )


def get_supabase_admin() -> Client:
    """使用服务角色获取 Supabase 客户端(绕过 RLS)。"""
    return create_client(
        settings.supabase_url,
        settings.supabase_service_role_key
    )

认证依赖项

src/api/deps.py

from typing import Annotated
from fastapi import Depends, HTTPException, status
from fastapi.security import HTTPBearer, HTTPAuthorizationCredentials
from sqlalchemy.ext.asyncio import AsyncSession
from supabase import Client

from src.db.session import get_db
from src.services.supabase import get_supabase_client

security = HTTPBearer()


async def get_current_user(
    credentials: Annotated[HTTPAuthorizationCredentials, Depends(security)],
) -> dict:
    """验证 JWT 并返回用户。"""
    supabase = get_supabase_client()

    try:
        # 使用 Supabase 验证令牌
        user = supabase.auth.get_user(credentials.credentials)
        if not user or not user.user:
            raise HTTPException(
                status_code=status.HTTP_401_UNAUTHORIZED,
                detail="无效的令牌",
            )
        return user.user
    except Exception as e:
        raise HTTPException(
            status_code=status.HTTP_401_UNAUTHORIZED,
            detail="无效的令牌",
        )


# 类型别名用于依赖注入
CurrentUser = Annotated[dict, Depends(get_current_user)]
DbSession = Annotated[AsyncSession, Depends(get_db)]

API 路由

src/api/routes/auth.py

from fastapi import APIRouter, HTTPException, status
from pydantic import BaseModel, EmailStr

from src.services.supabase import get_supabase_client

router = APIRouter(prefix="/auth", tags=["auth"])


class SignUpRequest(BaseModel):
    email: EmailStr
    password: str


class SignInRequest(BaseModel):
    email: EmailStr
    password: str


class AuthResponse(BaseModel):
    access_token: str
    refresh_token: str
    user_id: str


@router.post("/signup", response_model=AuthResponse)
async def sign_up(request: SignUpRequest):
    supabase = get_supabase_client()

    try:
        response = supabase.auth.sign_up({
            "email": request.email,
            "password": request.password,
        })

        if response.user is None:
            raise HTTPException(
                status_code=status.HTTP_400_BAD_REQUEST,
                detail="注册失败",
            )

        return AuthResponse(
            access_token=response.session.access_token,
            refresh_token=response.session.refresh_token,
            user_id=str(response.user.id),
        )
    except Exception as e:
        raise HTTPException(
            status_code=status.HTTP_400_BAD_REQUEST,
            detail=str(e),
        )


@router.post("/signin", response_model=AuthResponse)
async def sign_in(request: SignInRequest):
    supabase = get_supabase_client()

    try:
        response = supabase.auth.sign_in_with_password({
            "email": request.email,
            "password": request.password,
        })

        return AuthResponse(
            access_token=response.session.access_token,
            refresh_token=response.session.refresh_token,
            user_id=str(response.user.id),
        )
    except Exception as e:
        raise HTTPException(
            status_code=status.HTTP_401_UNAUTHORIZED,
            detail="无效的凭证",
        )


@router.post("/signout")
async def sign_out():
    supabase = get_supabase_client()
    supabase.auth.sign_out()
    return {"message": "已注销"}

src/api/routes/posts.py

from uuid import UUID
from fastapi import APIRouter, HTTPException, status
from sqlmodel import select

from src.api.deps import CurrentUser, DbSession
from src.db.models import Post, PostCreate, PostRead

router = APIRouter(prefix="/posts", tags=["posts"])


@router.get("/", response_model=list[PostRead])
async def list_posts(
    db: DbSession,
    published_only: bool = True,
):
    query = select(Post)
    if published_only:
        query = query.where(Post.published == True)
    query = query.order_by(Post.created_at.desc())

    result = await db.execute(query)
    return result.scalars().all()


@router.get("/me", response_model=list[PostRead])
async def list_my_posts(
    db: DbSession,
    user: CurrentUser,
):
    query = select(Post).where(Post.author_id == UUID(user.id))
    result = await db.execute(query)
    return result.scalars().all()


@router.post("/", response_model=PostRead, status_code=status.HTTP_201_CREATED)
async def create_post(
    db: DbSession,
    user: CurrentUser,
    post_in: PostCreate,
):
    post = Post(
        **post_in.model_dump(),
        author_id=UUID(user.id),
    )
    db.add(post)
    await db.commit()
    await db.refresh(post)
    return post


@router.get("/{post_id}", response_model=PostRead)
async def get_post(
    db: DbSession,
    post_id: UUID,
):
    result = await db.execute(select(Post).where(Post.id == post_id))
    post = result.scalar_one_or_none()

    if not post:
        raise HTTPException(
            status_code=status.HTTP_404_NOT_FOUND,
            detail="找不到帖子",
        )

    return post


@router.delete("/{post_id}", status_code=status.HTTP_204_NO_CONTENT)
async def delete_post(
    db: DbSession,
    user: CurrentUser,
    post_id: UUID,
):
    result = await db.execute(
        select(Post).where(Post.id == post_id, Post.author_id == UUID(user.id))
    )
    post = result.scalar_one_or_none()

    if not post:
        raise HTTPException(
            status_code=status.HTTP_404_NOT_FOUND,
            detail="找不到帖子",
        )

    await db.delete(post)
    await db.commit()

主应用程序

src/main.py

from fastapi import FastAPI
from fastapi.middleware.cors import CORSMiddleware

from src.api.routes import auth, posts

app = FastAPI(title="我的 API")

# CORS
app.add_middleware(
    CORSMiddleware,
    allow_origins=["*"],  # 生产环境配置
    allow_credentials=True,
    allow_methods=["*"],
    allow_headers=["*"],
)

# 路由
app.include_router(auth.router, prefix="/api")
app.include_router(posts.router, prefix="/api")


@app.get("/health")
async def health_check():
    return {"status": "healthy"}

Alembic 迁移

初始化 Alembic

alembic init alembic

alembic/env.py(关键更改)

from src.db.models import SQLModel
from src.core.config import get_settings

settings = get_settings()

# 使用异步引擎
config.set_main_option("sqlalchemy.url", settings.database_url)

target_metadata = SQLModel.metadata


def run_migrations_online():
    # 用于异步
    import asyncio
    from sqlalchemy.ext.asyncio import create_async_engine

    connectable = create_async_engine(settings.database_url)

    async def do_run_migrations():
        async with connectable.connect() as connection:
            await connection.run_sync(do_run_migrations_sync)

    def do_run_migrations_sync(connection):
        context.configure(
            connection=connection,
            target_metadata=target_metadata,
        )
        with context.begin_transaction():
            context.run_migrations()

    asyncio.run(do_run_migrations())

迁移命令

# 创建迁移
alembic revision --autogenerate -m "创建帖子表"

# 应用迁移
alembic upgrade head

# 回滚
alembic downgrade -1

存储

上传文件

from fastapi import UploadFile
from src.services.supabase import get_supabase_client


async def upload_avatar(user_id: str, file: UploadFile) -> str:
    supabase = get_supabase_client()

    file_content = await file.read()
    file_path = f"{user_id}/avatar.{file.filename.split('.')[-1]}"

    response = supabase.storage.from_("avatars").upload(
        file_path,
        file_content,
        {"content-type": file.content_type, "upsert": "true"},
    )

    # 获取公共 URL
    url = supabase.storage.from_("avatars").get_public_url(file_path)
    return url

下载文件

def get_avatar_url(user_id: str) -> str:
    supabase = get_supabase_client()
    return supabase.storage.from_("avatars").get_public_url(f"{user_id}/avatar.png")

实时(异步)

import asyncio
from supabase import create_client


async def listen_to_posts():
    supabase = create_client(
        settings.supabase_url,
        settings.supabase_anon_key
    )

    def handle_change(payload):
        print(f"收到更改:{payload}")

    channel = supabase.channel("posts")
    channel.on_postgres_changes(
        event="*",
        schema="public",
        table="posts",
        callback=handle_change,
    ).subscribe()

    # 持续监听
    while True:
        await asyncio.sleep(1)

测试

tests/conftest.py

import pytest
from httpx import AsyncClient, ASGITransport
from sqlalchemy.ext.asyncio import create_async_engine, AsyncSession
from sqlalchemy.orm import sessionmaker

from src.main import app
from src.db.session import get_db
from src.db.models import SQLModel

TEST_DATABASE_URL = "postgresql+asyncpg://postgres:postgres@localhost:54322/postgres_test"

engine = create_async_engine(TEST_DATABASE_URL)
TestingSessionLocal = sessionmaker(engine, class_=AsyncSession, expire_on_commit=False)


@pytest.fixture(scope="function")
async def db_session():
    async with engine.begin() as conn:
        await conn.run_sync(SQLModel.metadata.create_all)

    async with TestingSessionLocal() as session:
        yield session

    async with engine.begin() as conn:
        await conn.run_sync(SQLModel.metadata.drop_all)


@pytest.fixture
async def client(db_session):
    async def override_get_db():
        yield db_session

    app.dependency_overrides[get_db] = override_get_db

    async with AsyncClient(
        transport=ASGITransport(app=app),
        base_url="http://test",
    ) as ac:
        yield ac

    app.dependency_overrides.clear()

tests/test_posts.py

import pytest
from httpx import AsyncClient


@pytest.mark.asyncio
async def test_list_posts(client: AsyncClient):
    response = await client.get("/api/posts/")
    assert response.status_code == 200
    assert isinstance(response.json(), list)

运行应用程序

# 开发
uvicorn src.main:app --reload --port 8000

# 生产
uvicorn src.main:app --host 0.0.0.0 --port 8000 --workers 4

反模式

  • 使用 Supabase 客户端进行数据库查询 - 使用 SQLAlchemy/SQLModel
  • 同步数据库调用 - 使用异步与 asyncpg
  • 硬编码凭证 - 使用环境变量
  • 缺少连接池 - asyncpg 处理此问题
  • 缺少认证依赖项 - 始终验证 JWT
  • 不关闭会话 - 使用上下文管理器
  • 在异步中阻塞 I/O - 使用异步库