使用Kubeflow構建機器學習流水線

此前的文章中,我已經向你介紹了Kubeflow,這是一個爲團隊設置的機器學習平臺,須要構建機器學習流水線。python

在本文中,咱們將瞭解如何採用現有的機器學習詳細並將其變成Kubeflow的機器學習流水線,進而能夠部署在Kubernetes上。在進行本次練習的時候,請考慮你該如何將現有的機器學習項目轉換到Kubeflow上。linux

我將使用Fashion MNIST做爲例子,由於在本次練習中模型的複雜性並非咱們須要解決的主要目標。對於這一簡單的例子,我將流水線分爲3個階段:git

  • Git clone代碼庫github

  • 下載並從新處理訓練和測試數據docker

  • 訓練評估bash

固然,你能夠根據本身的用例將流水線以任意形式拆分,而且能夠隨意擴展流水線。ssh

獲取代碼

你能夠從Github上獲取代碼:機器學習

% git clone https://github.com/benjamintanweihao/kubeflow-mnist.git

如下是咱們用來建立流水線的完整清單。實際上,你的代碼極可能跨多個庫和文件。在咱們的例子中,咱們將代碼分爲兩個腳本,preprocessing.pytrain.py函數

from tensorflow import keras
import argparse
import os
import pickle


def preprocess(data_dir: str):
    fashion_mnist = keras.datasets.fashion_mnist
    (train_images, train_labels), (test_images, test_labels) = fashion_mnist.load_data()

    train_images = train_images / 255.0
    test_images = test_images / 255.0

    os.makedirs(data_dir, exist_ok=True)

    with open(os.path.join(data_dir, 'train_images.pickle'), 'wb') as f:
  pickle.dump(train_images, f)

    with open(os.path.join(data_dir, 'train_labels.pickle'), 'wb') as f:
  pickle.dump(train_labels, f)

    with open(os.path.join(data_dir, 'test_images.pickle'), 'wb') as f:
        pickle.dump(test_images, f)

    with open(os.path.join(data_dir, 'test_labels.pickle'), 'wb') as f:
        pickle.dump(test_labels, f)

if __name__ == '__main__':
    parser = argparse.ArgumentParser(description='Kubeflow MNIST training script')
    parser.add_argument('--data_dir', help='path to images and labels.')
    args = parser.parse_args()

    preprocess(data_dir=args.data_dir)

處理腳本採用單個參數data_dir。它下載並預處理數據,並將pickled版本保存在data_dir中。在生產代碼中,這多是TFRecords的存儲目錄。工具

train.py

import calendar
import os
import time

import tensorflow as tf
import pickle
import argparse

from tensorflow import keras
from constants import PROJECT_ROOT


def train(data_dir: str):
    # Training
    model = keras.Sequential([
          keras.layers.Flatten(input_shape=(28, 28)),
          keras.layers.Dense(128, activation='relu'),
          keras.layers.Dense(10)])

    model.compile(optimizer='adam',
              loss=tf.keras.losses.SparseCategoricalCrossentropy(from_logits=True),
                  metrics=['accuracy'])

    with open(os.path.join(data_dir, 'train_images.pickle'), 'rb') as f:
        train_images = pickle.load(f)

    with open(os.path.join(data_dir, 'train_labels.pickle'), 'rb') as f:
        train_labels = pickle.load(f)

    model.fit(train_images, train_labels, epochs=10)

    with open(os.path.join(data_dir, 'test_images.pickle'), 'rb') as f:
        test_images = pickle.load(f)

    with open(os.path.join(data_dir, 'test_labels.pickle'), 'rb') as f:
        test_labels = pickle.load(f)

    # Evaluation
    test_loss, test_acc = model.evaluate(test_images, test_labels, verbose=2)

    print(f'Test Loss: {test_loss}')
    print(f'Test Acc: {test_acc}')

    # Save model
    ts = calendar.timegm(time.gmtime())
    model_path = os.path.join(PROJECT_ROOT, f'mnist-{ts}.h5')
    tf.saved_model.save(model, model_path)

    with open(os.path.join(PROJECT_ROOT, 'output.txt'), 'w') as f:
        f.write(model_path)
        print(f'Model written to: {model_path}')


if __name__ == '__main__':
    parser = argparse.ArgumentParser(description='Kubeflow FMNIST training script')
    parser.add_argument('--data_dir', help='path to images and labels.')
    args = parser.parse_args()

    train(data_dir=args.data_dir)

train.py中,將創建模型,並使用data_dir指定訓練和測試數據的位置。模型訓練完畢而且開始執行評估後,將模型寫入帶有時間戳的路徑。請注意,該路徑也已寫入output.txt。稍後將對此進行引用。

開發Kubeflow流水線

