目錄python
參考書git
《TensorFlow:實戰Google深度學習框架》(第2版)github
工具算法
python3.5.1,pycharmwindows
1. 如何使用log_device_placement參數來打印運行每個運算的設備。服務器
#!/usr/bin/env python # -*- coding: UTF-8 -*- # coding=utf-8 """ @author: Li Tian @contact: 694317828@qq.com @software: pycharm @file: gpu_test1.py @time: 2019/5/14 19:40 @desc: 如何使用log_device_placement參數來打印運行每個運算的設備 """ import tensorflow as tf a = tf.constant([1.0, 2.0, 3.0], shape=[3], name='a') b = tf.constant([1.0, 2.0, 3.0], shape=[3], name='b') c = a + b # 經過log_device_placement參數來輸出運行每個運算的設備。 sess = tf.Session(config=tf.ConfigProto(log_device_placement=True)) print(sess.run(c))
運行結果:網絡
2. 經過tf.device手工指定運行設備的樣例。session
#!/usr/bin/env python # -*- coding: UTF-8 -*- # coding=utf-8 """ @author: Li Tian @contact: 694317828@qq.com @software: pycharm @file: gpu_test2.py @time: 2019/5/14 19:54 @desc: 經過tf.device手工指定運行設備的樣例。 """ import tensorflow as tf # 經過tf.device將運行指定到特定的設備上。 with tf.device('/cpu:0'): a = tf.constant([1.0, 2.0, 3.0], shape=[3], name='a') b = tf.constant([1.0, 2.0, 3.0], shape=[3], name='b') with tf.device('/gpu:1'): c = a + b sess = tf.Session(config=tf.ConfigProto(log_device_placement=True)) print(sess.run(c))
因爲我並無GPU,因此只是尬碼代碼。。。app
3. 不是全部的操做均可以被放在GPU上,若是強行將沒法放在GPU上的操做指定到GPU上,那麼程序將會報錯。框架
4. 爲了不這個問題,TensorFlow在生成會話時,能夠指定allow_soft_placement參數,當這個參數爲True時,若是運算沒法由GPU執行,那麼TensorFlow會自動將它放到CPU上執行。
在多GPU上訓練深度學習模型解決MNIST問題。
#!/usr/bin/env python # -*- coding: UTF-8 -*- # coding=utf-8 """ @author: Li Tian @contact: 694317828@qq.com @software: pycharm @file: gpu_test3.py @time: 2019/5/15 10:35 @desc: 在多GPU上訓練深度學習模型解決MNIST問題。 """ from datetime import datetime import os import time import tensorflow as tf import BookStudy.book2.mnist_inference as mnist_inference # 定義訓練神經網絡時須要用到的模型。 BATCH_SIZE = 100 LEARNING_RATE_BASE = 0.001 LEARNING_RATE_DECAY = 0.99 REGULARAZTION_RATE = 0.0001 TRAINING_STEPS = 1000 MOVING_AVERAGE_DECAY = 0.99 N_GPU = 2 # 定義日誌和模型輸出的路徑 MODEL_SAVE_PATH = 'logs_and_models/' MODEL_NAME = 'model.ckpt' # 定義數據存儲的路徑。由於須要爲不一樣的GPU提供不一樣的訓練數據,因此經過placeholder的方式 # 就須要手動準備多份數據。爲了方便訓練數據的獲取過程,能夠採用前面介紹的Dataset的方式從 # TFRecord中讀取數據。因而在這裏提供的數據文件路徑爲將MNIST訓練數據轉化爲TFRecord格式 # 以後的路徑。如何將MNIST數據轉化爲TFRecord格式在前面有詳細介紹,這裏再也不贅述。 DATA_PATH = 'output.tfrecords' # 定義輸入隊列獲得的訓練數據,具體細節能夠參考前面。 def get_input(): dataset = tf.data.TFRecordDataset([DATA_PATH]) # 定義數據解析格式。 def parser(record): features = tf.parse_single_example( record, features={ 'image_raw': tf.FixedLenFeature([], tf.string), 'pixels': tf.FixedLenFeature([], tf.int64), 'label': tf.FixedLenFeature([], tf.int64), } ) # 解析圖片和標籤信息。 decoded_image = tf.decode_raw(features['image_raw'], tf.uint8) reshaped_image = tf.reshape(decoded_image, [784]) retyped_image = tf.cast(reshaped_image, tf.float32) label = tf.cast(features['label', tf.int32]) return retyped_image, label # 定義輸入隊列 dataset = dataset.map(parser) dataset = dataset.shuffle(buffer_size=10000) dataset = dataset.repeat(10) dataset = dataset.batch(BATCH_SIZE) iterator = dataset.make_one_shot_iterator() features, labels = iterator.get_next() return features, labels # 定義損失函數。對於給定的訓練數據、正則化損失計算規則和命名空間,計算在這個命名空間下的總損失。 # 之因此須要給定命名空間就是由於不一樣的GPU上計算得出的正則化損失都會加入名爲loss的集合,若是不 # 經過命名空間就會將不一樣GPU上的正則化損失都加進來。 def get_loss(x, y_, regularizer, scope, reuse_variables=None): # 沿用前面定義的函數來計算神經網絡的前向傳播結果。 with tf.variable_scope(tf.get_variable_scope(), reuse=reuse_variables): y = mnist_inference.inference(x, regularizer) # 計算交叉熵損失。 cross_entropy = tf.reduce_mean(tf.nn.sparse_softmax_cross_entropy_with_logits(logits=y, labels=y_)) # 計算當前GPU上計算獲得的正則化損失。 regularization_loss = tf.add_n(tf.get_collection('losses', scope)) # 計算最終的總損失。 loss = cross_entropy + regularization_loss return loss # 計算每個變量梯度的平均值。 def average_gradients(tower_grads): average_grads = [] # 枚舉全部的變量和變量在不一樣GPU上計算得出的梯度。 for grad_and_vars in zip(*tower_grads): # 計算全部GPU上的梯度平均值 grads = [] for g, _ in grad_and_vars: expanded_g = tf.expand_dims(g, 0) grads.append(expanded_g) grad = tf.concat(grads, 0) grad = tf.reduce_mean(grad, 0) v = grad_and_vars[0][1] grad_and_var = (grad, v) # 將變量和它的平均梯度對應起來。 average_grads.append(grad_and_var) # 返回全部變量的平均梯度,這個將被用於變量的更新。 return average_grads # 主訓練過程。 def main(argv=None): # 將簡單的運算放在CPU上,只有神經網絡的訓練過程在GPU上。 with tf.Graph().as_default(), tf.device('/cpu:0'): # 定義基本的訓練過程 x, y_ = get_input() regularizer = tf.contrib.layers.l2_regularizer(REGULARAZTION_RATE) global_step = tf.get_variable('global_step', [], initializer=tf.constant_initializer(0), trainable=False) learning_rate = tf.train.exponential_decay(LEARNING_RATE_BASE, global_step, 60000/BATCH_SIZE, LEARNING_RATE_DECAY) opt = tf.train.GradientDescentOptimizer(learning_rate) tower_grads = [] reuse_variables = False # 將神經網絡的優化過程跑在不一樣的GPU上。 for i in range(N_GPU): # 將優化過程指定在一個GPU上 with tf.device('/gpu:%d' % i): with tf.name_scope('GPU_%d' % i) as scope: cur_loss = get_loss(x, y_, regularizer, scope, reuse_variables) # 在第一次聲明變量以後,將控制變量重用的參數設置爲True。這樣可讓不一樣的GPU更新同一組參數 reuse_variables = True grads = opt.compute_gradients(cur_loss) tower_grads.append(grads) # 計算變量的平均梯度 grads = average_gradients(tower_grads) for grad, var in grads: if grad is not None: tf.summary.histogram('gradients_on_average/%s' % var.op.name, grad) # 使用平均梯度更新參數。 apply_gradient_op = opt.apply_gradients(grads, global_step=global_step) for var in tf.trainable_variables(): tf.summary.histogram(var.op.name, var) # 計算變量的滑動平均值。 variable_averages = tf.train.ExponentialMovingAverage(MOVING_AVERAGE_DECAY, global_step) variable_to_average = (tf.trainable_variables() + tf.moving_average_variables()) variable_averages_op = variable_averages.apply(variable_to_average) # 每一輪迭代須要更新變量的取值並更新變量的滑動平均值 train_op = tf.group(apply_gradient_op, variable_averages_op) saver = tf.train.Saver() summary_op = tf.summary.merge_all() init = tf.global_variables_initializer() with tf.Session(config=tf.ConfigProto(allow_soft_placement=True, log_device_placement=True)) as sess: # 初始化全部變量並啓動隊列。 init.run() summary_writer = tf.summary.FileWriter(MODEL_SAVE_PATH, sess.graph) for step in range(TRAINING_STEPS): # 執行神經網絡訓練操做,並記錄訓練操做的運行時間。 start_time = time.time() _, loss_value = sess.run([train_op, cur_loss]) duration = time.time() - start_time # 每隔一段時間輸出當前的訓練進度,並統計訓練速度 if step != 0 and step % 10 == 0: # 計算使用過的訓練數據個數。由於在每一次運行訓練操做時,每個GPU都會使用一個batch的訓練數據, # 因此總共用到的訓練數據個數爲batch大小 X GPU個數。 num_examples_per_step = BATCH_SIZE * N_GPU # num_example_per_step爲本次迭代使用到的訓練數據個數,duration爲運行當前訓練過程使用的時間, # 因而平均每秒能夠處理的訓練數據個數爲num_examples_per_step / duration examples_per_sec = num_examples_per_step / duration # duration爲運行當前訓練過程使用的時間,由於在每個訓練過程當中,每個GPU都會使用一個batch的 # 訓練數據,因此在單個batch上的訓練所須要的時間爲duration / GPU個數 sec_per_batch = duration / N_GPU # 輸出訓練信息。 format_str = '%s: step %d, loss = %.2f (%.1f examples/sec; %.3f sec/batch)' print(format_str % (datetime.now(), step, loss_value, examples_per_sec, sec_per_batch)) # 經過TensorBoard可視化訓練過程。 summary = sess.run(summary_op) summary_writer.add_summary(summary, step) # 每隔一段時間保存當前的模型。 if step % 1000 == 0 or (step + 1) == TRAINING_STEPS: checkpoint_path = os.path.join(MODEL_SAVE_PATH, MODEL_NAME) saver.save(sess, checkpoint_path, global_step=step) if __name__ == '__main__': tf.app.run()
因爲我依然沒有GPU,因此只是尬碼代碼。。。
#!/usr/bin/env python # -*- coding: UTF-8 -*- # coding=utf-8 """ @author: Li Tian @contact: 694317828@qq.com @software: pycharm @file: gpu_test4.py @time: 2019/5/15 22:19 @desc: 建立一個最簡單的TensorFlow集羣 """ import tensorflow as tf c = tf.constant("Hello, distributed TensorFlow!") # 建立一個本地的TensorFlow集羣 server = tf.train.Server.create_local_server() # 在集羣上建立一個會話。 sess = tf.Session(server.target) # 輸出Hello,distributed TensorFlow! print(sess.run(c))
輸出獲得:
第一個任務代碼:
#!/usr/bin/env python # -*- coding: UTF-8 -*- # coding=utf-8 """ @author: Li Tian @contact: 694317828@qq.com @software: pycharm @file: gpu_test5.py @time: 2019/5/15 22:27 @desc: 在本地運行有兩個任務的TensorFlow集羣。第一個任務的代碼。 """ import tensorflow as tf c = tf.constant("Hello from server1!") # 生成一個有兩個任務的集羣,一個任務跑在本地2222端口,另一個跑在本地2223端口。 cluster = tf.train.ClusterSpec({"local": ["localhost: 2222", "localhost: 2223"]}) # 經過上面生成的集羣配置生成Server,並經過job_name和task_index指定當前所啓動的任務。 # 由於該任務是第一個任務,因此task_index爲0. server = tf.train.Server(cluster, job_name="local", task_index=0) # 經過server.target生成會話來使用TensorFlow集羣中的資源。經過設置 # log_device_placement能夠看到執行每個操做的任務。 sess = tf.Session(server.target, config=tf.ConfigProto(log_device_placement=True)) print(sess.run(c)) server.join()
第二個任務代碼:
#!/usr/bin/env python # -*- coding: UTF-8 -*- # coding=utf-8 """ @author: Li Tian @contact: 694317828@qq.com @software: pycharm @file: gpu_test6.py @time: 2019/5/16 10:14 @desc: 在本地運行有兩個任務的TensorFlow集羣。第二個任務的代碼。 """ import tensorflow as tf c = tf.constant("Hello from server2!") # 和第一個程序同樣的集羣配置。集羣中的每個任務須要採用相同的配置。 cluster = tf.train.ClusterSpec({"local": ["localhost: 2222", "localhost: 2223"]}) # 指定task_index爲1,因此這個程序將在localhost: 2223啓動服務。 server = tf.train.Server(cluster, job_name="local", task_index=1) # 剩下的代碼都和第一個任務的代碼一致。 # 經過server.target生成會話來使用TensorFlow集羣中的資源。經過設置 # log_device_placement能夠看到執行每個操做的任務。 sess = tf.Session(server.target, config=tf.ConfigProto(log_device_placement=True)) print(sess.run(c)) server.join()
啓動第一個任務後,能夠獲得相似下面的輸出:
從第一個任務的輸出中能夠看到,當只啓動第一個任務時,程序會停下來等待第二個任務啓動。當第二個任務啓動後,能夠獲得以下輸出:
值得注意的是:第二個任務中定義的計算也被放在了同一個設備上,也就是說這個計算將由第一個任務來執行。
使用分佈式TensorFlow訓練深度學習模型通常有兩種方式:
- 一種方式叫作計算圖內分佈式(in-graph replication)。優勢:同步更新參數比較容易控制。缺點:當數據量太大時,中心節點容易形成性能瓶頸。
- 另一種叫作計算圖之間分佈式(between-graph replication)。優勢:並行程度更高。缺點:同步更新困難。
異步模式樣例程序
#!/usr/bin/env python # -*- coding: UTF-8 -*- # coding=utf-8 """ @author: Li Tian @contact: 694317828@qq.com @software: pycharm @file: gpu_test7.py @time: 2019/5/16 14:01 @desc: 實現異步模式的分佈式神經網絡訓練過程。 """ import time import tensorflow as tf from tensorflow.examples.tutorials.mnist import input_data import mnist_inference # 配置神經網絡的參數。 BATCH_SIZE = 100 LEARNING_RATE_BASE = 0.001 LEARNING_RATE_DECAY = 0.99 REGULARAZTION_RATE = 0.0001 TRAINING_STEPS = 20000 MOVING_AVERAGE_DECAY = 0.99 # 模型保存的路徑。 MODEL_SAVE_PATH = "./log1" # MNIST數據路徑。 DATA_PATH = "D:/Python3Space/BookStudy/book2/MNIST_data" # 經過flags指定運行的參數。在前面對於不一樣的任務(task)給出了不一樣的程序。 # 但這不是一種可擴展的方式。在這一節中將使用運行程序時給出的參數來配置在 # 不一樣任務中運行的程序。 FLAGS = tf.app.flags.FLAGS # 指定當前運行的是參數服務器仍是計算服務器。參數服務器只負責TensorFlow中變量的維護 # 和管理,計算服務器負責每一輪迭代時運行反向傳播過程。 tf.app.flags.DEFINE_string('job_name', 'worker', ' "ps" or "worker" ') # 指定集羣中的參數服務器地址。 tf.app.flags.DEFINE_string( 'ps_hosts', ' tf-ps0:2222,tf-ps1:1111', 'Comma-separated list of hostname:port for the parameter server jobs.' ' e.g. "tf-ps0:2222,tf-ps1:1111" ' ) # 指定集羣中的計算服務器地址。 tf.app.flags.DEFINE_string( 'worker_hosts', ' tf-worker0:2222, tf-worker1:1111', 'Comma-separated list of hostname:port for the worker jobs. ' 'e.g. "tf-worker0:2222,tf-worker1:1111" ' ) # 指定當前程序的任務ID。TensorFlow會自動根據參數服務器/計算服務器列表中的端口號來啓動服務。 # 注意參數服務器和計算服務器的編號都是從0開始的。 tf.app.flags.DEFINE_integer( 'task_id', 0, 'Task ID of the worker/replica running the training.' ) # 定義TensorFlow的計算圖,並返回每一輪迭代時須要運行的操做。這個過程和前面的主函數基本一致, # 但爲了使處理分佈式計算的部分更加突出,這裏將此過程整理爲一個函數。 def build_model(x, y_, is_chief): regularizer = tf.contrib.layers.l2_regularizer(REGULARAZTION_RATE) # 經過前面給出的mnist_inference計算神經網絡前向傳播的結果。 y = mnist_inference.inference(x, regularizer) global_step = tf.train.get_or_create_global_step() # 計算損失函數並定義反向傳播的過程。 cross_entropy = tf.nn.sparse_softmax_cross_entropy_with_logits(logits=y, labels=tf.argmax(y_, 1)) cross_entropy_mean = tf.reduce_mean(cross_entropy) loss = cross_entropy_mean + tf.add_n(tf.get_collection('losses')) learning_rate = tf.train.exponential_decay( LEARNING_RATE_BASE, global_step, 60000 / BATCH_SIZE, LEARNING_RATE_DECAY ) train_op = tf.train.GradientDescentOptimizer(learning_rate).minimize(loss, global_step=global_step) # 定義每一輪迭代須要運行的操做。 if is_chief: # 計算變量的滑動平均值。 variable_averages = tf.train.ExponentialMovingAverage(MOVING_AVERAGE_DECAY, global_step) variable_averages_op = variable_averages.apply(tf.trainable_variables()) with tf.control_dependencies([variable_averages_op, train_op]): train_op = tf.no_op() return global_step, loss, train_op def main(argv=None): # 解析flags並經過tf.train.ClusterSpe配置TensorFlow集羣。 ps_hosts = FLAGS.ps_hosts.split(',') worker_hosts = FLAGS.worker_hosts.split(',') cluster = tf.train.ClusterSpec({'ps': ps_hosts, 'worker': worker_hosts}) # 經過tf.train.ClusterSpec以及當前任務建立tf.train.Server。 server = tf.train.Server(cluster, job_name=FLAGS.job_name, task_index=FLAGS.task_id) # 參數服務器只須要管理TensorFlow中的變量,不須要執行訓練的過程。server.join()會一直停在這條語句上。 if FLAGS.job_name == 'ps': with tf.device("/cpu:0"): server.join() # 定義計算服務器須要運行的操做。 is_chief = (FLAGS.task_id == 0) mnist = input_data.read_data_sets(DATA_PATH, one_hot=True) # 經過tf.train.replica_device_setter函數來指定執行每個運算的設備。 # tf.train.replica_device_setter函數會自動將全部的參數分配到參數服務器上,將 # 計算分配到當前的計算服務器上。 device_setter = tf.train.replica_device_setter( worker_device="/job:worker/task:%d" % FLAGS.task_id, cluster=cluster ) with tf.device(device_setter): # 定義輸入並獲得每一輪迭代須要運行的操做。 x = tf.placeholder( tf.float32, [None, mnist_inference.INPUT_NODE], name='x-input' ) y_ = tf.placeholder( tf.float32, [None, mnist_inference.OUTPUT_NODE], name='y-input' ) global_step, loss, train_op = build_model(x, y_, is_chief) hooks = [tf.train.StopAtStepHook(last_step=TRAINING_STEPS)] sess_config = tf.ConfigProto(allow_soft_placement=True, log_device_placement=False) # 經過tf.train.MonitoredTrainingSession管理訓練深度學習模型的通用功能。 with tf.train.MonitoredTrainingSession( master=server.target, is_chief=is_chief, checkpoint_dir=MODEL_SAVE_PATH, hooks=hooks, save_checkpoint_secs=60, config=sess_config ) as mon_sess: print("session started.") step = 0 start_time = time.time() # 執行迭代過程。在迭代過程當中tf.train.MonitoredTrainingSession會幫助完成初始化、 # 從checkpoint中加載訓練過的模型、輸出日誌並保存模型,因此如下程序中不須要再調用 # 這些過程。tf.train.StopAtStepHook會幫忙判斷是否須要退出。 while not mon_sess.should_stop(): xs, ys = mnist.train.next_batch(BATCH_SIZE) _, loss_value, global_step_value = mon_sess.run( [train_op, loss, global_step], feed_dict={x: xs, y_: ys} ) # 每隔一段時間輸出訓練信息,不一樣的計算服務器都會更新全局的訓練輪數, # 因此這裏使用global_step_value獲得在訓練中使用過的batch的總數。 if step > 0 and step % 100 == 0: duration = time.time() - start_time sec_per_batch = duration / global_step_value format_str = "After %d training steps (%d global steps), loss on training batch is %g. (%.3f sec/batch)" print(format_str % (step, global_step_value, loss_value, sec_per_batch)) step += 1 if __name__ == '__main__': try: tf.app.run() except Exception as e: print(e)
要啓動一個擁有一個參數服務器、兩個計算服務器的集羣,須要如今運行參數服務器的機器上啓動如下命令。
python gpu_test7.py --job_name=ps --task_id=0 --ps_hosts=localhost:2222 --worker_hosts=localhost:2223,localhost:2224
而後再運行第一個計算服務器的機器上啓動如下命令:
python gpu_test7.py --job_name=worker --task_id=0 --ps_hosts=localhost:2222 --worker_hosts=localhost:2223,localhost:2224
最後再運行第二個計算服務器的機器上啓動如下命令:
python gpu_test7.py --job_name=worker --task_id=1 --ps_hosts=localhost:2222 --worker_hosts=localhost:2223,localhost:2224
注意:若是你報錯了:
1. UnknownError: Could not start gRPC server
必定是你的參數問題!檢查你的task_id有沒有寫成taske_id等等,相似的,必定是這樣!!!必定要跟程序中的參數名保持一致!!!
2. 跑的過程當中python搞不清楚崩了,兩個計算服務器都會崩,跟時間無關,隨機的那種。。。直接彈出python已中止的那種
聽個人換臺電腦或者重裝系統。
3. 報錯‘ps’不存在的,沒定義的注意了!
在cmd(windows系統)中啓動上面三個命令的時候,參數的內容不要加引號!!!書上面加引號真的是坑死了好嗎!
我就是解決了上述三個問題,換了臺macbook,才總算功德圓滿了跑出告終果。
左上:參數服務器
右上:計算服務器0
左下:計算服務器1
右下:運行tensorboard,結果以下:
同步模式樣例程序
#!/usr/bin/env python # -*- coding: UTF-8 -*- # coding=utf-8 """ @author: Li Tian @contact: 694317828@qq.com @software: pycharm @file: gpu_test8.py @time: 2019/5/17 13:52 @desc: 實現同步模式的分佈式神經網絡訓練過程。 """ import time import tensorflow as tf from tensorflow.examples.tutorials.mnist import input_data import mnist_inference # 配置神經網絡的參數。 BATCH_SIZE = 100 LEARNING_RATE_BASE = 0.001 LEARNING_RATE_DECAY = 0.99 REGULARAZTION_RATE = 0.0001 TRAINING_STEPS = 20000 MOVING_AVERAGE_DECAY = 0.99 # 模型保存的路徑。 MODEL_SAVE_PATH = "./log2" # MNIST數據路徑。 DATA_PATH = "D:/Python3Space/BookStudy/book2/MNIST_data" # 和異步模式相似的設置flags。 FLAGS = tf.app.flags.FLAGS tf.app.flags.DEFINE_string('job_name', 'worker', ' "ps" or "worker" ') tf.app.flags.DEFINE_string( 'ps_hosts', ' tf-ps0:2222, tf-ps1:1111', 'Comma-separated list of hostname:port for the parameter server jobs.' ' e.g. "tf-ps0:2222,tf-ps1:1111" ' ) tf.app.flags.DEFINE_string( 'worker_hosts', ' tf-worker0:2222, tf-worker1:1111', 'Comma-separated list of hostname:port for the worker jobs. ' 'e.g. "tf-worker0:2222,tf-worker1:1111" ' ) tf.app.flags.DEFINE_integer( 'task_id', 0, 'Task ID of the worker/replica running the training.' ) # 和異步模式相似的定義TensorFlow的計算圖。惟一的區別在於使用。 # tf.train.SyncReplicasOptimizer函數處理同步更新。 def build_model(x, y_, n_workers, is_chief): regularizer = tf.contrib.layers.l2_regularizer(REGULARAZTION_RATE) y = mnist_inference.inference(x, regularizer) global_step = tf.train.get_or_create_global_step() cross_entropy = tf.nn.sparse_softmax_cross_entropy_with_logits(logits=y, labels=tf.argmax(y_, 1)) cross_entropy_mean = tf.reduce_mean(cross_entropy) loss = cross_entropy_mean + tf.add_n(tf.get_collection('losses')) learning_rate = tf.train.exponential_decay( LEARNING_RATE_BASE, global_step, 60000 / BATCH_SIZE, LEARNING_RATE_DECAY ) # 經過tf.train.SyncReplicasOptimizer函數實現同步更新。 opt = tf.train.SyncReplicasOptimizer( tf.train.GradientDescentOptimizer(learning_rate), replicas_to_aggregate=n_workers, total_num_replicas=n_workers ) sync_replicas_hook = opt.make_session_run_hook(is_chief) train_op = opt.minimize(loss, global_step=global_step) if is_chief: variable_averages = tf.train.ExponentialMovingAverage(MOVING_AVERAGE_DECAY, global_step) variable_averages_op = variable_averages.apply(tf.trainable_variables()) with tf.control_dependencies([variable_averages_op, train_op]): train_op = tf.no_op() return global_step, loss, train_op, sync_replicas_hook def main(argv=None): # 和異步模式相似地建立TensorFlow集羣。 ps_hosts = FLAGS.ps_hosts.split(',') worker_hosts = FLAGS.worker_hosts.split(',') n_workers = len(worker_hosts) cluster = tf.train.ClusterSpec({"ps": ps_hosts, "worker": worker_hosts}) server = tf.train.Server(cluster, job_name=FLAGS.job_name, task_index=FLAGS.task_id) if FLAGS.job_name == 'ps': with tf.device("/cpu:0"): server.join() is_chief = (FLAGS.task_id == 0) mnist = input_data.read_data_sets(DATA_PATH, one_hot=True) device_setter = tf.train.replica_device_setter( worker_device="/job:worker/task:%d" % FLAGS.task_id, cluster=cluster ) with tf.device(device_setter): # 定義輸入並獲得每一輪迭代須要運行的操做。 x = tf.placeholder( tf.float32, [None, mnist_inference.INPUT_NODE], name='x-input' ) y_ = tf.placeholder( tf.float32, [None, mnist_inference.OUTPUT_NODE], name='y-input' ) global_step, loss, train_op, sync_replicas_hook = build_model(x, y_, n_workers, is_chief) # 把處理同步更新的hook也加進來 hooks = [sync_replicas_hook, tf.train.StopAtStepHook(last_step=TRAINING_STEPS)] sess_config = tf.ConfigProto(allow_soft_placement=True, log_device_placement=False) # 訓練過程和異步一致。 with tf.train.MonitoredTrainingSession( master=server.target, is_chief=is_chief, checkpoint_dir=MODEL_SAVE_PATH, hooks=hooks, save_checkpoint_secs=60, config=sess_config ) as mon_sess: print("session started.") step = 0 start_time = time.time() while not mon_sess.should_stop(): xs, ys = mnist.train.next_batch(BATCH_SIZE) _, loss_value, global_step_value = mon_sess.run( [train_op, loss, global_step], feed_dict={x: xs, y_: ys} ) if step > 0 and step % 100 == 0: duration = time.time() - start_time sec_per_batch = duration / global_step_value format_str = "After %d training steps (%d global steps), loss on training batch is %g. (%.3f sec/batch)" print(format_str % (step, global_step_value, loss_value, sec_per_batch)) step += 1 if __name__ == '__main__': tf.app.run()
同上運行出來獲得:
和異步模式不一樣,在同步模式下,global_step差很少是兩個計算服務器local_step的平均值。好比在第二個計算服務器尚未開始以前,global_step是第一個服務器local_step的通常。這是由於同步模式要求收集replicas_to_average份梯度纔會開始更新(注意這裏TensorFlow不要求每一份梯度來自不一樣的計算服務器)。同步模式不只僅是一次使用多份梯度,tf.train.SyncReplicasOptimizer的實現同時也保證了不會出現陳舊變量的問題,該函數會記錄每一份梯度是否是由最新的變量值計算獲得的,若是不是,那麼這一份梯度將會被丟棄。
總算是把這本書一步一步的實現,一步一步的修改,從頭認認真真的刷了一遍。取其精華,棄其糟粕。學習TensorFlow真的是一件不容易,也是一件頗有成就感的過程。還有黑皮書《TensorFlow實戰》和聖經花書《深度學習》要學,但願後面的書不會讓我失望吧。
個人CSDN:https://blog.csdn.net/qq_21579045
個人博客園:https://www.cnblogs.com/lyjun/
個人Github:https://github.com/TinyHandsome
紙上得來終覺淺,絕知此事要躬行~
歡迎你們過來OB~
by 李英俊小朋友