使用估算器、tf.keras 和 tf.data 進行多 GPU 訓練

文 / Zalando Research 研究科學家 Kashif Rasulpython

來源 | TensorFlow 公衆號git

與大多數 AI 研究部門同樣,Zalando Research 也意識到了對創意進行嘗試和快速原型設計的重要性。隨着數據集變得愈來愈龐大,瞭解如何利用咱們擁有的共享資源來高效快速地訓練深度學習模型變得大有用處。github

TensorFlow 的估算器 API 對於在分佈式環境中使用多個 GPU 來訓練模型很是有用。本文將主要介紹這一工做流程。咱們先使用 Fashion-MNIST 小數據集訓練一個用 tf.keras 編寫的自定義估算器,而後在文末介紹一個較實際的用例。api

請注意:TensorFlow 團隊一直在開發另外一項很酷的新功能(在我寫這篇文章時,該功能仍處於 Master 階段),使用這項新功能,您只需多輸入幾行代碼便可訓練 tf.keras 模型, 而無需先將該模型轉化爲估算器!其工做流程也很贊。下面我着重講講估算器 API。選擇哪個由您本身決定! 注:功能連接 github.com/tensorflow/…數組

TL; DR:基本上,咱們須要記住,對於 tf.keras. 模型,咱們只要經過 tf.keras.estimator.model_to_estimator 方法將其轉化爲 tf.estimator.Estimator 對象,便可使用 tf.estimator API 來進行訓練。轉化完成後,咱們可使用估算器提供的機制用不一樣的硬件配置訓練模型。bash

您能夠今後筆記本下載本文中的代碼並親自運行。 注:筆記本連接 github.com/kashif/tf-k…網絡

import os
import time

#!pip install -q -U tensorflow-gpu
import tensorflow as tf

import numpy as np
複製代碼

導入 Fashion-MNIST 數據集架構

咱們用 Fashion-MNIST 數據集隨手替換一下 MNIST,這裏麪包含幾千張 Zalando 時尚文章的灰度圖像。獲取訓練和測試數據很是簡單,以下所示:app

(train_images, train_labels), (test_images, test_labels) = 
   tf.keras.datasets.fashion_mnist.load_data()
複製代碼

咱們想把這些圖像的像素值從 0 到 255 之間的一個數字轉換爲 0 到 1 之間的一個數字,並將該數據集轉換爲 [B, H, W ,C] 格式,其中 B 表明批處理的圖像數,H 和 W 分別是高度和寬度,C 是咱們數據集的通道數(灰度爲 1):分佈式

TRAINING_SIZE = len(train_images)
TEST_SIZE = len(test_images)

train_images = np.asarray(train_images, dtype=np.float32) / 255
# Convert the train images and add channels
train_images = train_images.reshape((TRAINING_SIZE, 28, 28, 1))

test_images = np.asarray(test_images, dtype=np.float32) / 255
# Convert the test images and add channels
test_images = test_images.reshape((TEST_SIZE, 28, 28, 1))
複製代碼

接下來,咱們想將標籤從整數編號(例如,2 或套衫)轉換爲獨熱編碼(例如,0,0,1,0,0,0,0,0,0,0)。爲此,咱們要使用 tf.keras.utils.to_categorical 函數:

# How many categories we are predicting from (0-9)
LABEL_DIMENSIONS = 10

train_labels = tf.keras.utils.to_categorical(train_labels, 
                                            LABEL_DIMENSIONS)

test_labels = tf.keras.utils.to_categorical(test_labels,
                                           LABEL_DIMENSIONS)

# Cast the labels to floats, needed later
train_labels = train_labels.astype(np.float32)
test_labels = test_labels.astype(np.float32)
複製代碼

構建 tf.keras 模型

咱們會使用 Keras 功能 API 來建立神經網絡。Keras 是一個高級 API,可用於構建和訓練深度學習模型,其採用模塊化設計,使用方便,易於擴展。tf.keras 是 TensorFlow 對這個 API 的實現,其支持 Eager Execution、tf.data 管道和估算器等。

在架構方面,咱們會使用 ConvNet。一個很是籠統的說法是,ConvNet 是卷積層 (Conv2D) 和池化層 (MaxPooling2D) 的堆棧。但最重要的是,ConvNet 將每一個訓練示例看成一個 3D 形狀張量(高度、寬度、通道),對於灰度圖像,張量從通道 = 1 開始,而後返回一個 3D 張量。

