数据库连接技能Skill db-connection

专家级的数据库连接管理,适用于Python/FastAPI与Neon PostgreSQL,包括连接池、SSL配置和SQLAlchemy引擎设置,关键词包括数据库连接、连接池优化、SSL配置、FastAPI集成。

后端开发 0 次安装 0 次浏览 更新于 3/2/2026

数据库连接技能

专家级的数据库连接管理,适用于Python/FastAPI与Neon PostgreSQL,包括连接池、SSL配置和SQLAlchemy引擎设置。

快速参考

任务 文件/方法
获取引擎 get_engine()
获取会话 get_session()
连接字符串 DB_URL 来自设置
健康检查 check_connection()

项目结构

backend/
├── app/
│   ├── db/
│   │   ├── __init__.py
│   │   ├── connection.py    # 引擎和会话设置
│   │   └── session.py       # 依赖注入
│   └── config/
│       └── settings.py      # 环境配置
├── alembic/
│   └── env.py               # 使用这里的连接
└── .env.example

连接配置

设置与数据库URL

# backend/app/config/settings.py
from functools import lru_cache
from pydantic import Field, SecretStr
from pydantic_settings import BaseSettings, SettingsConfigDict


class Settings(BaseSettings):
    model_config = SettingsConfigDict(
        env_file=".env",
        env_file_encoding="utf-8",
        extra="ignore",
    )

    # 数据库配置
    DB_URL: SecretStr = Field(
        ...,
        description="PostgreSQL连接URL",
        examples=["postgresql://user:pass@ep-xxx.us-east-1.aws.neon.tech/dbname?sslmode=require"],
    )
    DB_POOL_SIZE: int = Field(default=5, ge=1, le=100)
    DB_MAX_OVERFLOW: int = Field(default=10, ge=0, le=100)
    DB_POOL_TIMEOUT: int = Field(default=30, ge=1, le=300)
    DB_POOL_RECYCLE: int = Field(default=1800, ge=300)
    DB_ECHO: bool = False


@lru_cache
def get_settings() -> Settings:
    return Settings()

环境变量

# .env.example

# 数据库(Neon PostgreSQL)
# 从Neon仪表板>连接详情获取
# 格式:postgresql://user:pass@host/dbname?sslmode=require
DB_URL="postgresql://username:password@ep-xxx.region.neon.tech/dbname?sslmode=require"

# 连接池设置
DB_POOL_SIZE=5
DB_MAX_OVERFLOW=10
DB_POOL_TIMEOUT=30
DB_POOL_RECYCLE=1800

# 调试(开发时设置为true)
DB_ECHO=false

SQLAlchemy引擎设置

连接模块

# backend/app/db/connection.py
from sqlalchemy import create_engine, event
from sqlalchemy.engine import Engine
from sqlalchemy.orm import sessionmaker, Session
from typing import Generator
import logging

from app.config.settings import get_settings

logger = logging.getLogger(__name__)


def get_db_url() -> str:
    """从设置中获取数据库URL。"""
    settings = get_settings()
    db_url = settings.DB_URL
    # SecretStr有get_secret_value()方法
    if hasattr(db_url, 'get_secret_value'):
        return db_url.get_secret_value()
    return str(db_url)


def create_sqlalchemy_engine() -> Engine:
    """创建SQLAlchemy引擎,针对Neon/PostgreSQL进行优化设置。"""
    settings = get_settings()
    db_url = get_db_url()

    engine = create_engine(
        db_url,
        pool_size=settings.DB_POOL_SIZE,
        max_overflow=settings.DB_MAX_OVERFLOW,
        pool_timeout=settings.DB_POOL_TIMEOUT,
        pool_recycle=settings.DB_POOL_RECYCLE,
        echo=settings.DB_ECHO,
        # PostgreSQL特定设置
        pool_pre_ping=True,  # 使用前验证连接
        isolation_level="AUTOCOMMIT",
    )

    # 启用连接健康检查
    @event.listens_for(engine, "connect")
    def set_session_vars(dbapi_connection, connection_record):
        cursor = dbapi_connection.cursor()
        # 设置会话特性
        cursor.execute("SET statement_timeout = '30s'")
        cursor.execute("SET idle_in_transaction_session_timeout = '60000'")
        cursor.close()

    logger.info(f"数据库引擎创建,pool_size={settings.DB_POOL_SIZE}")
    return engine


def get_engine() -> Engine:
    """获取或创建数据库引擎(单例模式)。"""
    return create_sqlalchemy_engine()

