PostgreSQL性能工程
问题陈述
性能问题会复合增长。一个在1K行数据中需要50ms的查询,在100K行数据中可能需要5s。这项技能涵盖了从一开始就构建高性能数据库交互的模式,以及修复性能问题。
模式:查询优化工作流程
第1步:识别慢查询
-- 启用pg_stat_statements(如果尚未启用)
CREATE EXTENSION IF NOT EXISTS pg_stat_statements;
-- 查找最慢的查询
SELECT
query,
calls,
round(mean_exec_time::numeric, 2) as avg_ms,
round(total_exec_time::numeric, 2) as total_ms,
rows
FROM pg_stat_statements
WHERE calls > 10
ORDER BY mean_exec_time DESC
LIMIT 20;
第2步:分析查询计划
EXPLAIN (ANALYZE, BUFFERS, FORMAT TEXT)
SELECT * FROM assessments
WHERE user_id = 'abc-123'
ORDER BY created_at DESC
LIMIT 10;
需要查找的内容:
| 警告信号 | 问题 | 解决方案 |
|---|---|---|
| 大表的顺序扫描 | 缺少索引 | 添加索引 |
高loops计数 |
连接中的N+1问题 | 重写查询,添加索引 |
| 高成本的排序 | 没有ORDER BY的索引 | 覆盖索引 |
| 高行数的哈希/合并连接 | 大的中间结果 | 提早过滤,更好的索引 |
| 缓冲区:共享读高 | 数据未缓存 | 更多RAM,或查询更少的数据 |
第3步:修复并验证
-- 添加索引
CREATE INDEX CONCURRENTLY ix_assessments_user_created
ON assessments (user_id, created_at DESC);
-- 验证改进
EXPLAIN (ANALYZE, BUFFERS)
SELECT * FROM assessments
WHERE user_id = 'abc-123'
ORDER BY created_at DESC
LIMIT 10;
-- 现在应该显示“Index Scan”而不是“Seq Scan”
模式:覆盖索引(索引仅扫描)
**问题:**查询读取索引,然后从表中获取行(堆获取)。
-- 查询
SELECT id, title, status FROM assessments WHERE user_id = ?;
-- 常规索引:需要堆获取
CREATE INDEX ix_assessments_user ON assessments (user_id);
-- 计划:索引扫描+堆获取
-- ✅ 覆盖索引:索引中的所有列
CREATE INDEX ix_assessments_user_covering
ON assessments (user_id)
INCLUDE (id, title, status);
-- 计划:索引仅扫描(无需堆获取,更快)
何时使用:
- 频繁运行的查询
- 选择少数列的查询
- 有许多列的表(堆获取成本高)
模式:大规模分页
-- ❌ 慢:基于OFFSET的分页
SELECT * FROM events ORDER BY created_at DESC LIMIT 20 OFFSET 10000;
-- 必须扫描并丢弃10,000行!
-- ✅ 快:基于游标的(keyset)分页
SELECT * FROM events
WHERE created_at < '2024-01-15T10:30:00Z' -- 上次看到的时间戳
ORDER BY created_at DESC
LIMIT 20;
-- 直接跳转到正确位置通过索引
-- 对于复合游标(当可能存在重复时):
SELECT * FROM events
WHERE (created_at, id) < ('2024-01-15T10:30:00Z', 'last-id')
ORDER BY created_at DESC, id DESC
LIMIT 20;
在SQLAlchemy中:
# 基于游标的分页
async def get_events_page(
session: AsyncSession,
cursor_time: datetime | None,
cursor_id: UUID | None,
limit: int = 20,
) -> list[Event]:
query = select(Event).order_by(Event.created_at.desc(), Event.id.desc())
if cursor_time and cursor_id:
query = query.where(
tuple_(Event.created_at, Event.id) < (cursor_time, cursor_id)
)
result = await session.execute(query.limit(limit))
return result.scalars().all()
模式:批量处理
-- ❌ 慢:一个巨大的查询/更新
UPDATE events SET processed = true WHERE processed = false;
-- 锁定数百万行,超时
-- ✅ 快:批量处理
DO $$
DECLARE
batch_size INT := 10000;
rows_affected INT;
BEGIN
LOOP
UPDATE events
SET processed = true
WHERE id IN (
SELECT id FROM events
WHERE processed = false
LIMIT batch_size
FOR UPDATE SKIP LOCKED
);
GET DIAGNOSTICS rows_affected = ROW_COUNT;
IF rows_affected = 0 THEN
EXIT;
END IF;
COMMIT;
PERFORM pg_sleep(0.1); -- 短暂暂停以让其他查询通过
END LOOP;
END $$;
在Python中:
async def process_in_batches(session: AsyncSession, batch_size: int = 10000):
while True:
result = await session.execute(
text("""
UPDATE events SET processed = true
WHERE id IN (
SELECT id FROM events
WHERE processed = false
LIMIT :batch_size
FOR UPDATE SKIP LOCKED
)
RETURNING id
"""),
{"batch_size": batch_size}
)
updated = result.fetchall()
await session.commit()
if len(updated) == 0:
break
await asyncio.sleep(0.1)
模式:高效聚合
-- ❌ 慢:复杂WHERE的计数
SELECT COUNT(*) FROM events WHERE user_id = ? AND status = 'active';
-- 扫描所有匹配的行
-- ✅ 快:近似计数(对于大表)
SELECT reltuples::bigint AS estimate
FROM pg_class
WHERE relname = 'events';
-- ✅ 快:维护计数器缓存
-- 添加列:assessments.answer_count
-- 在INSERT/DELETE到answers时更新
-- ✅ 快:复杂聚合的材料化视图
CREATE MATERIALIZED VIEW user_stats AS
SELECT
user_id,
COUNT(*) as total_assessments,
AVG(rating) as avg_rating
FROM assessments
GROUP BY user_id;
-- 定期刷新
REFRESH MATERIALIZED VIEW CONCURRENTLY user_stats;
模式:连接池调整
# Async SQLAlchemy与适当的池设置
from sqlalchemy.ext.asyncio import create_async_engine
from sqlalchemy.pool import NullPool, AsyncAdaptedQueuePool
# 对于无服务器/Lambda(无持久连接)
engine = create_async_engine(
DATABASE_URL,
poolclass=NullPool, # 每个请求新连接
)
# 对于长期运行的服务器
engine = create_async_engine(
DATABASE_URL,
poolclass=AsyncAdaptedQueuePool,
pool_size=10, # 基础连接
max_overflow=20, # 负载下的额外连接
pool_timeout=30, # 等待连接
pool_recycle=1800, # 每30分钟回收连接
pool_pre_ping=True, # 使用前测试连接
)
PostgreSQL方面:
-- 检查最大连接数
SHOW max_connections; -- 默认100
-- 查看当前连接
SELECT count(*) FROM pg_stat_activity;
-- 每个应用程序的连接
SELECT application_name, count(*)
FROM pg_stat_activity
GROUP BY application_name;
模式:读取副本
# 将读取路由到副本,写入路由到主库
from sqlalchemy import create_engine
from sqlalchemy.orm import Session
primary_engine = create_async_engine(PRIMARY_URL)
replica_engine = create_async_engine(REPLICA_URL)
class RoutingSession(Session):
def get_bind(self, mapper=None, clause=None):
if self._flushing or self.is_modified():
return primary_engine.sync_engine
return replica_engine.sync_engine
模式:读取性能的非规范化
-- ❌ 慢:为常见查询连接4个表
SELECT
a.id, a.title, u.name as user_name,
COUNT(q.id) as question_count,
AVG(ans.value) as avg_score
FROM assessments a
JOIN users u ON a.user_id = u.id
JOIN questions q ON q.assessment_id = a.id
LEFT JOIN answers ans ON ans.question_id = q.id
GROUP BY a.id, a.title, u.name;
-- ✅ 快:非规范化列
ALTER TABLE assessments ADD COLUMN user_name VARCHAR(100);
ALTER TABLE assessments ADD COLUMN question_count INT DEFAULT 0;
ALTER TABLE assessments ADD COLUMN avg_score NUMERIC(3,2);
-- 通过触发器或应用程序代码更新
-- 查询变得简单:
SELECT id, title, user_name, question_count, avg_score FROM assessments;
权衡:
- ✅ 更快的读取
- ❌ 更复杂的写入(必须更新非规范化数据)
- ❌ 潜在的陈旧数据
模式:大型表的分区
-- 按月分区事件
CREATE TABLE events (
id UUID PRIMARY KEY,
user_id UUID NOT NULL,
event_type VARCHAR(50),
created_at TIMESTAMPTZ NOT NULL
) PARTITION BY RANGE (created_at);
-- 创建分区
CREATE TABLE events_2024_01 PARTITION OF events
FOR VALUES FROM ('2024-01-01') TO ('2024-02-01');
CREATE TABLE events_2024_02 PARTITION OF events
FOR VALUES FROM ('2024-02-01') TO ('2024-03-01');
-- 查询特定分区(快速)
SELECT * FROM events WHERE created_at >= '2024-01-15' AND created_at < '2024-02-01';
-- 立即删除旧数据
DROP TABLE events_2023_01; -- 比DELETE快得多
模式:缓存策略
# 缓存频繁读取,很少更改的数据
import redis.asyncio as redis
import json
cache = redis.from_url(REDIS_URL)
async def get_user_stats(user_id: UUID) -> UserStats:
cache_key = f"user_stats:{user_id}"
# 首先尝试缓存
cached = await cache.get(cache_key)
if cached:
return UserStats.model_validate_json(cached)
# 查询数据库
async with get_session() as session:
stats = await calculate_user_stats(session, user_id)
# 缓存5分钟
await cache.setex(cache_key, 300, stats.model_dump_json())
return stats
# 写入时使缓存失效
async def update_user_assessment(user_id: UUID, ...):
# ... 更新数据库 ...
await cache.delete(f"user_stats:{user_id}")
性能监控查询
-- 表膨胀(需要VACUUM)
SELECT
schemaname, relname,
n_dead_tup as dead_tuples,
n_live_tup as live_tuples,
round(n_dead_tup::numeric / NULLIF(n_live_tup, 0) * 100, 2) as dead_pct
FROM pg_stat_user_tables
WHERE n_dead_tup > 10000
ORDER BY n_dead_tup DESC;
-- 索引膨胀
SELECT
indexrelname as index,
pg_size_pretty(pg_relation_size(indexrelid)) as size,
idx_scan as scans
FROM pg_stat_user_indexes
WHERE idx_scan = 0 -- 未使用的索引
ORDER BY pg_relation_size(indexrelid) DESC;
-- 缓存命中率(应> 99%)
SELECT
sum(blks_hit) * 100.0 / sum(blks_hit + blks_read) as cache_hit_ratio
FROM pg_stat_database;
-- 长时间运行的查询
SELECT
pid,
now() - query_start as duration,
query
FROM pg_stat_activity
WHERE state = 'active'
AND query NOT LIKE '%pg_stat%'
AND now() - query_start > interval '30 seconds';
性能检查表
部署前:
- [ ] 识别并优化慢查询
- [ ] 索引与查询模式匹配
- [ ] 频繁查询的覆盖索引
- [ ] 分页使用基于游标的(而不是OFFSET)
- [ ] 大型表如果> 10M行则分区
- [ ] 适当大小的连接池
- [ ] 热点数据的缓存层
- [ ] 慢查询的监控到位
- [ ] 计划VACUUM和ANALYZE