首页/知识库/ML 流水线设计与自动化

ML 流水线设计与自动化

✍️ AI Master📅 创建 2026-04-12📖 18 min 阅读
💡

文章摘要

从数据处理到模型部署,掌握 MLOps 流水线的最佳实践

1MLOps 概述与成熟度模型

MLOps 是 Machine Learning Operations 的缩写,它将 DevOps 的理念引入机器学习领域,旨在实现 ML 模型的自动化交付和持续运维。与传统软件开发不同,ML 系统不仅包含代码,还涉及数据、模型和基础设施三个维度,这使得 MLOps 的复杂度远超传统 DevOps。Google 提出的 MLOps 成熟度模型分为四个等级:Level 0 是手动流程,数据科学家手动训练和部署模型;Level 1 实现了 ML 流水线的自动化,训练过程可复现;Level 2 进一步实现了 CI/CD 流水线自动化,代码变更自动触发训练和部署;Level 3 达到最高级别,实现了持续训练,系统能够自动感知数据漂移并触发模型重训练。大多数企业处于 Level 0Level 1 之间,要实现从手动到自动化的跨越,需要建立标准化的流水线框架、引入实验追踪工具和模型注册表。MLOps 的核心价值在于缩短模型从实验到生产的时间,同时保证模型质量的一致性和可追溯性。

python
from enum import Enum

class MLOpsLevel(Enum):
    LEVEL_0 = "手动流程"
    LEVEL_1 = "ML 流水线自动化"
    LEVEL_2 = "CI/CD 流水线自动化"
    LEVEL_3 = "持续训练"


def assess_mlops_level(
    has_pipeline: bool,
    has_cicd: bool,
    has_continuous_training: bool
) -> MLOpsLevel:
    """评估当前 ML 项目的 MLOps 成熟度等级"""
    if has_continuous_training and has_cicd and has_pipeline:
        return MLOpsLevel.LEVEL_3
    elif has_cicd and has_pipeline:
        return MLOpsLevel.LEVEL_2
    elif has_pipeline:
        return MLOpsLevel.LEVEL_1
    else:
        return MLOpsLevel.LEVEL_0

level = assess_mlops_level(
    has_pipeline=True,
    has_cicd=True,
    has_continuous_training=False
)
print(f"Current level: {level.value}")
yaml
# mlops-maturity-checklist.yaml
level_0:
  - 手动数据探索
  - Jupyter Notebook 训练
  - 手工导出模型文件
level_1:
  - 自动化数据预处理
  - 可复现的训练脚本
  - 实验参数记录
level_2:
  - Git 触发训练流水线
  - 自动化模型评估
  - 一键部署到生产
level_3:
  - 数据漂移自动检测
  - 定时或事件触发重训练
  - 模型自动回滚机制
成熟度等级核心能力自动化程度典型工具

Level 0

手动训练与部署

Jupyter, sklearn

Level 1

ML 流水线

训练自动化

MLflow, DVC

Level 2

CI/CD 流水线

代码到部署全自动化

GitHub Actions, BentoML

Level 3

持续训练

端到端全自动

Kubeflow, TFX

从 Level 0 跃迁到 Level 1 的第一步是将 Notebook 中的代码重构为可复用的 Python 脚本,并建立参数化配置文件。

不要一上来就追求 Level 3 的完整架构。多数团队应该先从 Level 1 开始,先解决可复现性问题,再逐步自动化。

2数据版本控制(DVC)

在 ML 项目中,数据是决定模型性能的关键因素,但数据往往体积庞大、更新频繁且来源多样。传统的 Git 只能追踪代码变更,无法有效管理 GB 甚至 TB 级别的数据文件。DVC(Data Version Control)正是为了解决这一问题而设计的工具。它的核心理念是将数据文件的实际内容存储在远程存储后端(如 S3、GCS 或本地磁盘),而在 Git 仓库中只保存轻量级的元数据文件(.dvc 文件)。这样既保留了 Git 的版本控制能力,又避免了将大数据文件提交到代码仓库。DVC 支持数据版本追踪、数据流水线定义和实验复现三大核心功能。通过 dvc.yaml 文件,你可以定义从原始数据到特征工程到模型训练的完整数据处理流水线。DVC 的 dvc repro 命令能够智能地只重新运行发生变更的步骤,大幅节省计算资源。此外,DVC 与 MLflow 等实验追踪工具天然兼容,可以组合使用形成完整的 ML 工程化方案。在团队协作场景中,DVC 确保了所有成员使用相同版本的数据进行实验,消除了因为数据不一致导致的实验结果差异。

