模型部署 ModelDeployment

这项技能涉及使用Flask、FastAPI、Docker等技术将训练好的机器学习模型部署到生产环境,包括REST API、批量处理、实时流处理、无服务器、边缘部署和模型服务等。

机器学习 0 次安装 0 次浏览 更新于 3/4/2026

name: 模型部署 description: 使用Flask、FastAPI、Docker、云平台(AWS、GCP、Azure)和模型服务框架将机器学习模型部署到生产环境

模型部署

概览

模型部署是将训练好的机器学习模型通过API、网络服务或批量处理系统提供给生产环境使用的过程。

使用场景

  • 生产化训练模型以进行真实世界的推理和预测
  • 构建REST API或网络服务以提供模型服务
  • 扩展预测以服务多个用户或应用程序
  • 将模型部署到云平台、边缘设备或容器中
  • 实施CI/CD管道以更新ML模型
  • 创建批量处理系统以进行大规模预测

部署方法

  • REST API: Flask、FastAPI用于同步推理
  • 批量处理: 计划任务用于大规模预测
  • 实时流处理: Kafka、Spark Streaming用于连续数据
  • 无服务器: AWS Lambda、Google Cloud Functions
  • 边缘部署: TensorFlow Lite、ONNX用于边缘设备
  • 模型服务: TensorFlow Serving、Seldon Core、BentoML

考虑因素

  • 模型格式: Pickle、SavedModel、ONNX、PMML
  • 可扩展性: 负载均衡、自动扩展
  • 延迟: 响应时间要求
  • 监控: 模型漂移、性能指标
  • 版本控制: 生产环境中的多个模型版本

Python实现

import numpy as np
import pandas as pd
import pickle
import json
from sklearn.ensemble import RandomForestClassifier
from sklearn.preprocessing import StandardScaler
from sklearn.datasets import make_classification
import joblib

# FastAPI用于REST API
from fastapi import FastAPI, HTTPException, BackgroundTasks
from pydantic import BaseModel, Field
import uvicorn

# 用于模型服务
import mlflow.pyfunc
import mlflow.sklearn

# Docker和部署
import logging
import time
from typing import List, Dict

# 配置日志
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)

print("=== 1. 训练并保存模型 ===")

# 创建数据集
X, y = make_classification(n_samples=1000, n_features=20, random_state=42)
scaler = StandardScaler()
X_scaled = scaler.fit_transform(X)

# 训练模型
model = RandomForestClassifier(n_estimators=100, max_depth=10, random_state=42)
model.fit(X_scaled, y)

# 保存模型和预处理
model_path = '/tmp/model.pkl'
scaler_path = '/tmp/scaler.pkl'

joblib.dump(model, model_path)
joblib.dump(scaler, scaler_path)

print(f"模型保存到 {model_path}")
print(f"缩放器保存到 {scaler_path}")

