解鎖雲原生 AI 技能 - 開發你的機器學習工做流

按照上篇文章《解鎖雲原生 AI 技能 | 在 Kubernetes 上構建機器學習系統》搭建了一套 Kubeflow Pipelines 以後,咱們一塊兒小試牛刀,用一個真實的案例,學習如何開發一套基於 Kubeflow Pipelines 的機器學習工做流。node

準備工做

機器學習工做流是一個任務驅動的流程,同時也是數據驅動的流程,這裏涉及到數據的導入和準備、模型訓練 Checkpoint 的導出評估、到最終模型的導出。這就須要分佈式存儲做爲傳輸的媒介,此處使用 NAS 做爲分佈式存儲。python

  • 建立分佈式存儲,這裏以 NAS 爲例。此處 NFS_SERVER_IP 須要替換成真實 NAS 服務器地址
  1. 建立阿里雲 NAS 服務,能夠參考文檔
  2. 須要在 NFS Server 中建立 /data
# mkdir -p /nfs
# mount -t nfs -o vers=4.0 NFS_SERVER_IP:/ /nfs
# mkdir -p /data
# cd /
# umount /nfs
  1. 建立對應的 Persistent Volume
# cat nfs-pv.yaml
apiVersion: v1
kind: PersistentVolume
metadata:
  name: user-susan
  labels:
    user-susan: pipelines
spec:
  persistentVolumeReclaimPolicy: Retain
  capacity:
    storage: 10Gi
  accessModes:
  - ReadWriteMany
  nfs:
    server: NFS_SERVER_IP
    path: "/data"

# kubectl create -f nfs-pv.yaml
  1. 建立 Persistent Volume Claim
# cat nfs-pvc.yaml
apiVersion: v1
kind: PersistentVolumeClaim
metadata:
  name: user-susan
  annotations:
    description: "this is the mnist demo"
    owner: Tom
spec:
  accessModes:
    - ReadWriteMany
  resources:
    requests:
       storage: 5Gi
  selector:
    matchLabels:
      user-susan: pipelines
# kubectl create -f nfs-pvc.yaml

開發 Pipeline

因爲 Kubeflow Pipelines 提供的例子都是依賴於 Google 的存儲服務,這致使國內的用戶沒法真正體驗 Pipelines 的能力。爲此,阿里雲容器服務團隊提供了基於 NAS 存儲訓練 MNIST 模型的例子,方便您在阿里雲上使用和學習 Kubeflow Pipelines。具體步驟分 3 步:git

  • (1) 下載數據
  • (2) 利用 TensorFlow 進行模型訓練
  • (3) 模型導出

在這 3 個步驟中,後一個步驟都依賴於前一個步驟而完成。
Kubeflow Pipelines 中能夠用 Python 代碼描述這樣一個流程, 完整代碼能夠查看 standalone_pipeline.py
咱們在例子中使用了基於開源項目 Arenaarena_op ,這是對於 Kubeflow 默認的 container_op 封裝,它可以實現對於分佈式訓練 MPI 和 PS 模式的無縫銜接,另外也支持使用 GPU 和 RDMA 等異構設備和分佈式存儲的簡單接入,同時方便從 git 源同步代碼,是一個比較實用的工具 API。github

@dsl.pipeline(
  name='pipeline to run jobs',
  description='shows how to run pipeline jobs.'
)
def sample_pipeline(learning_rate='0.01',
    dropout='0.9',
    model_version='1',
    commit='f097575656f927d86d99dd64931042e1a9003cb2'):
  """A pipeline for end to end machine learning workflow."""
  data=["user-susan:/training"]
  gpus=1