所以,在 ConvNet 部分以後,咱們須要將張量平面化,並添加密集層,其中最後一個返回 LABEL_DIMENSIONS 大小的向量,並附帶 tf.nn.softmax 激活:

inputs = tf.keras.Input(shape=(28,28,1))  # Returns a placeholder

x = tf.keras.layers.Conv2D(filters=32, 
                          kernel_size=(3, 3), 
                          activation=tf.nn.relu)(inputs)

x = tf.keras.layers.MaxPooling2D(pool_size=(2, 2), strides=2)(x)

x = tf.keras.layers.Conv2D(filters=64, 
                          kernel_size=(3, 3), 
                          activation=tf.nn.relu)(x)

x = tf.keras.layers.MaxPooling2D(pool_size=(2, 2), strides=2)(x)

x = tf.keras.layers.Conv2D(filters=64, 
                          kernel_size=(3, 3), 
                          activation=tf.nn.relu)(x)

x = tf.keras.layers.Flatten()(x)

x = tf.keras.layers.Dense(64, activation=tf.nn.relu)(x)
predictions = tf.keras.layers.Dense(LABEL_DIMENSIONS,
                                   activation=tf.nn.softmax)(x)
複製代碼

如今,咱們能夠定義學習模型,請選擇優化器(咱們從 TensorFlow 中選擇一個,而不使用來自 tf.keras. optimizers 的優化器)並進行編譯:

model = tf.keras.Model(inputs=inputs, outputs=predictions)

optimizer = tf.train.AdamOptimizer(learning_rate=0.001)

model.compile(loss='categorical_crossentropy',
             optimizer=optimizer,
             metrics=['accuracy'])
複製代碼

建立估算器

使用已編譯的 Keras 模型建立估算器,也就是咱們所說的 model_to_estimator 方法。請注意,Keras 模型的初始模型狀態保存在建立的估算器中。

那估算器有哪些優勢呢?首先要提如下幾點:

您能夠在本地主機或分佈式多 GPU 環境中運行基於估算器的模型,而無需更改您的模型; 估算器可以簡化模型開發者之間的共享實現; 估算器可以爲您構建圖形,因此有點像 Eager Execution,沒有明確的會話。

那麼咱們要如何訓練簡單的 tf.keras 模型來使用多 GPU?咱們可使用 tf.contrib.distribute.MirroredStrategy 範式,經過同步訓練進行圖形內複製。如需瞭解更多關於此策略的信息,請觀看分佈式 TensorFlow 訓練講座。 注:分佈式 TensorFlow 連接 www.youtube.com/watch?v=bRM…

基本上,每一個工做器 GPU 都有一個網絡拷貝,並會獲取一個數據子集,據以計算本地梯度,而後等待全部工做器以同步方式結束。而後,工做器經過 Ring All-reduce 運算互相傳遞其本地梯度,這一般要進行優化,以減小網絡帶寬並增長吞吐量。在全部梯度到達後,每一個工做器會計算其平均值並更新參數,而後開始下一步。理想狀況下,您在單個節點上有多個高速互聯的 GPU。

要使用此策略,咱們首先要用已編譯的 tf.keras 模型建立一個估算器,而後經過 RunConfig config 賦予其 MirroredStrategy 配置。默認狀況下,該配置會使用所有 GPU,但您也能夠賦予其一個 num_gpus 選項,以使用特定數量的 GPU:

NUM_GPUS = 2

strategy = tf.contrib.distribute.MirroredStrategy(num_gpus=NUM_GPUS)
config = tf.estimator.RunConfig(train_distribute=strategy)

estimator = tf.keras.estimator.model_to_estimator(model,
                                                 config=config)
複製代碼

建立估算器輸入函數

要經過管道將數據傳遞到估算器,咱們須要定義一個數據導入函數,該函數返回批量數據的 tf.data 數據集(圖像、標籤)。下面的函數接收 numpy 數組,並經過 ETL 過程返回數據集。

請注意,最後咱們還調用了預讀取方法,該方法會在訓練時將數據緩衝到 GPU,以便下一批數據準備就緒並等待 GPU,而不是在每次迭代時讓 GPU 等待數據。GPU 可能仍然沒有獲得充分利用,要改善這一點,咱們可使用融合版轉換運算(如 shuffle_and_repeat),而不是兩個單獨的運算。不過,我在這裏選用的是簡單用例。

