从原始数据到模型服务:基于Kubeflow的AI/ML全生命周期蓝图
本文将指导您使用Kubeflow和Feast等开源工具构建可复现的机器学习工作流,从原始数据开始直至部署生产就绪的模型服务。
项目概述
本项目实现了欺诈检测用例的完整MLOps工作流。欺诈检测是金融服务中的关键应用,需要实时识别潜在欺诈交易,同时最小化误报。
我们的欺诈检测系统利用机器学习分析大量交易数据,从历史行为中学习模式,并标记异常交易。该系统需要:
- 实时推理:交易发生时即时决策
- 特征一致性:训练和推理使用相同特征
- 可扩展性:处理高交易量
- 持续学习:随欺诈模式演变定期重新训练
- 合规性:全面的模型跟踪和治理
为什么选择Kubeflow?
Kubeflow抽象了Kubernetes基础设施的复杂性,让数据科学家和ML工程师能够专注于数据和模型性能。
关键优势:
- 基础设施抽象:将ML工作流定义为代码,Kubeflow负责在Kubernetes集群上编排执行
- 专注于AI而非DevOps
- 可重现且可扩展
- 生产就绪
- 云平台无关
准备工作
1. 创建本地Kubernetes集群
1
|
kind create cluster -n fraud-detection-e2e-demo --image kindest/node:v1.31.6
|
2. 部署Kubeflow Pipelines
按照官方Kubeflow Pipelines独立安装指南进行部署。
3. 上传原始数据到MinIO
1
2
3
4
5
6
7
|
kubectl port-forward --namespace kubeflow svc/minio-service 9000:9000
cd synthetic_data_generation
uv sync
source .venv/bin/activate
python synthetic_data_generation.py
cp raw_transaction_datasource.csv ../feature_engineering/feature_repo/data/input
cd ..
|
4. 安装模型注册表、KServe等组件
1
2
3
4
5
|
kubectl apply -k "https://github.com/kubeflow/model-registry/manifests/kustomize/overlays/db?ref=v0.2.16"
kubectl create namespace kserve
kubectl config set-context --current --namespace=kserve
curl -s "https://raw.githubusercontent.com/kserve/kserve/release-0.15/hack/quick_install.sh" | bash
kubectl config set-context --current --namespace=kubeflow
|
构建管道镜像
每个管道组件使用专门的容器镜像:
- 数据准备(
data_preparation/Containerfile
)
- 特征工程(
feature_engineering/Containerfile
)
- 管道(
pipeline/Containerfile
)
- REST预测器(
rest_predictor/Containerfile
)
- 训练(
train/Containerfile
)
构建示例:
1
2
|
cd data_preparation
podman build -t fraud-detection-e2e-demo-data-preparation:latest .
|
管道工作流
1. 使用Spark进行数据准备
Spark作业执行关键数据准备步骤:
- 合并数据集
- 类型转换和特征工程
- 添加时间戳列
- 计算时间点特征
2. 使用Feast进行特征工程
定义特征视图示例:
1
2
3
4
5
6
7
8
9
10
11
|
transactions_fv = FeatureView(
name="transactions",
entities=[transaction],
schema=[
Field(name="user_id", dtype=feast.types.String),
Field(name="distance_from_home", dtype=feast.types.Float32),
# ...其他特征...
],
online=True,
source=transaction_source,
)
|
3. 模型训练
训练脚本:
- 加载特征
- 分割数据集
- 构建和训练神经网络
- 导出ONNX格式模型
4. 模型注册
将训练好的模型注册到Kubeflow模型注册表:
1
2
3
4
5
6
7
8
9
10
|
@dsl.component(base_image=PIPELINE_IMAGE)
def register_model(model: Input[Model]) -> NamedTuple('outputs', model_name=str, model_version=str):
registry = ModelRegistry(
server_address="http://model-registry-service.kubeflow.svc.cluster.local",
port=8080,
author="fraud-detection-e2e-pipeline",
user_token="non-used",
is_secure=False
)
# ...注册逻辑...
|
5. 使用KServe进行实时推理
创建KServe InferenceService:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
|
inference_service = kserve.V1beta1InferenceService(
api_version=kserve.constants.KSERVE_GROUP + "/v1beta1",
kind="InferenceService",
metadata=client.V1ObjectMeta(
name=model_name + "-" + job_id,
namespace=kserve.utils.get_default_target_namespace(),
),
spec=kserve.V1beta1InferenceServiceSpec(
predictor=kserve.V1beta1PredictorSpec(
service_account_name="kserve-sa",
containers=[V1Container(
name="inference-container",
image=rest_predictor_image,
command=["python", "predictor.py"],
args=["--model-name", model_name, "--model-version", model_version_name]
)]
)
),
)
|
测试实时端点
端口转发:
1
|
kubectl -n kubeflow get pods -l component=predictor -o jsonpath="{.items[*].metadata.name}" | tr ' ' '\n' | grep '^fraud-detection' | head -n1 | xargs -I {} kubectl port-forward -n kubeflow pod/{} 8081:8080
|
发送请求:
1
2
3
|
curl -X POST http://localhost:8081/v1/models/onnx-model:predict \
-H "Content-Type: application/json" \
-d '{"user_id": "user_0"}'
|
结论
本文展示了使用Kubeflow和开源工具构建从原始数据到实时模型服务的完整、可复现AI/ML工作流。通过这一蓝图,您可以调整和扩展流程以适应自己的机器学习项目。