# 1\. prepare data
  prepare_data = arena.standalone_job_op(
    name="prepare-data",
    image="byrnedo/alpine-curl",
    data=data,
    command="mkdir -p /training/dataset/mnist && \
  cd /training/dataset/mnist && \
  curl -O https://code.aliyun.com/xiaozhou/tensorflow-sample-code/raw/master/data/t10k-images-idx3-ubyte.gz && \
  curl -O https://code.aliyun.com/xiaozhou/tensorflow-sample-code/raw/master/data/t10k-labels-idx1-ubyte.gz && \
  curl -O https://code.aliyun.com/xiaozhou/tensorflow-sample-code/raw/master/data/train-images-idx3-ubyte.gz && \
  curl -O https://code.aliyun.com/xiaozhou/tensorflow-sample-code/raw/master/data/train-labels-idx1-ubyte.gz")
  # 2\. downalod source code and train the models
  train = arena.standalone_job_op(
    name="train",
    image="tensorflow/tensorflow:1.11.0-gpu-py3",
    sync_source="https://code.aliyun.com/xiaozhou/tensorflow-sample-code.git",
    env=["GIT_SYNC_REV=%s" % (commit)],
    gpus=gpus,
    data=data,
    command='''
    echo %s;python code/tensorflow-sample-code/tfjob/docker/mnist/main.py \
    --max_steps 500 --data_dir /training/dataset/mnist \
    --log_dir /training/output/mnist  --learning_rate %s \
    --dropout %s''' % (prepare_data.output, learning_rate, dropout),
    metrics=["Train-accuracy:PERCENTAGE"])
  # 3\. export the model
  export_model = arena.standalone_job_op(
    name="export-model",
    image="tensorflow/tensorflow:1.11.0-py3",
    sync_source="https://code.aliyun.com/xiaozhou/tensorflow-sample-code.git",
    env=["GIT_SYNC_REV=%s" % (commit)],
    data=data,
    command="echo %s;python code/tensorflow-sample-code/tfjob/docker/mnist/export_model.py --model_version=%s --checkpoint_path=/training/output/mnist /training/output/models" % (train.output, model_version))

Kubeflow Pipelines 會將上面的代碼轉化成一個有向無環圖 (DAG), 其中的每個節點就是 Component (組件),而 Component (組件)之間的連線表明它們之間的依賴關係。從 Pipelines UI 能夠看到 DAG 圖:
docker

首先具體理解一下數據準備的部分,這裏咱們提供了 arena.standalone_job_op 的 Python API, 須要指定該步驟的名稱: name; 須要使用的容器鏡像: image; 要使用的數據以及其對應到容器內部的掛載目錄: data。
這裏的 data 是一個數組格式, 如 data=["user-susan:/training"],表示能夠掛載到多個數據。 其中 user-susan 是以前建立的 Persistent Volume Claim, 而 /training 爲容器內部的掛載目錄。api

prepare_data = arena.standalone_job_op(
    name="prepare-data",
    image="byrnedo/alpine-curl",
    data=data,
    command="mkdir -p /training/dataset/mnist && \
  cd /training/dataset/mnist && \
  curl -O https://code.aliyun.com/xiaozhou/tensorflow-sample-code/raw/master/data/t10k-images-idx3-ubyte.gz && \
  curl -O https://code.aliyun.com/xiaozhou/tensorflow-sample-code/raw/master/data/t10k-labels-idx1-ubyte.gz && \
  curl -O https://code.aliyun.com/xiaozhou/tensorflow-sample-code/raw/master/data/train-images-idx3-ubyte.gz && \
  curl -O https://code.aliyun.com/xiaozhou/tensorflow-sample-code/raw/master/data/train-labels-idx1-ubyte.gz")

而上述步驟其實是從指定地址利用 curl 下載數據到分佈式存儲對應的目錄 /training/dataset/mnist,請注意這裏的 /training 爲分佈式存儲的根目錄,相似你們熟悉的根 mount 點;而 /training/dataset/mnist 是子目錄。其實後面的步驟能夠經過使用一樣的根 mount 點,讀到數據,進行運算。
第二步是利用下載到分佈式存儲的數據,並經過 git 指定固定 commit id 下載代碼,並進行模型訓練。數組

train = arena.standalone_job_op(
    name="train",
    image="tensorflow/tensorflow:1.11.0-gpu-py3",
    sync_source="https://code.aliyun.com/xiaozhou/tensorflow-sample-code.git",
    env=["GIT_SYNC_REV=%s" % (commit)],
    gpus=gpus,
    data=data,
    command='''
    echo %s;python code/tensorflow-sample-code/tfjob/docker/mnist/main.py \
    --max_steps 500 --data_dir /training/dataset/mnist \
    --log_dir /training/output/mnist  --learning_rate %s \
    --dropout %s''' % (prepare_data.output, learning_rate, dropout),
    metrics=["Train-accuracy:PERCENTAGE"])

