第八節,配置分佈式TensorFlow

 因爲隨着神經網絡層數的增多,須要訓練的參數也會增多,隨之而來須要的數據集就會很大,這樣會形成須要更大的運算資源,並且還要消耗很長的運算時間。TensorFlow提供了一個能夠分佈式部署的模式,將一個訓練任務拆分紅多個小任務,配置到不一樣的計算機上完成協同運算,這樣使用計算機羣運算來代替單機運算,可使訓練時間大幅度縮短。python

一 分佈式TensorFlow角色以及原理

要想配置TensorFlow爲分佈訓練,首先須要瞭解TensorFlow中關於分佈式的角色分配。算法

  • ps:做爲分佈式訓練的服務端,等到各個終端(supervisors)來鏈接。
  • worker:在TensorFlow的代碼註釋中被稱爲supervisors,做爲分佈式訓練的運算終端。
  • chief supervisors:在衆多運算終端中必須選中一個做爲主要的運算終端。該終端是在運算終端中最早啓動的,它的功能是合併各個終端運算後的學習參數,將其保存再寫入。

每一個具體角色網絡標識都是惟一的,即分佈在不一樣IP的機器上(或者同一主機但不一樣端口號)。數組

在實際運行中,各個角色的網絡構建部分代碼必須徹底相同。三者的分工以下:服務器

  • 服務器端做爲一個多方協調者,等待各個運算終端來鏈接。
  • cheif supervisors會在啓動時統一管理全局的學習參數,進行初始化或從模型載入。
  • 其它的運算終端只是負責獲得其相應的任務並進行計算,並不會保存檢查點以及用於TensorBoard可視化的summary日誌等任何參數信息。

 二 分佈部署TensorFlow的具體方法

配置過程當中,首先建立一個server,在server中會將ps以及全部worker的ip端口準備好,接着使用tf.train.Supervisor中的managed_seesion來管理打開的session,session只負責運算,而通訊協調的事情就都交給Supervisor來管理了。網絡

三 使用TensorFlow實現分佈式部署訓練

下面開始實現一個分佈式訓練的網絡模型,仍然以線性迴歸的模型做爲原型,並將其改成分佈式。使咱們須要在本機經過3個端口來創建3個終端,分別是ps,兩個worker。代碼主要分爲如下幾部分:session

1.爲每一個角色建立IP地址和端口,建立server

首先建立集羣(cluster), ClusterSpec的定義,須要把你要跑這個任務的全部的ps和worker 的節點的ip和端口的信息都包含進去, 全部的角色都要執行這段代碼, 就你們互相知道了, 這個集羣裏面都有哪些成員,不一樣的成員的類型是什麼, 是ps仍是worker。app

而後建立一個server,在server中會將ps以及全部worker的ip端口準備好,在同一臺電腦開三個不一樣的端口,分別表明ps,chief supervisors和worker。角色的名稱用strjob_name表示。從 tf.train.Server這個的定義開始,就每一個角色不同了。 若是角色名字是ps的話, 程序就join到這裏,做爲參數更新的服務, 等待其餘worker角色給它提交參數更新的數據。若是是worker角色,就執行後面的計算任務。以ps爲例(先建立ps文件):dom

'''
(1)爲每一個角色添加IP地址和端口,建立server
'''

'''定義IP和端口號'''
#指定服務器ip和port
strps_hosts = '127.0.0.1:1234'
#指定兩個終端的ip和port
strworker_hosts =  '127.0.0.1:1235,127.0.0.1:1236'

#定義角色名稱
strjob_name = 'ps'
task_index = 0
#將字符串轉爲數組
ps_hosts = strps_hosts.split(',')
worker_hosts = strworker_hosts.split(',')

cluster_spec= tf.train.ClusterSpec({'ps':ps_hosts,'worker':worker_hosts})

#建立Server
server = tf.train.Server(
         cluster_spec,
         job_name = strjob_name,
         task_index = task_index)

2.爲ps角色添加等待函數

ps角色使用server.join()函數進行線程掛起,開始接受鏈接消息。分佈式

'''
(2) 爲ps角色添加等待函數
'''
#ps角色處於監聽狀態,等待終端鏈接
if strjob_name == 'ps':
    print('waiting....')
    server.join()   