# 2. 模型服务类
print("
=== 2. 模型服务类 ===")

class ModelPredictor:
    def __init__(self, model_path, scaler_path):
        self.model = joblib.load(model_path)
        self.scaler = joblib.load(scaler_path)
        self.load_time = time.time()
        self.predictions_count = 0
        logger.info("模型加载成功")

    def predict(self, features: List[List[float]]) -> Dict:
        try:
            X = np.array(features)
            X_scaled = self.scaler.transform(X)
            predictions = self.model.predict(X_scaled)
            probabilities = self.model.predict_proba(X_scaled)

            self.predictions_count += len(X)

            return {
                'predictions': predictions.tolist(),
                'probabilities': probabilities.tolist(),
                'count': len(X),
                'timestamp': time.time()
            }
        except Exception as e:
            logger.error(f"预测错误: {str(e)}")
            raise

    def health_check(self) -> Dict:
        return {
            'status': 'healthy',
            'uptime': time.time() - self.load_time,
            'predictions': self.predictions_count
        }

# 初始化预测器
predictor = ModelPredictor(model_path, scaler_path)

# 3. FastAPI应用
print("
=== 3. FastAPI应用 ===")

app = FastAPI(
    title="ML模型API",
    description="生产ML模型服务API",
    version="1.0.0"
)

class PredictionRequest(BaseModel):
    features: List[List[float]] = Field(..., example=[[1.0, 2.0, 3.0]])

class PredictionResponse(BaseModel):
    predictions: List[int]
    probabilities: List[List[float]]
    count: int
    timestamp: float

class HealthResponse(BaseModel):
    status: str
    uptime: float
    predictions: int

@app.get("/health", response_model=HealthResponse)
async def health_check():
    """健康检查端点"""
    return predictor.health_check()

@app.post("/predict", response_model=PredictionResponse)
async def predict(request: PredictionRequest):
    """进行预测"""
    try:
        result = predictor.predict(request.features)
        return result
    except Exception as e:
        raise HTTPException(status_code=400, detail=str(e))

@app.post("/predict-batch")
async def predict_batch(requests: List[PredictionRequest], background_tasks: BackgroundTasks):
    """后台处理批量预测"""
    all_features = []
    for req in requests:
        all_features.extend(req.features)

    result = predictor.predict(all_features)
    background_tasks.add_task(logger.info, f"批量预测处理: {result['count']} 样本")
    return result

@app.get("/stats")
async def get_stats():
    """获取模型统计信息"""
    return {
        'model_type': type(predictor.model).__name__,
        'n_estimators': predictor.model.n_estimators,
        'max_depth': predictor.model.max_depth,
        'feature_importance': predictor.model.feature_importances_.tolist(),
        'total_predictions': predictor.predictions_count
    }

# 4. Dockerfile模板
print("
=== 4. Dockerfile模板 ===")

dockerfile_content = '''FROM python:3.9-slim

WORKDIR /app

COPY requirements.txt .
RUN pip install --no-cache-dir -r requirements.txt

COPY model.pkl .
COPY scaler.pkl .
COPY app.py .

EXPOSE 8000

CMD ["uvicorn", "app:app", "--host", "0.0.0.0", "--port", "8000"]
'''

print("Dockerfile内容:")
print(dockerfile_content)

# 5. 需求文件
print("
=== 5. requirements.txt ===")

requirements = """fastapi==0.104.1
uvicorn[standard]==0.24.0
numpy==1.24.0
pandas==2.1.0
scikit-learn==1.3.2
joblib==1.3.2
pydantic==2.5.0
mlflow==2.8.1
"""

print("需求:")
print(requirements)

# 6. 部署Docker Compose
print("
=== 6. Docker Compose模板 ===")

docker_compose = '''version: '3.8'

services:
  ml-api:
    build: .
    ports:
      - "8000:8000"
    environment:
      - LOG_LEVEL=info
      - WORKERS=4
    restart: unless-stopped
    healthcheck:
      test: ["CMD", "curl", "-f", "http://localhost:8000/health"]
      interval: 10s
      timeout: 5s
      retries: 3

  ml-monitor:
    image: prom/prometheus:latest
    ports:
      - "9090:9090"
    volumes:
      - ./prometheus.yml:/etc/prometheus/prometheus.yml
    command:
      - "--config.file=/etc/prometheus/prometheus.yml"

  ml-dashboard:
    image: grafana/grafana:latest
    ports:
      - "3000:3000"
    environment:
      - GF_SECURITY_ADMIN_PASSWORD=admin
    volumes:
      - ./grafana/dashboards:/etc/grafana/provisioning/dashboards
'''

print("Docker Compose内容:")
print(docker_compose)

# 7. 测试API
print("
=== 7. 测试API ===")

def test_predictor():
    # 测试单个预测
    test_features = [[1.0, 2.0, 3.0, 4.0, 5.0, 6.0, 7.0, 8.0, 9.0, 10.0,
                     1.1, 2.1, 3.1, 4.1, 5.1, 6.1, 7.1, 8.1, 9.1, 10.1]]

    result = predictor.predict(test_features)
    print(f"预测结果: {result}")

    # 健康检查
    health = predictor.health_check()
    print(f"健康状态: {health}")

    # 批量预测
    batch_features = [
        [1.0] * 20,
        [2.0] * 20,
        [3.0] * 20,
    ]
    batch_result = predictor.predict(batch_features)
    print(f"批量预测: {batch_result['count']} 样本处理")

test_predictor()

# 8. 模型版本控制和注册表
print("
=== 8. MLflow模型注册表 ===")

# 登录模型到MLflow
with mlflow.start_run():
    mlflow.sklearn.log_model(model, "model")
    mlflow.log_param("max_depth", 10)
    mlflow.log_param("n_estimators", 100)
    mlflow.log_metric("accuracy", 0.95)

    model_uri = "runs:/" + mlflow.active_run().info.run_id + "/model"
    print(f"模型登录到MLflow: {model_uri}")

# 9. 部署监控代码
print("
=== 9. 监控设置 ===")

class ModelMonitor:
    def __init__(self):
        self.predictions = []
        self.latencies = []

    def log_prediction(self, features, prediction, latency):
        self.predictions.append({
            'timestamp': time.time(),
            'features_mean': np.mean(features),
            'prediction': prediction,
            'latency_ms': latency * 1000
        })

    def check_model_drift(self):
        if len(self.predictions) < 100:
            return {'drift_detected': False}

        recent_predictions = [p['prediction'] for p in self.predictions[-100:]]
        historical_mean = np.mean([p['prediction'] for p in self.predictions[:-100]])
        recent_mean = np.mean(recent_predictions)

        drift = abs(recent_mean - historical_mean) > 0.1

        return {
            'drift_detected': drift,
            'historical_mean': float(historical_mean),
            'recent_mean': float(recent_mean),
            'threshold': 0.1
        }

    def get_stats(self):
        if not self.latencies:
            return {}

        return {
            'avg_latency_ms': np.mean(self.latencies) * 1000,
            'p95_latency_ms': np.percentile(self.latencies, 95) * 1000,
            'p99_latency_ms': np.percentile(self.latencies, 99) * 1000,
            'total_predictions': len(self.predictions)
        }

monitor = ModelMonitor()

print("
部署设置完成!")
print("要运行FastAPI服务器: uvicorn app:app --reload")

部署清单

  • 模型格式和序列化
  • 输入/输出验证
  • 错误处理和日志记录
  • 认证和安全
  • 速率限制和节流
  • 健康检查端点
  • 监控和警报
  • 版本管理
  • 回滚程序

云部署选项

  • AWS: SageMaker、Lambda、EC2
  • GCP: Vertex AI、Cloud Run、App Engine
  • Azure: Machine Learning、App Service
  • Kubernetes: 自行管理的本地

性能优化

  • 模型量化以减小大小
  • 缓存预测
  • 批量处理
  • GPU加速
  • 请求池化

交付物

  • 部署的模型端点
  • API文档
  • Docker配置
  • 监控仪表板
  • 部署指南
  • 性能基准
  • 扩展建议