机器学习工程Skill ml-engineer

机器学习工程技能专注于构建和部署生产级机器学习系统,包括模型服务、特征工程、管道设计和监控,适用于机器学习模型集成和生产环境。关键词:机器学习、模型部署、特征工程、ML管道、生产系统、AI应用。

机器学习 0 次安装 1 次浏览 更新于 3/12/2026

name: ml-engineer description: 实现机器学习管道、模型服务和特征工程。用于机器学习模型集成或生产部署。

机器学习工程

构建生产级机器学习系统。

何时使用

  • 模型服务和部署
  • 特征工程
  • ML管道设计
  • 模型监控

模型服务

FastAPI端点

from fastapi import FastAPI, HTTPException
from pydantic import BaseModel
import joblib
import numpy as np

app = FastAPI()
model = joblib.load("model.pkl")

class PredictRequest(BaseModel):
    features: list[float]

class PredictResponse(BaseModel):
    prediction: float
    confidence: float

@app.post("/predict", response_model=PredictResponse)
async def predict(request: PredictRequest):
    try:
        X = np.array(request.features).reshape(1, -1)
        pred = model.predict(X)[0]
        proba = model.predict_proba(X)[0].max()
        return PredictResponse(prediction=pred, confidence=proba)
    except Exception as e:
        raise HTTPException(status_code=500, detail=str(e))

@app.get("/health")
async def health():
    return {"status": "healthy", "model_version": "1.0.0"}

批量推理

import pandas as pd
from concurrent.futures import ProcessPoolExecutor

def predict_batch(df: pd.DataFrame, batch_size: int = 1000):
    results = []

    for i in range(0, len(df), batch_size):
        batch = df.iloc[i:i+batch_size]
        predictions = model.predict(batch)
        results.extend(predictions)

    return results

# 并行批处理
def parallel_predict(df: pd.DataFrame, n_workers: int = 4):
    chunks = np.array_split(df, n_workers)

    with ProcessPoolExecutor(max_workers=n_workers) as executor:
        results = list(executor.map(predict_batch, chunks))

    return np.concatenate(results)

特征工程

from sklearn.pipeline import Pipeline
from sklearn.preprocessing import StandardScaler, OneHotEncoder
from sklearn.compose import ColumnTransformer
from sklearn.impute import SimpleImputer

numeric_features = ['age', 'income', 'score']
categorical_features = ['category', 'region']

preprocessor = ColumnTransformer(
    transformers=[
        ('num', Pipeline([
            ('imputer', SimpleImputer(strategy='median')),
            ('scaler', StandardScaler())
        ]), numeric_features),
        ('cat', Pipeline([
            ('imputer', SimpleImputer(strategy='constant', fill_value='missing')),
            ('encoder', OneHotEncoder(handle_unknown='ignore'))
        ]), categorical_features)
    ]
)

# 完整管道
pipeline = Pipeline([
    ('preprocessor', preprocessor),
    ('classifier', model)
])

模型监控

from evidently import ColumnMapping
from evidently.report import Report
from evidently.metrics import DataDriftTable, DatasetSummaryMetric

def check_data_drift(reference_data, current_data):
    column_mapping = ColumnMapping(
        target='label',
        prediction='prediction',
        numerical_features=['feature1', 'feature2'],
        categorical_features=['category']
    )

    report = Report(metrics=[
        DatasetSummaryMetric(),
        DataDriftTable(),
    ])

    report.run(
        reference_data=reference_data,
        current_data=current_data,
        column_mapping=column_mapping
    )

    return report.as_dict()

A/B测试

import hashlib

def get_model_variant(user_id: str, experiment: str) -> str:
    """基于用户ID的确定性分配"""
    hash_input = f"{user_id}:{experiment}"
    hash_value = int(hashlib.md5(hash_input.encode()).hexdigest(), 16)
    return "control" if hash_value % 100 < 50 else "treatment"

def predict_with_experiment(user_id: str, features):
    variant = get_model_variant(user_id, "model_v2_test")

    if variant == "treatment":
        prediction = model_v2.predict(features)
    else:
        prediction = model_v1.predict(features)

    log_prediction(user_id, variant, prediction)
    return prediction

示例

输入: “部署模型为API” 动作: 创建FastAPI端点,添加健康检查,容器化

输入: “设置模型监控” 动作: 实现漂移检测、预测日志记录、警报