3.建立網絡結構

與正常的程序不一樣,在建立網絡結構時,使用tf.device()函數將所有的節點都放在當前任務下。task:0對應worker1(能夠理解爲任務0對應着角色1),task:1對應worker2。ide

在rf.device()函數中的任務是經過tf.train.replica_device_setter()來指定的。

在tf.train.replica_device_setter()中使用worker_device()來定義具體任務名稱:使用cluster的配置來指定角色和對應的ip地址,從而實現整個任務下的圖節點,

'''
(3) 建立網絡結構
'''
#設定訓練集數據長度
n_train = 100

#生成x數據,[-1,1]之間,均分紅n_train個數據
train_x = np.linspace(-1,1,n_train).reshape(n_train,1)

#把x乘以2,在加入(0,0.3)的高斯正太分佈
train_y = 2*train_x + np.random.normal(loc=0.0,scale=0.3,size=[n_train,1])

#繪製x,y波形
plt.figure()
plt.plot(train_x,train_y,'ro',label='y=2x')   #o使用圓點標記一個點
plt.legend()
plt.show()

#建立網絡結構時,經過tf.device()函數將所有的節點都放在當前任務下 task:0對應worker1 task:1對應worker2
with tf.device(tf.train.replica_device_setter(
        worker_device = '/job:worker/task:{0}'.format(task_index),
        cluster = cluster_spec)):
    
    '''
    前向反饋
    '''
    #建立佔位符
    input_x = tf.placeholder(dtype=tf.float32)
    input_y = tf.placeholder(dtype=tf.float32)
    
    #模型參數
    w = tf.Variable(tf.truncated_normal(shape=[1],mean=0.0,stddev=1),name='w')    #設置正太分佈參數  初始化權重
    b = tf.Variable(tf.truncated_normal(shape=[1],mean=0.0,stddev=1),name='b')    #設置正太分佈參數  初始化偏置
    
    #建立一個global_step變量
    global_step = tf.train.get_or_create_global_step()
    
    #前向結構
    pred = tf.multiply(w,input_x) + b
    
    #將預測值以直方圖形式顯示,給直方圖命名爲'pred'
    tf.summary.histogram('pred',pred)
    
    '''
    反向傳播bp
    '''
    #定義代價函數  選取二次代價函數
    cost = tf.reduce_mean(tf.square(input_y - pred))
    
    #將損失以標量形式顯示 該變量命名爲loss_function
    tf.summary.scalar('loss_function',cost)
    
    
    #設置求解器 採用梯度降低法 學習了設置爲0.001 並把global_step變量放到優化器中,這樣每運行一次優化器,globle_step就會自動得到當前迭代的次數
    train = tf.train.GradientDescentOptimizer(learning_rate=0.001).minimize(cost,global_step = global_step)
    
    saver = tf.train.Saver(max_to_keep = 1)
    
    #合併全部的summary
    merged_summary_op = tf.summary.merge_all()
    
    #初始化全部變量,所以變量須要放在其前面定義
    init  =tf.global_variables_initializer()

爲了使載入檢查點文件可以同步循環次數,這裏添加了一個global_step變量,將其放到優化器中。這樣每運行一次優化器,global_step就會自動加1.

4.建立Supervisor,管理session

'''
(4)建立Supervisor,管理session
'''    
training_epochs = 2000
display_step = 20

sv = tf.train.Supervisor(is_chief = (task_index == 0),          #0號worker爲chief
                         logdir='./LinearRegression/super/',    #檢查點和summary文件保存的路徑
                         init_op = init,  #初始化全部變量
                         summary_op = None,                            #summary_op用於自動保存summary文件,設置爲None,表示不自動保存
                         saver = saver,   #將保存檢查點的saver對象傳入,supervisor會自動保存檢查點文件。不然設置爲None
                         global_step = global_step,
                         save_model_secs = 50   #保存檢查點文件的時間間隔
                         )
  • 在tf.train.Supervisor()函數中,is_cheif代表了是否爲cheif supervisors角色,這裏將task_index = 0的worker設置成chief supervisors。
  • logdir:爲檢查點和summary日誌文件的保存路徑。不過這個彷佛啓動就會去這個logdir的目錄去看有沒有checkpoint的文件, 有的話就自動裝載了,沒有就用init_op指定的初始化參數。
  • init_op:表示使用初始化變量的函數。
  • summary_op:將保存summary的對象傳入,就會自動保存summary文件。這裏設置爲None,表示不自動保存。
  • saver:將保存檢查點的saver對象傳入,Supervisor就會自動保存檢查點文件。若是不想自動保存,就設置爲None。
  • global_step:爲迭代次數。
  • save_model_op:爲保存檢查點文件的時間間隔,這裏設置成50,代表每50秒自動保存一次檢查點文件。爲了使程序運行時間長一些,咱們更改了training_epochs參數。

