分佈式TensorFlow

本文檔介紹如何建立TensorFlow服務器集羣,以及如何在該集羣中分發計算圖。咱們假設您熟悉編寫TensorFlow程序的基本概念python

開始

要查看一個簡單的TensorFlow集羣,請執行如下操做:git

# Start a TensorFlow server as a single-process "cluster".

$ python
>>> import tensorflow as tf
>>> c = tf.constant("Hello, distributed TensorFlow!")
>>> server = tf.train.Server.create_local_server()
>>> sess = tf.Session(server.target)  # Create a session on the server.
>>> sess.run(c)
'Hello, distributed TensorFlow!'

tf.train.Server.create_local_server 方法建立具備進程內服務器的單進程集羣。github

建立一個集羣

<iframe width="412" height="232" src="https://www.youtube.com/embed/la_M6bCV91M?ecver=1" frameborder="0" allowfullscreen></iframe>編程

TensorFlow「集羣」是參與TensorFlow圖形的分佈式執行的一組「任務」。每一個任務與TensorFlow「服務器」相關聯,該服務器包含可用於建立會話的「主」,以及在圖中執行操做的「工做者」。羣集還能夠分爲一個或多個「做業」,其中每一個做業包含一個或多個任務。api

要建立集羣,請在集羣中爲每一個任務啓動一個TensorFlow服務器。每一個任務一般在不一樣的機器上運行,但您能夠在同一臺機器上運行多個任務(例如控制不一樣的GPU設備)。在每一個任務中,執行如下操做:服務器

  1. 建立一個tf.train.ClusterSpec描述集羣中全部任務的內容。這對每一個任務應該是同樣的。網絡

  2. 建立一個tf.train.Server,傳遞tf.train.ClusterSpec給構造函數,並使用做業名和任務索引識別本地任務。session

建立一個tf.train.ClusterSpec描述集羣

羣集規範字典將做業名稱映射到網絡地址列表。將此字典傳遞給tf.train.ClusterSpec 構造函數。例如:app

tf.train.ClusterSpec construction框架

Available tasks

tf.train.ClusterSpec({"local": ["localhost:2222", "localhost:2223"]})
/job:local/task:0
/job:local/task:1
tf.train.ClusterSpec({
    "worker": [
        "worker0.example.com:2222",
        "worker1.example.com:2222",
        "worker2.example.com:2222"
    ],
    "ps": [
        "ps0.example.com:2222",
        "ps1.example.com:2222"
    ]})
/job:worker/task:0
/job:worker/task:1
/job:worker/task:2
/job:ps/task:0
/job:ps/task:1

tf.train.Server在每一個任務中建立一個實例

一個tf.train.Server對象包含了一組本地設備,一組到其餘任務的鏈接 tf.train.ClusterSpec,而且 tf.Session可以使用這些來進行分佈式計算。每一個服務器是特定命名做業的成員,而且在該做業中具備任務索引。服務器能夠與羣集中的任何其餘服務器進行通訊。

例如,推出了集羣兩臺服務器上運行localhost:2222 ,並localhost:2223在本地機器上的兩個不一樣的進程運行下面的代碼片斷:

# In task 0:

cluster = tf.train.ClusterSpec({"local": ["localhost:2222""localhost:2223"]})

server = tf.train.Server(cluster, job_name="local", task_index=0)

 

# In task 1:

cluster = tf.train.ClusterSpec({"local": ["localhost:2222""localhost:2223"]})

