PostgreSQL性能工程Skill postgres-performance

本技能涵盖了从一开始就构建高性能数据库交互的模式,以及修复性能问题,包括查询优化、索引策略、分页、批量处理、聚合优化、连接池调整、读取副本、非规范化、表分区和缓存策略等。

数据工程 0 次安装 0 次浏览 更新于 3/3/2026

PostgreSQL性能工程

问题陈述

性能问题会复合增长。一个在1K行数据中需要50ms的查询,在100K行数据中可能需要5s。这项技能涵盖了从一开始就构建高性能数据库交互的模式,以及修复性能问题。


模式:查询优化工作流程

第1步:识别慢查询

-- 启用pg_stat_statements(如果尚未启用)
CREATE EXTENSION IF NOT EXISTS pg_stat_statements;

-- 查找最慢的查询
SELECT 
    query,
    calls,
    round(mean_exec_time::numeric, 2) as avg_ms,
    round(total_exec_time::numeric, 2) as total_ms,
    rows
FROM pg_stat_statements
WHERE calls > 10
ORDER BY mean_exec_time DESC
LIMIT 20;

第2步:分析查询计划

EXPLAIN (ANALYZE, BUFFERS, FORMAT TEXT)
SELECT * FROM assessments 
WHERE user_id = 'abc-123' 
ORDER BY created_at DESC 
LIMIT 10;

需要查找的内容:

警告信号 问题 解决方案
大表的顺序扫描 缺少索引 添加索引
loops计数 连接中的N+1问题 重写查询,添加索引
高成本的排序 没有ORDER BY的索引 覆盖索引
高行数的哈希/合并连接 大的中间结果 提早过滤,更好的索引
缓冲区:共享读高 数据未缓存 更多RAM,或查询更少的数据

第3步:修复并验证

-- 添加索引
CREATE INDEX CONCURRENTLY ix_assessments_user_created 
ON assessments (user_id, created_at DESC);

-- 验证改进
EXPLAIN (ANALYZE, BUFFERS)
SELECT * FROM assessments 
WHERE user_id = 'abc-123' 
ORDER BY created_at DESC 
LIMIT 10;
-- 现在应该显示“Index Scan”而不是“Seq Scan”

模式:覆盖索引(索引仅扫描)

**问题:**查询读取索引,然后从表中获取行(堆获取)。

-- 查询
SELECT id, title, status FROM assessments WHERE user_id = ?;

-- 常规索引:需要堆获取
CREATE INDEX ix_assessments_user ON assessments (user_id);
-- 计划:索引扫描+堆获取

-- ✅ 覆盖索引:索引中的所有列
CREATE INDEX ix_assessments_user_covering 
ON assessments (user_id) 
INCLUDE (id, title, status);
-- 计划:索引仅扫描(无需堆获取,更快)

何时使用:

  • 频繁运行的查询
  • 选择少数列的查询
  • 有许多列的表(堆获取成本高)

模式:大规模分页

-- ❌ 慢:基于OFFSET的分页
SELECT * FROM events ORDER BY created_at DESC LIMIT 20 OFFSET 10000;
-- 必须扫描并丢弃10,000行!

-- ✅ 快:基于游标的(keyset)分页
SELECT * FROM events 
WHERE created_at < '2024-01-15T10:30:00Z'  -- 上次看到的时间戳
ORDER BY created_at DESC 
LIMIT 20;
-- 直接跳转到正确位置通过索引

-- 对于复合游标(当可能存在重复时):
SELECT * FROM events 
WHERE (created_at, id) < ('2024-01-15T10:30:00Z', 'last-id')
ORDER BY created_at DESC, id DESC 
LIMIT 20;

在SQLAlchemy中:

# 基于游标的分页
async def get_events_page(
    session: AsyncSession,
    cursor_time: datetime | None,
    cursor_id: UUID | None,
    limit: int = 20,
) -> list[Event]:
    query = select(Event).order_by(Event.created_at.desc(), Event.id.desc())
    
    if cursor_time and cursor_id:
        query = query.where(
            tuple_(Event.created_at, Event.id) < (cursor_time, cursor_id)
        )
    
    result = await session.execute(query.limit(limit))
    return result.scalars().all()

