实施适当的关闭程序,确保在进程终止前完成所有请求、关闭连接和释放资源。
何时使用
- Kubernetes/Docker 部署
- 滚动更新和部署
- 服务器重启
- 负载均衡器排水周期
- 零停机时间部署
- 进程管理器(PM2,systemd)
- 长运行后台作业
- 数据库连接清理
关闭阶段
1. 接收 SIGTERM 信号
2. 停止接受新请求
3. 排空活动连接
4. 完成飞行中的请求
5. 关闭数据库连接
6. 刷新日志和指标
7. 退出进程
实现示例
1. Express.js 优雅关闭
import express from 'express';
import http from 'http';
class GracefulShutdownServer {
private app: express.Application;
private server: http.Server;
private isShuttingDown = false;
private activeConnections = new Set<any>();
private shutdownTimeout = 30000; // 30 秒
constructor() {
this.app = express();
this.server = http.createServer(this.app);
this.setupMiddleware();
this.setupRoutes();
this.setupShutdownHandlers();
}
private setupMiddleware(): void {
// 跟踪活动连接
this.app.use((req, res, next) => {
if (this.isShuttingDown) {
res.set('Connection', 'close');
return res.status(503).json({
error: '服务器正在关闭'
});
}
this.activeConnections.add(res);
res.on('finish', () => {
this.activeConnections.delete(res);
});
res.on('close', () => {
this.activeConnections.delete(res);
});
next();
});
}
private setupRoutes(): void {
this.app.get('/health', (req, res) => {
if (this.isShuttingDown) {
return res.status(503).json({ status: 'shutting_down' });
}
res.json({ status: 'ok' });
});
this.app.get('/api/data', async (req, res) => {
// 模拟长运行请求
await new Promise(resolve => setTimeout(resolve, 5000));
res.json({ data: 'response' });
});
}
private setupShutdownHandlers(): void {
const signals: NodeJS.Signals[] = ['SIGTERM', 'SIGINT'];
signals.forEach(signal => {
process.on(signal, () => {
console.log(`Received ${signal}, starting graceful shutdown...`);
this.gracefulShutdown(signal);
});
});
// 处理未捕获的异常
process.on('uncaughtException', (error) => {
console.error('Uncaught exception:', error);
this.gracefulShutdown('UNCAUGHT_EXCEPTION');
});
process.on('unhandledRejection', (reason, promise) => {
console.error('Unhandled rejection:', reason);
this.gracefulShutdown('UNHANDLED_REJECTION');
});
}
private async gracefulShutdown(signal: string): Promise<void> {
if (this.isShuttingDown) {
console.log('Shutdown already in progress');
return;
}
this.isShuttingDown = true;
console.log(`Starting graceful shutdown (${signal})`);
// 设置关闭超时
const shutdownTimer = setTimeout(() => {
console.error('Shutdown timeout reached, forcing exit');
process.exit(1);
}, this.shutdownTimeout);
try {
// 1. 停止接受新连接
await this.stopAcceptingConnections();
// 2. 等待活动请求完成
await this.waitForActiveConnections();
// 3. 关闭服务器
await this.closeServer();
// 4. 清理资源
await this.cleanupResources();
console.log('Graceful shutdown completed');
clearTimeout(shutdownTimer);
process.exit(0);
} catch (error) {
console.error('Error during shutdown:', error);
clearTimeout(shutdownTimer);
process.exit(1);
}
}
private async stopAcceptingConnections(): Promise<void> {
console.log('Stopping new connections...');
return new Promise((resolve) => {
this.server.close(() => {
console.log('Server stopped accepting new connections');
resolve();
});
});
}
private async waitForActiveConnections(): Promise<void> {
console.log(`Waiting for ${this.activeConnections.size} active connections...`);
const checkInterval = 100;
const maxWait = this.shutdownTimeout - 5000;
let waited = 0;
while (this.activeConnections.size > 0 && waited < maxWait) {
await new Promise(resolve => setTimeout(resolve, checkInterval));
waited += checkInterval;
if (waited % 1000 === 0) {
console.log(`Still waiting for ${this.activeConnections.size} connections...`);
}
}
if (this.activeConnections.size > 0) {
console.warn(`Force closing ${this.activeConnections.size} remaining connections`);
this.activeConnections.forEach((res: any) => {
res.destroy();
});
}
console.log('All connections closed');
}
private async closeServer(): Promise<void> {
// 服务器已经在 stopAcceptingConnections 中关闭
console.log('Server closed');
}
private async cleanupResources(): Promise<void> {
console.log('Cleaning up resources...');
// 关闭数据库连接
await this.closeDatabaseConnections();
// 刷新日志
await this.flushLogs();
// 关闭其他资源
await this.closeOtherResources();
console.log('Resources cleaned up');
}
private async closeDatabaseConnections(): Promise<void> {
// 关闭数据库连接
console.log('Closing database connections...');
// await db.close();
}
private async flushLogs(): Promise<void> {
// 刷新任何挂起的日志
console.log('Flushing logs...');
}
private async closeOtherResources(): Promise<void> {
// 关闭 Redis,消息队列等
console.log('Closing other resources...');
}
start(port: number): void {
this.server.listen(port, () => {
console.log(`Server listening on port ${port}`);
});
}
}
// 使用
const server = new GracefulShutdownServer();
server.start(3000);
2. Kubernetes感知关闭
class KubernetesGracefulShutdown {
private isReady = true;
private isLive = true;
private shutdownDelay = 5000; // K8s 传播延迟
setupProbes(app: express.Application): void {
// 就绪探针
app.get('/health/ready', (req, res) => {
if (this.isReady) {
res.status(200).json({ status: 'ready' });
} else {
res.status(503).json({ status: 'not_ready' });
}
});
// 活跃探针
app.get('/health/live', (req, res) => {
if (this.isLive) {
res.status(200).json({ status: 'alive' });
} else {
res.status(503).json({ status: 'not_alive' });
}
});
}
async shutdown(): Promise<void> {
console.log('Kubernetes graceful shutdown initiated');
// 1. 标记为不可用(失败就绪探针)
this.isReady = false;
console.log('Marked as not ready');
// 2. 等待 K8s 从服务端点中移除 pod
console.log(`Waiting ${this.shutdownDelay}ms for endpoint propagation...`);
await new Promise(resolve => setTimeout(resolve, this.shutdownDelay));
// 3. 继续正常优雅关闭
// ... 其余关闭逻辑
}
}
3. 工作进程关闭
import Queue from 'bull';
class WorkerShutdown {
private queue: Queue.Queue;
private isProcessing = new Map<string, boolean>();
constructor(queue: Queue.Queue) {
this.queue = queue;
this.setupWorker();
this.setupShutdownHandlers();
}
private setupWorker(): void {
this.queue.process('task', 5, async (job) => {
const jobId = job.id!.toString();
this.isProcessing.set(jobId, true);
try {
console.log(`Processing job ${jobId}`);
await this.processJob(job);
console.log(`Completed job ${jobId}`);
} finally {
this.isProcessing.delete(jobId);
}
});
}
private async processJob(job: Queue.Job): Promise<void> {
// 工作处理逻辑
await new Promise(resolve => setTimeout(resolve, 5000));
}
private setupShutdownHandlers(): void {
process.on('SIGTERM', () => {
console.log('SIGTERM received, shutting down worker...');
this.shutdownWorker();
});
}
private async shutdownWorker(): Promise<void> {
console.log('Pausing queue...');
await this.queue.pause(true, true);
console.log(`Waiting for ${this.isProcessing.size} jobs to complete...`);
// 等待当前工作完成
const checkInterval = 500;
const maxWait = 30000;
let waited = 0;
while (this.isProcessing.size > 0 && waited < maxWait) {
await new Promise(resolve => setTimeout(resolve, checkInterval));
waited += checkInterval;
if (waited % 5000 === 0) {
console.log(`Still processing ${this.isProcessing.size} jobs...`);
}
}
if (this.isProcessing.size > 0) {
console.warn(`Forcing shutdown with ${this.isProcessing.size} jobs remaining`);
}
console.log('Closing queue...');
await this.queue.close();
console.log('Worker shutdown complete');
process.exit(0);
}
}
4. 数据库连接池关闭
import { Pool } from 'pg';
class DatabaseShutdown {
private pool: Pool;
private activeQueries = new Set<Promise<any>>();
constructor(pool: Pool) {
this.pool = pool;
this.setupQueryTracking();
}
private setupQueryTracking(): void {
const originalQuery = this.pool.query.bind(this.pool);
this.pool.query = (...args: any[]) => {
const queryPromise = originalQuery(...args);
this.activeQueries.add(queryPromise);
queryPromise.finally(() => {
this.activeQueries.delete(queryPromise);
});
return queryPromise;
};
}
async shutdown(): Promise<void> {
console.log('Shutting down database connections...');
// 等待活动查询
if (this.activeQueries.size > 0) {
console.log(`Waiting for ${this.activeQueries.size} active queries...`);
await Promise.race([
Promise.all(Array.from(this.activeQueries)),
new Promise(resolve => setTimeout(resolve, 5000))
]);
}
// 关闭池
console.log('Ending pool...');
await this.pool.end();
console.log('Database connections closed');
}
}
5. PM2 优雅关闭
// ecosystem.config.js
module.exports = {
apps: [{
name: 'api-server',
script: './dist/server.js',
instances: 4,
exec_mode: 'cluster',
kill_timeout: 30000, // 等待 30s 优雅关闭
wait_ready: true,
listen_timeout: 10000,
shutdown_with_message: true
}]
};
// server.ts
import express from 'express';
const app = express();
const port = process.env.PORT || 3000;
// ... 设置路由 ...
const server = app.listen(port, () => {
console.log(`Server started on port ${port}`);
// 向 PM2 发送信号,表明应用已就绪
if (process.send) {
process.send('ready');
}
});
// 处理 PM2 的关闭消息
process.on('message', (msg) => {
if (msg === 'shutdown') {
console.log('Received shutdown message from PM2');
gracefulShutdown();
}
});
async function gracefulShutdown() {
console.log('Starting graceful shutdown...');
// 停止接受新连接
server.close(() => {
console.log('Server closed');
process.exit(0);
});
// 超时后强制关闭
setTimeout(() => {
console.error('Forced shutdown after timeout');
process.exit(1);
}, 28000); // 小于 PM2 的 kill_timeout
}
6. Python/Flask 优雅关闭
import signal
import sys
import time
from flask import Flask, request, g
from threading import Lock
app = Flask(__name__)
class GracefulShutdown:
def __init__(self):
self.is_shutting_down = False
self.active_requests = 0
self.lock = Lock()
def before_request(self):
"""跟踪活动请求。"""
if self.is_shutting_down:
return {'error': 'Server is shutting down'}, 503
with self.lock:
self.active_requests += 1
def after_request(self, response):
"""减少活动请求。"""
with self.lock:
self.active_requests -= 1
return response
def shutdown(self, signum, frame):
"""处理关闭信号。"""
print(f"Received signal {signum}, starting graceful shutdown...")
self.is_shutting_down = True
# 等待活动请求
max_wait = 30
waited = 0
while self.active_requests > 0 and waited < max_wait:
print(f"Waiting for {self.active_requests} active requests...")
time.sleep(1)
waited += 1
if self.active_requests > 0:
print(f"Force closing with {self.active_requests} requests remaining")
print("Graceful shutdown complete")
sys.exit(0)
# 设置优雅关闭
shutdown_handler = GracefulShutdown()
app.before_request(shutdown_handler.before_request)
app.after_request(shutdown_handler.after_request)
signal.signal(signal.SIGTERM, shutdown_handler.shutdown)
signal.signal(signal.SIGINT, shutdown_handler.shutdown)
@app.route('/health')
def health():
if shutdown_handler.is_shutting_down:
return {'status': 'shutting_down'}, 503
return {'status': 'ok'}
if __name__ == '__main__':
app.run(host='0.0.0.0', port=5000)
最佳实践
✅ DO
- 处理 SIGTERM 和 SIGINT 信号
- 立即停止接受新请求
- 等待飞行中的请求完成
- 设置合理的关闭超时
- 正确关闭数据库连接
- 刷新日志和指标
- 在关闭期间使健康检查失败
- 测试关闭程序
- 登记关闭进度
- 在容器中使用优雅关闭
❌ DON’T
- 忽略关闭信号
- 在没有清理的情况下强制杀死进程
- 设置不合理的长超时
- 跳过资源清理
- 忘记关闭连接
- 无限期地阻塞关闭
Kubernetes 配置
# deployment.yaml
apiVersion: apps/v1
kind: Deployment
metadata:
name: api-server
spec:
replicas: 3
template:
spec:
containers:
- name: api
image: api-server:latest
lifecycle:
preStop:
exec:
command: ["/bin/sh", "-c", "sleep 5"]
readinessProbe:
httpGet:
path: /health/ready
port: 3000
initialDelaySeconds: 5
periodSeconds: 5
livenessProbe:
httpGet:
path: /health/live
port: 3000
initialDelaySeconds: 15
periodSeconds: 10
terminationGracePeriodSeconds: 30