TensorFlow並行,模型並行,數據並行。模型並行根據不一樣模型設計不一樣並行方式,模型不一樣計算節點放在不一樣硬伯上資源運算。數據並行,比較通用簡便實現大規模並行方式,同時使用多個硬件資源計算不一樣batch數據梯度,彙總梯度全局參數更新。git
數據並行,多塊GPU同時訓練多個batch數據,運行在每塊GPU模型基於同一神經網絡,網絡結構同樣,共享模型參數。算法
同步數據並行,全部GPU計算完batch數據梯度,統計將多個梯度合在一塊兒,更新共享模型參數,相似使用較大batch。GPU型號、速度一致時,效率最高。
異步數據並行,不等待全部GPU完成一次訓練,哪一個GPU完成訓練,當即將梯度更新到共享模型參數。
同步數據並行,比異步收斂速度更快,模型精度更高。微信
同步數據並行,數據集CIFAR-10。載入依賴庫,TensorFlow Models cifar10類,下載CIFAR-10數據預處理。網絡
設置batch大小 128,最大步數100萬步(中間隨時中止,模型按期保存),GPU數量4。session
定義計算損失函數tower_loss。cifar10.distorted_inputs產生數據加強images、labels,調用cifar10.inference生成卷積網絡,每一個GPU生成單獨網絡,結構一致,共享模型參數。根據卷積網絡、labels,調用cifar10.loss計算損失函數(loss儲存到collection),tf.get_collection('losses',scope)獲取當前GPU loss(scope限定範圍),tf.add_n 全部損失疊加一塊兒得total_loss。返回total_loss做函數結果。app
定義函數average_gradients,不一樣GPU計算梯度合成。輸入參數tower_grads梯度雙層列表,外層列表不一樣GPU計算梯度,內層列表GPU計算不一樣Variable梯度。最內層元素(grads,variable),tower_grads基本元素二元組(梯度、變量),具體形式[[(grad0_gpu0,var0_gpu0),(grad1_gpu0,var1_gpu0)……],[(grad0_gpu1,var0_gpu1),(grad1_gpu1,var1_gpu1)……]……]。建立平均梯度列表average_grads,梯度在不一樣GPU平均。zip(*tower_grads)雙層列表轉置,變[[(grad0_gpu0,var0_gpu0),(grad0_gpu1,var0_gpu1)……],[(grad1_gpu0,var1_gpu0),(grad1_gpu1,var1_gpu1)……]……]形式,循環遍歷元素。循環獲取元素grad_and_vars,同Variable梯度在不一樣GPU計算結果。同Variable梯度不一樣GPU計算副本,計算梯度均值。梯度N維向量,每一個維度平均。tf.expand_dims給梯度添加冗餘維度0,梯度放列表grad。tf.concat 維度0上合併。tf.reduce_mean維度0平均,其餘維度所有平均。平均梯度,和Variable組合得原有二元組(梯度、變量)格式,添加到列表average_grads。全部梯度求均後,返回average_grads。異步
定義訓練函數。設置默認計算設備CPU。global_step記錄全局訓練步數,計算epoch對應batch數,學習速率衰減須要步數decay_steps。tf.train.exponential_decay建立隨訓練步數衰減學習速率,第一參數初始學習速率,第二參數全局訓練步數,第三參數每次衰減須要步數,第四參數衰減率,staircase設true,階梯式衰減。設置優化算法GradientDescent,傳入隨機步數衰減學習速率。ide
定義儲存GPU計算結果列表tower_grads。建立循環,循環次數GPU數量。循環中tf.device限定使用哪一個GPU。tf.name_scope命名空間。函數
GPU用tower_loss獲取損失。tf.get_variable_scope().reuse_variables()重用參數。GPU共用一個模型入徹底相同參數。opt.compute_gradients(loss)計算單個GPU梯度,添加到梯度列表tower_grads。average_gradients計算平均梯度,opt.apply_gradients更新模型參數。學習
建立模型保存器saver,Session allow_soft_placement 參數設True。有些操做只能在CPU上進行,不使用soft_placement。初始化所有參數,tf.train.start_queue_runner()準備大量數據加強訓練樣本,防止訓練被阻塞在生成樣本。
訓練循環,最大迭代次數max_steps。每步執行一次更新梯度操做apply_gradient_op(一次訓練操做),計算損失操做loss。time.time()記錄耗時。每隔10步,展現當前batch loss。每秒鐘可訓練樣本數和每一個batch訓練花費時間。每隔1000步,Saver保存整個模型文件。
cifar10.maybe_download_and_extract()下載完整CIFAR-10數據,train()開始訓練。
loss從最開始4點幾,到第70萬步,降到0.07。平均每一個batch耗時0.021s,平均每秒訓練6000個樣本,單GPU 4倍。
import os.path import re import time import numpy as np import tensorflow as tf import cifar10 batch_size=128 #train_dir='/tmp/cifar10_train' max_steps=1000000 num_gpus=4 #log_device_placement=False def tower_loss(scope): """Calculate the total loss on a single tower running the CIFAR model. Args: scope: unique prefix string identifying the CIFAR tower, e.g. 'tower_0' Returns: Tensor of shape [] containing the total loss for a batch of data """ # Get images and labels for CIFAR-10. images, labels = cifar10.distorted_inputs() # Build inference Graph. logits = cifar10.inference(images) # Build the portion of the Graph calculating the losses. Note that we will # assemble the total_loss using a custom function below. _ = cifar10.loss(logits, labels) # Assemble all of the losses for the current tower only. losses = tf.get_collection('losses', scope) # Calculate the total loss for the current tower. total_loss = tf.add_n(losses, name='total_loss') # Compute the moving average of all individual losses and the total loss. # loss_averages = tf.train.ExponentialMovingAverage(0.9, name='avg') # loss_averages_op = loss_averages.apply(losses + [total_loss]) # Attach a scalar summary to all individual losses and the total loss; do the # same for the averaged version of the losses. # for l in losses + [total_loss]: # Remove 'tower_[0-9]/' from the name in case this is a multi-GPU training # session. This helps the clarity of presentation on tensorboard. # loss_name = re.sub('%s_[0-9]*/' % cifar10.TOWER_NAME, '', l.op.name) # Name each loss as '(raw)' and name the moving average version of the loss # as the original loss name. # tf.scalar_summary(loss_name +' (raw)', l) # tf.scalar_summary(loss_name, loss_averages.average(l)) # with tf.control_dependencies([loss_averages_op]): # total_loss = tf.identity(total_loss) return total_loss def average_gradients(tower_grads): """Calculate the average gradient for each shared variable across all towers. Note that this function provides a synchronization point across all towers. Args: tower_grads: List of lists of (gradient, variable) tuples. The outer list is over individual gradients. The inner list is over the gradient calculation for each tower. Returns: List of pairs of (gradient, variable) where the gradient has been averaged across all towers. """ average_grads = [] for grad_and_vars in zip(*tower_grads): # Note that each grad_and_vars looks like the following: # ((grad0_gpu0, var0_gpu0), ... , (grad0_gpuN, var0_gpuN)) grads = [] for g, _ in grad_and_vars: # Add 0 dimension to the gradients to represent the tower. expanded_g = tf.expand_dims(g, 0) # Append on a 'tower' dimension which we will average over below. grads.append(expanded_g) # Average over the 'tower' dimension. grad = tf.concat(grads, 0) grad = tf.reduce_mean(grad, 0) # Keep in mind that the Variables are redundant because they are shared # across towers. So .. we will just return the first tower's pointer to # the Variable. v = grad_and_vars[0][1] grad_and_var = (grad, v) average_grads.append(grad_and_var) return average_grads def train(): """Train CIFAR-10 for a number of steps.""" with tf.Graph().as_default(), tf.device('/cpu:0'): # Create a variable to count the number of train() calls. This equals the # number of batches processed * FLAGS.num_gpus. global_step = tf.get_variable( 'global_step', [], initializer=tf.constant_initializer(0), trainable=False) # Calculate the learning rate schedule. num_batches_per_epoch = (cifar10.NUM_EXAMPLES_PER_EPOCH_FOR_TRAIN / batch_size) decay_steps = int(num_batches_per_epoch * cifar10.NUM_EPOCHS_PER_DECAY) # Decay the learning rate exponentially based on the number of steps. lr = tf.train.exponential_decay(cifar10.INITIAL_LEARNING_RATE, global_step, decay_steps, cifar10.LEARNING_RATE_DECAY_FACTOR, staircase=True) # Create an optimizer that performs gradient descent. opt = tf.train.GradientDescentOptimizer(lr) # Calculate the gradients for each model tower. tower_grads = [] for i in range(num_gpus): with tf.device('/gpu:%d' % i): with tf.name_scope('%s_%d' % (cifar10.TOWER_NAME, i)) as scope: # Calculate the loss for one tower of the CIFAR model. This function # constructs the entire CIFAR model but shares the variables across # all towers. loss = tower_loss(scope) # Reuse variables for the next tower. tf.get_variable_scope().reuse_variables() # Retain the summaries from the final tower. # summaries = tf.get_collection(tf.GraphKeys.SUMMARIES, scope) # Calculate the gradients for the batch of data on this CIFAR tower. grads = opt.compute_gradients(loss) # Keep track of the gradients across all towers. tower_grads.append(grads) # We must calculate the mean of each gradient. Note that this is the # synchronization point across all towers. grads = average_gradients(tower_grads) # Add a summary to track the learning rate. # summaries.append(tf.scalar_summary('learning_rate', lr)) # Add histograms for gradients. # for grad, var in grads: # if grad is not None: # summaries.append( # tf.histogram_summary(var.op.name + '/gradients', grad)) # Apply the gradients to adjust the shared variables. apply_gradient_op = opt.apply_gradients(grads, global_step=global_step) # Add histograms for trainable variables. # for var in tf.trainable_variables(): # summaries.append(tf.histogram_summary(var.op.name, var)) # Track the moving averages of all trainable variables. # variable_averages = tf.train.ExponentialMovingAverage( # cifar10.MOVING_AVERAGE_DECAY, global_step) # variables_averages_op = variable_averages.apply(tf.trainable_variables()) # Group all updates to into a single train op. # train_op = tf.group(apply_gradient_op, variables_averages_op) # Create a saver. saver = tf.train.Saver(tf.all_variables()) # Build the summary operation from the last tower summaries. # summary_op = tf.merge_summary(summaries) # Build an initialization operation to run below. init = tf.global_variables_initializer() # Start running operations on the Graph. allow_soft_placement must be set to # True to build towers on GPU, as some of the ops do not have GPU # implementations. sess = tf.Session(config=tf.ConfigProto(allow_soft_placement=True)) sess.run(init) # Start the queue runners. tf.train.start_queue_runners(sess=sess) # summary_writer = tf.train.SummaryWriter(train_dir, sess.graph) for step in range(max_steps): start_time = time.time() _, loss_value = sess.run([apply_gradient_op, loss]) duration = time.time() - start_time assert not np.isnan(loss_value), 'Model diverged with loss = NaN' if step % 10 == 0: num_examples_per_step = batch_size * num_gpus examples_per_sec = num_examples_per_step / duration sec_per_batch = duration / num_gpus format_str = ('step %d, loss = %.2f (%.1f examples/sec; %.3f ' 'sec/batch)') print (format_str % (step, loss_value, examples_per_sec, sec_per_batch)) # if step % 100 == 0: # summary_str = sess.run(summary_op) # summary_writer.add_summary(summary_str, step) # Save the model checkpoint periodically. if step % 1000 == 0 or (step + 1) == max_steps: # checkpoint_path = os.path.join(train_dir, 'model.ckpt') saver.save(sess, '/tmp/cifar10_train/model.ckpt', global_step=step) cifar10.maybe_download_and_extract() #if tf.gfile.Exists(train_dir): # tf.gfile.DeleteRecursively(train_dir) #tf.gfile.MakeDirs(train_dir) train()
參考資料:
《TensorFlow實戰》
歡迎付費諮詢(150元每小時),個人微信:qingxingfengzi