数据库连接技能
专家级的数据库连接管理,适用于Python/FastAPI与Neon PostgreSQL,包括连接池、SSL配置和SQLAlchemy引擎设置。
快速参考
| 任务 | 文件/方法 |
|---|---|
| 获取引擎 | get_engine() |
| 获取会话 | get_session() |
| 连接字符串 | DB_URL 来自设置 |
| 健康检查 | check_connection() |
项目结构
backend/
├── app/
│ ├── db/
│ │ ├── __init__.py
│ │ ├── connection.py # 引擎和会话设置
│ │ └── session.py # 依赖注入
│ └── config/
│ └── settings.py # 环境配置
├── alembic/
│ └── env.py # 使用这里的连接
└── .env.example
连接配置
设置与数据库URL
# backend/app/config/settings.py
from functools import lru_cache
from pydantic import Field, SecretStr
from pydantic_settings import BaseSettings, SettingsConfigDict
class Settings(BaseSettings):
model_config = SettingsConfigDict(
env_file=".env",
env_file_encoding="utf-8",
extra="ignore",
)
# 数据库配置
DB_URL: SecretStr = Field(
...,
description="PostgreSQL连接URL",
examples=["postgresql://user:pass@ep-xxx.us-east-1.aws.neon.tech/dbname?sslmode=require"],
)
DB_POOL_SIZE: int = Field(default=5, ge=1, le=100)
DB_MAX_OVERFLOW: int = Field(default=10, ge=0, le=100)
DB_POOL_TIMEOUT: int = Field(default=30, ge=1, le=300)
DB_POOL_RECYCLE: int = Field(default=1800, ge=300)
DB_ECHO: bool = False
@lru_cache
def get_settings() -> Settings:
return Settings()
环境变量
# .env.example
# 数据库(Neon PostgreSQL)
# 从Neon仪表板>连接详情获取
# 格式:postgresql://user:pass@host/dbname?sslmode=require
DB_URL="postgresql://username:password@ep-xxx.region.neon.tech/dbname?sslmode=require"
# 连接池设置
DB_POOL_SIZE=5
DB_MAX_OVERFLOW=10
DB_POOL_TIMEOUT=30
DB_POOL_RECYCLE=1800
# 调试(开发时设置为true)
DB_ECHO=false
SQLAlchemy引擎设置
连接模块
# backend/app/db/connection.py
from sqlalchemy import create_engine, event
from sqlalchemy.engine import Engine
from sqlalchemy.orm import sessionmaker, Session
from typing import Generator
import logging
from app.config.settings import get_settings
logger = logging.getLogger(__name__)
def get_db_url() -> str:
"""从设置中获取数据库URL。"""
settings = get_settings()
db_url = settings.DB_URL
# SecretStr有get_secret_value()方法
if hasattr(db_url, 'get_secret_value'):
return db_url.get_secret_value()
return str(db_url)
def create_sqlalchemy_engine() -> Engine:
"""创建SQLAlchemy引擎,针对Neon/PostgreSQL进行优化设置。"""
settings = get_settings()
db_url = get_db_url()
engine = create_engine(
db_url,
pool_size=settings.DB_POOL_SIZE,
max_overflow=settings.DB_MAX_OVERFLOW,
pool_timeout=settings.DB_POOL_TIMEOUT,
pool_recycle=settings.DB_POOL_RECYCLE,
echo=settings.DB_ECHO,
# PostgreSQL特定设置
pool_pre_ping=True, # 使用前验证连接
isolation_level="AUTOCOMMIT",
)
# 启用连接健康检查
@event.listens_for(engine, "connect")
def set_session_vars(dbapi_connection, connection_record):
cursor = dbapi_connection.cursor()
# 设置会话特性
cursor.execute("SET statement_timeout = '30s'")
cursor.execute("SET idle_in_transaction_session_timeout = '60000'")
cursor.close()
logger.info(f"数据库引擎创建,pool_size={settings.DB_POOL_SIZE}")
return engine
def get_engine() -> Engine:
"""获取或创建数据库引擎(单例模式)。"""
return create_sqlalchemy_engine()
会话管理
# backend/app/db/session.py
from sqlalchemy.orm import sessionmaker, Session
from typing import Generator
from app.db.connection import get_engine
# 创建会话工厂
SessionLocal = sessionmaker(
autocommit=False,
autoflush=False,
bind=get_engine(),
)
def get_db() -> Generator[Session, None, None]:
"""
FastAPI的数据库会话依赖。
用法:
@router.get("/users/")
def get_users(db: Session = Depends(get_db)):
...
"""
db = SessionLocal()
try:
yield db
finally:
db.close()
async def get_async_db() -> Generator[Session, None, None]:
"""
异步数据库会话依赖(用于异步路由)。
注意:与SQLModel异步会话或asyncpg一起使用。
"""
db = SessionLocal()
try:
yield db
finally:
db.close()
Neon PostgreSQL设置
Neon连接字符串格式
postgresql://[user]:[password]@[host]/[dbname]?sslmode=require
组件:
- user: 数据库用户名(来自Neon)
- password: 数据库密码(来自Neon)
- host: 终端ID + 区域,例如,
ep-xxx-12345.us-east-1.aws.neon.tech - dbname: 您的数据库名称
- sslmode: 必须为
require以使用Neon
从Neon获取连接详情
- 访问Neon Dashboard
- 选择您的项目
- 转到连接详情
- 复制连接字符串
- 添加到Vercel/Dashboard环境变量
Neon的连接池
# 对于无服务器/边缘函数,使用较小的池大小
# backend/app/db/connection.py
def create_serverless_engine() -> Engine:
"""为无服务器/Vercel函数创建优化的引擎。"""
settings = get_settings()
# 无服务器使用较小的池以避免连接限制
return create_engine(
get_db_url(),
pool_size=2, # 无服务器保持小
max_overflow=0, # 无服务器中无溢出
pool_timeout=10, # 更快的超时
pool_recycle=300, # 更频繁地回收
pool_pre_ping=True,
echo=settings.DB_ECHO,
)
FastAPI集成
应用设置
# backend/app/main.py
from contextlib import asynccontextmanager
from fastapi import FastAPI
from app.db.connection import get_engine
from app.db.session import get_db
from app.config.settings import get_settings
@asynccontextmanager
async def lifespan(app: FastAPI):
# 启动:验证数据库连接
settings = get_settings()
engine = get_engine()
try:
with engine.connect() as conn:
conn.execute("SELECT 1")
logger.info("数据库连接验证成功")
except Exception as e:
logger.error(f"数据库连接失败:{e}")
raise
yield
# 关闭:关闭所有连接
engine.dispose()
logger.info("数据库连接已关闭")
app = FastAPI(lifespan=lifespan)
# 依赖注入适用于任何路由
@app.get("/users/")
def get_users(db=Depends(get_db)):
return db.query(User).all()
数据库健康检查
# backend/app/api/health.py
from fastapi import APIRouter, Depends
from sqlalchemy import text
from sqlalchemy.orm import Session
from app.db.session import get_db
router = APIRouter()
@router.get("/health/db")
def database_health(db: Session = Depends(get_db)) -> dict:
"""
检查数据库连接性。
返回:
{
"status": "healthy",
"latency_ms": <响应时间>,
"database": <db_name>
}
"""
import time
start = time.time()
result = db.execute(text("SELECT 1"))
latency_ms = (time.time() - start) * 1000
return {
"status": "healthy",
"latency_ms": round(latency_ms, 2),
"database": "postgresql",
}
SSL配置
必需的SSL设置
# Neon要求SSL - 这是默认行为
# 使用?sslmode=require时不需要额外配置
# 生产环境中验证SSL证书
import ssl
ssl_context = ssl.create_default_context()
ssl_context.check_hostname = True
ssl_context.verify_mode = ssl.CERT_REQUIRED
测试SSL连接
# 使用SSL测试连接
psql "postgresql://user:pass@ep-xxx.us-east-1.aws.neon.tech/dbname?sslmode=require" -c "SELECT 1"
连接池监控
池统计
# backend/app/db/monitoring.py
from sqlalchemy.pool import QueuePool
from app.db.connection import get_engine
def get_pool_stats() -> dict:
"""获取连接池统计信息。"""
engine = get_engine()
pool = engine.pool
if isinstance(pool, QueuePool):
return {
"size": pool.size(),
"checked_in": pool.checkedin(),
"checked_out": pool.checkout(),
"overflow": pool.overflow(),
"status": "healthy" if pool.checkedin() >= 0 else "exhausted",
}
return {"status": "unknown", "pool_type": type(pool).__name__}
def check_connection_leaks() -> list:
"""检查连接泄漏。"""
stats = get_pool_stats()
warnings = []
if stats.get("checked_out", 0) > stats.get("size", 0) * 0.8:
warnings.append("高连接检出率 - 可能泄漏")
if stats.get("overflow", 0) > 10:
warnings.append("高溢出 - 考虑增加池大小")
return warnings
查询日志(调试)
# backend/app/db/connection.py
import logging
logger = logging.getLogger("sqlalchemy.engine")
logger.setLevel(logging.INFO)
# 添加此内容以创建引擎进行查询日志
# echo=True已经处理基本日志
# 更详细的日志:
# from sqlalchemy import event
# @event.listens_for(Engine, "before_cursor_execute")
# def before_cursor_execute(conn, cursor, statement, parameters, context, executemany):
# logger.info(f"执行:{statement[:100]}...")
Alembic集成
env.py配置
# alembic/env.py
import os
import sys
from logging.config import fileConfig
from sqlalchemy import pool
from sqlalchemy.engine import Connection
from alembic.runtime.migration import MigrationContext
# 添加项目根目录到路径
sys.path.insert(0, os.path.dirname(os.path.dirname(os.path.abspath(__file__))))
from app.config.settings import get_settings
from app.db.connection import get_engine
from app.models import Base # 导入所有SQLModels
def run_migrations_offline() -> None:
"""以'离线'模式运行迁移。"""
settings = get_settings()
db_url = settings.DB_URL
if hasattr(db_url, 'get_secret_value'):
db_url = db_url.get_secret_value()
context.configure(
url=db_url,
target_metadata=Base.metadata,
literal_binds=True,
dialect_opts={"paramstyle": "named"},
)
with context.begin_transaction():
context.run_migrations()
def run_migrations_online() -> None:
"""以'在线'模式运行迁移。"""
engine = get_engine()
with engine.connect() as connection:
context.configure(
connection=connection,
target_metadata=Base.metadata,
)
with context.begin_transaction():
context.run_migrations()
if context.is_offline_mode():
run_migrations_offline()
else:
run_migrations_online()
质量清单
- [ ] 重用连接:使用pool_pre_ping,不要创建新连接
- [ ] 无连接泄漏:始终在finally块中关闭会话
- [ ] 生产SSL启用:连接字符串中的
?sslmode=require - [ ] 本地开发简单:可以从本地环境连接
- [ ] 超时配置:池超时、语句超时设置
- [ ] 池大小调整:适合预期并发
- [ ] 健康检查端点:
/health/db返回状态
集成点
| 技能 | 集成 |
|---|---|
@env-config |
从环境读取DB_URL和池设置 |
@sqlmodel-crud |
使用get_db()依赖的会话 |
@db-migration |
使用相同的引擎/连接逻辑 |
@fastapi-app |
数据库依赖注入 |
@error-handling |
优雅地处理连接错误 |
故障排除
连接被拒绝
解决方案:检查DB_URL格式,确保Neon允许您的IP
太多连接
解决方案:减少pool_size,检查连接泄漏
SSL证书错误
解决方案:确保连接字符串中的sslmode=require
连接超时
解决方案:增加pool_timeout,检查网络延迟
空闲连接
解决方案:设置DB_POOL_RECYCLE较低,检查应用程序关闭
环境特定设置
# backend/app/config/settings.py
class Settings(BaseSettings):
# ...基础设置
@property
def is_production(self) -> bool:
return not self.DEBUG
def get_pool_config(self) -> dict:
"""根据环境获取池配置。"""
if self.is_production:
return {
"pool_size": self.DB_POOL_SIZE,
"max_overflow": self.DB_MAX_OVERFLOW,
"pool_timeout": self.DB_POOL_TIMEOUT,
"pool_recycle": self.DB_POOL_RECYCLE,
}
else:
# 开发:较小的池,更宽松的设置
return {
"pool_size": 2,
"max_overflow": 5,
"pool_timeout": 10,
"pool_recycle": 300,
}