欢迎来到尧图网

客户服务 关于我们

您的位置:首页 > 新闻 > 社会 > Kubeflow 快速入门实战(二) - Pipelines / Katib / KServer

Kubeflow 快速入门实战(二) - Pipelines / Katib / KServer

2025/5/3 23:38:13 来源:https://blog.csdn.net/weixin_39403185/article/details/147349105  浏览:    关键词:Kubeflow 快速入门实战(二) - Pipelines / Katib / KServer

承接前文博客 Kubeflow 快速入门实战(一)

Kubeflow 快速入门实战(一) - 简介 / Notebooks-CSDN博客文章浏览阅读441次,点赞19次,收藏6次。本文主要介绍了 Kubeflow 的主要功能和能力,适用场景,基本用法。以及Notebook,piplines,katib,KServer 的入门级示例 https://blog.csdn.net/weixin_39403185/article/details/147337813?spm=1001.2014.3001.5502

4.2 Kubeflow piplines 示例

1) 生成 pipelines 配置文件

任意 python 环境都成

## 安装依赖
pip install kfp --upgrade

准备流水线代码 simple_pipeline.py

# simple_pipeline.py
# 1. 导入 KFP 库
from kfp import dsl
from kfp import compiler# 2. 定义组件 (Component)
#    - 使用 @dsl.component 装饰器
#    - 指定基础镜像 (base_image),代码将在这个镜像的容器中运行
#    - 使用 Python 类型提示 (Type Hinting) 定义输入和输出
@dsl.component(base_image='python:3.9')
def add_prefix(text: str) -> str:"""在输入文本前添加 'Hello, '"""# 组件的逻辑代码return f"Hello, {text}"@dsl.component(base_image='python:3.9')
def print_message(message: str):"""打印传入的消息"""# 组件的逻辑代码print(f"Received message: {message}")# 3. 定义流水线 (Pipeline)
#    - 使用 @dsl.pipeline 装饰器
#    - 提供名称 (name) 和描述 (description),会显示在 UI 上
@dsl.pipeline(name='simple-greeting-pipeline',description='一个简单的打印问候语的流水线示例'
)
def simple_pipeline(recipient: str = 'World'): # 定义流水线参数"""定义流水线的工作流程"""# 4. 在流水线函数内部,实例化组件来创建任务 (Task)#    - 调用组件函数 (如 add_prefix()) 会创建一个任务节点add_prefix_task = add_prefix(text=recipient) # 将流水线参数传给组件# 5. 连接任务:将一个任务的输出作为另一个任务的输入#    - 使用 .output 属性来引用上一个任务的返回值print_message_task = print_message(message=add_prefix_task.output)if __name__ == '__main__':compiler.Compiler().compile(pipeline_func=simple_pipeline,package_path='simple_pipeline.yaml')print("Pipeline compiled to simple_pipeline.yaml")
## 生成 可供kubeflow识别的yaml文件
python simple_pipeline.py

2) UI 控制台 创建pipelines

开始在 UI 控制台上 创建流水线

3) UI 控制台 运行pipelines 

3) UI 控制台 查看pipelines结果 

4.3 Kubeflow katib 示例

a) 脚本yaml准备

simple-function-tuning-v2.yaml

apiVersion: "kubeflow.org/v1beta1"
kind: Experiment
metadata:namespace: kubeflow-user-example-com name: simple-function-tuning-v2
spec:objective:type: maximizegoal: 99.9objectiveMetricName: Metricalgorithm:algorithmName: randomparallelTrialCount: 3maxTrialCount: 12maxFailedTrialCount: 3parameters:- name: "x"parameterType: doublefeasibleSpace:min: "-5.0"max: "5.0"- name: "y"parameterType: doublefeasibleSpace:min: "-5.0"max: "5.0"trialTemplate:trialParameters:- name: "paramX"description: Parameter X for the functionreference: "x"- name: "paramY"description: Parameter Y for the functionreference: "y"trialSpec:apiVersion: batch/v1kind: Jobspec:template:metadata:annotations:sidecar.istio.io/inject: "false"spec:containers:- name: trial-containerimage: ubuntu:22.04resources:requests:memory: "64Mi"cpu: "100m"limits:memory: "128Mi"cpu: "500m"command:- /bin/bash- -c- |set -eecho "--- Starting Trial ---"echo "Updating package lists..."apt-get update > /dev/null && apt-get upgrade -y > /dev/null && apt-get install -y bc > /dev/nullecho "Packages installed successfully."X="${trialParameters.paramX}"Y="${trialParameters.paramY}"echo "Parameters received: X=$X, Y=$Y"echo "Calculating metric..."RESULT=$(echo "scale=4; 100 - ($X-3)^2 - ($Y-5)^2" | bc)echo "Calculation result: RESULT=$RESULT"echo "Reporting metric..."echo "Metric=${RESULT}" > /var/log/katib/metrics.log \restartPolicy: NeverprimaryContainerName: trial-containermetricsCollectorSpec:collector:kind: Filesource:fileSystem:path: /var/log/katib/metrics.log filter:metricNameRegex: '^Metric$'metricValueRegex: '([+-]?\d*\.?\d+([eE][+-]?\d+)?)'