5.迭代訓練

session中的內容和以前的同樣,直接迭代訓練便可,因爲使用了Supervisor管理session,將使用sv.summary_computed函數來保存summary文件,一樣,若是想要手動保存監測點文件,也可使用sv.saver.save()函數。

'''
(5) 迭代訓練
'''
#鏈接目標角色建立session
with sv.managed_session(server.target) as sess:
    print("sess ok:")
    print(global_step.eval(session=sess))
    print('開始迭代:')
         
    #存放批次值和代價值
    plotdata = {'batch_size':[],'loss':[]}
    
    #開始迭代 這裏step表示當前執行步數,迭代training_epochs輪  須要執行training_epochs*n_train步
    for step in range(training_epochs*n_train):
        for (x,y) in zip(train_x,train_y):
            #開始執行圖  並返回當前步數
            _,step = sess.run([train,global_step],feed_dict={input_x:x,input_y:y})
                                            
            #生成summary
            summary_str = sess.run(merged_summary_op,feed_dict={input_x:x,input_y:y})
            #將summary寫入文件  手動保存summary日誌文件
            sv.summary_computed(sess,summary_str,global_step = step)
            
            
             #一輪訓練完成後 打印輸出信息
            if step % display_step == 0:
                #計算代價值
                loss = sess.run(cost,feed_dict={input_x:train_x,input_y:train_y})
                print('step {0}  cost {1}  w {2}  b{3}'.format(step,loss,sess.run(w),sess.run(b)))
        
                #保存每display_step輪訓練後的代價值以及當前迭代輪數
                if not loss == np.nan:
                    plotdata['batch_size'].append(step)
                    plotdata['loss'].append(loss)
                
        
    print('Finished!')
    #手動保存檢查點文件
    #sv.saver.save(sess,'./LinearRegression/sv/sv.cpkt',global_step = step)
    
sv.stop()
  • 在設置了自動保存檢查點文件後,手動保存仍然有效。程序裏咱們在Supervisor對象建立的時候指定了自動保存檢查點文件,程序裏被我註釋掉的最後一行是採用手動保存檢查點文件。
  • 在Supervisor對象建立的時候指定了不自動保存summary日誌文件,咱們採用了手動保存,調用了sv.summary_computed()函數。
  • 在運行一半後終止,再運行Supervisor時會自動載入模型的參數,不須要手動調用saver.restore()。
  • 在session中,不須要再運行tf.global_variables_initializer()函數。由於在Supervisor創建的時候回調用傳入的init_op進行初始化,若是加了sess.run(tf.global_variables_initializer()),則會致使所載入模型的變量被二次清空。

6.創建worker文件

將ps.py文件複製兩份,一個叫worker1.py,一個叫worker2.py。將角色名稱修改成worker,並將worker2.py中的task_index修改成1。同時須要將worker2.py文件中手動保存summary日誌的代碼註釋掉。

worker1.py文件修改以下:

#定義角色名稱
strjob_name = 'worker'
task_index = 0

worker2.py文件修改以下:

#定義角色名稱
strjob_name = 'worker'
task_index = 1

在這個程序中使用了sv.summary_computed()函數手動將運行時動態的數據保存下來,以便於在TensorBoard中查看,可是在分佈式部署的時候,使用該功能還須要注意如下幾點:

  • worker2文件中不能使用sv.summary_computed()函數,由於worker2不是chief supervisors,在worker2中是不會爲Supervisor對象構造默認summary_writer(全部的summary日誌信息都要經過該對象進行寫)對象的,因此即便程序調用sv.summary_computed()也沒法執行下去,程序會報錯。
  • 手寫控制summary日誌和檢查點文件保存時,須要將chief supervisors之外的worker所有去掉才能夠,可使用Supervisor按時間間隔保存的形式來管理,這樣用一套代碼就能夠解決了。