bash
# 初始化 DVC 并连接远程存储
dvc init
dvc remote add -d myremote s3://my-bucket/dvc-store

# 追踪大型数据集
dvc add data/raw/training_data.parquet
dvc add data/processed/features.pkl

# 提交元数据到 Git(不是数据本身)
git add data/raw/training_data.parquet.dvc
git add data/processed/features.pkl.dvc
git commit -m "Track training data v2.1"

# 推送数据到远程存储
dvc push

# 在另一台机器上拉取数据
dvc pull
yaml
# dvc.yaml - 定义数据处理流水线
stages:
  data_ingestion:
    cmd: python scripts/ingest.py --config configs/ingest.yaml
    deps:
      - scripts/ingest.py
      - configs/ingest.yaml
    outs:
      - data/raw/dataset.csv

  feature_engineering:
    cmd: python scripts/features.py
    deps:
      - scripts/features.py
      - data/raw/dataset.csv
    outs:
      - data/processed/features.pkl

  train_model:
    cmd: python scripts/train.py --epochs 50
    deps:
      - scripts/train.py
      - data/processed/features.pkl
    outs:
      - models/model_v1.pkl
    metrics:
      - metrics/eval.json:
DVC 核心概念说明对应 Git 概念存储位置

.dvc 文件

数据元数据与校验和

Git blob

Git 仓库

远程存储

实际数据文件

无对应

S3/GCS/SSH

dvc.yaml

流水线定义

无对应

Git 仓库

dvc.lock

执行快照

无对应

Git 仓库

dvc repro

增量重运行

无对应

命令行

为数据集使用语义化的版本标签,例如 dvc tag v1.2.0,这样在回滚实验时可以精确还原数据状态。

DVC 不会自动删除远程存储中不再被引用的数据文件。定期运行 dvc gc 来清理孤儿文件,但操作前务必备份。

3实验追踪(MLflow/W&B)

机器学习实验的本质是一个高维的搜索过程:你需要尝试不同的超参数组合、模型架构和特征工程策略,然后从中选出最优方案。没有实验追踪工具的情况下,数据科学家通常用电子表格或笔记本来记录每次实验的参数和结果,这种方式在实验次数增多后会迅速失控。MLflow 和 Weights & Biases(W&B)是目前最主流的两个实验追踪解决方案。MLflow 是开源且自托管友好的方案,它通过 Tracking Server 记录每次运行的参数、指标、模型文件和代码版本。W&B 则是云端优先的平台,除了基础的实验追踪外,还提供了实时的可视化面板、团队协作功能和自动化的超参数搜索(Sweep)。两者的核心 API 设计都很简洁,只需几行代码就能集成到现有的训练脚本中。实验追踪的关键实践包括:为每次实验记录完整的参数集合、保存关键指标的日志、存储最佳模型的 artifact、以及记录代码的 Git commit hash 以实现完全复现。当团队规模扩大时,实验追踪平台还承担着知识共享的功能,让团队成员可以查看、比较和复用彼此的实验结果。

python
import mlflow
import mlflow.sklearn
from sklearn.ensemble import RandomForestClassifier
from sklearn.metrics import accuracy_score

mlflow.set_tracking_uri("http://localhost:5000")
mlflow.set_experiment("customer_churn_prediction")

with mlflow.start_run(run_name="rf_baseline") as run:
    # 记录参数
    params = {"n_estimators": 100, "max_depth": 10, "random_state": 42}
    mlflow.log_params(params)

    # 训练模型
    model = RandomForestClassifier(**params)
    model.fit(X_train, y_train)

    # 记录指标
    accuracy = accuracy_score(y_test, model.predict(X_test))
    mlflow.log_metric("accuracy", accuracy)
    mlflow.log_metric("f1_score", f1_score(y_test, model.predict(X_test), average="weighted"))

    # 保存模型 artifact
    mlflow.sklearn.log_model(model, "model")
    print(f"Run ID: {run.info.run_id}")
