GracefulShutdown graceful-shutdown

优雅关闭程序的实现,用于确保服务器在终止前完成所有请求、关闭连接和释放资源,适用于 Kubernetes/Docker 部署、滚动更新、服务器重启等场景。

DevOps 0 次安装 0 次浏览 更新于 3/4/2026

实施适当的关闭程序,确保在进程终止前完成所有请求、关闭连接和释放资源。

何时使用

  • Kubernetes/Docker 部署
  • 滚动更新和部署
  • 服务器重启
  • 负载均衡器排水周期
  • 零停机时间部署
  • 进程管理器(PM2,systemd)
  • 长运行后台作业
  • 数据库连接清理

关闭阶段

1. 接收 SIGTERM 信号
2. 停止接受新请求
3. 排空活动连接
4. 完成飞行中的请求
5. 关闭数据库连接
6. 刷新日志和指标
7. 退出进程

实现示例

1. Express.js 优雅关闭

import express from 'express';
import http from 'http';

class GracefulShutdownServer {
  private app: express.Application;
  private server: http.Server;
  private isShuttingDown = false;
  private activeConnections = new Set<any>();
  private shutdownTimeout = 30000; // 30 秒

  constructor() {
    this.app = express();
    this.server = http.createServer(this.app);
    this.setupMiddleware();
    this.setupRoutes();
    this.setupShutdownHandlers();
  }

  private setupMiddleware(): void {
    // 跟踪活动连接
    this.app.use((req, res, next) => {
      if (this.isShuttingDown) {
        res.set('Connection', 'close');
        return res.status(503).json({
          error: '服务器正在关闭'
        });
      }

      this.activeConnections.add(res);

      res.on('finish', () => {
        this.activeConnections.delete(res);
      });

      res.on('close', () => {
        this.activeConnections.delete(res);
      });

      next();
    });
  }

  private setupRoutes(): void {
    this.app.get('/health', (req, res) => {
      if (this.isShuttingDown) {
        return res.status(503).json({ status: 'shutting_down' });
      }
      res.json({ status: 'ok' });
    });

    this.app.get('/api/data', async (req, res) => {
      // 模拟长运行请求
      await new Promise(resolve => setTimeout(resolve, 5000));
      res.json({ data: 'response' });
    });
  }

  private setupShutdownHandlers(): void {
    const signals: NodeJS.Signals[] = ['SIGTERM', 'SIGINT'];

    signals.forEach(signal => {
      process.on(signal, () => {
        console.log(`Received ${signal}, starting graceful shutdown...`);
        this.gracefulShutdown(signal);
      });
    });

    // 处理未捕获的异常
    process.on('uncaughtException', (error) => {
      console.error('Uncaught exception:', error);
      this.gracefulShutdown('UNCAUGHT_EXCEPTION');
    });

    process.on('unhandledRejection', (reason, promise) => {
      console.error('Unhandled rejection:', reason);
      this.gracefulShutdown('UNHANDLED_REJECTION');
    });
  }

  private async gracefulShutdown(signal: string): Promise<void> {
    if (this.isShuttingDown) {
      console.log('Shutdown already in progress');
      return;
    }

    this.isShuttingDown = true;
    console.log(`Starting graceful shutdown (${signal})`);

    // 设置关闭超时
    const shutdownTimer = setTimeout(() => {
      console.error('Shutdown timeout reached, forcing exit');
      process.exit(1);
    }, this.shutdownTimeout);

    try {
      // 1. 停止接受新连接
      await this.stopAcceptingConnections();

      // 2. 等待活动请求完成
      await this.waitForActiveConnections();

      // 3. 关闭服务器
      await this.closeServer();

      // 4. 清理资源
      await this.cleanupResources();

      console.log('Graceful shutdown completed');
      clearTimeout(shutdownTimer);
      process.exit(0);
    } catch (error) {
      console.error('Error during shutdown:', error);
      clearTimeout(shutdownTimer);
      process.exit(1);
    }
  }

  private async stopAcceptingConnections(): Promise<void> {
    console.log('Stopping new connections...');
    return new Promise((resolve) => {
      this.server.close(() => {
        console.log('Server stopped accepting new connections');
        resolve();
      });
    });
  }

  private async waitForActiveConnections(): Promise<void> {
    console.log(`Waiting for ${this.activeConnections.size} active connections...`);

    const checkInterval = 100;
    const maxWait = this.shutdownTimeout - 5000;
    let waited = 0;

    while (this.activeConnections.size > 0 && waited < maxWait) {
      await new Promise(resolve => setTimeout(resolve, checkInterval));
      waited += checkInterval;

      if (waited % 1000 === 0) {
        console.log(`Still waiting for ${this.activeConnections.size} connections...`);
      }
    }

    if (this.activeConnections.size > 0) {
      console.warn(`Force closing ${this.activeConnections.size} remaining connections`);
      this.activeConnections.forEach((res: any) => {
        res.destroy();
      });
    }

    console.log('All connections closed');
  }