7.部署運行

在spyder中先將ps.py文件運行起來,選擇菜單Consoles->Open an Ipython console,新打開一個Consoles,以下圖

在spider面板右下角,能夠看到在原有標籤爲'Console 1/A'標籤又多了一個‘Console 2/A’標籤,選中這個標籤,就激活了這個標籤。

運行worker2.py文件。同理,啓動'Console 3/A'運行worker1.py文件。

下面咱們能夠看到worker1.py文件的輸出:

咱們在程序中設置display_step爲20,即迭代20次輸出一次信息,咱們可能看到這個輸出並非連續的,這是由於跳過的步驟被分配到了worker2中去運算了。

worker2.py文件對應的窗口顯示的信息以下:

從圖中能夠看到worker2和chief supervisors的迭代順序是互補,但也有多是沒有絕對互補的,可是爲何有時候沒有絕對互補?可能與Supervisor中的同步算法有關。

分佈運算的目的是爲了提升總體運算速度,若是同步epoch的準確度須要以犧牲整體運算速度爲代價,天然很不合適。因此更合理的推斷是由於單機單次運算太快迫使算法使用了更寬鬆的同步機制。

重要的一點是對於指定步數的學習參數w和b是一致的。即統一迭代論述的值是同樣的,這代表兩個終端是在相同的起點上進行運算的。

對於ps.py文件,其對應的窗口一直默默的只顯示打印的那句話waiting....,由於它只負責鏈接參與運算。

四 最後再補充一些名詞解釋

客戶端(Client)

  • 客戶端是一個用於創建TensorFlow計算圖並創立與集羣進行交互的會話層tensorflow::Session 的程序。通常客戶端是經過python或C++實現的。一個獨立的客戶端進程能夠同時與多個TensorFlow的服務端相連 ,同時一個獨立的服務端也能夠與多個客戶端相連。

集羣(Cluster) 

  • 一個TensorFlow的集羣裏包含了一個或多個做業(job), 每個做業又能夠拆分紅一個或多個任務(task)。集羣的概念主要用與一個特定的高層次對象中,好比說訓練神經網絡,並行化操做多臺機器等等。集羣對象能夠經過tf.train.ClusterSpec 來定義。 

做業(Job) 

  • 一個做業能夠拆封成多個具備相同目的的任務(task),好比說,一個稱之爲ps(parameter server,參數服務器)的做業中的任務主要是保存和更新變量,而一個名爲worker(工做)的做業通常是管理無狀態且主要從事計算的任務。一個做業中的任務能夠運行於不一樣的機器上,做業的角色也是靈活可變的,好比說稱之爲」worker」的做業能夠保存一些狀態。  

任務(Task) 

  • 任務至關因而一個特定的TesnsorFlow服務端,其至關於一個獨立的進程,該進程屬於特定的做業並在做業中擁有對應的序號。 
    TensorFlow服務端(TensorFlow server) 。

ps.py完整代碼:

# -*- coding: utf-8 -*-
"""
Created on Thu Apr 19 08:52:30 2018

@author: zy
"""

import tensorflow as tf
import numpy as np
import os
import matplotlib.pyplot as plt

'''
分佈式計算
'''

'''
(1)爲每一個角色添加IP地址和端口,建立server
'''

'''定義IP和端口號'''
#指定服務器ip和port
strps_hosts = '127.0.0.1:1234'
#指定兩個終端的ip和port
strworker_hosts =  '127.0.0.1:1235,127.0.0.1:1236'

#定義角色名稱
strjob_name = 'ps'
task_index = 0
#將字符串轉爲數組
ps_hosts = strps_hosts.split(',')
worker_hosts = strworker_hosts.split(',')
cluster_spec = tf.train.ClusterSpec({'ps': ps_hosts,'worker': worker_hosts})

#建立server
server = tf.train.Server(
         cluster_spec,
         job_name = strjob_name,
         task_index = task_index)