python
import wandb
from transformers import Trainer, TrainingArguments

# 初始化 W&B 实验
wandb.init(project="sentiment-finetune", config={
    "learning_rate": 2e-5,
    "num_train_epochs": 3,
    "batch_size": 32,
    "model_name": "bert-base-chinese"
})

training_args = TrainingArguments(
    output_dir="./results",
    num_train_epochs=3,
    per_device_train_batch_size=32,
    report_to="wandb"  # 自动上报到 W&B
)

trainer = Trainer(
    model=model,
    args=training_args,
    train_dataset=train_dataset,
    eval_dataset=val_dataset
)
trainer.train()

# 记录自定义指标
wandb.log({"final_loss": trainer.state.log_history[-1]["loss"]})
wandb.finish()
功能MLflowW&B说明

实验追踪

支持

支持

记录参数、指标、artifact

可视化

基础 UI

高级仪表盘

W&B 可视化更丰富

超参搜索

不直接支持

Sweep

W&B 内置贝叶斯搜索

部署

MLflow Models

W&B Artifacts

MLflow 原生支持 serving

成本

免费自托管

免费额度 + 付费

W&B 云端有免费 tier

团队协作

需自建 Server

原生支持

W&B 协作功能更完善

在 MLflow 中使用嵌套运行(nested runs)来组织实验:外层运行代表实验组,内层运行代表单次超参数尝试。

不要在实验追踪中记录敏感信息,如 API Key 或数据库密码。MLflow 的参数存储默认是明文可读的。

4模型注册表

模型注册表是 MLOps 架构中的核心组件,它充当了从实验环境到生产环境的桥梁。在实验追踪阶段,你可能会产生数百个模型变体,但只有少数几个适合部署到生产环境。模型注册表提供了一套标准化的流程来管理模型的生命周期:从实验阶段的开发中状态,到验证阶段的暂存状态,再到生产环境的已部署状态,最后到不再使用的废弃状态。MLflow Model Registry 是最常用的模型注册表实现之一,它支持模型版本管理、阶段转换、权限控制和模型描述文档。注册表中的每个模型都有一个唯一的名称和版本号,你可以为任意版本添加描述、标签和元数据。更重要的是,注册表支持模型签名的概念,即定义模型的输入输出 schema,这样下游系统可以自动验证请求格式是否正确。在团队规模扩大时,模型注册表还解决了模型归属和责任的问题:谁训练了这个模型、什么时候部署的、当前表现如何,所有信息都在注册表中一目了然。注册表与 CI/CD 流水线结合后,可以实现模型的自动化推进:当新版本模型通过所有质量检查后,自动从 Staging 阶段推进到 Production 阶段。

python
import mlflow
from mlflow.tracking import MlflowClient

client = MlflowClient()

# 将实验运行的模型注册到注册表
result = mlflow.register_model(
    model_uri="runs:/<run_id>/model",
    name="churn_prediction_model"
)
print(f"Registered version: {result.version}")

# 为模型版本添加描述
client.update_model_version(
    name="churn_prediction_model",
    version=result.version,
    description="Random Forest baseline - accuracy 0.89"
)

# 推进到 Staging 阶段
client.transition_model_version_stage(
    name="churn_prediction_model",
    version=result.version,
    stage="Staging"
)
python
import mlflow.pyfunc

# 从注册表加载生产阶段的最佳模型
model_name = "churn_prediction_model"
model_uri = f"models:/{model_name}/Production"

loaded_model = mlflow.pyfunc.load_model(model_uri)

# 获取模型签名以验证输入格式
model_info = mlflow.models.get_model_info(model_uri)
print(f"Model signature: {model_info.signature}")

# 批量预测
import pandas as pd
test_data = pd.DataFrame({
    "age": [35, 42, 28],
    "tenure": [24, 6, 12],
    "monthly_charges": [85.0, 95.5, 70.0]
})