会话管理

# backend/app/db/session.py
from sqlalchemy.orm import sessionmaker, Session
from typing import Generator
from app.db.connection import get_engine


# 创建会话工厂
SessionLocal = sessionmaker(
    autocommit=False,
    autoflush=False,
    bind=get_engine(),
)


def get_db() -> Generator[Session, None, None]:
    """
    FastAPI的数据库会话依赖。

    用法:
        @router.get("/users/")
        def get_users(db: Session = Depends(get_db)):
            ...
    """
    db = SessionLocal()
    try:
        yield db
    finally:
        db.close()


async def get_async_db() -> Generator[Session, None, None]:
    """
    异步数据库会话依赖(用于异步路由)。
    注意:与SQLModel异步会话或asyncpg一起使用。
    """
    db = SessionLocal()
    try:
        yield db
    finally:
        db.close()

Neon PostgreSQL设置

Neon连接字符串格式

postgresql://[user]:[password]@[host]/[dbname]?sslmode=require

组件:

  • user: 数据库用户名(来自Neon)
  • password: 数据库密码(来自Neon)
  • host: 终端ID + 区域,例如,ep-xxx-12345.us-east-1.aws.neon.tech
  • dbname: 您的数据库名称
  • sslmode: 必须为require以使用Neon

从Neon获取连接详情

  1. 访问Neon Dashboard
  2. 选择您的项目
  3. 转到连接详情
  4. 复制连接字符串
  5. 添加到Vercel/Dashboard环境变量

Neon的连接池

# 对于无服务器/边缘函数,使用较小的池大小
# backend/app/db/connection.py

def create_serverless_engine() -> Engine:
    """为无服务器/Vercel函数创建优化的引擎。"""
    settings = get_settings()

    # 无服务器使用较小的池以避免连接限制
    return create_engine(
        get_db_url(),
        pool_size=2,  # 无服务器保持小
        max_overflow=0,  # 无服务器中无溢出
        pool_timeout=10,  # 更快的超时
        pool_recycle=300,  # 更频繁地回收
        pool_pre_ping=True,
        echo=settings.DB_ECHO,
    )

FastAPI集成

应用设置

# backend/app/main.py
from contextlib import asynccontextmanager
from fastapi import FastAPI
from app.db.connection import get_engine
from app.db.session import get_db
from app.config.settings import get_settings


@asynccontextmanager
async def lifespan(app: FastAPI):
    # 启动:验证数据库连接
    settings = get_settings()
    engine = get_engine()

    try:
        with engine.connect() as conn:
            conn.execute("SELECT 1")
        logger.info("数据库连接验证成功")
    except Exception as e:
        logger.error(f"数据库连接失败:{e}")
        raise

    yield

    # 关闭:关闭所有连接
    engine.dispose()
    logger.info("数据库连接已关闭")


app = FastAPI(lifespan=lifespan)


# 依赖注入适用于任何路由
@app.get("/users/")
def get_users(db=Depends(get_db)):
    return db.query(User).all()

数据库健康检查

# backend/app/api/health.py
from fastapi import APIRouter, Depends
from sqlalchemy import text
from sqlalchemy.orm import Session
from app.db.session import get_db

router = APIRouter()


@router.get("/health/db")
def database_health(db: Session = Depends(get_db)) -> dict:
    """
    检查数据库连接性。

    返回:
        {
            "status": "healthy",
            "latency_ms": <响应时间>,
            "database": <db_name>
        }
    """
    import time

    start = time.time()
    result = db.execute(text("SELECT 1"))
    latency_ms = (time.time() - start) * 1000

    return {
        "status": "healthy",
        "latency_ms": round(latency_ms, 2),
        "database": "postgresql",
    }

SSL配置

必需的SSL设置

# Neon要求SSL - 这是默认行为
# 使用?sslmode=require时不需要额外配置

# 生产环境中验证SSL证书
import ssl

ssl_context = ssl.create_default_context()
ssl_context.check_hostname = True
ssl_context.verify_mode = ssl.CERT_REQUIRED

测试SSL连接

# 使用SSL测试连接
psql "postgresql://user:pass@ep-xxx.us-east-1.aws.neon.tech/dbname?sslmode=require" -c "SELECT 1"

连接池监控

池统计

# backend/app/db/monitoring.py
from sqlalchemy.pool import QueuePool
from app.db.connection import get_engine