'''
(2) 爲ps角色添加等待函數
'''
#ps角色處於監聽狀態,等待終端鏈接
if strjob_name == 'ps':
    print('waiting....')
    server.join()
    
    
    
'''
(3) 建立網絡結構
'''


#設定訓練集數據長度
n_train = 100

#生成x數據,[-1,1]之間,均分紅n_train個數據
train_x = np.linspace(-1,1,n_train).reshape(n_train,1)

#把x乘以2,在加入(0,0.3)的高斯正太分佈
train_y = 2*train_x + np.random.normal(loc=0.0,scale=0.3,size=[n_train,1])

#繪製x,y波形
plt.figure()
plt.plot(train_x,train_y,'ro',label='y=2x')   #o使用圓點標記一個點
plt.legend()
plt.show()

#建立網絡結構時,經過tf.device()函數將所有的節點都放在當前任務下
with tf.device(tf.train.replica_device_setter(
        worker_device = '/job:worker/task:{0}'.format(task_index),
        cluster = cluster_spec)):
    
    '''
    前向反饋
    '''
    #建立佔位符
    input_x = tf.placeholder(dtype=tf.float32)
    input_y = tf.placeholder(dtype=tf.float32)
    
    #模型參數
    w = tf.Variable(tf.truncated_normal(shape=[1],mean=0.0,stddev=1),name='w')    #設置正太分佈參數  初始化權重
    b = tf.Variable(tf.truncated_normal(shape=[1],mean=0.0,stddev=1),name='b')    #設置正太分佈參數  初始化偏置
    
    #建立一個global_step變量
    global_step = tf.train.get_or_create_global_step()
    
    #前向結構
    pred = tf.multiply(w,input_x) + b
    
    #將預測值以直方圖形式顯示,給直方圖命名爲'pred'
    tf.summary.histogram('pred',pred)
    
    '''
    反向傳播bp
    '''
    #定義代價函數  選取二次代價函數
    cost = tf.reduce_mean(tf.square(input_y - pred))
    
    #將損失以標量形式顯示 該變量命名爲loss_function
    tf.summary.scalar('loss_function',cost)
    
    
    #設置求解器 採用梯度降低法 學習了設置爲0.001 並把global_step變量放到優化器中,這樣每運行一次優化器,global_step就會自動得到當前迭代的次數
    train = tf.train.GradientDescentOptimizer(learning_rate=0.001).minimize(cost,global_step = global_step)
    
    saver = tf.train.Saver(max_to_keep = 1)
    
    #合併全部的summary
    merged_summary_op = tf.summary.merge_all()
    
    #初始化全部變量,所以變量須要放在其前面定義
    init  =tf.global_variables_initializer()

'''
(4)建立Supervisor,管理session
'''    
training_epochs = 2000
display_step = 20

sv = tf.train.Supervisor(is_chief = (task_index == 0),          #0號worker爲chief
                         logdir='./LinearRegression/super/',    #檢查點和summary文件保存的路徑
                         init_op = init,  #初始化全部變量
                         summary_op = None,                            #summary_op用於自動保存summary文件,設置爲None,表示不自動保存
                         saver = saver,   #將保存檢查點的saver對象傳入,supervisor會自動保存檢查點文件。不然設置爲None
                         global_step = global_step,
                         save_model_secs = 50   #保存檢查點文件的時間間隔
                         )







'''
(5) 迭代訓練
'''
#鏈接目標角色建立session
with sv.managed_session(server.target) as sess:
    print("sess ok:")
    print(global_step.eval(session=sess))
    print('開始迭代:')
         
    #存放批次值和代價值
    plotdata = {'batch_size':[],'loss':[]}
    
    #開始迭代 這裏step表示當前執行步數,迭代training_epochs輪  須要執行training_epochs*n_train步
    for step in range(training_epochs*n_train):
        for (x,y) in zip(train_x,train_y):
            #開始執行圖  並返回當前步數
            _,step = sess.run([train,global_step],feed_dict={input_x:x,input_y:y})
                                            
            #生成summary
            summary_str = sess.run(merged_summary_op,feed_dict={input_x:x,input_y:y})
            #將summary寫入文件  手動保存summary日誌文件
            sv.summary_computed(sess,summary_str,global_step = step)
            
            
             #一輪訓練完成後 打印輸出信息
            if step % display_step == 0:
                #計算代價值
                loss = sess.run(cost,feed_dict={input_x:train_x,input_y:train_y})
                print('step {0}  cost {1}  w {2}  b{3}'.format(step,loss,sess.run(w),sess.run(b)))
        
                #保存每display_step輪訓練後的代價值以及當前迭代輪數
                if not loss == np.nan:
                    plotdata['batch_size'].append(step)
                    plotdata['loss'].append(loss)
                
        
    print('Finished!')
    #手動保存檢查點文件
    #sv.saver.save(sess,'./LinearRegression/sv/sv.cpkt',global_step = step)
    
