TensorFlow支持使用多臺機器的設備進行計算。本文基於官方教程,實踐了分佈式TensorFlow搭建的過程。python
TensorFlow入門教程shell
A TensorFlow "cluster" is a set of "tasks" that participate in the distributed execution of a TensorFlow graph. Each task is associated with a TensorFlow "server", which contains a "master" that can be used to create sessions, and a "worker" that executes operations in the graph.
從上面的定義能夠看出,所謂的TensorFlow集羣就是一組任務,每一個任務就是一個服務。服務由兩個部分組成,第一部分是master,用於建立session,第二部分是worker,用於執行具體的計算。segmentfault
TensorFlow通常將任務分爲兩類job:一類叫參數服務器,parameter server,簡稱爲ps,用於存儲tf.Variable
;一類就是普通任務,稱爲worker,用於執行具體的計算。服務器
首先來理解一下參數服務器的概念。通常而言,機器學習的參數訓練過程能夠劃分爲兩個類別:第一個是根據參數算算梯度,第二個是根據梯度更新參數。對於小規模訓練,數據量不大,參數數量很少,一個CPU就足夠了,兩類任務都交給一個CPU來作。對於普通的中等規模的訓練,數據量比較大,參數數量很少,計算梯度的任務負荷較重,參數更新的任務負荷較輕,因此將第一類任務交給若干個CPU或GPU去作,第二類任務交給一個CPU便可。對於超大規模的訓練,數據量大、參數多,不只計算梯度的任務要部署到多個CPU或GPU上,並且更新參數的任務也要部署到多個CPU。若是計算量足夠大,一臺機器能搭載的CPU和GPU數量有限,就須要多臺機器來進行計算能力的擴展了。參數服務器是一套分佈式存儲,用於保存參數,並提供參數更新的操做。session
咱們來看一下怎麼建立一個TensorFlow集羣。每一個任務用一個ip:port表示。TensorFlow用tf.train.ClusterSpec
表示一個集羣信息,舉例以下:app
import tensorflow as tf # Configuration of cluster ps_hosts = [ "xx.xxx.xx.xxxx:oooo", "xx.xxx.xx.xxxx:oooo" ] worker_hosts = [ "xx.xxx.xx.xxxx:oooo", "xx.xxx.xx.xxxx:oooo", "xx.xxx.xx.xxxx:oooo" ] cluster = tf.train.ClusterSpec({"ps": ps_hosts, "worker": worker_hosts})
上面的語句提供了一個TensorFlow集羣信息,集羣有兩類任務,稱爲job,一個job是ps,一個job是worker;ps由2個任務組成,worker由3個任務組成。dom
定義完集羣信息後,使用tf.train.Server
建立每一個任務:機器學習
tf.app.flags.DEFINE_string("job_name", "worker", "One of 'ps', 'worker'") tf.app.flags.DEFINE_integer("task_index", 0, "Index of task within the job") FLAGS = tf.app.flags.FLAGS def main(_): server = tf.train.Server(cluster, job_name=FLAGS.job_name, task_index=FLAGS.task_index) server.join() if __name__ == "__main__": tf.app.run()
對於本例而言,咱們須要在ip:port對應的機器上運行每一個任務,共需執行五次代碼,生成五個任務。異步
python worker.py --job_name=ps --task_index=0 python worker.py --job_name=ps --task_index=1 python worker.py --job_name=worker --task_index=0 python worker.py --job_name=worker --task_index=1 python worker.py --job_name=worker --task_index=2
咱們找到集羣的某一臺機器,執行下面的代碼:async
# -*- coding=utf-8 -*- import tensorflow as tf import numpy as np train_X = np.random.rand(100).astype(np.float32) train_Y = train_X * 0.1 + 0.3 # 選擇變量存儲位置和op執行位置,這裏所有放在worker的第一個task上 with tf.device("/job:worker/task:0"): X = tf.placeholder(tf.float32) Y = tf.placeholder(tf.float32) w = tf.Variable(0.0, name="weight") b = tf.Variable(0.0, name="reminder") y = w * X + b loss = tf.reduce_mean(tf.square(y - Y)) init_op = tf.global_variables_initializer() train_op = tf.train.GradientDescentOptimizer(0.01).minimize(loss) # 選擇建立session使用的master with tf.Session("grpc://xx.xxx.xx.xxxx:oooo") as sess: sess.run(init_op) for i in range(500): sess.run(train_op, feed_dict={X: train_Y, Y: train_Y}) if i % 50 == 0: print i, sess.run(w), sess.run(b) print sess.run(w) print sess.run(b)
執行結果以下:
0 0.00245265 0.00697793 50 0.0752466 0.213145 100 0.0991397 0.279267 150 0.107308 0.30036 200 0.110421 0.306972 250 0.111907 0.308929 300 0.112869 0.309389 350 0.113663 0.309368 400 0.114402 0.309192 450 0.115123 0.308967 0.115824 0.30873
其實ps和worker本質上是一個東西,就是名字不一樣,咱們將上例中的with tf.device("/job:worker/task:0"):
改成with tf.device("/job:psr/task:0"):
,同樣可以執行。之因此在建立集羣時要分爲兩個類別的任務,是由於TensorFlow提供了一些工具函數,會根據名字不一樣賦予task不一樣的任務,ps的用於存儲變量,worker的用於計算。
所謂 replication,指的是各個task(簡單的狀況下,每一個task運行在不一樣的GPU device上)如何得到同一個模型,也就是說,replication的對象是模型。
在使用in-graph replication方式時,只有一個client進程(能夠在參與訓練的CPU或GPU上任選一個task來運行這個client,參與計算的其它tasks不運行這個client)來建立模型(即tf.Graph)及模型的參數(那些tf.Variables,好比權重W和偏置b)。因爲參數(W和b)是共享的,該client指定把參數放在/job:ps,即parameter server上(好比 /job:ps/task:0/cpu:0)。模型的計算部分(前向傳播,後向傳播,loss和梯度計算,等等)也由該client進程定義好,而後client進程把這個計算部分分配到各個GPU device上(這個過程就至關於在各個GPU中複製模型),分配的方式相似函數調用,但每次調用都指定了設備(即 /job:worker/task:0/gpu:0,/job:worker/task:1/gpu:0,等等)。調用時,模型的參數(即W和b)被看成函數的參數輸入給不一樣tasks(一般運行在不一樣GPU上)運行的模型,以保證這些參數確實是共享的。
若是用between-graph replication方式,則每一個task都運行本身的client進程用於建立模型和參數,並將參數pin到parameter server上(好比 /job:ps/task:0/cpu:0),而後各自獨立地執行該模型。注意,每一個task建立的模型必須如出一轍,這很容易作到,由於只要每一個task裏的這部分代碼都同樣就好了。問題是,這些task各自建立並pin到parameter server上的模型參數是一樣的嗎?問這個問題是由於咱們如今跑的是數據並行,而模型的參數及其更新都必須由parameter server統一處理。回答是,只要各task使用一樣的parameter server設備名(好比都用 /job:ps/task:0/cpu:0)和一樣的變量名(那些tf.Variable定義的變量,好比權重和偏置變量), 那麼在默認的狀況下,它們被分配在parameter server的相同的存儲裏。
import tensorflow as tf import numpy as np # Configuration of cluster ps_hosts = [ "xx.xxx.xx.xxxx:oooo", "xx.xxx.xx.xxxx:oooo" ] worker_hosts = [ "xx.xxx.xx.xxxx:oooo", "xx.xxx.xx.xxxx:oooo", "xx.xxx.xx.xxxx:oooo" ] cluster = tf.train.ClusterSpec({"ps": ps_hosts, "worker": worker_hosts}) tf.app.flags.DEFINE_integer("task_index", 0, "Index of task within the job") FLAGS = tf.app.flags.FLAGS def main(_): with tf.device(tf.train.replica_device_setter( worker_device="/job:worker/task:%d" % FLAGS.task_index, cluster=cluster)): x_data = tf.placeholder(tf.float32, [100]) y_data = tf.placeholder(tf.float32, [100]) W = tf.Variable(tf.random_uniform([1], -1.0, 1.0)) b = tf.Variable(tf.zeros([1])) y = W * x_data + b loss = tf.reduce_mean(tf.square(y - y_data)) global_step = tf.Variable(0, name="global_step", trainable=False) optimizer = tf.train.GradientDescentOptimizer(0.1) train_op = optimizer.minimize(loss, global_step=global_step) tf.summary.scalar('cost', loss) summary_op = tf.summary.merge_all() init_op = tf.global_variables_initializer() # The StopAtStepHook handles stopping after running given steps. hooks = [ tf.train.StopAtStepHook(last_step=1000000)] # The MonitoredTrainingSession takes care of session initialization, # restoring from a checkpoint, saving to a checkpoint, and closing when done # or an error occurs. with tf.train.MonitoredTrainingSession(master="grpc://" + worker_hosts[FLAGS.task_index], is_chief=(FLAGS.task_index==0), # 咱們制定task_index爲0的任務爲主任務,用於負責變量初始化、作checkpoint、保存summary和復原 checkpoint_dir="/tmp/tf_train_logs", save_checkpoint_secs=None, hooks=hooks) as mon_sess: while not mon_sess.should_stop(): # Run a training step asynchronously. # See `tf.train.SyncReplicasOptimizer` for additional details on how to # perform *synchronous* training. # mon_sess.run handles AbortedError in case of preempted PS. train_x = np.random.rand(100).astype(np.float32) train_y = train_x * 0.1 + 0.3 _, step, loss_v, weight, biase = mon_sess.run([train_op, global_step, loss, W, b], feed_dict={x_data: train_x, y_data: train_y}) if step % 100 == 0: print "step: %d, weight: %f, biase: %f, loss: %f" %(step, weight, biase, loss_v) print "Optimization finished." if __name__ == "__main__": tf.app.run()
代碼中,tf.train.replica_device_setter()
會根據job名,將with
內的Variable
op放到ps tasks,將其餘計算op放到worker tasks。默認分配策略是輪詢。
在屬於集羣的一臺機器中執行上面的代碼,屏幕會開始輸出每輪迭代的訓練參數和損失
python train.py --task_index=0
在另外一臺機器上執行下面你的代碼,再啓動一個任務,會看到屏幕開始輸出每輪迭代的訓練參數和損失,注意,step再也不是從0開始,而是在啓動時刻上一個啓動任務的step後繼續。此時觀察兩個任務,會發現他們同時在對同一參數進行更新。
python train.py --task_index=2
分佈式TensorFlow與Spark對比:
參考資料