能夠看到這個步驟比數據準備要相對複雜一點,除了和第一步驟中的 name, image, data 和 command 同樣須要指定以外,在模型訓練步驟中,還須要指定:安全

  • 獲取代碼的方式: 從可重現實驗的角度來看,對於運行試驗代碼的追本溯源,是很是重要的一環。能夠在 API 調用時指定 sync_source 的 git 代碼源,同時經過設定 envGIT_SYNC_REV 指定訓練代碼的 commit id;
  • gpu: 默認爲 0,就是不使用 GPU;若是爲大於 0 的整數值,就表明該步驟須要這個數量的 GPU 數;
  • metrics: 一樣是從可重現和可比較的實驗目的出發,用戶能夠將須要的一系列指標導出,而且經過 Pipelines UI 進行直觀的顯示和比較。具體使用方法分爲兩步:1. 在調用 API 時以數組的形式指定要收集指標的 metrics name 和指標的展現格式 PERCENTAGE 或者是 RAW,好比 metrics=["Train-accuracy:PERCENTAGE"]。 2. 因爲 Pipelines 默認會從 stdout 日誌中收集指標,你須要在真正運行的模型代碼中輸出 {metrics name}={value} 或者 {metrics name}:{value}, 能夠參考具體樣例代碼

值得注意的是:bash

在本步驟中指定了和 prepare_data 相同的 data 參數 ["user-susan:/training"],就能夠在訓練代碼中讀到對應的數據,好比 --data_dir /training/dataset/mnist
另外因爲該步驟依賴於 prepare_data,能夠在方法中經過指定 prepare_data.output 表示兩個步驟的依賴關係。

最後 export_model 是基於 train 訓練產生的 checkpoint,生成訓練模型:服務器

export_model = arena.standalone_job_op(
    name="export-model",
    image="tensorflow/tensorflow:1.11.0-py3",
    sync_source="https://code.aliyun.com/xiaozhou/tensorflow-sample-code.git",
    env=["GIT_SYNC_REV=%s" % (commit)],
    data=data,
    command="echo %s;python code/tensorflow-sample-code/tfjob/docker/mnist/export_model.py --model_version=%s --checkpoint_path=/training/output/mnist /training/output/models" % (train.output, model_version))

export_model 和第二步 train 相似,甚至要更爲簡單,它只是從 git 同步模型導出代碼而且利用共享目錄 /training/output/mnist 中的 checkpoint 執行模型導出。
整個工做流程看起來仍是很直觀的, 下面就能夠定義一個 Python 方法將整個流程貫穿在一塊兒:

@dsl.pipeline(
  name='pipeline to run jobs',
  description='shows how to run pipeline jobs.'
)
def sample_pipeline(learning_rate='0.01',
    dropout='0.9',
    model_version='1',
    commit='f097575656f927d86d99dd64931042e1a9003cb2'):
@dsl.pipeline 是表示工做流的裝飾器,這個裝飾器中須要定義兩個屬性,分別是 namedescription
入口方法 sample_pipeline 中定義了 4 個參數: learning_rate, dropout, model_versioncommit, 分別能夠在上面的 trainexport_model 階段使用。這裏的參數的值其實是 dsl.PipelineParam 類型,定義成 dsl.PipelineParam 的目的在於能夠經過 Kubeflow Pipelines 的原生 UI 將其轉換成輸入表單,表單的關鍵字是參數名稱,而默認值爲參數的值。值得注意的是,這裏的 dsl.PipelineParam 對應值實際上只能是字符串和數字型;而數組和 map,以及自定義類型都是沒法經過轉型進行變換的。

實際上,這些參數均可以在用戶提交工做流時進行覆蓋,如下就是提交工做流對應的 UI:

提交 Pipeline

您能夠在本身的 Kubernetes 內將前面開發工做流的 Python DSL 提交到 Kubeflow Pipelines 服務中, 實際提交代碼很簡單:

KFP_SERVICE="ml-pipeline.kubeflow.svc.cluster.local:8888"
  import kfp.compiler as compiler
  compiler.Compiler().compile(sample_pipeline, __file__ + '.tar.gz')
  client = kfp.Client(host=KFP_SERVICE)
  try:
    experiment_id = client.get_experiment(experiment_name=EXPERIMENT_NAME).id
  except:
    experiment_id = client.create_experiment(EXPERIMENT_NAME).id
  run = client.run_pipeline(experiment_id, RUN_ID, __file__ + '.tar.gz',
                            params={'learning_rate':learning_rate,
                                     'dropout':dropout,
                                    'model_version':model_version,
                                    'commit':commit})
利用 compiler.compile 將 Python 代碼編譯成執行引擎 (Argo) 識別的 DAG 配置文件;
經過 Kubeflow Pipeline 的客戶端建立或者找到已有的實驗,而且提交以前編譯出的 DAG 配置文件。

在集羣內準備一個 python3 的環境,而且安裝 Kubeflow Pipelines SDK:

# kubectl create job pipeline-client --namespace kubeflow --image python:3 -- sleep infinity
# kubectl  exec -it -n kubeflow $(kubectl get po -l job-name=pipeline-client -n kubeflow | grep -v NAME| awk '{print $1}') bash