predictions = loaded_model.predict(test_data)
print(f"Predictions: {predictions}")
模型阶段含义触发转换自动操作

None

刚注册,未验证

手动注册

Staging

通过初步验证

自动化测试通过

运行集成测试

Production

已部署到生产

性能指标达标

流量切换

Archived

已下线退役

新版本替代

保留 artifact

为每个注册模型维护一份 Model Card,记录训练数据、评估指标、已知限制和公平性分析,这是 AI 治理的重要实践。

模型注册表的权限控制容易被忽视。生产阶段的模型应该设置为只读,只有经过审批的流水线才能执行阶段转换。

5CI/CD for ML

传统软件工程的 CI/CD 主要关注代码的构建、测试和部署,但 ML 系统的 CI/CD 流水线需要额外处理数据和模型这两个关键维度。ML 系统的持续集成(CI)不仅包括代码的单元测试和集成测试,还需要数据验证测试和模型质量测试。持续交付(CD)则意味着模型通过所有测试后,自动部署到生产环境或注册表中。在 ML 的 CI/CD 中,有几个关键测试类型:代码测试确保训练脚本和推理服务的逻辑正确;数据测试验证训练数据的完整性和分布;模型测试检查模型的性能指标是否达到预期基线。GitHub Actions、GitLab CI 和 Jenkins 等 CI/CD 工具都可以用来编排 ML 流水线。与传统的 CI/CD 不同,ML 流水线通常需要更长的执行时间(训练可能需要数小时),并且消耗大量计算资源(GPU)。因此,建议在 CI 中运行轻量级的冒烟测试和快速验证,而将完整的训练和评估放在专门的训练集群上执行。持续部署阶段还需要考虑 A/B 测试、金丝雀发布和自动回滚等策略,以确保新模型上线不会对生产服务造成负面影响。

yaml
# .github/workflows/ml-pipeline.yaml
name: ML CI/CD Pipeline

on:
  push:
    branches: [main]
  pull_request:
    branches: [main]

jobs:
  test-code:
    runs-on: ubuntu-latest
    steps:
      - uses: actions/checkout@v4
      - name: Run unit tests
        run: pytest tests/ -v

  validate-data:
    runs-on: ubuntu-latest
    steps:
      - uses: actions/checkout@v4
      - name: Validate data schema
        run: python scripts/validate_data.py

  train-and-evaluate:
    needs: [test-code, validate-data]
    runs-on: gpu-runner
    steps:
      - uses: actions/checkout@v4
      - name: Train model
        run: python scripts/train.py --config configs/prod.yaml
      - name: Evaluate model
        run: python scripts/evaluate.py --threshold 0.85

  deploy:
    needs: [train-and-evaluate]
    if: github.ref == 'refs/heads/main'
    runs-on: ubuntu-latest
    steps:
      - name: Deploy to production
        run: python scripts/deploy.py --env production
python
import pytest
import mlflow
import numpy as np

def test_model_performance_threshold():
    """测试模型性能是否达到最低要求"""
    model = mlflow.sklearn.load_model("models:/churn_model/Production")
    predictions = model.predict(X_test)
    accuracy = accuracy_score(y_test, predictions)
    assert accuracy >= 0.85, f"Accuracy {accuracy} below threshold 0.85"

def test_no_data_leakage():
    """确保训练集和测试集没有重叠"""
    train_ids = set(X_train.index)
    test_ids = set(X_test.index)
    overlap = train_ids & test_ids
    assert len(overlap) == 0, f"Found {len(overlap)} overlapping samples"

def test_feature_distribution():
    """检查特征分布是否合理"""
    assert X_train["age"].between(0, 120).all()
    assert X_train["monthly_charges"].min() >= 0
    assert not X_train.isnull().any().any(), "Found null values in features"
CI/CD 阶段测试类型触发条件失败处理

代码构建

单元测试、lint

代码 push/PR

阻止合并

数据验证

Schema 检查、漂移检测

数据文件变更

告警 + 阻止

模型训练

训练成功、无 NaN

CI 通过后

重试 3 次后告警

模型评估

指标阈值检查

训练完成后

阻止部署

生产部署

