''' Created on 2017年5月28日 @author: weizhen ''' import time import tensorflow as tf from tensorflow.examples.tutorials.mnist import input_data import mnist_inference BATCH_SIZE = 100 LEARNING_RATE_BASE = 0.01 TRAINING_STEPS = 1000 LEARNING_RATE_DECAY = 0.99 REGULARAZTION_RATE = 0.0001 # 模型保存路徑 MODEL_SAVE_PATH = "/path/to/model" # MNIST數據路徑 DATA_PATH = "/path/to/data" # 經過flags指定運行的參數。對於不一樣的任務task給出了不一樣的程序 # 但這不是一種可擴展的方式,在這一小節中將使用運行程序是給出的參數來配置在不一樣任務中運行的程序 FLAGS = tf.app.flags.FLAGS # 指定當前運行的是參數服務器仍是計算服務器。參數服務器只負責Tensorflow中變量的維護和管理 # 計算服務器則負責每一輪迭代時運行反向傳播過程 tf.app.flags.DEFINE_string('job_name', 'worker', '"ps" or "worker" ') # 指定集羣中的參數服務器地址 tf.app.flags.DEFINE_string( 'ps_hosts', 'tf-ps0:2222,tf-ps1:1111', 'Comma-separated list of hostname:port for the parameter server jobs. e.g. "tf-ps0:2222,tf-ps1:1111" ') # 指定集羣中的計算服務器地址 tf.app.flags.DEFINE_string( 'worker_hosts', 'tf-worker0:2222,tf-worker1:1111', 'Comma-separated list of hostname:port for the worker jobs.' 'e.g. "tf-worker0:2222,tf-worker1:1111" ') # 指定當前程序的任務ID. Tensorflow 會自動根據參數服務器/計算服務器列表中的端口號 # 來啓動服務。注意參數服務器和計算服務器的編號都是從0開始的 tf.app.flags.DEFINE_integer( 'task_id', 0, 'Task ID of the worker/replica running the training.' ) # 定義Tensorflow的計算圖,並返回每一輪迭代時須要 運行的操做。 # 爲了是處理分佈式計算的部分更加突出,本校節將此過程整理爲一個函數 def build_model(x, y_, is_chief): regularizer = tf.contrib.layers.l2_regularizer(REGULARAZTION_RATE) # 計算神經網絡前向傳播的結果 y = mnist_inference.inference(x, regularizer) global_step = tf.Variable(0, trainable=False) # 計算損失函數並定義反向傳播過程 cross_entropy = tf.nn.sparse_softmax_cross_entropy_with_logits(logits=y, labels=tf.argmax(y_, 1)) cross_entropy_mean = tf.reduce_mean(cross_entropy) loss = cross_entropy_mean + tf.add_n(tf.get_collection('losses')) learning_rate = tf.train.exponential_decay(LEARNING_RATE_BASE, global_step, 60000 / BATCH_SIZE, LEARNING_RATE_DECAY) # 定義每一輪迭代須要運行的操做 train_op = tf.train.GradientDescentOptimizer(learning_rate).minimize(loss, global_step=global_step) return global_step, loss, train_op # 訓練分佈式深度學習模型的主過程 def main(argv=None): # 解析flags並經過tf.train.ClusterSpec配置TensorFlow集羣 ps_hosts = FLAGS.ps_hosts.split(',') worker_hosts = FLAGS.worker_hosts.split(',') cluster = tf.train.ClusterSpec({"ps":ps_hosts, "worker":worker_hosts}) # 經過ClusterSpec以及當前任務建立Server server = tf.train.Server(cluster, job_name=FLAGS.job_name, task_index=FLAGS.task_id) # 參數服務器只須要管理TensorFlow中的變量,不須要執行訓練的過程。server.join()會一直停在這條語句上 if FLAGS.job_name == 'ps': server.join() # 定義計算服務器須要運行的操做。在全部的計算服務器中有一個是主計算服務器。它除了負責計算反向傳播的結果,它還負責輸出日誌和保存模型 is_chief = (FLAGS.task_id == 0) mnist = input_data.read_data_sets(DATA_PATH, one_hot=True) # 經過tf.train.replica_device_setter函數來指定執行每個運算的設備 # tf.train.replica_device_setter函數會自動將全部的參數分配到參數服務器上,而 # 計算分配到當前的計算服務器上 with tf.device(tf.train.replica_device_setter(worker_device="/job:worker/task:%d " % FLAGS.task_id, cluster=cluster)): x = tf.placeholder(tf.float32, [None, mnist_inference.INPUT_NODE], name='x-input') y_ = tf.placeholder(tf.float32, [None, mnist_inference.OUTPUT_NODE], name='y-input') # 定義訓練模型須要運行的操做 global_step, loss, train_op = build_model(x, y_, is_chief) # 定義用於保存模型的saver saver = tf.train.Saver() # 定義日誌輸出操做 summary_op = tf.summary.merge_all() # 定義病了初始化操做 init_op = tf.global_variables_initializer() # 經過tf.train.Supervisor管理訓練深度學習模型的通用功能 # tf.train.Supervisor能統一管理隊列操做、模型保存、日誌輸出以及會話的生成 sv = tf.train.Supervisor( is_chief=is_chief, # 定義當前計算服務器是否爲主計算服務器,只用主計算服務器會保存模型以及輸出日誌 logdir=MODEL_SAVE_PATH, # 指定保存模型和輸出日誌的地址 init_op=init_op, # 指定初始化操做 summary_op=summary_op, # 指定日誌生成操做 saver=saver, # 指定用於保存模型的saver global_step=global_step, # 指定當前迭代的輪數,這個會用於生成保存模型文件的文件名 save_model_secs=60, # 指定保存模型的時間間隔 save_summaries_secs=60 # 指定日誌輸出的時間間隔 ) sess_config = tf.ConfigProto(allow_soft_placement=True, log_device_placement=False) # 經過tf.train.Supervisor生成會話 sess = sv.prepare_or_wait_for_session(server.target, config=sess_config) step = 0 start_time = time.time() # 執行迭代過程。在迭代過程當中tf.train.Supervisor會幫助輸出日誌並保存模型 # 因此不須要直接調用這些過程 while not sv.should_stop(): xs, ys = mnist.train.next_batch(BATCH_SIZE) _, loss_value, global_step_value = sess.run( [train_op, loss, global_step], feed_dict={x:xs, y_:ys}) if global_step_value >= TRAINING_STEPS:break # 每隔一段時間輸出訓練信息 if step > 0 and step % 100 == 0: duration = time.time() - start_time # 不一樣的計算服務器都會更新全局的訓練輪數,因此這裏使用 # global_step_value能夠直接獲得在訓練中使用過的batch的總數 sec_per_batch = duration / global_step_value format_str = ("After %d training steps (%d global steps), loss on training batch is %g. (%.3f sec/batch)") print(format_str % (step, global_step_value, loss_value, sec_per_batch)) step += 1 sv.stop() if __name__ == "__main__": tf.app.run()
下面是訓練的結果,須要等到全部的機器都開起來以後才能進行訓練git
2017-05-28 22:38:45.122523: W c:\tf_jenkins\home\workspace\release-win\device\gpu\os\windows\tensorflow\core\platform\cpu_feature_guard.cc:45] The TensorFlow library wasn't compiled to use SSE instructions, but these are available on your machine and could speed up CPU computations. 2017-05-28 22:38:45.122960: W c:\tf_jenkins\home\workspace\release-win\device\gpu\os\windows\tensorflow\core\platform\cpu_feature_guard.cc:45] The TensorFlow library wasn't compiled to use SSE2 instructions, but these are available on your machine and could speed up CPU computations. 2017-05-28 22:38:45.123285: W c:\tf_jenkins\home\workspace\release-win\device\gpu\os\windows\tensorflow\core\platform\cpu_feature_guard.cc:45] The TensorFlow library wasn't compiled to use SSE3 instructions, but these are available on your machine and could speed up CPU computations. 2017-05-28 22:38:45.123847: W c:\tf_jenkins\home\workspace\release-win\device\gpu\os\windows\tensorflow\core\platform\cpu_feature_guard.cc:45] The TensorFlow library wasn't compiled to use SSE4.1 instructions, but these are available on your machine and could speed up CPU computations. 2017-05-28 22:38:45.124201: W c:\tf_jenkins\home\workspace\release-win\device\gpu\os\windows\tensorflow\core\platform\cpu_feature_guard.cc:45] The TensorFlow library wasn't compiled to use SSE4.2 instructions, but these are available on your machine and could speed up CPU computations. 2017-05-28 22:38:45.125153: W c:\tf_jenkins\home\workspace\release-win\device\gpu\os\windows\tensorflow\core\platform\cpu_feature_guard.cc:45] The TensorFlow library wasn't compiled to use AVX instructions, but these are available on your machine and could speed up CPU computations. 2017-05-28 22:38:45.125514: W c:\tf_jenkins\home\workspace\release-win\device\gpu\os\windows\tensorflow\core\platform\cpu_feature_guard.cc:45] The TensorFlow library wasn't compiled to use AVX2 instructions, but these are available on your machine and could speed up CPU computations. 2017-05-28 22:38:45.126016: W c:\tf_jenkins\home\workspace\release-win\device\gpu\os\windows\tensorflow\core\platform\cpu_feature_guard.cc:45] The TensorFlow library wasn't compiled to use FMA instructions, but these are available on your machine and could speed up CPU computations. 2017-05-28 22:38:47.211250: I c:\tf_jenkins\home\workspace\release-win\device\gpu\os\windows\tensorflow\core\common_runtime\gpu\gpu_device.cc:887] Found device 0 with properties: name: GeForce 940MX major: 5 minor: 0 memoryClockRate (GHz) 1.189 pciBusID 0000:01:00.0 Total memory: 2.00GiB Free memory: 1.66GiB 2017-05-28 22:38:47.211668: I c:\tf_jenkins\home\workspace\release-win\device\gpu\os\windows\tensorflow\core\common_runtime\gpu\gpu_device.cc:908] DMA: 0 2017-05-28 22:38:47.211848: I c:\tf_jenkins\home\workspace\release-win\device\gpu\os\windows\tensorflow\core\common_runtime\gpu\gpu_device.cc:918] 0: Y 2017-05-28 22:38:47.212045: I c:\tf_jenkins\home\workspace\release-win\device\gpu\os\windows\tensorflow\core\common_runtime\gpu\gpu_device.cc:977] Creating TensorFlow device (/gpu:0) -> (device: 0, name: GeForce 940MX, pci bus id: 0000:01:00.0) 2017-05-28 22:38:47.375428: I c:\tf_jenkins\home\workspace\release-win\device\gpu\os\windows\tensorflow\core\distributed_runtime\rpc\grpc_channel.cc:200] Initialize GrpcChannelCache for job ps -> {0 -> tf-ps0:2222, 1 -> tf-ps1:1111} 2017-05-28 22:38:47.376363: I c:\tf_jenkins\home\workspace\release-win\device\gpu\os\windows\tensorflow\core\distributed_runtime\rpc\grpc_channel.cc:200] Initialize GrpcChannelCache for job worker -> {0 -> localhost:2222, 1 -> tf-worker1:1111} 2017-05-28 22:38:47.380830: I c:\tf_jenkins\home\workspace\release-win\device\gpu\os\windows\tensorflow\core\distributed_runtime\rpc\grpc_server_lib.cc:240] Started server with target: grpc://localhost:2222 Extracting /path/to/data\train-images-idx3-ubyte.gz Extracting /path/to/data\train-labels-idx1-ubyte.gz Extracting /path/to/data\t10k-images-idx3-ubyte.gz Extracting /path/to/data\t10k-labels-idx1-ubyte.gz 2017-05-28 22:38:58.243494: I c:\tf_jenkins\home\workspace\release-win\device\gpu\os\windows\tensorflow\core\distributed_runtime\master.cc:201] CreateSession still waiting for response from worker: /job:ps/replica:0/task:0 2017-05-28 22:38:58.244680: I c:\tf_jenkins\home\workspace\release-win\device\gpu\os\windows\tensorflow\core\distributed_runtime\master.cc:201] CreateSession still waiting for response from worker: /job:ps/replica:0/task:1 2017-05-28 22:38:58.247390: I c:\tf_jenkins\home\workspace\release-win\device\gpu\os\windows\tensorflow\core\distributed_runtime\master.cc:201] CreateSession still waiting for response from worker: /job:worker/replica:0/task:1 2017-05-28 22:39:08.248725: I c:\tf_jenkins\home\workspace\release-win\device\gpu\os\windows\tensorflow\core\distributed_runtime\master.cc:201] CreateSession still waiting for response from worker: /job:ps/replica:0/task:0 2017-05-28 22:39:08.249804: I c:\tf_jenkins\home\workspace\release-win\device\gpu\os\windows\tensorflow\core\distributed_runtime\master.cc:201] CreateSession still waiting for response from worker: /job:ps/replica:0/task:1 2017-05-28 22:39:08.251307: I c:\tf_jenkins\home\workspace\release-win\device\gpu\os\windows\tensorflow\core\distributed_runtime\master.cc:201] CreateSession still waiting for response from worker: /job:worker/replica:0/task:1 2017-05-28 22:39:18.253692: I c:\tf_jenkins\home\workspace\release-win\device\gpu\os\windows\tensorflow\core\distributed_runtime\master.cc:201] CreateSession still waiting for response from worker: /job:ps/replica:0/task:0 2017-05-28 22:39:18.254576: I c:\tf_jenkins\home\workspace\release-win\device\gpu\os\windows\tensorflow\core\distributed_runtime\master.cc:201] CreateSession still waiting for response from worker: /job:ps/replica:0/task:1 2017-05-28 22:39:18.255448: I c:\tf_jenkins\home\workspace\release-win\device\gpu\os\windows\tensorflow\core\distributed_runtime\master.cc:201] CreateSession still waiting for response from worker: /job:worker/replica:0/task:1 2017-05-28 22:39:28.257660: I c:\tf_jenkins\home\workspace\release-win\device\gpu\os\windows\tensorflow\core\distributed_runtime\master.cc:201] CreateSession still waiting for response from worker: /job:ps/replica:0/task:0 2017-05-28 22:39:28.258782: I c:\tf_jenkins\home\workspace\release-win\device\gpu\os\windows\tensorflow\core\distributed_runtime\master.cc:201] CreateSession still waiting for response from worker: /job:ps/replica:0/task:1 2017-05-28 22:39:28.260428: I c:\tf_jenkins\home\workspace\release-win\device\gpu\os\windows\tensorflow\core\distributed_runtime\master.cc:201] CreateSession still waiting for response from worker: /job:worker/replica:0/task:1