登陸到 Python3 的環境後,執行以下命令,連續提交兩個不一樣參數的任務:

# pip3 install http://kubeflow.oss-cn-beijing.aliyuncs.com/kfp/0.1.14/kfp.tar.gz --upgrade
# pip3 install http://kubeflow.oss-cn-beijing.aliyuncs.com/kfp-arena/kfp-arena-0.4.tar.gz --upgrade
# curl -O https://raw.githubusercontent.com/cheyang/pipelines/update_standalone_sample/samples/arena-samples/standalonejob/standalone_pipeline.py
# python3 standalone_pipeline.py --learning_rate 0.0001 --dropout 0.8 --model_version 2
# python3 standalone_pipeline.py --learning_rate 0.0005 --dropout 0.8 --model_version 3

查看運行結果

登陸到 Kubeflow Pipelines 的 UI: [https://](){pipeline地址}/pipeline/#/experiments, 好比:

https://11.124.285.171/pipeline/#/experiments

點擊 Compare runs 按鈕,能夠比較兩個實驗的輸入、花費的時間和精度等一系列指標。讓實驗可追溯是讓實驗可重現的第一步,而利用 Kubeflow Pipelines 自己的實驗管理能力則是開啓實驗可重現的第一步。

總結

實現一個能夠運行的 Kubeflow Pipeline 須要的步驟是:

  1. 構建 Pipeline (流水線)中須要的最小執行單元 Component (組件),若是是利用原生定義的 dsl.container_ops, 須要構建兩部分代碼:
  • 構建運行時代碼:一般是爲每一個步驟構建容器鏡像,做爲 Pipelines 和真正執行業務邏輯代碼之間的適配器。它所作的事情爲獲取 Pipelines 上下文的輸入參數,調用業務邏輯代碼,而且將須要傳遞到下個步驟的輸出按照 Pipelines 的規則放到容器內的指定位置,由底層工做流組件負責傳遞。 這樣產生的結果是運行時代碼與業務邏輯代碼會耦合在一塊兒。能夠參考 Kubeflow Pipelines 的例子
  • 構建客戶端代碼:這個步驟一般是長成下面的樣子, 熟悉 Kubernetes 的朋友會發現這個步驟實際上就是在編寫 Pod Spec:
container_op = dsl.ContainerOp(
        name=name,
        image='<train-image>',
        arguments=[
            '--input_dir', input_dir,
            '--output_dir', output_dir,
            '--model_name', model_name,
            '--model_version', model_version,
            '--epochs', epochs
        ],
        file_outputs={'output': '/output.txt'}
    )
container_op.add_volume(k8s_client.V1Volume(
            host_path=k8s_client.V1HostPathVolumeSource(
                path=persistent_volume_path),
            name=persistent_volume_name))
container_op.add_volume_mount(k8s_client.V1VolumeMount(
            mount_path=persistent_volume_path,
            name=persistent_volume_name))

利用原生定義的 dsl.container_ops 的好處在於靈活,因爲開放了和 Pipelines 的交互接口,用戶能夠在 container_ops 這個層面作許多事情。可是它的問題在於:

  • 複用度低。每一個 Component 都須要構建鏡像和開發運行時代碼;
  • 複雜度高。使用者須要瞭解 Kubernetes 的概念,好比 resource limit, PVC, node selector 等一系列概念;
  • 支持分佈式訓練困難。因爲 container_op 爲單容器操做,若是須要支持分佈式訓練就須要在 container_ops 中提交和管理相似 TFJob 的任務。這裏會帶來複雜度和安全性的雙重挑戰,複雜度比較好理解,安全性是說提交 TFJob 這類任務的權限會須要開放額外的權限給 Pipeline 的開發者。

另外一種方式是使用 arena_op 這種能夠重用的 Component API,它使用通用運行時代碼,能夠免去重複構建運行時代碼的工做;同時利用通用一套的 arena_op API 簡化用戶的使用;也支持 Parameter Server 和 MPI 等場景。建議您使用這種方式編譯 Pipelines。

  1. 將構建好的 Component (組件)拼接成 Pipeline (流水線);
  2. 將 Pipeline (流水線)編譯成 Argo 的執行引擎 (Argo) 識別的 DAG 配置文件, 並提交 DAG 配置文件到 Kubeflow Pipelines, 利用 Kubeflow Pipelines 自身的 UI 查看流程結果。


本文做者:一綠舟

原文連接

本文爲雲棲社區原創內容,未經容許不得轉載。

相關文章
相關標籤/搜索