爲了開始建立Kubeflow流水線,咱們須要拉取一些依賴項。我準備了一個environment.yml,其中包括了kfp 0.5.0tensorflow以及其餘所需的依賴項。

你須要安裝Conda,而後執行如下步驟:

% conda env create -f environment.yml
% source activate kubeflow-mnist
% python preprocessing.py --data_dir=/path/to/data
% python train.py --data_dir=/path/to/data

如今咱們來回顧一下咱們流水線中的幾個步驟:

  • Git clone代碼庫

  • 下載並預處理訓練和測試數據

  • 訓練並進行評估

在咱們開始寫代碼以前,須要從宏觀上了解Kubeflow流水線。

流水線由鏈接組件構成。一個組件的輸出成爲另外一個組件的輸入,每一個組件實際上都在容器中執行(在本例中爲Docker)。將發生的狀況是,咱們會執行一個咱們稍後將要指定的Docker鏡像,它包含了咱們運行preprocessing.pytrain.py所需的一切。固然,這兩個階段會有它們的組件。

咱們還須要額外的一個鏡像以git clone項目。咱們須要將項目bake到Docker鏡像,但在實際項目中,這可能會致使Docker鏡像的大小膨脹。

說到Docker鏡像,咱們應該先建立一個。

Step0:建立一個Docker鏡像

若是你只是想進行測試,那麼這個步驟不是必須的,由於我已經在Docker Hub上準備了一個鏡像。這是Dockerfile的全貌:

FROM tensorflow/tensorflow:1.14.0-gpu-py3
LABEL MAINTAINER "Benjamin Tan <benjamintanweihao@gmail.com>"
SHELL ["/bin/bash", "-c"]

# Set the locale
RUN echo 'Acquire {http::Pipeline-Depth "0";};' >> /etc/apt/apt.conf
RUN DEBIAN_FRONTEND="noninteractive"
RUN apt-get update  && apt-get -y install --no-install-recommends locales && locale-gen en_US.UTF-8
ENV LANG en_US.UTF-8
ENV LANGUAGE en_US:en
ENV LC_ALL en_US.UTF-8