def input_fn(images, labels, epochs, batch_size):
  
     # Convert the inputs to a Dataset. (E)
    ds = tf.data.Dataset.from_tensor_slices((images, labels))    

    # Shuffle, repeat, and batch the examples. (T)
    SHUFFLE_SIZE = 5000
    ds = ds.shuffle(SHUFFLE_SIZE).repeat(epochs).batch(batch_size)
    ds = ds.prefetch(2)    
    
    # Return the dataset. (L)
    return ds
複製代碼

訓練估算器

首先,咱們定義一個 SessionRunHook 類,用於記錄隨機梯度降低法每次迭代的次數:

class TimeHistory(tf.train.SessionRunHook):
    def begin(self):
       self.times = []    

    def before_run(self, run_context):
       self.iter_time_start = time.time()    

    def after_run(self, run_context, run_values):
       self.times.append(time.time() - self.iter_time_start)
複製代碼

亮點在這裏!咱們能夠對估算器調用 train 函數,並經過 hooks 參數,向其賦予咱們定義的 input_fn (包含批次大小和咱們但願的訓練回合次數)和 TimeHistory 實例:

time_hist = TimeHistory()

BATCH_SIZE = 512
EPOCHS = 5

estimator.train(lambda:input_fn(train_images,
                               train_labels,
                               epochs=EPOCHS,
                               batch_size=BATCH_SIZE),
               hooks=[time_hist])
複製代碼

性能

如今,咱們可使用時間鉤子來計算訓練的總時間和平均每秒訓練的圖像數量(平均吞吐量):

total_time = sum(time_hist.times)
print(f"total time with {NUM_GPUS} GPU(s): {total_time} seconds")

avg_time_per_batch = np.mean(time_hist.times)
print(f"{BATCH_SIZE*NUM_GPUS/avg_time_per_batch} images/second with {NUM_GPUS} GPU(s)")
複製代碼

使用兩塊 K80 GPU 進行訓練時的 Fashion-MNIST 訓練吞吐量和總時間,採用不一樣 NUM_GPUS,顯示縮放不良

評估估算器

爲了檢驗模型的性能,咱們要對估算器調用評估方法:

estimator.evaluate(lambda:input_fn(test_images, 
                                  test_labels,
                                  epochs=1,
                                  batch_size=BATCH_SIZE))
複製代碼

視網膜 OCT (光學相干斷層成像術)圖像示例

爲了測試模型在處理較大數據集時的擴展性能,咱們使用 視網膜 OCT 圖像數據集,這是 Kaggle 衆多大型數據集中的一個。該數據集由活人視網膜的橫截面 X 光圖像組成,分爲四個類別:NORMAL、CNV、DME 和 DRUSEN:

光學相干斷層成像術的表明圖像,選自 Kermany 等人所著的《經過基於圖像的深度學習技術肯定醫學診斷和可治療疾病》(Identifying Medical Diagnoses and Treatable Diseases by Image-Based Deep Learning)

該數據集共有 84,495 張 JPEG 格式的 X 光圖像,尺寸多爲 512x496,能夠經過 Kaggle CLI 下載: 注:CLI 連接 github.com/Kaggle/kagg…

#!pip install kaggle
#!kaggle datasets download -d paultimothymooney/kermany2018
複製代碼

下載完成後,訓練集和測試集圖像類位於各自的文件夾內,所以咱們能夠將模式定義爲:

labels = ['CNV', 'DME', 'DRUSEN', 'NORMAL']

train_folder = os.path.join('OCT2017', 'train', '**', '*.jpeg')
test_folder = os.path.join('OCT2017', 'test', '**', '*.jpeg')
複製代碼

接下來,咱們要編寫估算器的輸入函數,該函數能夠提取任何文件模式,並返回已縮放圖像和獨熱編碼標籤做爲 tf.data.Dataset。此次,咱們遵循輸入管道性能指南中的最佳實踐。請特別注意,若是 prefetch 的 buffer_size 爲 None,則 TensorFlow 會自動使用最優的預讀取緩衝區大小: 注:輸入管道性能指南連接 www.tensorflow.org/performance…

