TensorFlow——分佈式的TensorFlow運行環境

當咱們在大型的數據集上面進行深度學習的訓練時,每每須要大量的運行資源,並且還要花費大量時間才能完成訓練。數組

1.分佈式TensorFlow的角色與原理網絡

在分佈式的TensorFlow中的角色分配以下:session

PS:做爲分佈式訓練的服務端,等待各個終端(supervisors)來鏈接。dom

worker:在TensorFlow的代碼註釋中被稱爲終端(supervisors),做爲分佈式訓練的計算資源終端。分佈式

chief supervisors:在衆多的運算終端中必須選擇一個做爲主要的運算終端。該終端在運算終端中最早啓動,它的功能是合併各個終端運算後的學習參數,將其保存或者載入。函數

每一個具體的網絡標識都是惟一的,即分佈在不一樣IP的機器上(或者同一個機器的不一樣端口)。在實際的運行中,各個角色的網絡構建部分代碼必須100%的相同。三者的分工以下:學習

服務端做爲一個多方協調者,等待各個運算終端來鏈接。spa

chief supervisors會在啓動時同一管理全局的學習參數,進行初始化或者從模型載入。線程

其餘的運算終端只是負責獲得其對應的任務並進行計算,並不會保存檢查點,用於TensorBoard可視化中的summary日誌等任何參數信息。scala

在整個過程都是經過RPC協議來進行通訊的。

2.分佈部署TensorFlow的具體方法

配置過程當中,首先創建一個server,在server中會將ps及全部worker的IP端口準備好。接着,使用tf.train.Supervisor中的managed_ssion來管理一個打開的session。session中只是負責運算,而通訊協調的事情就都交給supervisor來管理了。

3.部署訓練實例

下面開始實現一個分佈式訓練的網絡模型,以線性迴歸爲例,經過3個端口來創建3個終端,分別是一個ps,兩個worker,實現TensorFlow的分佈式運算。

1. 爲每一個角色添加IP地址和端口,建立sever,在一臺機器上開3個不一樣的端口,分別表明PS,chief supervisor和worker。角色的名稱用strjob_name表示,以ps爲例,代碼以下:

# 定義IP和端口
strps_hosts = 'localhost:1681'
strworker_hosts = 'localhost:1682,localhost:1683'

# 定義角色名稱
strjob_name = 'ps'
task_index = 0

# 將字符串轉數組
ps_hosts = strps_hosts.split(',')
worker_hosts = strps_hosts.split(',')

cluster_spec = tf.train.ClusterSpec({'ps': ps_hosts, 'worker': worker_hosts})

# 建立server
server = tf.train.Server({'ps':ps_hosts, 'worker':worker_hosts}, job_name=strjob_name, task_index=task_index)

2爲ps角色添加等待函數

ps角色使用server.join函數進行線程掛起,開始接受連續消息。

# ps角色使用join進行等待
if strjob_name == 'ps':
    print("wait")
    server.join()

3.建立網絡的結構

與正常的程序不一樣,在建立網絡結構時,使用tf.device函數將所有的節點都放在當前任務下。在tf.device函數中的任務是經過tf.train.replica_device_setter來指定的。在tf.train.replica_device_setter中使用worker_device來定義具體任務名稱;使用cluster的配置來指定角色及對應的IP地址,從而實現管理整個任務下的圖節點。代碼以下:

with tf.device(tf.train.replica_device_setter(worker_device='/job:worker/task:%d'%task_index,
                                              cluster=cluster_spec)):
    X = tf.placeholder('float')
    Y = tf.placeholder('float')
    # 模型參數
    w = tf.Variable(tf.random_normal([1]), name='weight')
    b = tf.Variable(tf.zeros([1]), name='bias')

    global_step = tf.train.get_or_create_global_step()   # 獲取迭代次數

    z = tf.multiply(X, w) + b
    tf.summary('z', z)
    cost = tf.reduce_mean(tf.square(Y - z))
    tf.summary.scalar('loss_function', cost)
    learning_rate = 0.001

    optimizer = tf.train.GradientDescentOptimizer(learning_rate).minimize(cost, global_step=global_step)

    saver = tf.train.Saver(max_to_keep=1)

    merged_summary_op = tf.summary.merge_all()  # 合併全部summary

    init = tf.global_variables_initializer()

4.建立Supercisor,管理session

在tf.train.Supervisor函數中,is_chief代表爲是否爲chief Supervisor角色,這裏將task_index=0的worker設置成chief Supervisor。saver須要將保存檢查點的saver對象傳入。init_op表示使用初始化變量的函數。

training_epochs = 2000
display_step = 2

sv = tf.train.Supervisor(is_chief=(task_index == 0),# 0號爲chief
                         logdir='log/spuer/',
                         init_op=init,
                         summary_op=None,
                         saver=saver,
                         global_step=global_step,
                         save_model_secs=5)

# 鏈接目標角色建立session
with sv.managed_session(saver.target) as sess:

5迭代訓練

session中的內容與之前同樣,直接迭代訓練便可。因爲使用了supervisor管理session,將使用sv.summary_computed函數來保存summary文件。

print('sess ok')
    print(global_step.eval(session=sess))
    
    for epoch in range(global_step.eval(session=sess), training_epochs*len(train_x)):
        for (x, y) in zip(train_x, train_y):
            _, epoch = sess.run([optimizer, global_step], feed_dict={X: x, Y: y})
            summary_str = sess.run(merged_summary_op, feed_dict={X: x, Y: y})
            sv.summary_computed(sess, summary_str, global_step=epoch)
            if epoch % display_step == 0:
                loss = sess.run(cost, feed_dict={X:train_x, Y:train_y})
                print("Epoch:", epoch+1, 'loss:', loss, 'W=', sess.run(w), w, 'b=', sess.run(b))
                
    print(' finished ')
    sv.saver.save(sess, 'log/linear/' + "sv.cpk", global_step=epoch)

sv.stop()

(1)在設置自動保存檢查點文件後,手動保存仍然有效,

(2)在運行一半後,在運行supervisor時會自動載入模型的參數,不須要手動調用restore。

(3)在session中不須要進行初始化的操做。

6.創建worker文件

新建兩個py文件,設置task_index分別爲0和1,其餘的部分和上述的代碼相一致。

strjob_name = 'worker'
task_index = 1
  
strjob_name = 'worker'
task_index = 0

7.運行

咱們分別啓動寫好的三個文件,在運行結果中,咱們能夠看到循環的次數不是連續的,顯示結果中會有警告,這是由於在構建supervisor時沒有填寫local_init_op參數,該參數的含義是在建立worker實例時,初始化本地變量,上述代碼中沒有設置,系統會自動初始化,並給出警告提示。

分佈運算的目的是爲了提升總體運算速度,若是同步epoch的準確率須要犧牲整體運行速度爲代價,天然很不合適。

在ps的文件中,它只是負責鏈接,並不參與運算。

相關文章
相關標籤/搜索