连接池
概述
连接池是一种用于维护数据库连接缓存的技术,这些连接可以被重用,而不是为每个请求创建新连接。这通过减少建立新连接的开销显著提高了应用程序性能。
前置条件
- 数据库连接和TCP/IP网络的理解
- 数据库查询执行的知识
- 对async/await模式的熟悉
- 基本的资源管理理解
核心概念
什么是连接池以及为什么它很重要
没有池的问题
没有连接池时,每个数据库操作需要:
- TCP连接建立 - 网络握手
- 认证 - 验证凭据
- 会话初始化 - 设置会话参数
- 查询执行 - 实际工作
- 连接拆除 - 关闭连接
这个过程可能需要50-500ms,在成千上万的请求中累积起来是相当可观的。
使用池的解决方案
使用连接池:
- 借用连接 - 从池中获取(~1ms)
- 查询执行 - 实际工作
- 返回连接 - 返回池中(~1ms)
池维护一组已建立的连接,这些连接可以在请求之间重用。
好处
- 性能: 连接获取速度提高10-100倍
- 资源效率: 数据库连接更少
- 可扩展性: 处理更多并发请求
- 稳定性: 防止连接风暴
连接生命周期
池状态
┌─────────────────────────────────────────────────────────────┐
│ 连接池 │
├─────────────────────────────────────────────────────────────┤
│ │
│ ┌──────────────┐ ┌──────────────┐ ┌──────────────┐ │
│ │ 空闲 │ │ 活跃 │ │ 创建中 │ │
│ │连接 │◄──►│连接 │◄──►│连接 │ │
│ └──────────────┘ └──────────────┘ └──────────────┘ │
│ ▲ ▲ │
│ │ │ │
│ └──────────────────────────────────────┘ │
│ │
└─────────────────────────────────────────────────────────────┘
生命周期阶段
// 1. 创建
const pool = new Pool({
host: 'localhost',
database: 'mydb',
max: 20, // 最大池大小
});
// 2. 获取(借用)
const connection = await pool.connect();
// 连接现在被标记为活跃
// 3. 使用
const result = await connection.query('SELECT * FROM users');
// 4. 释放(返回)
connection.release();
// 连接现在被标记为空闲
// 5. 销毁(如果需要)
// 池可能会销毁以下连接:
// - 太旧(maxLifetime)
// - 空闲太久(idleTimeout)
// - 健康检查失败
实施指南
基本连接池(PostgreSQL)
const { Pool } = require('pg');
const pool = new Pool({
host: 'localhost',
database: 'mydb',
user: 'user',
password: 'pass',
// 池设置
max: 20, // 最大池大小
min: 2, // 最小池大小
idleTimeoutMillis: 30000, // 30秒后关闭空闲连接
connectionTimeoutMillis: 5000, // 等待5秒连接
// 连接设置
application_name: 'myapp',
statement_timeout: 30000,
});
// 简单查询
const result = await pool.query('SELECT * FROM users');
// 使用连接
const client = await pool.connect();
try {
await client.query('BEGIN');
await client.query('UPDATE users SET name = $1 WHERE id = $2', ['John', 1]);
await client.query('COMMIT');
} catch (error) {
await client.query('ROLLBACK');
throw error;
} finally {
client.release();
}
// 事件监听器
pool.on('connect', (client) => {
console.log('New client connected');
});
pool.on('error', (error) => {
console.error('Pool error:', error);
});
// 优雅关闭
await pool.end();
MySQL连接池
const mysql = require('mysql2/promise');
const pool = mysql.createPool({
host: 'localhost',
user: 'user',
password: 'pass',
database: 'mydb',
// 池设置
waitForConnections: true,
connectionLimit: 20,
queueLimit: 0,
// 连接设置
connectTimeout: 10000,
acquireTimeout: 10000,
timeout: 60000,
});
// 简单查询
const [rows] = await pool.query('SELECT * FROM users');
// 使用连接
const conn = await pool.getConnection();
try {
await conn.beginTransaction();
await conn.execute('UPDATE users SET name = ? WHERE id = ?', ['John', 1]);
await conn.commit();
} catch (error) {
await conn.rollback();
throw error;
} finally {
conn.release();
}
// 事件监听器
pool.on('acquire', (connection) => {
console.log('Connection %d acquired', connection.threadId);
});
pool.on('release', (connection) => {
console.log('Connection %d released', connection.threadId);
});
// 优雅关闭
await pool.end();
Python SQLAlchemy池
from sqlalchemy import create_engine
from sqlalchemy.orm import sessionmaker
from sqlalchemy.pool import QueuePool
# 创建带池的引擎
engine = create_engine(
'postgresql://user:pass@localhost/mydb',
poolclass=QueuePool,
pool_size=20, # 维护的连接数
max_overflow=10, # 超出pool_size的额外连接数
pool_timeout=30, # 等待连接的秒数
pool_recycle=3600, # 1小时后回收连接
pool_pre_ping=True, # 使用前测试连接
)
# 创建会话工厂
Session = sessionmaker(bind=engine)
# 使用
def get_users():
session = Session()
try:
users = session.query(User).all()
return users
finally:
session.close()
# 上下文管理器
from contextlib import contextmanager
@contextmanager
def session_scope():
session = Session()
try:
yield session
session.commit()
except:
session.rollback()
raise
finally:
session.close()
# 使用
with session_scope() as session:
user = session.query(User).first()
user.name = 'John'
池尺寸策略
基本尺寸公式
池尺寸的一个常见起点:
pool_size = (核心数 * 2) + 有效磁盘数
对于基于现代SSD的数据库:
pool_size = 核心数 * 2
连接池与数据库限制
// 数据库服务器配置
max_connections = 100 // PostgreSQL默认值
// 应用程序实例(4个实例)
connections_per_instance = 20 // 4 * 20 = 80总数
// 为超级用户连接、复制等留出空间
动态池尺寸
class DynamicPool {
constructor(options) {
this.min = options.min || 2;
this.max = options.max || 20;
this.connections = [];
this.activeConnections = 0;
}
async getConnection() {
// 尝试获取空闲连接
const idle = this.connections.find(c => c.state === 'idle');
if (idle) {
return idle.acquire();
}
// 如果低于最大值则创建新连接
if (this.connections.length < this.max) {
const conn = await this.createConnection();
this.connections.push(conn);
return conn.acquire();
}
// 等待可用连接
return this.waitForAvailableConnection();
}
releaseConnection(conn) {
conn.release();
// 销毁多余的空闲连接
this.pruneIdleConnections();
}
pruneIdleConnections() {
const idle = this.connections.filter(c => c.state === 'idle');
const excess = idle.length - this.min;
if (excess > 0) {
// 销毁最旧的空闲连接
idle.slice(0, excess).forEach(c => c.destroy());
}
}
}
池尺寸计算器
function calculatePoolSize(options) {
const {
cpuCores = 4,
dbMaxConnections = 100,
appInstances = 1,
targetUtilization = 0.75, // 75%利用率
} = options;
// 计算每个实例的连接数
const totalAvailable = dbMaxConnections * targetUtilization;
const connectionsPerInstance = Math.floor(totalAvailable / appInstances);
// 使用公式:核心数 * 2,但限制在可用范围内
const formulaSize = cpuCores * 2;
const poolSize = Math.min(formulaSize, connectionsPerInstance);
return {
poolSize,
formulaSize,
connectionsPerInstance,
totalAvailable,
maxConnections: dbMaxConnections,
};
}
// 示例
console.log(calculatePoolSize({
cpuCores: 8,
dbMaxConnections: 100,
appInstances: 4,
}));
// 输出:{ poolSize: 16, formulaSize: 16, connectionsPerInstance: 18, ... }
连接验证
借用时测试
在将连接提供给应用程序之前验证连接。
class ValidatingPool {
constructor(options) {
this.testOnBorrow = options.testOnBorrow !== false; // 默认为true
this.validationQuery = options.validationQuery || 'SELECT 1';
}
async getConnection() {
const conn = await this.acquireConnection();
if (this.testOnBorrow) {
try {
await conn.query(this.validationQuery);
} catch (error) {
// 连接有问题,销毁并获取另一个
await conn.destroy();
return this.getConnection();
}
}
return conn;
}
}
返回时测试
在将连接返回到池之前验证连接。
class ValidatingPool {
constructor(options) {
this.testOnReturn = options.testOnReturn || false;
}
async releaseConnection(conn) {
if (this.testOnReturn) {
try {
await conn.query('SELECT 1');
} catch (error) {
// 连接有问题,销毁它
await conn.destroy();
return;
}
}
conn.release();
}
}
空闲时测试
定期验证空闲连接。
class IdleValidatingPool {
constructor(options) {
this.idleValidationInterval = options.idleValidationInterval || 60000; // 1分钟
this.startIdleValidation();
}
startIdleValidation() {
setInterval(() => {
this.validateIdleConnections();
}, this.idleValidationInterval);
}
async validateIdleConnections() {
const idleConnections = this.connections.filter(c =>
c.state === 'idle' &&
Date.now() - c.lastValidated > this.idleValidationInterval
);
for (const conn of idleConnections) {
try {
await conn.query('SELECT 1');
conn.lastValidated = Date.now();
} catch (error) {
await conn.destroy();
}
}
}
}
超时配置
连接超时
从池中等待连接的时间。
const pool = new Pool({
host: 'localhost',
connectionTimeoutMillis: 5000, // 5秒
});
try {
const conn = await pool.connect();
// ...
} catch (error) {
if (error.code === 'CONNECTION_TIMEOUT') {
console.error('Timeout waiting for connection');
}
}
空闲超时
空闲连接关闭的时间。
const pool = new Pool({
host: 'localhost',
idleTimeoutMillis: 30000, // 30秒
// 超过30秒空闲的连接将被关闭
});
最大生命周期
连接存在的最大时间,之后将被关闭。
const pool = new Pool({
host: 'localhost',
maxLifetimeMillis: 3600000, // 1小时
// 超过1小时的连接将被关闭
});
查询超时
单个查询的时间限制。
const pool = new Pool({
host: 'localhost',
query_timeout: 30000, // 30秒
});
try {
await pool.query('SELECT * FROM large_table');
} catch (error) {
if (error.code === 'QUERY_TIMEOUT') {
console.error('Query timed out');
}
}
完整的超时配置
const pool = new Pool({
host: 'localhost',
database: 'mydb',
user: 'user',
password: 'pass',
// 池超时
connectionTimeoutMillis: 5000, // 等待连接
idleTimeoutMillis: 30000, // 关闭空闲连接
maxLifetimeMillis: 3600000, // 关闭旧连接
// 查询超时
query_timeout: 30000,
// 语句超时(PostgreSQL)
statement_timeout: '30s',
});
连接泄漏检测与预防
什么是连接泄漏?
连接泄漏发生在从池中获取连接但从未返回时,导致池最终耗尽可用连接。
检测
class LeakDetectingPool {
constructor(options) {
this.leakDetectionThreshold = options.leakDetectionThreshold || 30000; // 30秒
this.borrowedConnections = new Map();
}
async getConnection() {
const conn = await this.acquireConnection();
const borrowId = generateId();
this.borrowedConnections.set(borrowId, {
connection: conn,
borrowedAt: Date.now(),
stackTrace: new Error().stack,
});
// 设置超时以检测泄漏
setTimeout(() => {
const borrowed = this.borrowedConnections.get(borrowId);
if (borrowed) {
console.error('Potential connection leak detected!');
console.error('Connection borrowed at:', borrowed.borrowedAt);
console.error('Stack trace:', borrowed.stackTrace);
}
}, this.leakDetectionThreshold);
return {
connection: conn,
release: () => this.releaseConnection(borrowId),
};
}
releaseConnection(borrowId) {
const borrowed = this.borrowedConnections.get(borrowId);
if (!borrowed) {
console.warn('Connection already released or never borrowed');
return;
}
borrowed.connection.release();
this.borrowedConnections.delete(borrowId);
}
}
// 使用
const { connection, release } = await pool.getConnection();
try {
await connection.query('SELECT * FROM users');
} finally {
release(); // 总是释放!
}
预防自动清理
class AutoCleaningPool {
constructor(options) {
this.autoCleanupInterval = options.autoCleanupInterval || 60000;
this.borrowedConnections = new Map();
this.startAutoCleanup();
}
startAutoCleanup() {
setInterval(() => {
this.cleanupStaleConnections();
}, this.autoCleanupInterval);
}
cleanupStaleConnections() {
const now = Date.now();
for (const [borrowId, borrowed] of this.borrowedConnections) {
const age = now - borrowed.borrowedAt;
if (age > this.leakDetectionThreshold) {
console.warn(`Force returning leaked connection (age: ${age}ms)`);
borrowed.connection.release();
this.borrowedConnections.delete(borrowId);
}
}
}
}
使用try-finally模式
// 总是使用try-finally确保释放
async function getUsers() {
const { connection, release } = await pool.getConnection();
try {
return await connection.query('SELECT * FROM users');
} finally {
release();
}
}
// 或者使用异步资源跟踪
async function withConnection(fn) {
const { connection, release } = await pool.getConnection();
try {
return await fn(connection);
} finally {
release();
}
}
// 使用
const users = await withConnection(async (conn) => {
return await conn.query('SELECT * FROM users');
});
池监控和指标
基本指标收集
class MonitoredPool {
constructor(options) {
this.metrics = {
totalRequests: 0,
totalWaitTime: 0,
totalQueryTime: 0,
errors: 0,
timeouts: 0,
};
}
async getConnection() {
const startTime = Date.now();
this.metrics.totalRequests++;
try {
const conn = await this.acquireConnection();
const waitTime = Date.now() - startTime;
this.metrics.totalWaitTime += waitTime;
return {
connection: conn,
query: async (sql, params) => {
const queryStart = Date.now();
try {
const result = await conn.query(sql, params);
const queryTime = Date.now() - queryStart;
this.metrics.totalQueryTime += queryTime;
return result;
} catch (error) {
this.metrics.errors++;
throw error;
}
},
release: () => conn.release(),
};
} catch (error) {
if (error.code === 'CONNECTION_TIMEOUT') {
this.metrics.timeouts++;
}
this.metrics.errors++;
throw error;
}
}
getMetrics() {
const avgWaitTime = this.metrics.totalRequests > 0
? this.metrics.totalWaitTime / this.metrics.totalRequests
: 0;
const avgQueryTime = this.metrics.totalRequests > 0
? this.metrics.totalQueryTime / this.metrics.totalRequests
: 0;
return {
...this.metrics,
avgWaitTime,
avgQueryTime,
errorRate: this.metrics.totalRequests > 0
? this.metrics.errors / this.metrics.totalRequests
: 0,
};
}
}
实时池状态
function getPoolStatus(pool) {
return {
totalCount: pool.totalCount,
idleCount: pool.idleCount,
waitingCount: pool.waitingCount,
maxCount: pool.options.max,
minCount: pool.options.min,
utilization: pool.totalCount / pool.options.max,
};
// 定期监控
setInterval(() => {
const status = getPoolStatus(pool);
console.log('Pool Status:', status);
// 如果池几乎耗尽则发出警告
if (status.utilization > 0.9) {
console.warn('Pool utilization high:', status.utilization);
}
}, 5000);
Prometheus指标
const promClient = require('prom-client');
// 创建指标
const poolSizeGauge = new promClient.Gauge({
name: 'db_pool_size',
help: 'Current pool size',
labelNames: ['database'],
});
const poolIdleGauge = new promClient.Gauge({
name: 'db_pool_idle',
help: 'Number of idle connections',
labelNames: ['database'],
});
const poolWaitingGauge = new promClient.Gauge({
name: 'db_pool_waiting',
help: 'Number of clients waiting for connection',
labelNames: ['database'],
});
const poolQueryDuration = new promClient.Histogram({
name: 'db_query_duration_seconds',
help: 'Query execution time',
labelNames: ['database', 'operation'],
buckets: [0.001, 0.01, 0.1, 1, 10],
});
// 定期更新指标
setInterval(() => {
const status = getPoolStatus(pool);
poolSizeGauge.set({ database: 'mydb' }, status.totalCount);
poolIdleGauge.set({ database: 'mydb' }, status.idleCount);
poolWaitingGauge.set({ database: 'mydb' }, status.waitingCount);
}, 5000);
// 跟踪查询持续时间
async function queryWithMetrics(sql) {
const end = poolQueryDuration.startTimer({ database: 'mydb', operation: 'select' });
try {
return await pool.query(sql);
} finally {
end();
}
}
PostgreSQL连接池器
PgBouncer
PgBouncer是一个轻量级的PostgreSQL连接池器。
安装:
# Ubuntu/Debian
sudo apt-get install pgbouncer
# macOS
brew install pgbouncer
# 从源代码
wget https://pgbouncer.github.io/downloads/files/1.18.0/pgbouncer-1.18.0.tar.gz
tar xzf pgbouncer-1.18.0.tar.gz
cd pgbouncer-1.18.0
./configure && make && sudo make install
配置(pgbouncer.ini):
[databases]
mydb = host=localhost port=5432 dbname=mydb
[pgbouncer]
pool_mode = transaction
listen_addr = 127.0.0.1
listen_port = 6432
auth_type = md5
auth_file = /etc/pgbouncer/userlist.txt
max_client_conn = 1000
default_pool_size = 25
reserve_pool_size = 5
reserve_pool_timeout = 3
server_lifetime = 3600
server_idle_timeout = 600
用户列表(userlist.txt):
"username" "md5hash"
生成MD5哈希:
echo -n "usernamepassword" | md5sum
池模式:
- 会话池化:每个客户端连接一个服务器连接
- 事务池化:每个事务后返回服务器连接(推荐)
- 语句池化:每个语句后返回服务器连接
启动PgBouncer:
pgbouncer -d /etc/pgbouncer/pgbouncer.ini
Pgpool-II
Pgpool-II是一个功能更丰富的连接池器,具有额外的功能。
安装:
# Ubuntu/Debian
sudo apt-get install pgpool2
# macOS
brew install pgpool2
配置(pgpool.conf):
# 连接设置
listen_addresses = '*'
port = 9999
# 池化
connection_cache = on
num_init_children = 32
max_pool = 4
child_life_time = 300
connection_life_time = 0
# 负载均衡
load_balance_mode = on
backend_hostname0 = 'db1.example.com'
backend_port0 = 5432
backend_weight0 = 1
backend_hostname1 = 'db2.example.com'
backend_port1 = 5432
backend_weight1 = 1
无服务器考虑
冷启动影响
无服务器函数启动时冷启动,每次都需要建立新连接。
// 不好:每次调用新连接
exports.handler = async (event) => {
const pool = new Pool({ /* ... */ });
const result = await pool.query('SELECT * FROM users');
return result;
};
连接重用
// 更好:跨调用重用连接
let pool;
async function getPool() {
if (!pool) {
pool = new Pool({
host: process.env.DB_HOST,
database: process.env.DB_NAME,
user: process.env.DB_USER,
password: process.env.DB_PASSWORD,
max: 5, // 无服务器更低的最大值
idleTimeoutMillis: 10000, // 更短的空闲超时
});
}
return pool;
}
exports.handler = async (event) => {
const pool = await getPool();
const result = await pool.query('SELECT * FROM users');
return result;
};
AWS Lambda RDS Proxy
为Lambda函数使用AWS RDS Proxy:
// 通过RDS Proxy连接
const pool = new Pool({
host: process.env.RDS_PROXY_ENDPOINT, // 代理端点
database: process.env.DB_NAME,
user: process.env.DB_USER,
password: process.env.DB_PASSWORD,
max: 5,
});
连接限制
无服务器平台有连接限制:
// 根据并发计算池大小
const maxConcurrentExecutions = 1000; // Lambda限制
const avgDuration = 100; // 每个请求100ms
const connectionsPerSecond = maxConcurrentExecutions / (avgDuration / 1000);
const poolSize = Math.min(connectionsPerSecond, 20); // 限制在20
多租户应用中的每个租户一个池
租户特定池
class TenantPoolManager {
constructor() {
this.pools = new Map(); // tenantId -> pool
}
async getPool(tenantId) {
if (!this.pools.has(tenantId)) {
const config = await this.getTenantConfig(tenantId);
const pool = new Pool({
host: config.host,
database: config.database,
user: config.user,
password: config.password,
max: 10, // 每个租户更小的池
});
this.pools.set(tenantId, pool);
}
return this.pools.get(tenantId);
}
async closePool(tenantId) {
const pool = this.pools.get(tenantId);
if (pool) {
await pool.end();
this.pools.delete(tenantId);
}
}
async closeAll() {
for (const [tenantId, pool] of this.pools) {
await pool.end();
}
this.pools.clear();
}
}
// 使用
const poolManager = new TenantPoolManager();
async function tenantQuery(tenantId, query) {
const pool = await poolManager.getPool(tenantId);
return await pool.query(query);
}
基于模式的多租户
// 单池,多个模式
const pool = new Pool({
host: 'localhost',
database: 'mydb',
user: 'user',
password: 'pass',
});
async function tenantQuery(tenantId, query) {
const client = await pool.connect();
try {
// 设置搜索路径到租户模式
await client.query(`SET search_path TO tenant_${tenantId}`);
return await client.query(query);
} finally {
client.release();
}
}
池耗尽故障排除
症状
- 应用程序挂起等待连接
- "连接超时"错误
- 响应时间慢
诊断
// 检查池状态
function diagnosePool(pool) {
const status = {
totalCount: pool.totalCount,
idleCount: pool.idleCount,
waitingCount: pool.waitingCount,
maxCount: pool.options.max,
utilization: pool.totalCount / pool.options.max,
};
console.log('Pool Status:', status);
if (status.waitingCount > 10) {
console.warn('Many clients waiting for connections');
}
if (status.utilization > 0.9) {
console.warn('Pool nearly exhausted');
}
return status;
}
常见原因
-
连接泄漏
// 不好:连接未释放 const conn = await pool.connect(); await conn.query('SELECT * FROM users'); // 忘记:conn.release() // 好:总是释放 const conn = await pool.connect(); try { await conn.query('SELECT * FROM users'); } finally { conn.release(); } -
长时间运行的查询
// 不好:长时间查询占用连接 const conn = await pool.connect(); await conn.query('SELECT * FROM huge_table'); // 需要几分钟 // 好:使用游标或分页 const conn = await pool.connect(); const cursor = conn.query(new Cursor('SELECT * FROM huge_table')); while (true) { const rows = await cursor.read(1000); if (rows.length === 0) break; // 处理行 } -
池太小
// 增加池大小 const pool = new Pool({ max: 50, // 从20增加 }); -
数据库连接限制已达到
-- 检查当前连接 SELECT count(*) FROM pg_stat_activity; -- 检查最大连接数 SHOW max_connections; -- 如有需要增加 ALTER SYSTEM SET max_connections = 200;
最佳实践
-
池尺寸
- 从
cpu_cores * 2开始 - 根据指标监控和调整
- 考虑数据库连接限制
- 考虑多个应用程序实例
- 从
-
超时配置
- 设置连接超时(5-10秒)
- 设置查询超时(30-60秒)
- 设置空闲超时(30-60秒)
- 设置最大生命周期(1-8小时)
-
连接验证
- 在开发中启用借用时测试
- 在生产中使用空闲时测试
- 设置适当的验证间隔
-
错误处理
- 总是在finally块中释放连接
- 优雅地处理超时错误
- 记录连接错误以供调试
-
监控
- 跟踪池利用率
- 监控等待时间
- 池耗尽时发出警报
- 跟踪连接错误
常见错误
-
不释放连接
// 不好 const conn = await pool.connect(); await conn.query('SELECT * FROM users'); // 连接泄漏! // 好 const conn = await pool.connect(); try { await conn.query('SELECT * FROM users'); } finally { conn.release(); } -
池太大
// 不好:太多连接 const pool = new Pool({ max: 1000 }); // 过度 // 好:适当大小 const pool = new Pool({ max: 20 }); -
无连接验证
// 不好:无验证 const pool = new Pool({}); // 好:启用验证 const pool = new Pool({ idleTimeoutMillis: 30000, connectionTimeoutMillis: 5000, }); -
长时间事务
// 不好:长时间事务占用连接 const conn = await pool.connect(); await conn.query('BEGIN'); // ...很多处理... await conn.query('COMMIT'); // 好:保持事务简短 const conn = await pool.connect(); try { await conn.query('BEGIN'); await conn.query('UPDATE users SET name = $1', ['John']); await conn.query('COMMIT'); } catch (error) { await conn.query('ROLLBACK'); throw error; } finally { conn.release(); }