sv.stop()
    
    
    
View Code

worker1.py完整代碼:

# -*- coding: utf-8 -*-
"""
Created on Thu Apr 19 08:52:30 2018

@author: zy
"""

import tensorflow as tf
import numpy as np
import os
import matplotlib.pyplot as plt

'''
分佈式計算
'''

'''
(1)爲每一個角色添加IP地址和端口,建立worker 
'''

'''定義IP和端口號'''
#指定服務器ip和port
strps_hosts = '127.0.0.1:1234'
#指定兩個終端的ip和port
strworker_hosts =  '127.0.0.1:1235,127.0.0.1:1236'

#定義角色名稱
strjob_name = 'worker'
task_index = 0
#將字符串轉爲數組
ps_hosts = strps_hosts.split(',')
worker_hosts = strworker_hosts.split(',')
cluster_spec = tf.train.ClusterSpec({'ps': ps_hosts,'worker': worker_hosts})

#建立server
server = tf.train.Server(
         cluster_spec,
         job_name = strjob_name,
         task_index = task_index)

'''
(2) 爲ps角色添加等待函數
'''
#ps角色處於監聽狀態,等待終端鏈接
if strjob_name == 'ps':
    print('waiting....')
    server.join()
    
    
    
'''
(3) 建立網絡結構
'''


#設定訓練集數據長度
n_train = 100

#生成x數據,[-1,1]之間,均分紅n_train個數據
train_x = np.linspace(-1,1,n_train).reshape(n_train,1)

#把x乘以2,在加入(0,0.3)的高斯正太分佈
train_y = 2*train_x + np.random.normal(loc=0.0,scale=0.3,size=[n_train,1])

#繪製x,y波形
plt.figure()
plt.plot(train_x,train_y,'ro',label='y=2x')   #o使用圓點標記一個點
plt.legend()
plt.show()

#建立網絡結構時,經過tf.device()函數將所有的節點都放在當前任務下
with tf.device(tf.train.replica_device_setter(
        worker_device = '/job:worker/task:{0}'.format(task_index),
        cluster = cluster_spec)):
    
    '''
    前向反饋
    '''
    #建立佔位符
    input_x = tf.placeholder(dtype=tf.float32)
    input_y = tf.placeholder(dtype=tf.float32)
    
    #模型參數
    w = tf.Variable(tf.truncated_normal(shape=[1],mean=0.0,stddev=1),name='w')    #設置正太分佈參數  初始化權重
    b = tf.Variable(tf.truncated_normal(shape=[1],mean=0.0,stddev=1),name='b')    #設置正太分佈參數  初始化偏置
    
    #建立一個global_step變量
    global_step = tf.train.get_or_create_global_step()
    
    #前向結構
    pred = tf.multiply(w,input_x) + b
    
    #將預測值以直方圖形式顯示,給直方圖命名爲'pred'
    tf.summary.histogram('pred',pred)
    
    '''
    反向傳播bp
    '''
    #定義代價函數  選取二次代價函數
    cost = tf.reduce_mean(tf.square(input_y - pred))
    
    #將損失以標量形式顯示 該變量命名爲loss_function
    tf.summary.scalar('loss_function',cost)
    
    
    #設置求解器 採用梯度降低法 學習了設置爲0.001 並把global_step變量放到優化器中,這樣每運行一次優化器,global_step就會自動得到當前迭代的次數
    train = tf.train.GradientDescentOptimizer(learning_rate=0.001).minimize(cost,global_step = global_step)
    
    saver = tf.train.Saver(max_to_keep = 1)
    
    #合併全部的summary
    merged_summary_op = tf.summary.merge_all()
    
    #初始化全部變量,所以變量須要放在其前面定義
    init  =tf.global_variables_initializer()

