按照上篇文章《解鎖雲原生 AI 技能 | 在 Kubernetes 上構建機器學習系統》搭建了一套 Kubeflow Pipelines 以後,咱們一塊兒小試牛刀,用一個真實的案例,學習如何開發一套基於 Kubeflow Pipelines 的機器學習工做流。node
機器學習工做流是一個任務驅動的流程,同時也是數據驅動的流程,這裏涉及到數據的導入和準備、模型訓練 Checkpoint 的導出評估、到最終模型的導出。這就須要分佈式存儲做爲傳輸的媒介,此處使用 NAS 做爲分佈式存儲。python
NFS_SERVER_IP
須要替換成真實 NAS 服務器地址/data
# mkdir -p /nfs # mount -t nfs -o vers=4.0 NFS_SERVER_IP:/ /nfs # mkdir -p /data # cd / # umount /nfs
# cat nfs-pv.yaml apiVersion: v1 kind: PersistentVolume metadata: name: user-susan labels: user-susan: pipelines spec: persistentVolumeReclaimPolicy: Retain capacity: storage: 10Gi accessModes: - ReadWriteMany nfs: server: NFS_SERVER_IP path: "/data" # kubectl create -f nfs-pv.yaml
# cat nfs-pvc.yaml apiVersion: v1 kind: PersistentVolumeClaim metadata: name: user-susan annotations: description: "this is the mnist demo" owner: Tom spec: accessModes: - ReadWriteMany resources: requests: storage: 5Gi selector: matchLabels: user-susan: pipelines # kubectl create -f nfs-pvc.yaml
因爲 Kubeflow Pipelines 提供的例子都是依賴於 Google 的存儲服務,這致使國內的用戶沒法真正體驗 Pipelines 的能力。爲此,阿里雲容器服務團隊提供了基於 NAS 存儲訓練 MNIST 模型的例子,方便您在阿里雲上使用和學習 Kubeflow Pipelines。具體步驟分 3 步:git
在這 3 個步驟中,後一個步驟都依賴於前一個步驟而完成。
Kubeflow Pipelines 中能夠用 Python 代碼描述這樣一個流程, 完整代碼能夠查看 standalone_pipeline.py。
咱們在例子中使用了基於開源項目 Arena 的 arena_op
,這是對於 Kubeflow 默認的 container_op
封裝,它可以實現對於分佈式訓練 MPI 和 PS 模式的無縫銜接,另外也支持使用 GPU 和 RDMA 等異構設備和分佈式存儲的簡單接入,同時方便從 git 源同步代碼,是一個比較實用的工具 API。github
@dsl.pipeline( name='pipeline to run jobs', description='shows how to run pipeline jobs.' ) def sample_pipeline(learning_rate='0.01', dropout='0.9', model_version='1', commit='f097575656f927d86d99dd64931042e1a9003cb2'): """A pipeline for end to end machine learning workflow.""" data=["user-susan:/training"] gpus=1 # 1\. prepare data prepare_data = arena.standalone_job_op( name="prepare-data", image="byrnedo/alpine-curl", data=data, command="mkdir -p /training/dataset/mnist && \ cd /training/dataset/mnist && \ curl -O https://code.aliyun.com/xiaozhou/tensorflow-sample-code/raw/master/data/t10k-images-idx3-ubyte.gz && \ curl -O https://code.aliyun.com/xiaozhou/tensorflow-sample-code/raw/master/data/t10k-labels-idx1-ubyte.gz && \ curl -O https://code.aliyun.com/xiaozhou/tensorflow-sample-code/raw/master/data/train-images-idx3-ubyte.gz && \ curl -O https://code.aliyun.com/xiaozhou/tensorflow-sample-code/raw/master/data/train-labels-idx1-ubyte.gz") # 2\. downalod source code and train the models train = arena.standalone_job_op( name="train", image="tensorflow/tensorflow:1.11.0-gpu-py3", sync_source="https://code.aliyun.com/xiaozhou/tensorflow-sample-code.git", env=["GIT_SYNC_REV=%s" % (commit)], gpus=gpus, data=data, command=''' echo %s;python code/tensorflow-sample-code/tfjob/docker/mnist/main.py \ --max_steps 500 --data_dir /training/dataset/mnist \ --log_dir /training/output/mnist --learning_rate %s \ --dropout %s''' % (prepare_data.output, learning_rate, dropout), metrics=["Train-accuracy:PERCENTAGE"]) # 3\. export the model export_model = arena.standalone_job_op( name="export-model", image="tensorflow/tensorflow:1.11.0-py3", sync_source="https://code.aliyun.com/xiaozhou/tensorflow-sample-code.git", env=["GIT_SYNC_REV=%s" % (commit)], data=data, command="echo %s;python code/tensorflow-sample-code/tfjob/docker/mnist/export_model.py --model_version=%s --checkpoint_path=/training/output/mnist /training/output/models" % (train.output, model_version))
Kubeflow Pipelines 會將上面的代碼轉化成一個有向無環圖 (DAG), 其中的每個節點就是 Component (組件),而 Component (組件)之間的連線表明它們之間的依賴關係。從 Pipelines UI 能夠看到 DAG 圖:
docker
首先具體理解一下數據準備的部分,這裏咱們提供了 arena.standalone_job_op
的 Python API, 須要指定該步驟的名稱
: name; 須要使用的容器鏡像
: image; 要使用的數據以及其對應到容器內部的掛載目錄
: data。
這裏的 data 是一個數組格式, 如 data=["user-susan:/training"],表示能夠掛載到多個數據。 其中 user-susan
是以前建立的 Persistent Volume Claim, 而 /training
爲容器內部的掛載目錄。api
prepare_data = arena.standalone_job_op( name="prepare-data", image="byrnedo/alpine-curl", data=data, command="mkdir -p /training/dataset/mnist && \ cd /training/dataset/mnist && \ curl -O https://code.aliyun.com/xiaozhou/tensorflow-sample-code/raw/master/data/t10k-images-idx3-ubyte.gz && \ curl -O https://code.aliyun.com/xiaozhou/tensorflow-sample-code/raw/master/data/t10k-labels-idx1-ubyte.gz && \ curl -O https://code.aliyun.com/xiaozhou/tensorflow-sample-code/raw/master/data/train-images-idx3-ubyte.gz && \ curl -O https://code.aliyun.com/xiaozhou/tensorflow-sample-code/raw/master/data/train-labels-idx1-ubyte.gz")
而上述步驟其實是從指定地址利用 curl 下載數據到分佈式存儲對應的目錄 /training/dataset/mnist
,請注意這裏的 /training
爲分佈式存儲的根目錄,相似你們熟悉的根 mount 點;而 /training/dataset/mnist
是子目錄。其實後面的步驟能夠經過使用一樣的根 mount 點,讀到數據,進行運算。
第二步是利用下載到分佈式存儲的數據,並經過 git 指定固定 commit id 下載代碼,並進行模型訓練。數組
train = arena.standalone_job_op( name="train", image="tensorflow/tensorflow:1.11.0-gpu-py3", sync_source="https://code.aliyun.com/xiaozhou/tensorflow-sample-code.git", env=["GIT_SYNC_REV=%s" % (commit)], gpus=gpus, data=data, command=''' echo %s;python code/tensorflow-sample-code/tfjob/docker/mnist/main.py \ --max_steps 500 --data_dir /training/dataset/mnist \ --log_dir /training/output/mnist --learning_rate %s \ --dropout %s''' % (prepare_data.output, learning_rate, dropout), metrics=["Train-accuracy:PERCENTAGE"])
能夠看到這個步驟比數據準備要相對複雜一點,除了和第一步驟中的 name, image, data 和 command 同樣須要指定以外,在模型訓練步驟中,還須要指定:安全
sync_source
的 git 代碼源,同時經過設定 env
中 GIT_SYNC_REV
指定訓練代碼的 commit id;metrics=["Train-accuracy:PERCENTAGE"]
。 2. 因爲 Pipelines 默認會從 stdout 日誌中收集指標,你須要在真正運行的模型代碼中輸出 {metrics name}={value} 或者 {metrics name}:{value}, 能夠參考具體樣例代碼。
值得注意的是:bash
在本步驟中指定了和prepare_data
相同的data
參數 ["user-susan:/training"],就能夠在訓練代碼中讀到對應的數據,好比--data_dir /training/dataset/mnist
。
另外因爲該步驟依賴於prepare_data
,能夠在方法中經過指定prepare_data.output
表示兩個步驟的依賴關係。
最後 export_model
是基於 train
訓練產生的 checkpoint,生成訓練模型:服務器
export_model = arena.standalone_job_op( name="export-model", image="tensorflow/tensorflow:1.11.0-py3", sync_source="https://code.aliyun.com/xiaozhou/tensorflow-sample-code.git", env=["GIT_SYNC_REV=%s" % (commit)], data=data, command="echo %s;python code/tensorflow-sample-code/tfjob/docker/mnist/export_model.py --model_version=%s --checkpoint_path=/training/output/mnist /training/output/models" % (train.output, model_version))
export_model
和第二步 train
相似,甚至要更爲簡單,它只是從 git 同步模型導出代碼而且利用共享目錄 /training/output/mnist
中的 checkpoint 執行模型導出。
整個工做流程看起來仍是很直觀的, 下面就能夠定義一個 Python 方法將整個流程貫穿在一塊兒:
@dsl.pipeline( name='pipeline to run jobs', description='shows how to run pipeline jobs.' ) def sample_pipeline(learning_rate='0.01', dropout='0.9', model_version='1', commit='f097575656f927d86d99dd64931042e1a9003cb2'):
@dsl.pipeline 是表示工做流的裝飾器,這個裝飾器中須要定義兩個屬性,分別是name
和description
。
入口方法sample_pipeline
中定義了 4 個參數:learning_rate
,dropout
,model_version
和commit
, 分別能夠在上面的train
和export_model
階段使用。這裏的參數的值其實是 dsl.PipelineParam 類型,定義成 dsl.PipelineParam 的目的在於能夠經過 Kubeflow Pipelines 的原生 UI 將其轉換成輸入表單,表單的關鍵字是參數名稱,而默認值爲參數的值。值得注意的是,這裏的 dsl.PipelineParam 對應值實際上只能是字符串和數字型;而數組和 map,以及自定義類型都是沒法經過轉型進行變換的。
實際上,這些參數均可以在用戶提交工做流時進行覆蓋,如下就是提交工做流對應的 UI:
您能夠在本身的 Kubernetes 內將前面開發工做流的 Python DSL 提交到 Kubeflow Pipelines 服務中, 實際提交代碼很簡單:
KFP_SERVICE="ml-pipeline.kubeflow.svc.cluster.local:8888" import kfp.compiler as compiler compiler.Compiler().compile(sample_pipeline, __file__ + '.tar.gz') client = kfp.Client(host=KFP_SERVICE) try: experiment_id = client.get_experiment(experiment_name=EXPERIMENT_NAME).id except: experiment_id = client.create_experiment(EXPERIMENT_NAME).id run = client.run_pipeline(experiment_id, RUN_ID, __file__ + '.tar.gz', params={'learning_rate':learning_rate, 'dropout':dropout, 'model_version':model_version, 'commit':commit})
利用compiler.compile
將 Python 代碼編譯成執行引擎 (Argo) 識別的 DAG 配置文件;
經過 Kubeflow Pipeline 的客戶端建立或者找到已有的實驗,而且提交以前編譯出的 DAG 配置文件。
在集羣內準備一個 python3 的環境,而且安裝 Kubeflow Pipelines SDK:
# kubectl create job pipeline-client --namespace kubeflow --image python:3 -- sleep infinity # kubectl exec -it -n kubeflow $(kubectl get po -l job-name=pipeline-client -n kubeflow | grep -v NAME| awk '{print $1}') bash
登陸到 Python3 的環境後,執行以下命令,連續提交兩個不一樣參數的任務:
# pip3 install http://kubeflow.oss-cn-beijing.aliyuncs.com/kfp/0.1.14/kfp.tar.gz --upgrade # pip3 install http://kubeflow.oss-cn-beijing.aliyuncs.com/kfp-arena/kfp-arena-0.4.tar.gz --upgrade # curl -O https://raw.githubusercontent.com/cheyang/pipelines/update_standalone_sample/samples/arena-samples/standalonejob/standalone_pipeline.py # python3 standalone_pipeline.py --learning_rate 0.0001 --dropout 0.8 --model_version 2 # python3 standalone_pipeline.py --learning_rate 0.0005 --dropout 0.8 --model_version 3
登陸到 Kubeflow Pipelines 的 UI: [https://](){pipeline地址}/pipeline/#/experiments, 好比:
https://11.124.285.171/pipeline/#/experiments
點擊 Compare runs
按鈕,能夠比較兩個實驗的輸入、花費的時間和精度等一系列指標。讓實驗可追溯是讓實驗可重現的第一步,而利用 Kubeflow Pipelines 自己的實驗管理能力則是開啓實驗可重現的第一步。
實現一個能夠運行的 Kubeflow Pipeline 須要的步驟是:
dsl.container_ops
, 須要構建兩部分代碼:container_op = dsl.ContainerOp( name=name, image='<train-image>', arguments=[ '--input_dir', input_dir, '--output_dir', output_dir, '--model_name', model_name, '--model_version', model_version, '--epochs', epochs ], file_outputs={'output': '/output.txt'} ) container_op.add_volume(k8s_client.V1Volume( host_path=k8s_client.V1HostPathVolumeSource( path=persistent_volume_path), name=persistent_volume_name)) container_op.add_volume_mount(k8s_client.V1VolumeMount( mount_path=persistent_volume_path, name=persistent_volume_name))
利用原生定義的 dsl.container_ops
的好處在於靈活,因爲開放了和 Pipelines 的交互接口,用戶能夠在 container_ops 這個層面作許多事情。可是它的問題在於:
container_op
爲單容器操做,若是須要支持分佈式訓練就須要在 container_ops 中提交和管理相似 TFJob 的任務。這裏會帶來複雜度和安全性的雙重挑戰,複雜度比較好理解,安全性是說提交 TFJob 這類任務的權限會須要開放額外的權限給 Pipeline 的開發者。另外一種方式是使用 arena_op
這種能夠重用的 Component API,它使用通用運行時代碼,能夠免去重複構建運行時代碼的工做;同時利用通用一套的 arena_op
API 簡化用戶的使用;也支持 Parameter Server 和 MPI 等場景。建議您使用這種方式編譯 Pipelines。
本文爲雲棲社區原創內容,未經容許不得轉載。