1    def input_fn(file_pattern, labels, 
2                        image_size=(224,224), 
3                        shuffle=False,
4                        batch_size=64, 
5                        num_epochs=None,
6                        buffer_size=4096,
7                        prefetch_buffer_size=None): 
8
9            table = tf.contrib.lookup.index_table_from_tensor(mapping=tf.constant(labels))
10          num_classes = len(labels) 
11
12          def _map_func(filename):
13                label = tf.string_split([filename], delimiter=os.sep).values[-2]
14                image = tf.image.decode_jpeg(tf.read_file(filename), channels=3)
15                image = tf.image.convert_image_dtype(image, dtype=tf.float32) 
16                image = tf.image.resize_images(image, size=image_size)
17                return (image, tf.one_hot(table.lookup(label), num_classes))
18
19          dataset = tf.data.Dataset.list_files(file_pattern, shuffle=shuffle)
20
21          if num_epochs is not None and shuffle:
22                dataset = dataset.apply(
23                    tf.contrib.data.shuffle_and_repeat(buffer_size, num_epochs))
24          elif shuffle:
25                dataset = dataset.shuffle(buffer_size)
26          elif num_epochs is not None:
27                dataset = dataset.repeat(num_epochs)
28
29          dataset = dataset.apply(
30                tf.contrib.data.map_and_batch(map_func=_map_func,
31                                        batch_size=batch_size,
32                                        num_parallel_calls=os.cpu_count()))
33          dataset = dataset.prefetch(buffer_size=prefetch_buffer_size)
34
35          return dataset 
複製代碼

此次訓練該模型時,咱們將使用一個通過預訓練的 VGG16,而且只從新訓練其最後 5 層:

keras_vgg16 = tf.keras.applications.VGG16(input_shape=(224,224,3),
                                         include_top=False)

output = keras_vgg16.output
output = tf.keras.layers.Flatten()(output)
prediction = tf.keras.layers.Dense(len(labels),
                                  activation=tf.nn.softmax)(output)

model = tf.keras.Model(inputs=keras_vgg16.input,
                      outputs=prediction)

for layer in keras_vgg16.layers[:-4]:
   layer.trainable = False
複製代碼

如今,咱們萬事皆備,能夠按照上述步驟進行,並使用 NUM_GPUS GPU 在幾分鐘內訓練咱們的模型:

model.compile(loss='categorical_crossentropy',               optimizer=tf.train.AdamOptimizer(),              metrics=['accuracy'])

NUM_GPUS = 2
strategy = tf.contrib.distribute.MirroredStrategy(num_gpus=NUM_GPUS)
config = tf.estimator.RunConfig(train_distribute=strategy)
estimator = tf.keras.estimator.model_to_estimator(model,                                                  config=config)
BATCH_SIZE = 64
EPOCHS = 1

estimator.train(input_fn=lambda:input_fn(train_folder,                                         labels,                                         shuffle=True,                                         batch_size=BATCH_SIZE,                                         buffer_size=2048,                                         num_epochs=EPOCHS,                                         prefetch_buffer_size=4),                hooks=[time_hist])
複製代碼

訓練結束後,咱們能夠評估測試集的準確度,應該在 95% 左右(對初始基線來講還不錯):

estimator.evaluate(input_fn=lambda:input_fn(test_folder,
                                           labels, 
                                           shuffle=False,
                                           batch_size=BATCH_SIZE,
                                           buffer_size=1024,
                                           num_epochs=1))
複製代碼

使用兩塊 K80 GPU 進行訓練時的 Fashion-MNIST 訓練吞吐量和總時間,採用不一樣 NUM_GPUS,顯示線性縮放

總結

咱們在上文中介紹瞭如何使用估算器 API 在多個 GPU 上輕鬆訓練 Keras 深度學習模型,如何編寫符合最佳實踐的輸入管道,以充分利用咱們的資源(線性縮放),以及如何經過鉤子爲咱們的訓練吞吐量計時。

請務必注意,最後咱們主要關注的是測試集錯誤。您可能會注意到,測試集的準確度會隨着 NUM_GPUS 值的增長而降低。其中一個緣由多是,使用 BATCH_SIZE*NUM_GPUS 的批量大小時,MirroredStrategy 可以有效地訓練模型,而當咱們增長 GPU 數量時,可能須要調整 BATCH_SIZE 或學習率。爲便於製圖,文中除 NUM_GPUS 以外的全部其餘超參數均保持不變,但實際上咱們須要調整這些超參數。

數據集和模型的大小也會影響這些方案的縮放效果。在讀取或寫入小數據時,GPU 的帶寬較差,若是是較爲老舊的 GPU(如 K80),則情形尤爲如此,並且可能會形成上面 Fashion-MNIST 圖中所示狀況。

致謝

感謝 TensorFlow 團隊,特別是 Josh Gordon,以及 Zalando Research 的各位同事,特別是 Duncan Blythe、Gokhan Yildirim 和 Sebastian Heinz,感謝他們幫忙修改草稿。

相關文章
相關標籤/搜索