server = tf.train.Server(cluster, job_name="local", task_index=1

注意:手動指定這些集羣規範可能很乏味,特別是對於大型集羣。咱們正在開發以編程方式啓動任務的工具,例如使用像Kubernetes這樣的集羣管理器 。若是您想要查看支持的特定集羣管理器,請提出一個 GitHub問題

 

在模型中指定分佈式設備

要對特定進程進行操做,您可使用與tf.device 用於指定運行在CPU或GPU上的操做相同的 功能。例如:

with tf.device("/job:ps/task:0"):

  weights_1 = tf.Variable(...)

  biases_1 = tf.Variable(...)

 

with tf.device("/job:ps/task:1"):

  weights_2 = tf.Variable(...)

  biases_2 = tf.Variable(...)

 

with tf.device("/job:worker/task:7"):

  input, labels = ...

  layer_1 = tf.nn.relu(tf.matmul(input, weights_1) + biases_1)

  logits = tf.nn.relu(tf.matmul(layer_1, weights_2) + biases_2)

  # ...

  train_op = ...

 

with tf.Session("grpc://worker7.example.com:2222") as sess:

  for in range(10000):

    sess.run(train_op) 

在上述示例中,在做業中的兩個任務上建立變量,而且在做業中建立模型的計算密集型部分 。TensorFlow將插入做業之間的適當的數據傳輸(從到用於直傳,以及從到用於施加梯度)。psworkerpsworkerworkerps

 

複印訓練

一種稱爲「數據並行性」的常見培訓配置涉及到worker在不一樣小批量數據上對同一模型進行職業培訓的多個任務,更新託管在ps 做業中的一個或多個任務中的共享參數。全部任務一般在不一樣的機器上運行。有不少方法能夠在TensorFlow中指定此結構,而且咱們正在構建庫,以簡化指定複製模型的工做。可能的方法包括:

  • 圖形內複製。在這種方法中,客戶端構建一個 tf.Graph包含一組參數(在tf.Variable固定到節點中/job:ps)的參數; 和模型的計算密集型部分的多個副本,每一個副本都固定在不一樣的任務中/job:worker

  • 圖形間複製。在這種方法中,每一個/job:worker任務都有一個單獨的客戶端,一般與worker任務相同。每一個客戶端構建一個包含參數的相似圖(固定爲 /job:ps使用前 tf.train.replica_device_setter 將其肯定性地映射到相同的任務); 和模型的計算密集型部分的單個副本,固定到本地任務 /job:worker

  • 異步訓練 在這種方法中,圖的每一個副本都有一個獨立的訓練循環,無需協調地執行。它與上述兩種複製形式兼容。

  • 同步訓練 在這種方法中,全部的副本都會讀取當前參數的相同值,並行計算梯度,而後將它們應用在一塊兒。它與圖形內複製(例如使用CIFAR-10多GPU培訓師中的梯度平均 )和圖形間複製(例如使用tf.train.SyncReplicasOptimizer)兼容 。

把它們放在一塊兒:示例教練程序

如下代碼顯示了分佈式教練程序的框架,實現了圖形間複製異步訓練。它包括參數服務器和工做任務的代碼。

import argparse

import sys

 

import tensorflow as tf

 

FLAGS = None

 

def main(_):

  ps_hosts = FLAGS.ps_hosts.split(",")

  worker_hosts = FLAGS.worker_hosts.split(",")

 

  # Create a cluster from the parameter server and worker hosts.

  cluster = tf.train.ClusterSpec({"ps": ps_hosts, "worker": worker_hosts})

 

  # Create and start a server for the local task.

  server = tf.train.Server(cluster,

                           job_name=FLAGS.job_name,

                           task_index=FLAGS.task_index)

 

  if FLAGS.job_name == "ps":

    server.join()

  elif FLAGS.job_name == "worker":

 

    # Assigns ops to the local worker by default.

    with tf.device(tf.train.replica_device_setter(

        worker_device="/job:worker/task:%d" % FLAGS.task_index,

        cluster=cluster)):

 

      # Build model...

      loss = ...

      global_step = tf.contrib.framework.get_or_create_global_step()

 

      train_op = tf.train.AdagradOptimizer(0.01).minimize(

          loss, global_step=global_step)

 

    # The StopAtStepHook handles stopping after running given steps.

    hooks=[tf.train.StopAtStepHook(last_step=1000000)]

 

    # The MonitoredTrainingSession takes care of session initialization,

    # restoring from a checkpoint, saving to a checkpoint, and closing when done

    # or an error occurs.

    with tf.train.MonitoredTrainingSession(master=server.target,

                                           is_chief=(FLAGS.task_index == 0),

                                           checkpoint_dir="/tmp/train_logs",

                                           hooks=hooks) as mon_sess:

      while not mon_sess.should_stop():

        # Run a training step asynchronously.

        # See `tf.train.SyncReplicasOptimizer` for additional details on how to

        # perform *synchronous* training.

        # mon_sess.run handles AbortedError in case of preempted PS.

        mon_sess.run(train_op)

 

if __name__ == "__main__":

  parser = argparse.ArgumentParser()

  parser.register("type""bool"lambda v: v.lower() == "true")

  # Flags for defining the tf.train.ClusterSpec

  parser.add_argument(

      "--ps_hosts",

      type=str,

      default="",

      help="Comma-separated list of hostname:port pairs"

  )

  parser.add_argument(

      "--worker_hosts",

      type=str,

      default="",

      help="Comma-separated list of hostname:port pairs"

  )

  parser.add_argument(

      "--job_name",

      type=str,

      default="",

      help="One of 'ps', 'worker'"

  )

  # Flags for defining the tf.train.Server

  parser.add_argument(

      "--task_index",

      type=int,

      default=0,

      help="Index of task within the job"

  )

  FLAGS, unparsed = parser.parse_known_args()

  tf.app.run(main=main, argv=[sys.argv[0]] + unparsed) 

要啓動具備兩個參數服務器和兩個工做人員的培訓師,請使用如下命令行(假設調用腳本):trainer.py

# On ps0.example.com:

$ python trainer.py \

     --ps_hosts=ps0.example.com:2222,ps1.example.com:2222 \

     --worker_hosts=worker0.example.com:2222,worker1.example.com:2222 \

     --job_name=ps --task_index=0

# On ps1.example.com:

$ python trainer.py \

     --ps_hosts=ps0.example.com:2222,ps1.example.com:2222 \

     --worker_hosts=worker0.example.com:2222,worker1.example.com:2222 \

     --job_name=ps --task_index=1

# On worker0.example.com:

$ python trainer.py \

     --ps_hosts=ps0.example.com:2222,ps1.example.com:2222 \

     --worker_hosts=worker0.example.com:2222,worker1.example.com:2222 \

     --job_name=worker --task_index=0

# On worker1.example.com:

$ python trainer.py \

     --ps_hosts=ps0.example.com:2222,ps1.example.com:2222 \

     --worker_hosts=worker0.example.com:2222,worker1.example.com:2222 \

     --job_name=worker --task_index=1 

詞彙表

Client

客戶端一般是一個構建TensorFlow圖並構建一個 tensorflow::Session與集羣進行交互的程序。客戶端一般用Python或C ++編寫。單個客戶端進程能夠直接與多個TensorFlow服務器進行交互(請參見上面的「複製培訓」),單個服務器能夠爲多個客戶端提供服務。

Cluster

TensorFlow集羣包括一個或多個「做業」,每一個「做業」分爲一個或多個「任務」的列表。集羣一般專用於特定的高級目標,例如訓練神經網絡,並行使用許多機器。集羣由tf.train.ClusterSpec對象定義。

Job

一份工做包括一份一般用於共同目的的「任務」清單。例如,名爲ps(對於「參數服務器」)的做業一般會託管存儲和更新變量的節點; 而名爲「 workerjob」 的做業一般會承載執行計算密集型任務的無狀態節點。做業中的任務一般在不一樣的機器上運行。一組工做角色是靈活的:例如,a worker可能會保持一些狀態。

Master service

提供遠程訪問一組分佈式設備並充當會話目標的RPC服務。主服務實現 tensorflow::Session接口,負責協調一個或多個「工做服務」的工做。全部TensorFlow服務器都實現主服務。

Task

任務對應於特定的TensorFlow服務器,一般對應於單個進程。任務屬於特定的「做業」,並由該做業的任務列表中的索引識別。

TensorFlow服務器運行tf.train.Server實例的進程,該實例是集羣的成員,並導出「主服務」和「工做服務」。

Worker service

使用本地設備執行TensorFlow圖形的一部分的RPC服務。worker服務實現了worker_service.proto。全部TensorFlow服務器都實現了工做服務。

相關文章
相關標籤/搜索