模式:批量处理

-- ❌ 慢:一个巨大的查询/更新
UPDATE events SET processed = true WHERE processed = false;
-- 锁定数百万行,超时

-- ✅ 快:批量处理
DO $$
DECLARE
    batch_size INT := 10000;
    rows_affected INT;
BEGIN
    LOOP
        UPDATE events 
        SET processed = true 
        WHERE id IN (
            SELECT id FROM events 
            WHERE processed = false 
            LIMIT batch_size
            FOR UPDATE SKIP LOCKED
        );
        
        GET DIAGNOSTICS rows_affected = ROW_COUNT;
        
        IF rows_affected = 0 THEN
            EXIT;
        END IF;
        
        COMMIT;
        PERFORM pg_sleep(0.1);  -- 短暂暂停以让其他查询通过
    END LOOP;
END $$;

在Python中:

async def process_in_batches(session: AsyncSession, batch_size: int = 10000):
    while True:
        result = await session.execute(
            text("""
                UPDATE events SET processed = true
                WHERE id IN (
                    SELECT id FROM events 
                    WHERE processed = false 
                    LIMIT :batch_size
                    FOR UPDATE SKIP LOCKED
                )
                RETURNING id
            """),
            {"batch_size": batch_size}
        )
        
        updated = result.fetchall()
        await session.commit()
        
        if len(updated) == 0:
            break
        
        await asyncio.sleep(0.1)

模式:高效聚合

-- ❌ 慢:复杂WHERE的计数
SELECT COUNT(*) FROM events WHERE user_id = ? AND status = 'active';
-- 扫描所有匹配的行

-- ✅ 快:近似计数(对于大表)
SELECT reltuples::bigint AS estimate
FROM pg_class
WHERE relname = 'events';

-- ✅ 快:维护计数器缓存
-- 添加列:assessments.answer_count
-- 在INSERT/DELETE到answers时更新

-- ✅ 快:复杂聚合的材料化视图
CREATE MATERIALIZED VIEW user_stats AS
SELECT 
    user_id,
    COUNT(*) as total_assessments,
    AVG(rating) as avg_rating
FROM assessments
GROUP BY user_id;

-- 定期刷新
REFRESH MATERIALIZED VIEW CONCURRENTLY user_stats;

模式:连接池调整

# Async SQLAlchemy与适当的池设置
from sqlalchemy.ext.asyncio import create_async_engine
from sqlalchemy.pool import NullPool, AsyncAdaptedQueuePool

# 对于无服务器/Lambda(无持久连接)
engine = create_async_engine(
    DATABASE_URL,
    poolclass=NullPool,  # 每个请求新连接
)

# 对于长期运行的服务器
engine = create_async_engine(
    DATABASE_URL,
    poolclass=AsyncAdaptedQueuePool,
    pool_size=10,           # 基础连接
    max_overflow=20,        # 负载下的额外连接
    pool_timeout=30,        # 等待连接
    pool_recycle=1800,      # 每30分钟回收连接
    pool_pre_ping=True,     # 使用前测试连接
)

PostgreSQL方面:

-- 检查最大连接数
SHOW max_connections;  -- 默认100

-- 查看当前连接
SELECT count(*) FROM pg_stat_activity;

-- 每个应用程序的连接
SELECT application_name, count(*) 
FROM pg_stat_activity 
GROUP BY application_name;

模式:读取副本

# 将读取路由到副本,写入路由到主库
from sqlalchemy import create_engine
from sqlalchemy.orm import Session

primary_engine = create_async_engine(PRIMARY_URL)
replica_engine = create_async_engine(REPLICA_URL)

class RoutingSession(Session):
    def get_bind(self, mapper=None, clause=None):
        if self._flushing or self.is_modified():
            return primary_engine.sync_engine
        return replica_engine.sync_engine

模式:读取性能的非规范化

-- ❌ 慢:为常见查询连接4个表
SELECT 
    a.id, a.title, u.name as user_name, 
    COUNT(q.id) as question_count,
    AVG(ans.value) as avg_score