b) 脚本部署并运行 

c) 运行结果查看

4.4 Kubeflow KServer 示例

这个模块就很复杂了 涉及到的元素特别多。画了一个简易的逻辑架构图

a) 基础环境准备 conda python

mkdir -p /data/modelswget https://repo.anaconda.com/miniconda/Miniconda3-latest-Linux-x86_64.sh -O /data/Miniconda3.sh
bash /data/Miniconda3.sh -b -p /data/miniconda3echo 'export PATH="/data/miniconda3/bin:$PATH"' >> ~/.bashrc
source /data/miniconda3/bin/activateconda create -n llm python=3.10 -y
conda activate llm
echo 'conda activate llm' >> ~/.bashrc
source ~/.bashrcpip install huggingface_hub
huggingface-cli download Qwen/Qwen2.5-1.5B --resume-download --local-dir /data/models/qwen2.5-1.5b

b) 镜像准备

可以白嫖一下阿里云的个人版私有仓库

model_server.py

import os
from fastapi import FastAPI, Request
import torch
from transformers import AutoModelForCausalLM, AutoTokenizer# 模型加载路径,对应 PV 挂载点
MODEL_DIR = "/data/models/qwen2.5-1.5b" # 这个路径必须和 InferenceService 中 volumeMounts 的 mountPath 一致
MODEL_NAME = "Qwen/Qwen2-1.5B-Instruct" # 可以从环境变量读取或硬编码# 检查是否有可用的 GPU
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
print(f"Using device: {device}")# 加载模型和 Tokenizer
print(f"Loading tokenizer from {MODEL_DIR}...")
tokenizer = AutoTokenizer.from_pretrained(MODEL_DIR, trust_remote_code=True)
print(f"Loading model from {MODEL_DIR}...")
model = AutoModelForCausalLM.from_pretrained(MODEL_DIR,torch_dtype="auto", # 或者 torch.float16 节省显存device_map="auto", # 自动将模型分片到可用设备 (GPU)trust_remote_code=True
)
# model.to(device) # 如果 device_map="auto" 不起作用或想强制指定,可以用这个
model.eval() # 设置为评估模式
print("Model loaded successfully.")# 创建 FastAPI 应用
app = FastAPI()@app.get('/')
def read_root():return {"message": "Qwen Model Server is running"}# KServe V1 Predict Protocol (可以简化为自定义 /predict 接口)
@app.post('/v1/models/{model_name}:predict')
async def predict(model_name: str, request: Request):"""接收 KServe V1 协议的请求或自定义请求预期 JSON: {"instances": [{"text": "your prompt"}]}或者简化: {"text": "your prompt"}"""body = await request.json()print(f"Received request body: {body}")# 兼容 KServe V1 和自定义格式if "instances" in body and isinstance(body["instances"], list) and "text" in body["instances"][0]:prompt_text = body["instances"][0]["text"]elif "text" in body:prompt_text = body["text"]else:return {"error": "Invalid input format. Expecting {'instances': [{'text': '...'}]} or {'text': '...'}"}, 400print(f"Generating text for prompt: {prompt_text}")try:# 构建 Qwen 需要的 messages 格式messages = [{"role": "system", "content": "You are a helpful assistant."},{"role": "user", "content": prompt_text}]text = tokenizer.apply_chat_template(messages,tokenize=False,add_generation_prompt=True)model_inputs = tokenizer([text], return_tensors="pt").to(device)generated_ids = model.generate(model_inputs.input_ids,max_new_tokens=512,         # 控制最大生成长度do_sample=True,             # 启用采样策略,而不是贪婪解码temperature=0.7,            # 控制随机性,稍小于 1.0 使输出更集中但仍有变化top_p=0.9,                  # Nucleus sampling,只考虑概率累积到 0.9 的词repetition_penalty=1.1      # 轻微惩罚重复 token,大于 1.0 即可,不要设置过高)generated_ids = [output_ids[len(input_ids):] for input_ids, output_ids in zip(model_inputs.input_ids, generated_ids)]response = tokenizer.batch_decode(generated_ids, skip_special_tokens=True)[0]print(f"Generated response: {response}")# 返回 KServe V1 兼容格式return {"predictions": [response]}except Exception as e:print(f"Error during inference: {e}")return {"error": str(e)}, 500# KServe 要求健康检查端点 (可选,但推荐)
@app.get('/healthz')
def healthz():return {"status": "ok"}

requirements.txt