def get_pool_stats() -> dict:
    """获取连接池统计信息。"""
    engine = get_engine()
    pool = engine.pool

    if isinstance(pool, QueuePool):
        return {
            "size": pool.size(),
            "checked_in": pool.checkedin(),
            "checked_out": pool.checkout(),
            "overflow": pool.overflow(),
            "status": "healthy" if pool.checkedin() >= 0 else "exhausted",
        }
    return {"status": "unknown", "pool_type": type(pool).__name__}


def check_connection_leaks() -> list:
    """检查连接泄漏。"""
    stats = get_pool_stats()
    warnings = []

    if stats.get("checked_out", 0) > stats.get("size", 0) * 0.8:
        warnings.append("高连接检出率 - 可能泄漏")

    if stats.get("overflow", 0) > 10:
        warnings.append("高溢出 - 考虑增加池大小")

    return warnings

查询日志(调试)

# backend/app/db/connection.py
import logging

logger = logging.getLogger("sqlalchemy.engine")
logger.setLevel(logging.INFO)

# 添加此内容以创建引擎进行查询日志
# echo=True已经处理基本日志

# 更详细的日志:
# from sqlalchemy import event
# @event.listens_for(Engine, "before_cursor_execute")
# def before_cursor_execute(conn, cursor, statement, parameters, context, executemany):
#     logger.info(f"执行:{statement[:100]}...")

Alembic集成

env.py配置

# alembic/env.py
import os
import sys
from logging.config import fileConfig
from sqlalchemy import pool
from sqlalchemy.engine import Connection
from alembic.runtime.migration import MigrationContext

# 添加项目根目录到路径
sys.path.insert(0, os.path.dirname(os.path.dirname(os.path.abspath(__file__))))

from app.config.settings import get_settings
from app.db.connection import get_engine
from app.models import Base  # 导入所有SQLModels


def run_migrations_offline() -> None:
    """以'离线'模式运行迁移。"""
    settings = get_settings()
    db_url = settings.DB_URL
    if hasattr(db_url, 'get_secret_value'):
        db_url = db_url.get_secret_value()

    context.configure(
        url=db_url,
        target_metadata=Base.metadata,
        literal_binds=True,
        dialect_opts={"paramstyle": "named"},
    )

    with context.begin_transaction():
        context.run_migrations()


def run_migrations_online() -> None:
    """以'在线'模式运行迁移。"""
    engine = get_engine()

    with engine.connect() as connection:
        context.configure(
            connection=connection,
            target_metadata=Base.metadata,
        )

        with context.begin_transaction():
            context.run_migrations()


if context.is_offline_mode():
    run_migrations_offline()
else:
    run_migrations_online()

质量清单

  • [ ] 重用连接:使用pool_pre_ping,不要创建新连接
  • [ ] 无连接泄漏:始终在finally块中关闭会话
  • [ ] 生产SSL启用:连接字符串中的?sslmode=require
  • [ ] 本地开发简单:可以从本地环境连接
  • [ ] 超时配置:池超时、语句超时设置
  • [ ] 池大小调整:适合预期并发
  • [ ] 健康检查端点/health/db返回状态

集成点

技能 集成
@env-config 从环境读取DB_URL和池设置
@sqlmodel-crud 使用get_db()依赖的会话
@db-migration 使用相同的引擎/连接逻辑
@fastapi-app 数据库依赖注入
@error-handling 优雅地处理连接错误

故障排除

连接被拒绝

解决方案:检查DB_URL格式,确保Neon允许您的IP

太多连接

解决方案:减少pool_size,检查连接泄漏

SSL证书错误

解决方案:确保连接字符串中的sslmode=require

连接超时

解决方案:增加pool_timeout,检查网络延迟

空闲连接

解决方案:设置DB_POOL_RECYCLE较低,检查应用程序关闭

环境特定设置

# backend/app/config/settings.py

class Settings(BaseSettings):
    # ...基础设置

    @property
    def is_production(self) -> bool:
        return not self.DEBUG

    def get_pool_config(self) -> dict:
        """根据环境获取池配置。"""
        if self.is_production:
            return {
                "pool_size": self.DB_POOL_SIZE,
                "max_overflow": self.DB_MAX_OVERFLOW,
                "pool_timeout": self.DB_POOL_TIMEOUT,
                "pool_recycle": self.DB_POOL_RECYCLE,
            }
        else:
            # 开发:较小的池,更宽松的设置
            return {
                "pool_size": 2,
                "max_overflow": 5,
                "pool_timeout": 10,
                "pool_recycle": 300,
            }