FROM assessments a
JOIN users u ON a.user_id = u.id
JOIN questions q ON q.assessment_id = a.id
LEFT JOIN answers ans ON ans.question_id = q.id
GROUP BY a.id, a.title, u.name;

-- ✅ 快:非规范化列
ALTER TABLE assessments ADD COLUMN user_name VARCHAR(100);
ALTER TABLE assessments ADD COLUMN question_count INT DEFAULT 0;
ALTER TABLE assessments ADD COLUMN avg_score NUMERIC(3,2);

-- 通过触发器或应用程序代码更新
-- 查询变得简单:
SELECT id, title, user_name, question_count, avg_score FROM assessments;

权衡:

  • ✅ 更快的读取
  • ❌ 更复杂的写入(必须更新非规范化数据)
  • ❌ 潜在的陈旧数据

模式:大型表的分区

-- 按月分区事件
CREATE TABLE events (
    id UUID PRIMARY KEY,
    user_id UUID NOT NULL,
    event_type VARCHAR(50),
    created_at TIMESTAMPTZ NOT NULL
) PARTITION BY RANGE (created_at);

-- 创建分区
CREATE TABLE events_2024_01 PARTITION OF events
    FOR VALUES FROM ('2024-01-01') TO ('2024-02-01');
CREATE TABLE events_2024_02 PARTITION OF events
    FOR VALUES FROM ('2024-02-01') TO ('2024-03-01');

-- 查询特定分区(快速)
SELECT * FROM events WHERE created_at >= '2024-01-15' AND created_at < '2024-02-01';

-- 立即删除旧数据
DROP TABLE events_2023_01;  -- 比DELETE快得多

模式:缓存策略

# 缓存频繁读取,很少更改的数据
import redis.asyncio as redis
import json

cache = redis.from_url(REDIS_URL)

async def get_user_stats(user_id: UUID) -> UserStats:
    cache_key = f"user_stats:{user_id}"
    
    # 首先尝试缓存
    cached = await cache.get(cache_key)
    if cached:
        return UserStats.model_validate_json(cached)
    
    # 查询数据库
    async with get_session() as session:
        stats = await calculate_user_stats(session, user_id)
    
    # 缓存5分钟
    await cache.setex(cache_key, 300, stats.model_dump_json())
    
    return stats

# 写入时使缓存失效
async def update_user_assessment(user_id: UUID, ...):
    # ... 更新数据库 ...
    await cache.delete(f"user_stats:{user_id}")

性能监控查询

-- 表膨胀(需要VACUUM)
SELECT 
    schemaname, relname,
    n_dead_tup as dead_tuples,
    n_live_tup as live_tuples,
    round(n_dead_tup::numeric / NULLIF(n_live_tup, 0) * 100, 2) as dead_pct
FROM pg_stat_user_tables
WHERE n_dead_tup > 10000
ORDER BY n_dead_tup DESC;

-- 索引膨胀
SELECT
    indexrelname as index,
    pg_size_pretty(pg_relation_size(indexrelid)) as size,
    idx_scan as scans
FROM pg_stat_user_indexes
WHERE idx_scan = 0  -- 未使用的索引
ORDER BY pg_relation_size(indexrelid) DESC;

-- 缓存命中率(应> 99%)
SELECT 
    sum(blks_hit) * 100.0 / sum(blks_hit + blks_read) as cache_hit_ratio
FROM pg_stat_database;

-- 长时间运行的查询
SELECT 
    pid,
    now() - query_start as duration,
    query
FROM pg_stat_activity
WHERE state = 'active' 
    AND query NOT LIKE '%pg_stat%'
    AND now() - query_start > interval '30 seconds';

性能检查表

部署前:

  • [ ] 识别并优化慢查询
  • [ ] 索引与查询模式匹配
  • [ ] 频繁查询的覆盖索引
  • [ ] 分页使用基于游标的(而不是OFFSET)
  • [ ] 大型表如果> 10M行则分区
  • [ ] 适当大小的连接池
  • [ ] 热点数据的缓存层
  • [ ] 慢查询的监控到位
  • [ ] 计划VACUUM和ANALYZE