  private async closeServer(): Promise<void> {
    // 服务器已经在 stopAcceptingConnections 中关闭
    console.log('Server closed');
  }

  private async cleanupResources(): Promise<void> {
    console.log('Cleaning up resources...');

    // 关闭数据库连接
    await this.closeDatabaseConnections();

    // 刷新日志
    await this.flushLogs();

    // 关闭其他资源
    await this.closeOtherResources();

    console.log('Resources cleaned up');
  }

  private async closeDatabaseConnections(): Promise<void> {
    // 关闭数据库连接
    console.log('Closing database connections...');
    // await db.close();
  }

  private async flushLogs(): Promise<void> {
    // 刷新任何挂起的日志
    console.log('Flushing logs...');
  }

  private async closeOtherResources(): Promise<void> {
    // 关闭 Redis,消息队列等
    console.log('Closing other resources...');
  }

  start(port: number): void {
    this.server.listen(port, () => {
      console.log(`Server listening on port ${port}`);
    });
  }
}

// 使用
const server = new GracefulShutdownServer();
server.start(3000);

2. Kubernetes感知关闭

class KubernetesGracefulShutdown {
  private isReady = true;
  private isLive = true;
  private shutdownDelay = 5000; // K8s 传播延迟

  setupProbes(app: express.Application): void {
    // 就绪探针
    app.get('/health/ready', (req, res) => {
      if (this.isReady) {
        res.status(200).json({ status: 'ready' });
      } else {
        res.status(503).json({ status: 'not_ready' });
      }
    });

    // 活跃探针
    app.get('/health/live', (req, res) => {
      if (this.isLive) {
        res.status(200).json({ status: 'alive' });
      } else {
        res.status(503).json({ status: 'not_alive' });
      }
    });
  }

  async shutdown(): Promise<void> {
    console.log('Kubernetes graceful shutdown initiated');

    // 1. 标记为不可用(失败就绪探针)
    this.isReady = false;
    console.log('Marked as not ready');

    // 2. 等待 K8s 从服务端点中移除 pod
    console.log(`Waiting ${this.shutdownDelay}ms for endpoint propagation...`);
    await new Promise(resolve => setTimeout(resolve, this.shutdownDelay));

    // 3. 继续正常优雅关闭
    // ... 其余关闭逻辑
  }
}

3. 工作进程关闭

import Queue from 'bull';

class WorkerShutdown {
  private queue: Queue.Queue;
  private isProcessing = new Map<string, boolean>();

  constructor(queue: Queue.Queue) {
    this.queue = queue;
    this.setupWorker();
    this.setupShutdownHandlers();
  }

  private setupWorker(): void {
    this.queue.process('task', 5, async (job) => {
      const jobId = job.id!.toString();
      this.isProcessing.set(jobId, true);

      try {
        console.log(`Processing job ${jobId}`);
        await this.processJob(job);
        console.log(`Completed job ${jobId}`);
      } finally {
        this.isProcessing.delete(jobId);
      }
    });
  }

  private async processJob(job: Queue.Job): Promise<void> {
    // 工作处理逻辑
    await new Promise(resolve => setTimeout(resolve, 5000));
  }

  private setupShutdownHandlers(): void {
    process.on('SIGTERM', () => {
      console.log('SIGTERM received, shutting down worker...');
      this.shutdownWorker();
    });
  }

  private async shutdownWorker(): Promise<void> {
    console.log('Pausing queue...');
    await this.queue.pause(true, true);

    console.log(`Waiting for ${this.isProcessing.size} jobs to complete...`);

    // 等待当前工作完成
    const checkInterval = 500;
    const maxWait = 30000;
    let waited = 0;

    while (this.isProcessing.size > 0 && waited < maxWait) {
      await new Promise(resolve => setTimeout(resolve, checkInterval));
      waited += checkInterval;

      if (waited % 5000 === 0) {
        console.log(`Still processing ${this.isProcessing.size} jobs...`);
      }
    }

    if (this.isProcessing.size > 0) {
      console.warn(`Forcing shutdown with ${this.isProcessing.size} jobs remaining`);
    }

    console.log('Closing queue...');
    await this.queue.close();

    console.log('Worker shutdown complete');
    process.exit(0);
  }
}

4. 数据库连接池关闭

import { Pool } from 'pg';

class DatabaseShutdown {
  private pool: Pool;
  private activeQueries = new Set<Promise<any>>();

  constructor(pool: Pool) {
    this.pool = pool;
    this.setupQueryTracking();
  }

  private setupQueryTracking(): void {
    const originalQuery = this.pool.query.bind(this.pool);

    this.pool.query = (...args: any[]) => {
      const queryPromise = originalQuery(...args);

      this.activeQueries.add(queryPromise);

      queryPromise.finally(() => {
        this.activeQueries.delete(queryPromise);
      });

      return queryPromise;
    };
  }