fastapi>=0.100.0
uvicorn>=0.20.0
torch>=2.1.0 --index-url https://download.pytorch.org/whl/cu121
transformers>=4.38.0
accelerate>=0.25.0
sentencepiece
tiktoken
einops
protobuf

dockerfile

FROM pytorch/pytorch:2.1.2-cuda12.1-cudnn8-runtime
WORKDIR /appCOPY requirements.txt .
RUN pip install --no-cache-dir -r requirements.txt# 复制模型服务代码
COPY model_server.py .# 暴露端口 (KServe 默认希望容器监听 8080)
EXPOSE 8080# 设置时区 (可选, 但有助于日志时间戳统一)
ENV TZ=Asia/Shanghai
RUN ln -snf /usr/share/zoneinfo/$TZ /etc/localtime && echo $TZ > /etc/timezone# 设置入口点,启动 FastAPI 服务
# 使用 uvicorn 运行 FastAPI 应用
CMD ["uvicorn", "model_server:app", "--host", "0.0.0.0", "--port", "8080"]
## 制作镜像
docker build -t registry.ap-southeast-5.aliyuncs.com/xxx/qwen-kserve:0.5b-cu121 .
## 登录阿里云的私有仓库 免费的
docker login
##推送过去
docker push registry.ap-southeast-5.aliyuncs.com/xxx/qwen-kserve:0.5b-cu121 

c) 模型准备

huggingface-cli download Qwen/Qwen2.5-1.5B --resume-download --local-dir /data/models/qwen2.5-1.5b

qwen-model-pv.yaml

apiVersion: v1
kind: PersistentVolume
metadata:name: qwen-model-pvnamespace: kubeflow-user-example-com
spec:capacity:storage: 10Gi volumeMode: FilesystemaccessModes:- ReadWriteOncepersistentVolumeReclaimPolicy: Retain storageClassName: manual hostPath:path: "/data/models" 

qwen-model-pvc.yaml

apiVersion: v1
kind: PersistentVolumeClaim
metadata:name: qwen-model-pvcnamespace: kubeflow-user-example-com
spec:accessModes:- ReadWriteOncevolumeMode: Filesystemresources:requests:storage: 10Gi storageClassName: manual
## 部署pv pvc
kubectl apply -f qwen-model-pv.yaml
kubectl apply -f qwen-model-pvc.yaml

d) InferenceService部署

apiVersion: "serving.kserve.io/v1beta1"
kind: "InferenceService"
metadata:name: "qwen25-15b-instruct" namespace: kubeflow-user-example-com
spec:predictor:containers:- name: kserve-containerimage: registry.ap-southeast-5.aliyuncs.com/xxxxx/qwen-kserve:0.5b-cu121imagePullPolicy: IfNotPresentresources:requests:cpu: "2"memory: "8Gi"nvidia.com/gpu: "1"limits:cpu: "4"memory: "16Gi"nvidia.com/gpu: "1"ports:- containerPort: 8080protocol: TCPvolumeMounts:- name: model-storagemountPath: /data/modelsreadOnly: truevolumes:- name: model-storagepersistentVolumeClaim:claimName: qwen-model-pvc

e) 测试

curl -X POST -H "Content-Type: application/json" \
http://127.0.0.1:8080/v1/models/qwen:predict \
-d '{"instances": [{"text": "你好,请简单介绍一下阿里云ECS?"}]
}' | jq .

参考:

Kubeflow Pipelines介绍与实例-CSDN博客文章浏览阅读2.5k次,点赞2次,收藏8次。kubeflow/kubeflow 是一个胶水项目,。pipelines 是基于 kubeflow 实现的工作流系统,它的目标是借助 kubeflow 的底层支持,实现出一套工作流,支持数据准备,模型训练,模型部署,可以通过代码提交等等方式触发。_kubeflow pipeline https://blog.csdn.net/qq_45808700/article/details/132188234?spm=1001.2101.3001.6650.4&utm_medium=distribute.pc_relevant.none-task-blog-2%7Edefault%7EOPENSEARCH%7ERate-4-132188234-blog-147349105.235%5Ev43%5Epc_blog_bottom_relevance_base3&depth_1-utm_source=distribute.pc_relevant.none-task-blog-2%7Edefault%7EOPENSEARCH%7ERate-4-132188234-blog-147349105.235%5Ev43%5Epc_blog_bottom_relevance_base3&utm_relevant_index=5

版权声明:

本网仅为发布的内容提供存储空间,不对发表、转载的内容提供任何形式的保证。凡本网注明“来源:XXX网络”的作品,均转载自其它媒体,著作权归作者所有,商业转载请联系作者获得授权,非商业转载请注明出处。

我们尊重并感谢每一位作者,均已注明文章来源和作者。如因作品内容、版权或其它问题,请及时与我们联系,联系邮箱:809451989@qq.com,投稿邮箱:809451989@qq.com

热搜词