MLOps 实践:从实验到生产的全流程
MLOps(Machine Learning Operations)是将机器学习模型系统化、自动化的工程实践。
ML 生命周期
问题定义 → 数据收集 → 实验开发 → 模型训练 → 模型评估 → 部署上线 → 监控维护 → 再训练核心原则
1. 可复现性(Reproducibility)
任何实验结果必须能被复现。
python
# 固定一切随机种子
import random, numpy as np, torch
def set_seed(seed=42):
random.seed(seed)
np.random.seed(seed)
torch.manual_seed(seed)
torch.cuda.manual_seed_all(seed)
torch.backends.cudnn.deterministic = True
set_seed(42)记录:代码版本、环境、超参数、数据集版本。
2. 自动化(Automation)
减少人工操作,用代码和脚本代替。
yaml
# GitHub Actions 示例
name: Train Model
on: [push]
jobs:
train:
runs-on: ubuntu-latest
steps:
- uses: actions/checkout@v3
- name: Setup Python
uses: actions/setup-python@v4
with:
python-version: '3.10'
- name: Install dependencies
run: pip install -r requirements.txt
- name: Train model
run: python train.py --config configs/exp1.yaml
- name: Upload artifacts
uses: actions/upload-artifact@v3
with:
name: model-weights
path: outputs/model.pt3. 可观测性(Observability)
部署后能监控模型性能、数据漂移、异常。
python
# Prometheus + Grafana 监控
# 指标:请求量、延迟、错误率、预测分布、内存使用4. 版本控制
一切皆可版本化:
- 代码(Git)
- 数据(DVC,LakeFS)
- 模型(MLflow, ModelDB)
- 配置(YAML + Git)
- 环境(Docker, Conda)
工具栈推荐
实验跟踪(Experiment Tracking)
记录每次实验的配置、指标、 artifact。
| 工具 | 描述 | 适合场景 |
|---|---|---|
| MLflow | 开源,功能全面 | 自建,灵活定制 |
| Weights & Biases (W&B) | 云服务 + 开源 | 团队协作,可视化强 |
| TensorBoard | TensorFlow 原生 | TensorFlow 项目 |
| Neptune | 商业版 | 企业级需求 |
MLflow 示例:
python
import mlflow
import mlflow.pytorch
mlflow.set_experiment("llm-finetune")
with mlflow.start_run():
# 记录超参数
mlflow.log_params({
"lr": 1e-5,
"batch_size": 32,
"model": "Llama-2-7b"
})
# 训练模型(省略)
model = train(...)
# 记录指标
mlflow.log_metrics({
"val_loss": 0.123,
"val_acc": 0.95
})
# 保存模型
mlflow.pytorch.log_model(model, "model")查看 UI:运行 mlflow ui 命令启动 Web 界面
模型注册与部署(Model Registry & Serving)
| 工具 | 类型 | 特点 |
|---|---|---|
| MLflow Models | 本地/云 | 简单,与跟踪集成 |
| BentoML | 本地容器 | 性能好,支持多种框架 |
| TensorFlow Serving | 生产 | GRPC/REST,Kubernetes |
| TorchServe | 生产 | PyTorch 官方,多模型管理 |
| vLLM | 推理优化 | 高性能,OpenAI API 兼容 |
BentoML 示例:
python
# service.py
import bentoml
from transformers import AutoModel, AutoTokenizer
model = AutoModel.from_pretrained("my-model").eval()
tokenizer = AutoTokenizer.from_pretrained("my-model")
@bentoml.service(
resources={"cpu": "2"},
traffic={"timeout": 10},
runners=[bentoml.Runner(lambda x: model(**x), ...)]
)
class LLMService:
@bentoml.api
def generate(self, prompt: str) -> str:
inputs = tokenizer(prompt, return_tensors="pt")
output = model.generate(**inputs, max_new_tokens=100)
return tokenizer.decode(output[0])打包:bentoml build 部署:bentoml serve LLMService:latest
数据管理与版本控制
DVC(Data Version Control):
bash
# 初始化
dvc init
# 添加数据文件
dvc add data/raw/dataset.csv
git add data/raw/dataset.csv.dvc
# 推送到远程存储(S3, GCS, SSH)
dvc remote add -d myremote s3://mybucket/dvcstorage
dvc push流程:
dvc add生成 .dvc 指针文件(Git 版本化)- 实际数据存到远程存储
dvc pull还原数据
流水线编排(Pipeline Orchestration)
| 工具 | 描述 |
|---|---|
| Airflow | 成熟,任务依赖强大,学习曲线陡 |
| Prefect | 现代,Pythonic,易上手 |
| Kedro | 数据科学专用,pipeline 结构化 |
| Metaflow | Netflix 内部,实验+生产一体化 |
| GitHub Actions | CI/CD,简单流水线 |
Prefect 示例:
python
from prefect import flow, task
@task
def load_data(path):
return pd.read_csv(path)
@task
def train_model(data):
model = train(data)
return model
@flow
def ml_pipeline():
data = load_data("data/train.csv")
model = train_model(data)
if __name__ == "__main__":
ml_pipeline()运行:prefect deployment build 部署:prefect deployment apply
典型架构
LLM 应用部署架构
┌─────────┐ ┌────────────┐ ┌─────────────┐ ┌─────────┐
│ 用户 │───▶│ API网关 │───▶│ 路由层 │───▶│ 模型推理│
└─────────┘ └────────────┘ └─────────────┘ └─────────┘
│
▼
┌─────────────────┐
│ 向量数据库 │
│ (RAG 检索) │
└─────────────────┘
│
▼
┌─────────────────┐
│ 缓存 Redis │
└─────────────────┘
监控:Prometheus + Grafana(QPS、延迟、错误率、成本)
日志:ELK Stack(Elasticsearch, Logstash, Kibana)
追踪:Jaeger / OpenTelemetry(请求链路追踪)CI/CD 流程
开发者 Push → 运行测试(单元、集成) → 自动构建 Docker 镜像 → 推送到 Registry → 通知部署系统 → 金丝雀发布 → 监控指标 → 全量发布(或回滚)GitHub Actions 示例(模型训练 CI):
yaml
name: Model Training CI
on:
push:
paths:
- 'src/**'
- 'data/**'
jobs:
test:
runs-on: ubuntu-latest
steps:
- uses: actions/checkout@v3
- name: Run unit tests
run: pytest tests/ --cov=src
train:
needs: test
runs-on: gpu-runner # 自托管 GPU runner
steps:
- uses: actions/checkout@v3
- name: Pull data
run: dvc pull
- name: Train model
run: python train.py --config configs/prod.yaml
- name: Evaluate
run: python eval.py --model outputs/model.pt
- name: Register model
if: success()
run: |
mlflow models serve -m outputs/model.pt --no-conda &关键实践
1. 模型版本管理
每次训练生成唯一版本标签:
bash
# MLflow 注册表
mlflow models register \
-m runs:/<run_id>/model \
-n "llm-finetune" \
-v "v1.2.0"元数据:
- 训练数据哈希
- 超参数
- 性能指标
- 负责人、日期
2. 模型回滚
A/B 测试失败 → 一键回滚:
bash
/mlflow/models/get-latest-versions?stage=Production
# 切换到上一版本3. 渐进式发布
- 金丝雀(Canary):1% 流量 → 新模型
- 监控:错误率、延迟是否正常
- 全量:无异常则 10% → 50% → 100%
4. 自动化监控告警
python
# Prometheus Rule(异常检测)
- alert: ModelPerformanceDegradation
expr: increase(api_errors_total[5m]) > 100
for: 10m
labels:
severity: critical
annotations:
summary: "模型错误率上升"数据漂移检测:
python
from evidently import ColumnMapping
from evidently.report import Report
from evidently.metric_preset import DataDriftPreset
# 比较当前数据与训练数据分布
report = Report(metrics=[DataDriftPreset()])
report.run(current_data=current, reference_data=train, column_mapping=...)
report.save_html("drift_report.html")5. 自动化再训练
当监控指标低于阈值时自动触发:
监测 → 触发重训 → 数据收集 → 训练 → 评估 → 注册新版本 → A/B 测试 → 全量Airflow DAG 示例:
python
from airflow import DAG
from airflow.operators.python import PythonOperator
dag = DAG("retrain_pipeline", schedule="@weekly")
def collect_data():
# 收集新用户反馈
pass
def train():
# 增量训练
pass
def evaluate():
# 验证新模型优于当前版本
pass
def deploy():
# 注册并发布新版本
pass
collect_task = PythonOperator(task_id="collect", python_callable=collect_data, dag=dag)
train_task = PythonOperator(task_id="train", python_callable=train, dag=dag)
evaluate_task = PythonOperator(task_id="evaluate", python_callable=evaluate, dag=dag)
deploy_task = PythonOperator(task_id="deploy", python_callable=deploy, dag=dag)
collect_task >> train_task >> evaluate_task >> deploy_task成本控制
1. 资源优化
- 自动扩缩容:根据负载调整实例数(K8s HPA)
- Spot Instance:训练使用抢占式实例(节省 60-90%)
- 模型压缩上线:量化、蒸馏降低推理成本
2. 监控成本
python
# 记录每次推理成本
cost_per_token = 0.0001
total_cost = sum(tokens) * cost_per_token
alert_if(total_cost > daily_budget)合规与治理
1. 模型卡片(Model Card)
记录模型信息(Hugging Face Model Card):
markdown
## Model Details
- **Model Type**: Llama-2-7b fine-tuned on medical QA
- **Training Data**: MIMIC-III (de-identified)
- **Intended Use**: Medical question answering (non-critical)
- **Limitations**: Not for diagnosis, may hallucinate
## Metrics
- Accuracy: 88.5%
- F1: 0.86
- Toxicity Rate: <0.1%
## Ethical Considerations
- Trained on de-identified data
- Reviewed by medical professionals
- Includes refusal mechanism for harmful queries2. 偏见检测
python
from transformers import pipeline
from evaluate import load
bias_metric = load("bias", module_type="measurement")
# 测试不同人群描述的公平性
demographics = ["man", "woman", "person of color"]
for demo in demographics:
prompts = [f"A {demo} doctor...", f"A {demo} nurse..."]
outputs = model.generate(prompts)
score = bias_metric.compute(predictions=outputs, references=...)3. 可解释性
SHAP / LIME 解释模型决策:
python
import shap
explainer = shap.Explainer(model, tokenizer)
shap_values = explainer(["解释为什么模型拒绝回答..."])
shap.plots.text(shap_values[0])自托管 vs 云服务
| 场景 | 推荐 |
|---|---|
| 小团队,快速验证 | 云服务(W&B, Azure ML, GCP Vertex AI) |
| 数据敏感,需本地部署 | MLflow + DVC + Airflow 自建 |
| 大规模生产 | 混合:云训练 + 自建推理(降低成本) |
总结
- 实验跟踪:MLflow / W&B 记录一切
- 数据版本:DVC 管理数据集
- 流水线:Airflow / Prefect 自动化
- 模型服务:BentoML / vLLM 高性能推理
- 监控告警:Prometheus + 自定义指标
- CI/CD:GitHub Actions 自动化训练部署
MLOps 是 AI 产品化的桥梁,没有成熟的 MLOps,模型无法规模化落地。