'''
(4)建立Supervisor,管理session
'''    
training_epochs = 2000
display_step = 20

sv = tf.train.Supervisor(is_chief = (task_index == 0),          #0號worker爲chief
                         logdir='./LinearRegression/super/',    #檢查點和summary文件保存的路徑
                         init_op = init,  #初始化全部變量
                         summary_op = None,                            #summary_op用於自動保存summary文件,設置爲None,表示不自動保存
                         saver = saver,   #將保存檢查點的saver對象傳入,supervisor會自動保存檢查點文件。不然設置爲None
                         global_step = global_step,
                         save_model_secs = 50   #保存檢查點文件的時間間隔
                         )





'''
(5) 迭代訓練
'''
#鏈接目標角色建立session
with sv.managed_session(server.target) as sess:
    print("sess ok:")
    print(global_step.eval(session=sess))
    print('開始迭代:')
         
    #存放批次值和代價值
    plotdata = {'batch_size':[],'loss':[]}
    
    #開始迭代 這裏step表示當前執行步數,迭代training_epochs輪  須要執行training_epochs*n_train步
    for step in range(training_epochs*n_train):
        for (x,y) in zip(train_x,train_y):
            #開始執行圖  並返回當前步數
            _,step = sess.run([train,global_step],feed_dict={input_x:x,input_y:y})
                                            
            #生成summary
            summary_str = sess.run(merged_summary_op,feed_dict={input_x:x,input_y:y})
            #將summary寫入文件  手動保存summary日誌文件
            sv.summary_computed(sess,summary_str,global_step = step)
            
            
             #一輪訓練完成後 打印輸出信息
            if step % display_step == 0:
                #計算代價值
                loss = sess.run(cost,feed_dict={input_x:train_x,input_y:train_y})
                print('step {0}  cost {1}  w {2}  b{3}'.format(step,loss,sess.run(w),sess.run(b)))
        
                #保存每display_step輪訓練後的代價值以及當前迭代輪數
                if not loss == np.nan:
                    plotdata['batch_size'].append(step)
                    plotdata['loss'].append(loss)
                
        
    print('Finished!')
    #手動保存檢查點文件
    #sv.saver.save(sess,'./LinearRegression/sv/sv.cpkt',global_step = step)
    
sv.stop()
    
    
View Code

worker2.py完整代碼:

# -*- coding: utf-8 -*-
"""
Created on Thu Apr 19 08:52:30 2018

@author: zy
"""

import tensorflow as tf
import numpy as np
import os
import matplotlib.pyplot as plt

'''
分佈式計算
'''

'''
(1)爲每一個角色添加IP地址和端口,建立worker 
'''

'''定義IP和端口號'''
#指定服務器ip和port
strps_hosts = '127.0.0.1:1234'
#指定兩個終端的ip和port
strworker_hosts =  '127.0.0.1:1235,127.0.0.1:1236'

#定義角色名稱
strjob_name = 'worker'
task_index = 1
#將字符串轉爲數組
ps_hosts = strps_hosts.split(',')
worker_hosts = strworker_hosts.split(',')
cluster_spec = tf.train.ClusterSpec({'ps': ps_hosts,'worker': worker_hosts})

#建立server
server = tf.train.Server(
         cluster_spec,
         job_name = strjob_name,
         task_index = task_index)

'''
(2) 爲ps角色添加等待函數
'''
#ps角色處於監聽狀態,等待終端鏈接
if strjob_name == 'ps':
    print('waiting....')
    server.join()
    
    
    
'''
(3) 建立網絡結構
'''


#設定訓練集數據長度
n_train = 100

#生成x數據,[-1,1]之間,均分紅n_train個數據
train_x = np.linspace(-1,1,n_train).reshape(n_train,1)

#把x乘以2,在加入(0,0.3)的高斯正太分佈
train_y = 2*train_x + np.random.normal(loc=0.0,scale=0.3,size=[n_train,1])

#繪製x,y波形
plt.figure()
plt.plot(train_x,train_y,'ro',label='y=2x')   #o使用圓點標記一個點
plt.legend()
plt.show()

