name: 模型部署 description: 使用Flask、FastAPI、Docker、云平台(AWS、GCP、Azure)和模型服务框架将机器学习模型部署到生产环境
模型部署
概览
模型部署是将训练好的机器学习模型通过API、网络服务或批量处理系统提供给生产环境使用的过程。
使用场景
- 生产化训练模型以进行真实世界的推理和预测
- 构建REST API或网络服务以提供模型服务
- 扩展预测以服务多个用户或应用程序
- 将模型部署到云平台、边缘设备或容器中
- 实施CI/CD管道以更新ML模型
- 创建批量处理系统以进行大规模预测
部署方法
- REST API: Flask、FastAPI用于同步推理
- 批量处理: 计划任务用于大规模预测
- 实时流处理: Kafka、Spark Streaming用于连续数据
- 无服务器: AWS Lambda、Google Cloud Functions
- 边缘部署: TensorFlow Lite、ONNX用于边缘设备
- 模型服务: TensorFlow Serving、Seldon Core、BentoML
考虑因素
- 模型格式: Pickle、SavedModel、ONNX、PMML
- 可扩展性: 负载均衡、自动扩展
- 延迟: 响应时间要求
- 监控: 模型漂移、性能指标
- 版本控制: 生产环境中的多个模型版本
Python实现
import numpy as np
import pandas as pd
import pickle
import json
from sklearn.ensemble import RandomForestClassifier
from sklearn.preprocessing import StandardScaler
from sklearn.datasets import make_classification
import joblib
# FastAPI用于REST API
from fastapi import FastAPI, HTTPException, BackgroundTasks
from pydantic import BaseModel, Field
import uvicorn
# 用于模型服务
import mlflow.pyfunc
import mlflow.sklearn
# Docker和部署
import logging
import time
from typing import List, Dict
# 配置日志
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)
print("=== 1. 训练并保存模型 ===")
# 创建数据集
X, y = make_classification(n_samples=1000, n_features=20, random_state=42)
scaler = StandardScaler()
X_scaled = scaler.fit_transform(X)
# 训练模型
model = RandomForestClassifier(n_estimators=100, max_depth=10, random_state=42)
model.fit(X_scaled, y)
# 保存模型和预处理
model_path = '/tmp/model.pkl'
scaler_path = '/tmp/scaler.pkl'
joblib.dump(model, model_path)
joblib.dump(scaler, scaler_path)
print(f"模型保存到 {model_path}")
print(f"缩放器保存到 {scaler_path}")
# 2. 模型服务类
print("
=== 2. 模型服务类 ===")
class ModelPredictor:
def __init__(self, model_path, scaler_path):
self.model = joblib.load(model_path)
self.scaler = joblib.load(scaler_path)
self.load_time = time.time()
self.predictions_count = 0
logger.info("模型加载成功")
def predict(self, features: List[List[float]]) -> Dict:
try:
X = np.array(features)
X_scaled = self.scaler.transform(X)
predictions = self.model.predict(X_scaled)
probabilities = self.model.predict_proba(X_scaled)
self.predictions_count += len(X)
return {
'predictions': predictions.tolist(),
'probabilities': probabilities.tolist(),
'count': len(X),
'timestamp': time.time()
}
except Exception as e:
logger.error(f"预测错误: {str(e)}")
raise
def health_check(self) -> Dict:
return {
'status': 'healthy',
'uptime': time.time() - self.load_time,
'predictions': self.predictions_count
}
# 初始化预测器
predictor = ModelPredictor(model_path, scaler_path)
# 3. FastAPI应用
print("
=== 3. FastAPI应用 ===")
app = FastAPI(
title="ML模型API",
description="生产ML模型服务API",
version="1.0.0"
)
class PredictionRequest(BaseModel):
features: List[List[float]] = Field(..., example=[[1.0, 2.0, 3.0]])
class PredictionResponse(BaseModel):
predictions: List[int]
probabilities: List[List[float]]
count: int
timestamp: float
class HealthResponse(BaseModel):
status: str
uptime: float
predictions: int
@app.get("/health", response_model=HealthResponse)
async def health_check():
"""健康检查端点"""
return predictor.health_check()
@app.post("/predict", response_model=PredictionResponse)
async def predict(request: PredictionRequest):
"""进行预测"""
try:
result = predictor.predict(request.features)
return result
except Exception as e:
raise HTTPException(status_code=400, detail=str(e))
@app.post("/predict-batch")
async def predict_batch(requests: List[PredictionRequest], background_tasks: BackgroundTasks):
"""后台处理批量预测"""
all_features = []
for req in requests:
all_features.extend(req.features)
result = predictor.predict(all_features)
background_tasks.add_task(logger.info, f"批量预测处理: {result['count']} 样本")
return result
@app.get("/stats")
async def get_stats():
"""获取模型统计信息"""
return {
'model_type': type(predictor.model).__name__,
'n_estimators': predictor.model.n_estimators,
'max_depth': predictor.model.max_depth,
'feature_importance': predictor.model.feature_importances_.tolist(),
'total_predictions': predictor.predictions_count
}
# 4. Dockerfile模板
print("
=== 4. Dockerfile模板 ===")
dockerfile_content = '''FROM python:3.9-slim
WORKDIR /app
COPY requirements.txt .
RUN pip install --no-cache-dir -r requirements.txt
COPY model.pkl .
COPY scaler.pkl .
COPY app.py .
EXPOSE 8000
CMD ["uvicorn", "app:app", "--host", "0.0.0.0", "--port", "8000"]
'''
print("Dockerfile内容:")
print(dockerfile_content)
# 5. 需求文件
print("
=== 5. requirements.txt ===")
requirements = """fastapi==0.104.1
uvicorn[standard]==0.24.0
numpy==1.24.0
pandas==2.1.0
scikit-learn==1.3.2
joblib==1.3.2
pydantic==2.5.0
mlflow==2.8.1
"""
print("需求:")
print(requirements)
# 6. 部署Docker Compose
print("
=== 6. Docker Compose模板 ===")
docker_compose = '''version: '3.8'
services:
ml-api:
build: .
ports:
- "8000:8000"
environment:
- LOG_LEVEL=info
- WORKERS=4
restart: unless-stopped
healthcheck:
test: ["CMD", "curl", "-f", "http://localhost:8000/health"]
interval: 10s
timeout: 5s
retries: 3
ml-monitor:
image: prom/prometheus:latest
ports:
- "9090:9090"
volumes:
- ./prometheus.yml:/etc/prometheus/prometheus.yml
command:
- "--config.file=/etc/prometheus/prometheus.yml"
ml-dashboard:
image: grafana/grafana:latest
ports:
- "3000:3000"
environment:
- GF_SECURITY_ADMIN_PASSWORD=admin
volumes:
- ./grafana/dashboards:/etc/grafana/provisioning/dashboards
'''
print("Docker Compose内容:")
print(docker_compose)
# 7. 测试API
print("
=== 7. 测试API ===")
def test_predictor():
# 测试单个预测
test_features = [[1.0, 2.0, 3.0, 4.0, 5.0, 6.0, 7.0, 8.0, 9.0, 10.0,
1.1, 2.1, 3.1, 4.1, 5.1, 6.1, 7.1, 8.1, 9.1, 10.1]]
result = predictor.predict(test_features)
print(f"预测结果: {result}")
# 健康检查
health = predictor.health_check()
print(f"健康状态: {health}")
# 批量预测
batch_features = [
[1.0] * 20,
[2.0] * 20,
[3.0] * 20,
]
batch_result = predictor.predict(batch_features)
print(f"批量预测: {batch_result['count']} 样本处理")
test_predictor()
# 8. 模型版本控制和注册表
print("
=== 8. MLflow模型注册表 ===")
# 登录模型到MLflow
with mlflow.start_run():
mlflow.sklearn.log_model(model, "model")
mlflow.log_param("max_depth", 10)
mlflow.log_param("n_estimators", 100)
mlflow.log_metric("accuracy", 0.95)
model_uri = "runs:/" + mlflow.active_run().info.run_id + "/model"
print(f"模型登录到MLflow: {model_uri}")
# 9. 部署监控代码
print("
=== 9. 监控设置 ===")
class ModelMonitor:
def __init__(self):
self.predictions = []
self.latencies = []
def log_prediction(self, features, prediction, latency):
self.predictions.append({
'timestamp': time.time(),
'features_mean': np.mean(features),
'prediction': prediction,
'latency_ms': latency * 1000
})
def check_model_drift(self):
if len(self.predictions) < 100:
return {'drift_detected': False}
recent_predictions = [p['prediction'] for p in self.predictions[-100:]]
historical_mean = np.mean([p['prediction'] for p in self.predictions[:-100]])
recent_mean = np.mean(recent_predictions)
drift = abs(recent_mean - historical_mean) > 0.1
return {
'drift_detected': drift,
'historical_mean': float(historical_mean),
'recent_mean': float(recent_mean),
'threshold': 0.1
}
def get_stats(self):
if not self.latencies:
return {}
return {
'avg_latency_ms': np.mean(self.latencies) * 1000,
'p95_latency_ms': np.percentile(self.latencies, 95) * 1000,
'p99_latency_ms': np.percentile(self.latencies, 99) * 1000,
'total_predictions': len(self.predictions)
}
monitor = ModelMonitor()
print("
部署设置完成!")
print("要运行FastAPI服务器: uvicorn app:app --reload")
部署清单
- 模型格式和序列化
- 输入/输出验证
- 错误处理和日志记录
- 认证和安全
- 速率限制和节流
- 健康检查端点
- 监控和警报
- 版本管理
- 回滚程序
云部署选项
- AWS: SageMaker、Lambda、EC2
- GCP: Vertex AI、Cloud Run、App Engine
- Azure: Machine Learning、App Service
- Kubernetes: 自行管理的本地
性能优化
- 模型量化以减小大小
- 缓存预测
- 批量处理
- GPU加速
- 请求池化
交付物
- 部署的模型端点
- API文档
- Docker配置
- 监控仪表板
- 部署指南
- 性能基准
- 扩展建议