RUN apt-get install -y --no-install-recommends \
    wget \
    git \
    python3-pip \
    openssh-client \
    python3-setuptools \
    google-perftools && \
    rm -rf /var/lib/apt/lists/*

# install conda
WORKDIR /tmp
RUN wget --quiet https://repo.anaconda.com/miniconda/Miniconda3-4.7.12-Linux-x86_64.sh -O ~/miniconda.sh && \
    /bin/bash ~/miniconda.sh -b -p /opt/conda && \
    rm ~/miniconda.sh && \
    ln -s /opt/conda/etc/profile.d/conda.sh /etc/profile.d/conda.sh && \
    echo ". /opt/conda/etc/profile.d/conda.sh" >> ~/.bashrc

# build conda environments
COPY environment.yml /tmp/kubeflow-mnist/conda/
RUN /opt/conda/bin/conda update -n base -c defaults conda
RUN /opt/conda/bin/conda env create -f /tmp/kubeflow-mnist/conda/environment.yml
RUN /opt/conda/bin/conda clean -afy

# Cleanup
RUN rm -rf /workspace/{nvidia,docker}-examples && rm -rf /usr/local/nvidia-examples && \
    rm /tmp/kubeflow-mnist/conda/environment.yml

# switch to the conda environment
RUN echo "conda activate kubeflow-mnist" >> ~/.bashrc
ENV PATH /opt/conda/envs/kubeflow-mnist/bin:$PATH
RUN /opt/conda/bin/activate kubeflow-mnist

# make /bin/sh symlink to bash instead of dash:
RUN echo "dash dash/sh boolean false" | debconf-set-selections && \
    DEBIAN_FRONTEND=noninteractive dpkg-reconfigure dash

# Set the new Allocator
ENV LD_PRELOAD /usr/lib/x86_64-linux-gnu/libtcmalloc.so.

關於Dockerfile值得關注的重要一點是Conda環境是否設置完成並準備就緒。要構建鏡像:

% docker build -t your-user-name/kubeflow-mnist . -f Dockerfile
% docker push your-user-name/kubeflow-mnist

那麼,如今讓咱們來建立第一個組件!

pipeline.py中能夠找到如下代碼片斷。

Step1:Git Clone

在這一步中,咱們將從遠程的Git代碼庫中執行一個git clone。特別是,我想要向你展現如何從私有倉庫中進行git clone,由於這是大多數企業的項目所在的位置。固然,這也是一個很好的機會來演示Rancher中一個很棒的功能,它能簡單地添加諸如SSH密鑰之類的密鑰。

使用Rancher添加密鑰

訪問Rancher界面。在左上角,選擇local,而後選擇二級菜單的Default:

而後,選擇Resources下的Secrets

你應該看到一個密鑰的列表,它們正在被你剛剛選擇的集羣所使用。點擊Add Secret:

使用你在下圖中所看到的值來填寫該頁面。若是kubeflow沒有在命名空間欄下展現出來,你能夠經過選擇Add to a new namespace而且輸入kubeflow簡單地建立一個。

確保Scope僅是個命名空間。若是將Scope設置爲全部命名空間,那麼將使得在Default項目中的任意工做負載都可以使用你的ssh密鑰。

在Secret Values中,key是id_rsa,值是id_rsa的內容。完成以後,點擊Save。

若是一些進展順利,你將會看到下圖的內容。如今你已經成功地在kubeflow命名空間中添加了你的SSH密鑰,而且無需使用kubectl!

既然咱們已經添加了咱們的SSH key,那麼是時候回到代碼。咱們如何利用新添加的SSH密鑰來訪問私有git倉庫?

def git_clone_darkrai_op(repo_url: str):

    volume_op = dsl.VolumeOp(
        name="create pipeline volume",
        resource_name="pipeline-pvc",
        modes=["ReadWriteOnce"],
        size="3Gi"
    )

    image = 'alpine/git:latest'

    commands = [
        "mkdir ~/.ssh",
        "cp /etc/ssh-key/id_rsa ~/.ssh/id_rsa",
        "chmod 600 ~/.ssh/id_rsa",
        "ssh-keyscan bitbucket.org >> ~/.ssh/known_hosts",
        f"git clone {repo_url} {PROJECT_ROOT}",
        f"cd {PROJECT_ROOT}"]

    op = dsl.ContainerOp(
        name='git clone',
        image=image,
        command=['sh'],
        arguments=['-c', ' && '.join(commands)],
        container_kwargs={'image_pull_policy': 'IfNotPresent'},
        pvolumes={"/workspace": volume_op.volume}
    )

    # Mount Git Secrets
    op.add_volume(V1Volume(name='ssh-key-volume',
                           secret=V1SecretVolumeSource(secret_name='ssh-key-secret')))
    op.add_volume_mount(V1VolumeMount(mount_path='/etc/ssh-key', name='ssh-key-volume', read_only=True))

    return op

首先,建立一個Kubernetes volume,預約義大小爲3Gi。其次,將image變量指定爲咱們將要使用的alpine/git Docker鏡像。以後是在Docker容器中執行的命令列表。這些命令實質上是設置SSH密鑰的,以便於流水線能夠從私有倉庫git clone,或者使用git://URL來代替 https://

該函數的核心是下面一行,返回一個dsl.ContainerOp

commandarguments指定了執行鏡像以後須要執行的命令。

最後一個變量十分有趣,是pvolumes,它是Pipeline Volumes簡稱。它建立一個Kubernetes volume並容許流水線組件來共享單個存儲。該volume被掛載在/workspace上。那麼這個組件要作的就是把倉庫git clone/workspace中。

使用Secrets

再次查看命令和複製SSH密鑰的位置。

流水線volume在哪裏建立呢?當咱們將全部組件都整合到一個流水線中時,就會看到建立好的volume。咱們在/etc/ssh-key/上安裝secrets:

op.add_volume_mount(V1VolumeMount(mount_path='/etc/ssh-key', name='ssh-key-volume', read_only=True))

請記得咱們將secret命名爲ssh-key-secret

op.add_volume(V1Volume(name='ssh-key-volume',
                           secret=V1SecretVolumeSource(secret_name='ssh-key-secret')))

經過使用相同的volume名稱ssh-key-volume,咱們能夠把一切綁定在一塊兒。

Step2:預處理

def preprocess_op(image: str, pvolume: PipelineVolume, data_dir: str):
    return dsl.ContainerOp(
        name='preprocessing',
        image=image,
        command=[CONDA_PYTHON_CMD, f"{PROJECT_ROOT}/preprocessing.py"],
        arguments=["--data_dir", data_dir],
        container_kwargs={'image_pull_policy': 'IfNotPresent'},
        pvolumes={"/workspace": pvolume}
    )

正如你所看到的, 預處理步驟看起來十分類似。

image指向咱們在Step0中建立的Docker鏡像。

這裏的command使用指定的conda python簡單地執行了preprocessing.py腳本。變量data_dir被用於執行preprocessing.py腳本。

在這一步驟中pvolume將在/workspace裏有倉庫,這意味着咱們全部的腳本在這一階段都是可用的。而且在這一步中預處理數據會存儲在/workspace下的data_dir中。

Step3:訓練和評估

def train_and_eval_op(image: str, pvolume: PipelineVolume, data_dir: str, ):
    return dsl.ContainerOp(
        name='training and evaluation',
        image=image,
        command=[CONDA_PYTHON_CMD, f"{PROJECT_ROOT}/train.py"],
        arguments=["--data_dir", data_dir],
        file_outputs={'output': f'{PROJECT_ROOT}/output.txt'},
        container_kwargs={'image_pull_policy': 'IfNotPresent'},
        pvolumes={"/workspace": pvolume}
    )

最後,是時候進行訓練和評估這一步驟。這一步惟一的區別在於file_outputs變量。若是咱們再次查看train.py,則有如下代碼段:

with open(os.path.join(PROJECT_ROOT, 'output.txt'), 'w') as f:
        f.write(model_path)
        print(f'Model written to: {model_path}')

咱們正在將模型路徑寫入名爲output.txt的文本文件中。一般,能夠將其發送到下一個流水線組件,在這種狀況下,該參數將包含模型的路徑。

將一切放在一塊兒

要指定流水線,你須要使用dsl.pipeline來註釋流水線功能:

@dsl.pipeline(
    name='Fashion MNIST Training Pipeline',
    description='Fashion MNIST Training Pipeline to be executed on KubeFlow.'
)
def training_pipeline(image: str = 'benjamintanweihao/kubeflow-mnist',
                      repo_url: str = 'https://github.com/benjamintanweihao/kubeflow-mnist.git',
                      data_dir: str = '/workspace'):
    git_clone = git_clone_darkrai_op(repo_url=repo_url)

    preprocess_data = preprocess_op(image=image,
                                    pvolume=git_clone.pvolume,
                                    data_dir=data_dir)

    _training_and_eval = train_and_eval_op(image=image,
                                           pvolume=preprocess_data.pvolume,
                                           data_dir=data_dir)

if __name__ == '__main__':
    import kfp.compiler as compiler
    compiler.Compiler().compile(training_pipeline, __file__ + '.tar.gz')

還記得流水線組件的輸出是另外一個組件的輸入嗎?在這裏,git clonecontainer_oppvolume將傳遞到preprocess_cp

最後一部分將pipeline.py轉換爲可執行腳本。最後一步是編譯流水線:

% dsl-compile --py pipeline.py --output pipeline.tar.gz

上傳並執行流水線

如今要進行最有趣的部分啦!第一步,上傳流水線。點擊Upload a pipeline

接下來,填寫Pipeline NamePipeline Description,而後選擇Choose file而且指向pipeline.tar.gz以上傳流水線。

下一頁將會展現完整的流水線。咱們所看到的是一個流水線的有向無環圖,在本例中這意味着依賴項會通往一個方向而且它不包含循環。點擊藍色按鈕Create run 以開始訓練。

大部分字段已經已經填寫完畢。請注意,Run parameters與使用@ dsl.pipeline註釋的training_pipeline函數中指定的參數相同:

最後,當你點擊藍色的Start按鈕時,整個流水線就開始運轉了!你點擊每一個組件並查看日誌就可以知道發生了什麼。當整個流水線執行完畢時,在全部組件的右方會有一個綠色的確認標誌,以下所示:

結論

若是你從上一篇文章開始就一直在關注,那麼你應該已經安裝了Kubeflow,而且應該能體會到大規模管理機器學習項目的複雜性。

在這篇文章中,咱們先介紹了爲Kubeflow準備一個機器學習項目的過程,而後是構建一個Kubeflow流水線,最後是使用Kubeflow接口上傳並執行流水線。這種方法的奇妙之處在於,你的機器學習項目能夠是簡單的,也能夠是複雜的,只要你願意,你就可使用相同的技術。

由於Kubeflow使用Docker容器做爲組件,你能夠自由地加入任何你喜歡的工具。並且因爲Kubeflow運行在Kubernetes上,你可讓Kubernetes處理機器學習工做負載的調度。

咱們還了解了一個我喜歡的Rancher功能,它十分方便,能夠輕鬆添加secrets。馬上,你就能夠輕鬆地組織secrets(如SSH密鑰),並選擇將其分配到哪一個命名空間,而無需爲Base64編碼而煩惱。就像Rancher的應用商店同樣,這些便利性使Kubernetes的工做更加愉快,更不容易出錯。

固然,Rancher提供的服務遠不止這些,我鼓勵你本身去作一些探索。我相信你會偶然發現一些讓你大吃一驚的功能。Rancher做爲一個開源的企業級Kubernetes管理平臺,Run Kubernetes Everywhere一直是咱們的願景和宗旨。開源和無廠商鎖定的特性,可讓用戶輕鬆地在不一樣的基礎設施部署和使用Rancher。此外,Rancher極簡的操做體驗也可讓用戶在不一樣的場景中利用Rancher提高效率,幫助開發人員專一於創新,而無需在繁瑣的小事中浪費精力。

相關文章
相關標籤/搜索