#建立網絡結構時,經過tf.device()函數將所有的節點都放在當前任務下
with tf.device(tf.train.replica_device_setter(
        worker_device = '/job:worker/task:{0}'.format(task_index),
        cluster = cluster_spec)):
    
    '''
    前向反饋
    '''
    #建立佔位符
    input_x = tf.placeholder(dtype=tf.float32)
    input_y = tf.placeholder(dtype=tf.float32)
    
    #模型參數
    w = tf.Variable(tf.truncated_normal(shape=[1],mean=0.0,stddev=1),name='w')    #設置正太分佈參數  初始化權重
    b = tf.Variable(tf.truncated_normal(shape=[1],mean=0.0,stddev=1),name='b')    #設置正太分佈參數  初始化偏置
    
    #建立一個global_step變量
    global_step = tf.train.get_or_create_global_step()
    
    #前向結構
    pred = tf.multiply(w,input_x) + b
    
    #將預測值以直方圖形式顯示,給直方圖命名爲'pred'
    tf.summary.histogram('pred',pred)
    
    '''
    反向傳播bp
    '''
    #定義代價函數  選取二次代價函數
    cost = tf.reduce_mean(tf.square(input_y - pred))
    
    #將損失以標量形式顯示 該變量命名爲loss_function
    tf.summary.scalar('loss_function',cost)
    
    
    #設置求解器 採用梯度降低法 學習了設置爲0.001 並把global_step變量放到優化器中,這樣每運行一次優化器,global_step就會自動得到當前迭代的次數
    train = tf.train.GradientDescentOptimizer(learning_rate=0.001).minimize(cost,global_step = global_step)
    
    saver = tf.train.Saver(max_to_keep = 1)
    
    #合併全部的summary
    merged_summary_op = tf.summary.merge_all()
    
    #初始化全部變量,所以變量須要放在其前面定義
    init  =tf.global_variables_initializer()

'''
(4)建立Supervisor,管理session
'''    
training_epochs = 2000
display_step = 20

sv = tf.train.Supervisor(is_chief = (task_index == 0),          #0號worker爲chief
                         logdir='./LinearRegression/super/',    #檢查點和summary文件保存的路徑
                         init_op = init,  #初始化全部變量
                         summary_op = None,                            #summary_op用於自動保存summary文件,設置爲None,表示不自動保存
                         saver = saver,   #將保存檢查點的saver對象傳入,supervisor會自動保存檢查點文件。不然設置爲None
                         global_step = global_step,
                         save_model_secs = 50   #保存檢查點文件的時間間隔
                         )





'''
(5) 迭代訓練
'''
#鏈接目標角色建立session
with sv.managed_session(server.target) as sess:
    print("sess ok:")
    print(global_step.eval(session=sess))
    print('開始迭代:')
         
    #存放批次值和代價值
    plotdata = {'batch_size':[],'loss':[]}
    
    #開始迭代 這裏step表示當前執行步數,迭代training_epochs輪  須要執行training_epochs*n_train步
    for step in range(training_epochs*n_train):
        for (x,y) in zip(train_x,train_y):
            #開始執行圖  並返回當前步數
            _,step = sess.run([train,global_step],feed_dict={input_x:x,input_y:y})
                                            
            #生成summary
            summary_str = sess.run(merged_summary_op,feed_dict={input_x:x,input_y:y})
            #將summary寫入文件  手動保存summary日誌文件
            #sv.summary_computed(sess,summary_str,global_step = step)
            
            
             #一輪訓練完成後 打印輸出信息
            if step % display_step == 0:
                #計算代價值
                loss = sess.run(cost,feed_dict={input_x:train_x,input_y:train_y})
                print('step {0}  cost {1}  w {2}  b{3}'.format(step,loss,sess.run(w),sess.run(b)))
        
                #保存每display_step輪訓練後的代價值以及當前迭代輪數
                if not loss == np.nan:
                    plotdata['batch_size'].append(step)
                    plotdata['loss'].append(loss)
                
        
    print('Finished!')
    #手動保存檢查點文件
    #sv.saver.save(sess,'./LinearRegression/sv/sv.cpkt',global_step = step)
    
sv.stop()
View Code
相關文章
相關標籤/搜索