  async shutdown(): Promise<void> {
    console.log('Shutting down database connections...');

    // 等待活动查询
    if (this.activeQueries.size > 0) {
      console.log(`Waiting for ${this.activeQueries.size} active queries...`);

      await Promise.race([
        Promise.all(Array.from(this.activeQueries)),
        new Promise(resolve => setTimeout(resolve, 5000))
      ]);
    }

    // 关闭池
    console.log('Ending pool...');
    await this.pool.end();

    console.log('Database connections closed');
  }
}

5. PM2 优雅关闭

// ecosystem.config.js
module.exports = {
  apps: [{
    name: 'api-server',
    script: './dist/server.js',
    instances: 4,
    exec_mode: 'cluster',
    kill_timeout: 30000, // 等待 30s 优雅关闭
    wait_ready: true,
    listen_timeout: 10000,
    shutdown_with_message: true
  }]
};

// server.ts
import express from 'express';

const app = express();
const port = process.env.PORT || 3000;

// ... 设置路由 ...

const server = app.listen(port, () => {
  console.log(`Server started on port ${port}`);

  // 向 PM2 发送信号,表明应用已就绪
  if (process.send) {
    process.send('ready');
  }
});

// 处理 PM2 的关闭消息
process.on('message', (msg) => {
  if (msg === 'shutdown') {
    console.log('Received shutdown message from PM2');
    gracefulShutdown();
  }
});

async function gracefulShutdown() {
  console.log('Starting graceful shutdown...');

  // 停止接受新连接
  server.close(() => {
    console.log('Server closed');
    process.exit(0);
  });

  // 超时后强制关闭
  setTimeout(() => {
    console.error('Forced shutdown after timeout');
    process.exit(1);
  }, 28000); // 小于 PM2 的 kill_timeout
}

6. Python/Flask 优雅关闭

import signal
import sys
import time
from flask import Flask, request, g
from threading import Lock

app = Flask(__name__)

class GracefulShutdown:
    def __init__(self):
        self.is_shutting_down = False
        self.active_requests = 0
        self.lock = Lock()

    def before_request(self):
        """跟踪活动请求。"""
        if self.is_shutting_down:
            return {'error': 'Server is shutting down'}, 503

        with self.lock:
            self.active_requests += 1

    def after_request(self, response):
        """减少活动请求。"""
        with self.lock:
            self.active_requests -= 1
        return response

    def shutdown(self, signum, frame):
        """处理关闭信号。"""
        print(f"Received signal {signum}, starting graceful shutdown...")
        self.is_shutting_down = True

        # 等待活动请求
        max_wait = 30
        waited = 0

        while self.active_requests > 0 and waited < max_wait:
            print(f"Waiting for {self.active_requests} active requests...")
            time.sleep(1)
            waited += 1

        if self.active_requests > 0:
            print(f"Force closing with {self.active_requests} requests remaining")

        print("Graceful shutdown complete")
        sys.exit(0)

# 设置优雅关闭
shutdown_handler = GracefulShutdown()
app.before_request(shutdown_handler.before_request)
app.after_request(shutdown_handler.after_request)

signal.signal(signal.SIGTERM, shutdown_handler.shutdown)
signal.signal(signal.SIGINT, shutdown_handler.shutdown)

@app.route('/health')
def health():
    if shutdown_handler.is_shutting_down:
        return {'status': 'shutting_down'}, 503
    return {'status': 'ok'}

if __name__ == '__main__':
    app.run(host='0.0.0.0', port=5000)

最佳实践

✅ DO

  • 处理 SIGTERM 和 SIGINT 信号
  • 立即停止接受新请求
  • 等待飞行中的请求完成
  • 设置合理的关闭超时
  • 正确关闭数据库连接
  • 刷新日志和指标
  • 在关闭期间使健康检查失败
  • 测试关闭程序
  • 登记关闭进度
  • 在容器中使用优雅关闭

❌ DON’T

  • 忽略关闭信号
  • 在没有清理的情况下强制杀死进程
  • 设置不合理的长超时
  • 跳过资源清理
  • 忘记关闭连接
  • 无限期地阻塞关闭

Kubernetes 配置

# deployment.yaml
apiVersion: apps/v1
kind: Deployment
metadata:
  name: api-server
spec:
  replicas: 3
  template:
    spec:
      containers:
      - name: api
        image: api-server:latest
        lifecycle:
          preStop:
            exec:
              command: ["/bin/sh", "-c", "sleep 5"]
        readinessProbe:
          httpGet:
            path: /health/ready
            port: 3000
          initialDelaySeconds: 5
          periodSeconds: 5
        livenessProbe:
          httpGet:
            path: /health/live
            port: 3000
          initialDelaySeconds: 15
          periodSeconds: 10
      terminationGracePeriodSeconds: 30

资源