集成测试、延迟测试

评估通过后

自动回滚

在 CI 中使用缓存(如 GitHub Actions 的 cache)来加速依赖安装和数据下载,可以显著缩短流水线执行时间。

不要在 CI 中训练完整的深度学习模型,这会消耗大量时间和资源。CI 中的训练应该使用小规模数据集和少量 epoch 做冒烟测试。

6持续训练触发器

持续训练(Continuous Training)是 MLOps 成熟度模型的最高级别,它意味着模型的重训练过程可以自动触发,无需人工干预。这与传统的定期手动训练有本质区别:持续训练系统能够实时感知环境变化并做出响应。触发持续训练的常见机制有三种:基于时间的调度(如每天凌晨自动训练)、基于数据的触发(当检测到数据漂移超过阈值时启动训练)和基于性能的触发(当模型在生产环境的性能下降到阈值以下时触发)。数据漂移检测是持续训练的核心能力之一,常用的检测方法包括 KS 检验(用于数值特征)、卡方检验(用于类别特征)和 PSI(Population Stability Index)用于整体分布比较。持续训练还需要解决一个关键问题:自动化不代表无监督。训练完成后,新模型必须经过自动化的评估流水线,与当前生产模型进行对比。只有当新模型在关键指标上显著优于旧模型时,才会被自动部署。此外,持续训练系统应该支持手动干预的能力,比如在业务发生重大变化时,数据科学家可以手动调整训练参数或暂停自动训练。

python
from evidently.report import Report
from evidently.metric_preset import DataDriftPreset
import pandas as pd

# 计算数据漂移报告
reference_data = pd.read_csv("data/reference_monthly.csv")
current_data = pd.read_csv("data/current_monthly.csv")

drift_report = Report(metrics=[DataDriftPreset()])
drift_report.run(reference_data=reference_data,
                 current_data=current_data)

# 提取漂移检测结果
report_dict = drift_report.as_dict()
drifted_features = []
for feature, result in report_dict["metrics"][0]["result"]["drift_by_columns"].items():
    if result["drift_detected"]:
        drifted_features.append(feature)

if len(drifted_features) > 3:
    print(f"DRIFT ALERT: {len(drifted_features)} features drifted")
    # 触发持续训练
    trigger_continuous_training()
python
import schedule
import time
import logging
from datetime import datetime

logging.basicConfig(level=logging.INFO)

def continuous_training_pipeline():
    """持续训练主函数"""
    logging.info(f"Starting CT pipeline at {datetime.now()}")

    # 1. 检查是否需要训练
    if not should_trigger_training():
        logging.info("No training trigger needed")
        return

    # 2. 拉取最新数据
    data = fetch_latest_training_data()

    # 3. 训练新模型
    new_model = train_model(data)

    # 4. 与生产模型对比
    champion = load_production_model()
    if evaluate_and_compare(new_model, champion):
        # 5. 注册并部署
        register_and_deploy(new_model)
        logging.info("New model deployed successfully")
    else:
        logging.warning("New model did not outperform champion")

# 每天凌晨 2 点执行
schedule.every().day.at("02:00").do(continuous_training_pipeline)

while True:
    schedule.run_pending()
    time.sleep(60)
触发机制检测内容响应时间适用场景

时间调度

固定周期

按计划

数据稳定更新

数据漂移

特征分布变化

近实时

业务环境多变

性能下降

线上指标恶化

近实时

高价值预测场景

手动触发

人工判断

即时

重大业务变更

为持续训练设置合理的冷却期(cooldown period),避免在短时间内因数据波动触发频繁训练,浪费计算资源。

持续训练不等于持续部署。训练完成的新模型必须经过严格的评估流程,否则可能将性能更差的模型部署到生产环境。

7Kubeflow Pipelines 实战

Kubeflow Pipelines 是 Google 开源的 ML 工作流编排平台,运行在 Kubernetes 之上,是构建企业级 MLOps 流水线的强大工具。它通过声明式的 YAML 或 Python DSL 来定义流水线,每个步骤作为一个容器化组件运行在 Kubernetes Pod 中。这种架构天然支持弹性伸缩、资源隔离和故障恢复。Kubeflow Pipelines 的核心概念包括:Component(组件)是最小执行单元,通常是一个容器化的 Python 脚本;Pipeline(流水线)是多个组件的有向无环图(DAG),定义了执行顺序和依赖关系;Run(运行)是流水线的一次具体执行,可以追踪每次运行的输入、输出和状态。在实战中,我们通常使用 kfp 的 Python DSL 来编写流水线定义,这样可以利用 Python 的编程能力来动态生成流水线结构。Kubeflow 提供了丰富的内置组件,涵盖了数据下载、模型训练、模型评估和模型部署等常见场景。此外,Kubeflow 还支持条件分支、并行执行和参数化配置等高级特性,使得构建复杂的 ML 流水线变得更加灵活和高效。对于已经有 Kubernetes 基础设施的团队来说,Kubeflow Pipelines 是构建 MLOps Level 2Level 3 的理想选择。

python
import kfp
from kfp import dsl
from kfp.dsl import component

@component(
    packages_to_install=["pandas", "scikit-learn"],
    base_image="python:3.11"
)
def preprocess_data(input_path: str, output_path: dsl.OutputPath("Dataset")):
    import pandas as pd
    from sklearn.model_selection import train_test_split

    df = pd.read_csv(input_path)
    df = df.dropna()
    train_df, test_df = train_test_split(df, test_size=0.2)
    train_df.to_csv(f"{output_path}_train.csv", index=False)
    test_df.to_csv(f"{output_path}_test.csv", index=False)

@component(
    packages_to_install=["scikit-learn", "joblib"],
    base_image="python:3.11"
)
def train_model(
    train_path: dsl.InputPath("Dataset"),
    model_path: dsl.OutputPath("Model"),
    n_estimators: int = 100
):
    import pandas as pd
    from sklearn.ensemble import RandomForestClassifier
    import joblib

    train_df = pd.read_csv(train_path)
    X = train_df.drop("target", axis=1)
    y = train_df["target"]
    model = RandomForestClassifier(n_estimators=n_estimators)
    model.fit(X, y)
    joblib.dump(model, model_path)

@dsl.pipeline(name="ml-training-pipeline", description="End-to-end ML pipeline")
def ml_pipeline(data_path: str, n_estimators: int = 100):
    preprocess_task = preprocess_data(input_path=data_path)
    train_model(
        train_path=preprocess_task.outputs["output_path"],
        model_path="model.pkl",
        n_estimators=n_estimators
    )
python
import kfp

# 连接到 Kubeflow Pipelines 服务器
client = kfp.Client(host="http://kubeflow.example.com")

# 编译流水线定义
from kfp import compiler
compiler.Compiler().compile(
    pipeline_func=ml_pipeline,
    package_path="ml_pipeline.yaml"
)

# 创建实验并启动运行
experiment = client.create_experiment(name="production-runs")

run = client.run_pipeline(
    experiment_id=experiment.id,
    job_name="churn-model-v2",
    pipeline_package_path="ml_pipeline.yaml",
    params={
        "data_path": "gs://my-bucket/data/churn_data.csv",
        "n_estimators": 200
    }
)
print(f"Run ID: {run.run_id}")

# 查看运行状态
run_detail = client.get_run(run.run_id)
print(f"Status: {run_detail.state}")
Kubeflow 概念作用类比关键 API

Component

最小执行单元(容器)

函数

@component 装饰器

Pipeline

组件编排(DAG)

工作流定义

@dsl.pipeline

Run

一次具体执行

进程实例

client.run_pipeline()

Experiment

运行的分组

项目文件夹

client.create_experiment()

Artifact

输入/输出数据

文件/对象

InputPath/OutputPath

在 Kubeflow 中使用缓存机制(execution_caching),相同的输入和代码不会重复执行组件,可以显著节省开发调试时间。

Kubeflow Pipelines 的安装和维护成本较高,需要 Kubernetes 集群管理经验。对于小团队,建议先用 MLflow Pipelines 或 GitHub Actions 起步。

继续你的 AI 学习之旅

浏览更多 AI 知识库文章,或者探索 GitHub 上的优质 AI 项目