摘要:本系列主要對tf的一些經常使用概念與方法進行描述。本文主要針對tensorflow的模型訓練Training與測試Testing等相關函數進行講解。爲‘Tensorflow一些經常使用基本概念與函數’系列之四。python
1 optimizer = tf.train.GradientDescentOptimizer(learning_rate)
而後,能夠設置 一個用於記錄全局訓練步驟的單值。以及使用minimize()操做,該操做不只能夠優化更新訓練的模型參數,也能夠爲全局步驟(global step)計數。與其餘tensorflow操做相似,這些訓練操做都須要在tf.session會話中進行
1 global_step = tf.Variable(0, name='global_step', trainable=False) 2 train_op = optimizer.minimize(loss, global_step=global_step)
操做組算法 |
操做服務器 |
---|---|
Training網絡 |
Optimizers,Gradient Computation,Gradient Clipping,Distributed executionsession |
Testing多線程 |
Unit tests,Utilities,Gradient checkingapp |
一個TFRecords 文件爲一個字符串序列。這種格式並不是隨機獲取,它比較適合大規模的數據流,而不太適合須要快速分區或其餘非序列獲取方式。框架
tf中各類優化類提供了爲損失函數計算梯度的方法,其中包含比較經典的優化算法,好比GradientDescent 和Adagrad。less
▶▶class tf.train.Optimizer分佈式
操做 |
描述 |
---|---|
class tf.train.Optimizer |
基本的優化類,該類不經常被直接調用,而較多使用其子類, 好比GradientDescentOptimizer, AdagradOptimizer 或者MomentumOptimizer |
tf.train.Optimizer.__init__(use_locking, name) |
建立一個新的優化器, 該優化器必須被其子類(subclasses)的構造函數調用 |
tf.train.Optimizer.minimize(loss, global_step=None, |
添加操做節點,用於最小化loss,並更新var_list 該函數是簡單的合併了compute_gradients()與apply_gradients()函數 返回爲一個優化更新後的var_list,若是global_step非None,該操做還會爲global_step作自增操做 |
tf.train.Optimizer.compute_gradients(loss,var_list=None, gate_gradients=1, aggregation_method=None, colocate_gradients_with_ops=False, grad_loss=None) |
對var_list中的變量計算loss的梯度 該函數爲函數minimize()的第一部分,返回一個以元組(gradient, variable)組成的列表 |
tf.train.Optimizer.apply_gradients(grads_and_vars, global_step=None, name=None) |
將計算出的梯度應用到變量上,是函數minimize()的第二部分,返回一個應用指定的梯度的操做Operation,對global_step作自增操做 |
tf.train.Optimizer.get_name() |
獲取名稱 |
▷ class tf.train.Optimizer
用法
1 # Create an optimizer with the desired parameters. 2 opt = GradientDescentOptimizer(learning_rate=0.1) 3 # Add Ops to the graph to minimize a cost by updating a list of variables. 4 # "cost" is a Tensor, and the list of variables contains tf.Variable objects. 5 opt_op = opt.minimize(cost, var_list=<list of variables>) 6 # Execute opt_op to do one step of training: 7 opt_op.run()
▶▶在使用它們以前處理梯度
使用minimize()操做,該操做不只能夠計算出梯度,並且還能夠將梯度做用在變量上。若是想在使用它們以前處理梯度,能夠按照如下三步驟使用optimizer :
一、使用函數compute_gradients()計算梯度 二、按照本身的願望處理梯度 三、使用函數apply_gradients()應用處理事後的梯度
例如:
1 # 建立一個optimizer. 2 opt = GradientDescentOptimizer(learning_rate=0.1) 3 4 # 計算<list of variables>相關的梯度 5 grads_and_vars = opt.compute_gradients(loss, <list of variables>) 6 7 # grads_and_vars爲tuples (gradient, variable)組成的列表。 8 #對梯度進行想要的處理,好比cap處理 9 capped_grads_and_vars = [(MyCapper(gv[0]), gv[1]) for gv in grads_and_vars] 10 11 # 令optimizer運用capped的梯度(gradients) 12 opt.apply_gradients(capped_grads_and_vars)
▶▶選通梯度(Gating Gradients)
函數minimize() 與compute_gradients()都含有一個參數gate_gradient,用於控制在應用這些梯度時並行化的程度。
其值能夠取:GATE_NONE, GATE_OP 或 GATE_GRAPH
GATE_NONE : 並行地計算和應用梯度。提供最大化的並行執行,可是會致使有的數據結果沒有再現性。好比兩個matmul操做的梯度依賴輸入值,使用GATE_NONE可能會出現有一個梯度在其餘梯度以前便應用到某個輸入中,致使出現不可再現的(non-reproducible)結果
GATE_OP: 對於每一個操做Op,確保每個梯度在使用以前都已經計算完成。這種作法防止了那些具備多個輸入,而且梯度計算依賴輸入情形中,多輸入Ops之間的競爭狀況出現。
GATE_GRAPH: 確保全部的變量對應的全部梯度在他們任何一個被使用前計算完成。該方式具備最低級別的並行化程度,可是對於想要在應用它們任何一個以前處理完全部的梯度計算時頗有幫助的。
一些optimizer的之類,好比 MomentumOptimizer 和 AdagradOptimizer 分配和管理着額外的用於訓練的變量。這些變量稱之爲’Slots’,Slots有相應的名稱,能夠向optimizer訪問的slots名稱。有助於在log debug一個訓練算法以及報告slots狀態
操做 |
描述 |
---|---|
tf.train.Optimizer.get_slot_names() |
返回一個由Optimizer所建立的slots的名稱列表 |
tf.train.Optimizer.get_slot(var, name) |
返回一個name所對應的slot,name是由Optimizer爲var所建立 var爲用於傳入 minimize() 或 apply_gradients()的變量 |
class tf.train.GradientDescentOptimizer |
使用梯度降低算法的Optimizer |
tf.train.GradientDescentOptimizer.__init__(learning_rate, use_locking=False, name=’GradientDescent’) |
構建一個新的梯度降低優化器(Optimizer) |
class tf.train.AdadeltaOptimizer |
使用Adadelta算法的Optimizer |
tf.train.AdadeltaOptimizer.__init__(learning_rate=0.001, rho=0.95, epsilon=1e-08, use_locking=False, name=’Adadelta’) |
建立Adadelta優化器 |
class tf.train.AdagradOptimizer |
使用Adagrad算法的Optimizer |
tf.train.AdagradOptimizer.__init__(learning_rate, initial_accumulator_value=0.1, use_locking=False, name=’Adagrad’) |
建立Adagrad優化器 |
class tf.train.MomentumOptimizer |
使用Momentum算法的Optimizer |
tf.train.MomentumOptimizer.__init__(learning_rate, momentum, use_locking=False, name=’Momentum’, use_nesterov=False) |
建立momentum優化器 momentum:動量,一個tensor或者浮點值 |
class tf.train.AdamOptimizer |
使用Adam 算法的Optimizer |
tf.train.AdamOptimizer.__init__(learning_rate=0.001, beta1=0.9, beta2=0.999, epsilon=1e-08, use_locking=False, name=’Adam’) |
建立Adam優化器 |
class tf.train.FtrlOptimizer |
使用FTRL 算法的Optimizer |
tf.train.FtrlOptimizer.__init__(learning_rate, learning_rate_power=-0.5, initial_accumulator_value=0.1, l1_regularization_strength=0.0, l2_regularization_strength=0.0, use_locking=False, name=’Ftrl’) |
建立FTRL算法優化器 |
class tf.train.RMSPropOptimizer |
使用RMSProp算法的Optimizer |
tf.train.RMSPropOptimizer.__init__(learning_rate, decay=0.9, momentum=0.0, epsilon=1e-10, use_locking=False, name=’RMSProp’) |
建立RMSProp算法優化器 |
▷ tf.train.AdamOptimizer
Adam 的基本運行方式,首先初始化:
m_0 <- 0 (Initialize initial 1st moment vector) v_0 <- 0 (Initialize initial 2nd moment vector) t <- 0 (Initialize timestep)
在論文中的 section2 的末尾所描述了更新規則,該規則使用梯度g來更新變量:
t <- t + 1 lr_t <- learning_rate * sqrt(1 - beta2^t) / (1 - beta1^t) m_t <- beta1 * m_{t-1} + (1 - beta1) * g v_t <- beta2 * v_{t-1} + (1 - beta2) * g * g variable <- variable - lr_t * m_t / (sqrt(v_t) + epsilon)
其中epsilon 的默認值1e-8可能對於大多數狀況都不是一個合適的值。例如,當在ImageNet上訓練一個 Inception network時比較好的選擇爲1.0或者0.1。
須要注意的是,在稠密數據中即使g爲0時, m_t, v_t 以及variable都將會更新。而在稀疏數據中,m_t, v_t 以及variable不被更新且值爲零。
TensorFlow 提供了計算給定tf計算圖的求導函數,並在圖的基礎上增長節點。優化器(optimizer )類能夠自動的計算網絡圖的導數,可是優化器中的建立器(creators )或者專業的人員能夠經過本節所述的函數調用更底層的方法。
操做 |
描述 |
---|---|
tf.gradients(ys, xs, grad_ys=None, name=’gradients’, colocate_gradients_with_ops=False, gate_gradients=False, aggregation_method=None) |
構建一個符號函數,計算ys關於xs中x的偏導的和, 返回xs中每一個x對應的sum(dy/dx) |
tf.stop_gradient(input, name=None) |
中止計算梯度, 在EM算法、Boltzmann機等可能會使用到 |
tf.clip_by_value(t, clip_value_min, clip_value_max, name=None) |
基於定義的min與max對tesor數據進行截斷操做, 目的是爲了應對梯度爆發或者梯度消失的狀況 |
tf.clip_by_norm(t, clip_norm, axes=None, name=None) |
使用L2範式標準化tensor最大值爲clip_norm 返回 t * clip_norm / l2norm(t) |
tf.clip_by_average_norm(t, clip_norm, name=None) |
使用平均L2範式規範tensor數據t, 並以clip_norm爲最大值 返回 t * clip_norm / l2norm_avg(t) |
tf.clip_by_global_norm(t_list, clip_norm, use_norm=None, name=None) |
返回t_list[i] * clip_norm / max(global_norm, clip_norm) 其中global_norm = sqrt(sum([l2norm(t)**2 for t in t_list])) |
tf.global_norm(t_list, name=None) |
返回global_norm = sqrt(sum([l2norm(t)**2 for t in t_list])) |
操做 |
描述 |
---|---|
tf.train.exponential_decay(learning_rate, global_step, decay_steps, decay_rate, staircase=False, name=None) |
對學習率進行指數衰退 |
▷ tf.train.exponential_decay
1 #該函數返回如下結果 2 decayed_learning_rate = learning_rate * 3 decay_rate ^ (global_step / decay_steps) 4 ##例: 以0.96爲基數,每100000 步進行一次學習率的衰退 5 global_step = tf.Variable(0, trainable=False) 6 starter_learning_rate = 0.1 7 learning_rate = tf.train.exponential_decay(starter_learning_rate, global_step, 8 100000, 0.96, staircase=True) 9 # Passing global_step to minimize() will increment it at each step. 10 learning_step = ( 11 tf.train.GradientDescentOptimizer(learning_rate) 12 .minimize(...my loss..., global_step=global_step) 13 )
一些訓練優化算法,好比GradientDescent 和Momentum 在優化過程當中即可以使用到移動平均方法。使用移動平均經常能夠較明顯地改善結果。
操做 |
描述 |
---|---|
class tf.train.ExponentialMovingAverage |
將指數衰退加入到移動平均中 |
tf.train.ExponentialMovingAverage.apply(var_list=None) |
對var_list變量保持移動平均 |
tf.train.ExponentialMovingAverage.average_name(var) |
返回var均值的變量名稱 |
tf.train.ExponentialMovingAverage.average(var) |
返回var均值變量 |
tf.train.ExponentialMovingAverage.variables_to_restore(moving_avg_variables=None) |
返回用於保存的變量名稱的映射 |
▷ tf.train.ExponentialMovingAverage
1 # Example usage when creating a training model: 2 # Create variables. 3 var0 = tf.Variable(...) 4 var1 = tf.Variable(...) 5 # ... use the variables to build a training model... 6 ... 7 # Create an op that applies the optimizer. This is what we usually 8 # would use as a training op. 9 opt_op = opt.minimize(my_loss, [var0, var1]) 10 11 # Create an ExponentialMovingAverage object 12 ema = tf.train.ExponentialMovingAverage(decay=0.9999) 13 14 # Create the shadow variables, and add ops to maintain moving averages 15 # of var0 and var1. 16 maintain_averages_op = ema.apply([var0, var1]) 17 18 # Create an op that will update the moving averages after each training 19 # step. This is what we will use in place of the usual training op. 20 with tf.control_dependencies([opt_op]): 21 training_op = tf.group(maintain_averages_op) 22 23 ...train the model by running training_op... 24 25 #Example of restoring the shadow variable values: 26 # Create a Saver that loads variables from their saved shadow values. 27 shadow_var0_name = ema.average_name(var0) 28 shadow_var1_name = ema.average_name(var1) 29 saver = tf.train.Saver({shadow_var0_name: var0, shadow_var1_name: var1}) 30 saver.restore(...checkpoint filename...) 31 # var0 and var1 now hold the moving average values
▷ tf.train.ExponentialMovingAverage.variables_to_restore
1 variables_to_restore = ema.variables_to_restore() 2 saver = tf.train.Saver(variables_to_restore)
查看queue中,queue相關的內容,瞭解tensorflow中隊列的運行方式。
操做 |
描述 |
---|---|
class tf.train.Coordinator |
線程的協調器 |
tf.train.Coordinator.clear_stop() |
清除中止標記 |
tf.train.Coordinator.join(threads=None, stop_grace_period_secs=120) |
等待線程終止 threads:一個threading.Threads的列表,啓動的線程,將額外加入到registered的線程中 |
tf.train.Coordinator.register_thread(thread) |
Register一個用於join的線程 |
tf.train.Coordinator.request_stop(ex=None) |
請求線程結束 |
tf.train.Coordinator.should_stop() |
檢查是否被請求中止 |
tf.train.Coordinator.stop_on_exception() |
上下文管理器,當一個例外出現時請求中止 |
tf.train.Coordinator.wait_for_stop(timeout=None) |
等待Coordinator提示中止進程 |
class tf.train.QueueRunner |
持有一個隊列的入列操做列表,用於線程中運行 queue:一個隊列 enqueue_ops: 用於線程中運行的入列操做列表 |
tf.train.QueueRunner.create_threads(sess, coord=None, daemon=False, start=False) |
建立運行入列操做的線程,返回一個線程列表 |
tf.train.QueueRunner.from_proto(queue_runner_def) |
返回由queue_runner_def建立的QueueRunner對象 |
tf.train.add_queue_runner(qr, collection=’queue_runners’) |
增長一個QueueRunner到graph的收集器(collection )中 |
tf.train.start_queue_runners(sess=None, coord=None, daemon=True, start=True, collection=’queue_runners’) |
啓動全部graph收集到的隊列運行器(queue runners) |
▷ class tf.train.Coordinator
1 #Coordinator的使用,用於多線程的協調 2 try: 3 ... 4 coord = Coordinator() 5 # Start a number of threads, passing the coordinator to each of them. 6 ...start thread 1...(coord, ...) 7 ...start thread N...(coord, ...) 8 # Wait for all the threads to terminate, give them 10s grace period 9 coord.join(threads, stop_grace_period_secs=10) 10 except RuntimeException: 11 ...one of the threads took more than 10s to stop after request_stop() 12 ...was called. 13 except Exception: 14 ...exception that was passed to coord.request_stop()
▷ tf.train.Coordinator.stop_on_exception()
1 with coord.stop_on_exception(): 2 # Any exception raised in the body of the with 3 # clause is reported to the coordinator before terminating 4 # the execution of the body. 5 ...body... 6 #等價於 7 try: 8 ...body... 9 exception Exception as ex: 10 coord.request_stop(ex)
能夠閱讀TensorFlow的分佈式學習框架簡介 查看更多tensorflow分佈式細節。
操做 |
描述 |
---|---|
class tf.train.Server |
一個進程內的tensorflow服務,用於分佈式訓練 |
tf.train.Server.init(server_or_cluster_def, job_name=None, task_index=None, protocol=None, config=None, start=True) |
建立一個新的服務,其中job_name, task_index, 和protocol爲可選參數, 優先級高於server_or_cluster_def中相關信息 server_or_cluster_def : 爲一個tf.train.ServerDef 或 tf.train.ClusterDef 協議(protocol)的buffer, 或者一個tf.train.ClusterSpec對象 |
tf.train.Server.create_local_server(config=None, start=True) |
建立一個新的運行在本地主機的單進程集羣 |
tf.train.Server.target |
返回tf.Session所鏈接的目標服務器 |
tf.train.Server.server_def |
返回該服務的tf.train.ServerDef |
tf.train.Server.start() |
開啓服務 |
tf.train.Server.join() |
阻塞直到服務已經關閉 |
# |
|
---|---|
class tf.train.Supervisor |
一個訓練輔助器,用於checkpoints模型以及計算的summaries。該監視器只是一個小的外殼(wrapper),用於Coordinator, a Saver, 和a SessionManager周圍 |
tf.train.Supervisor.__init__(graph=None, ready_op=0, is_chief=True, init_op=0, init_feed_dict=None, local_init_op=0, logdir=None, |
建立一個監視器Supervisor |
tf.train.Supervisor.managed_session(master=」, config=None, start_standard_services=True, close_summary_writer=True) |
返回一個管路session的上下文管理器 |
tf.train.Supervisor.prepare_or_wait_for_session(master=」, config=None, wait_for_checkpoint=False, max_wait_secs=7200, start_standard_services=True) |
確保model已經準備好 |
tf.train.Supervisor.start_standard_services(sess) |
爲sess啓動一個標準的服務 |
tf.train.Supervisor.start_queue_runners(sess, queue_runners=None) |
爲QueueRunners啓動一個線程,queue_runners爲一個QueueRunners列表 |
tf.train.Supervisor.summary_computed(sess, summary, global_step=None) |
指示計算的summary |
tf.train.Supervisor.stop(threads=None, close_summary_writer=True) |
中止服務以及協調器(coordinator),並無關閉session |
tf.train.Supervisor.request_stop(ex=None) |
參考Coordinator.request_stop() |
tf.train.Supervisor.should_stop() |
參考Coordinator.should_stop() |
tf.train.Supervisor.stop_on_exception() |
參考 Coordinator.stop_on_exception() |
tf.train.Supervisor.Loop(timer_interval_secs, target, args=None, kwargs=None) |
開啓一個循環器線程用於調用一個函數 每通過timer_interval_secs秒執行,target(*args, **kwargs) |
tf.train.Supervisor.coord |
返回監督器(Supervisor)使用的協調器(Coordinator ) |
# |
|
---|---|
class tf.train.SessionManager |
訓練的輔助器,用於從checkpoint恢復數據以及建立一個session |
tf.train.SessionManager.__init__(local_init_op=None, ready_op=None, graph=None, recovery_wait_secs=30) |
建立一個SessionManager |
tf.train.SessionManager.prepare_session(master, init_op=None, saver=None, checkpoint_dir=None, wait_for_checkpoint=False, max_wait_secs=7200, config=None, init_feed_dict=None, init_fn=None) |
建立一個session,並確保model能夠被使用 |
tf.train.SessionManager.recover_session(master, saver=None, checkpoint_dir=None, wait_for_checkpoint=False, max_wait_secs=7200, config=None) |
建立一個session,若是能夠的話,使用恢復方法建立 |
tf.train.SessionManager.wait_for_session(master, config=None, max_wait_secs=inf) |
建立一個session,並等待model準備完成 |
# |
|
---|---|
class tf.train.ClusterSpec |
將一個集羣表示爲一系列「tasks」,並整合至「jobs」中 |
tf.train.ClusterSpec.as_cluster_def() |
返回該cluster中一個tf.train.ClusterDef協議的buffer |
tf.train.ClusterSpec.as_dict() |
返回一個字典,由job名稱對應於網絡地址 |
tf.train.ClusterSpec.job_tasks(job_name) |
返回一個給定的job對應的task列表 |
tf.train.ClusterSpec.jobs |
返回該cluster的job名稱列表 |
tf.train.replica_device_setter(ps_tasks=0, ps_device=’/job:ps’, worker_device=’/job:worker’, merge_devices=True, cluster=None, ps_ops=None) |
返回一個設備函數(device function),以在創建一個副本graph的時候使用,設備函數(device function)用在with tf.device(device_function)中 |
▷ tf.train.Server
server = tf.train.Server(...) with tf.Session(server.target): # ...
▷ tf.train.Supervisor
相關參數:
ready_op : 一維 字符串 tensor。該tensor是用過監視器在prepare_or_wait_for_session()計算,檢查model是否準備好可使用。若是準備好,將返回一個空陣列,若是爲None,該model沒有被檢查。
is_chief : 若是爲True,建立一個主監視器用於負責初始化與模型的恢復,若爲False,則依賴主監視器。
init_op : 一個操做,用於模型不能恢復時的初始化操做。默認初始化全部操做
local_init_op : 可被全部監視器運行的初始化操做。
logdir : 設置log目錄
summary_op : 一個操做(Operation ),返回Summary 和事件logs,須要設置 logdir
saver : 一個Saver對象
save_summaries_secs : 保存summaries的間隔秒數
save_model_secs : 保存model的間隔秒數
checkpoint_basename : checkpoint保存的基本名稱
1 with tf.Graph().as_default(): 2 ...add operations to the graph... 3 # Create a Supervisor that will checkpoint the model in '/tmp/mydir'. 4 sv = Supervisor(logdir='/tmp/mydir') 5 # Get a TensorFlow session managed by the supervisor. 6 with sv.managed_session(FLAGS.master) as sess: 7 # Use the session to train the graph. 8 while not sv.should_stop(): 9 sess.run(<my_train_op>) 10 # 在上下文管理器with sv.managed_session()內,全部在graph的變量都被初始化。 11 # 或者說,一些服務器checkpoint相應模型並增長summaries至事件log中。 12 # 若是有例外發生,should_stop()將返回True
使用在多副本運行狀況中
要使用副本訓練已經部署在集羣上的相同程序,必須指定其中一個task爲主要,該task處理 initialization, checkpoints, summaries, 和recovery相關事物。其餘task依賴該task。
1 # Choose a task as the chief. This could be based on server_def.task_index, 2 # or job_def.name, or job_def.tasks. It's entirely up to the end user. 3 # But there can be only one *chief*. 4 is_chief = (server_def.task_index == 0) 5 server = tf.train.Server(server_def) 6 7 with tf.Graph().as_default(): 8 ...add operations to the graph... 9 # Create a Supervisor that uses log directory on a shared file system. 10 # Indicate if you are the 'chief' 11 sv = Supervisor(logdir='/shared_directory/...', is_chief=is_chief) 12 # Get a Session in a TensorFlow server on the cluster. 13 with sv.managed_session(server.target) as sess: 14 # Use the session to train the graph. 15 while not sv.should_stop(): 16 sess.run(<my_train_op>)
若是有task崩潰或重啓,managed_session() 將檢查是否Model被初始化。若是已經初始化,它只須要建立一個session並將其返回至正在訓練的正常代碼中。若是model須要被初始化,主task將對它進行從新初始化,而其餘task將等待模型初始化完成。
注意:該程序方法同樣適用於單進程的work,該單進程標註本身爲主要的便行
▷ supervisor中master的字符串形式
不管運行在本機或者集羣上,均可以使用如下值設定master flag:
▷ supervisor高級用法
#例如: 開啓一個線程用於打印loss. 設置每60秒該線程運行一次,咱們使用sv.loop()
1 ... 2 sv = Supervisor(logdir='/tmp/mydir') 3 with sv.managed_session(FLAGS.master) as sess: 4 sv.loop(60, print_loss, (sess)) 5 while not sv.should_stop(): 6 sess.run(my_train_op)
在chief中每100個step,建立summaries
1 # Create a Supervisor with no automatic summaries. 2 sv = Supervisor(logdir='/tmp/mydir', is_chief=is_chief, summary_op=None) 3 # As summary_op was None, managed_session() does not start the 4 # summary thread. 5 with sv.managed_session(FLAGS.master) as sess: 6 for step in xrange(1000000): 7 if sv.should_stop(): 8 break 9 if is_chief and step % 100 == 0: 10 # Create the summary every 100 chief steps. 11 sv.summary_computed(sess, sess.run(my_summary_op)) 12 else: 13 # Train normally 14 sess.run(my_train_op)
▷ tf.train.Supervisor.managed_session
1 def train(): 2 sv = tf.train.Supervisor(...) 3 with sv.managed_session(<master>) as sess: 4 for step in xrange(..): 5 if sv.should_stop(): 6 break 7 sess.run(<my training op>) 8 ...do other things needed at each training step...
▷ tf.train.SessionManager
1 with tf.Graph().as_default(): 2 ...add operations to the graph... 3 # Create a SessionManager that will checkpoint the model in '/tmp/mydir'. 4 sm = SessionManager() 5 sess = sm.prepare_session(master, init_op, saver, checkpoint_dir) 6 # Use the session to train the graph. 7 while True: 8 sess.run(<my_train_op>) 9 #其中prepare_session()初始化和恢復一個模型參數。 10 11 #另外一個進程將等待model準備完成,代碼以下 12 with tf.Graph().as_default(): 13 ...add operations to the graph... 14 # Create a SessionManager that will wait for the model to become ready. 15 sm = SessionManager() 16 sess = sm.wait_for_session(master) 17 # Use the session to train the graph. 18 while True: 19 sess.run(<my_train_op>) 20 #wait_for_session()等待一個model被其餘進程初始化
▷ tf.train.ClusterSpec
一個tf.train.ClusterSpec表示一系列的進程,這些進程都參與分佈式tensorflow的計算。每個 tf.train.Server都在一個獨有的集羣中構建。
建立一個具備兩個jobs及其5個tasks的集羣們須要定義從job名稱列表到網絡地址列表之間的映射。
1 cluster = tf.train.ClusterSpec({"worker": ["worker0.example.com:2222", 2 "worker1.example.com:2222", 3 "worker2.example.com:2222"], 4 "ps": ["ps0.example.com:2222", 5 "ps1.example.com:2222"]})
▷ tf.train.replica_device_setter
1 # To build a cluster with two ps jobs on hosts ps0 and ps1, and 3 worker 2 # jobs on hosts worker0, worker1 and worker2. 3 cluster_spec = { 4 "ps": ["ps0:2222", "ps1:2222"], 5 "worker": ["worker0:2222", "worker1:2222", "worker2:2222"]} 6 with tf.device(tf.replica_device_setter(cluster=cluster_spec)): 7 # Build your graph 8 v1 = tf.Variable(...) # assigned to /job:ps/task:0 9 v2 = tf.Variable(...) # assigned to /job:ps/task:1 10 v3 = tf.Variable(...) # assigned to /job:ps/task:0 11 # Run compute
咱們能夠在一個session中獲取summary操做的輸出,並將其傳輸到SummaryWriter以添加至一個事件記錄文件中。
操做 |
描述 |
---|---|
tf.scalar_summary(tags, values, collections=None, name=None) |
輸出一個標量值的summary協議buffer tag的shape須要與values的相同,用來作summaries的tags,爲字符串 |
tf.image_summary(tag, tensor, max_images=3, collections=None, name=None) |
輸出一個圖像tensor的summary協議buffer |
tf.audio_summary(tag, tensor, sample_rate, max_outputs=3, collections=None, name=None) |
輸出一個音頻tensor的summary協議buffer |
tf.histogram_summary(tag, values, collections=None, name=None) |
輸出一個直方圖的summary協議buffer |
tf.nn.zero_fraction(value, name=None) |
返回0在value中的小數比例 |
tf.merge_summary(inputs, collections=None, name=None) |
合併summary |
tf.merge_all_summaries(key=’summaries’) |
合併在默認graph中手機的summaries |
▶▶將記錄彙總寫入文件中(Adding Summaries to Event Files)
操做 |
描述 |
---|---|
class tf.train.SummaryWriter |
將summary協議buffer寫入事件文件中 |
tf.train.SummaryWriter.__init__(logdir, graph=None, max_queue=10, flush_secs=120, graph_def=None) |
建立一個SummaryWriter實例以及新建一個事件文件 |
tf.train.SummaryWriter.add_summary(summary, global_step=None) |
將一個summary添加到事件文件中 |
tf.train.SummaryWriter.add_session_log(session_log, global_step=None) |
添加SessionLog到一個事件文件中 |
tf.train.SummaryWriter.add_event(event) |
添加一個事件到事件文件中 |
tf.train.SummaryWriter.add_graph(graph, global_step=None, graph_def=None) |
添加一個Graph到時間文件中 |
tf.train.SummaryWriter.add_run_metadata(run_metadata, tag, global_step=None) |
爲一個單一的session.run()調用添加一個元數據信息 |
tf.train.SummaryWriter.flush() |
刷新時間文件到硬盤中 |
tf.train.SummaryWriter.close() |
將事件問價寫入硬盤中並關閉該文件 |
tf.train.summary_iterator(path) |
一個用於從時間文件中讀取時間協議buffer的迭代器 |
▷ tf.train.SummaryWriter
建立一個SummaryWriter 和事件文件。若是咱們傳遞一個Graph進入該構建器中,它將被添加到事件文件當中,這一點與使用add_graph()具備相同功能。
TensorBoard 將從事件文件中提取該graph,並將其顯示。因此咱們能直觀地看到咱們創建的graph。咱們一般從咱們啓動的session中傳遞graph:
1 ...create a graph... 2 # Launch the graph in a session. 3 sess = tf.Session() 4 # Create a summary writer, add the 'graph' to the event file. 5 writer = tf.train.SummaryWriter(<some-directory>, sess.graph)
▷ tf.train.summary_iterator
1 #打印時間文件中的內容 2 for e in tf.train.summary_iterator(path to events file): 3 print(e) 4 5 #打印指定的summary值 6 # This example supposes that the events file contains summaries with a 7 # summary value tag 'loss'. These could have been added by calling 8 # `add_summary()`, passing the output of a scalar summary op created with 9 # with: `tf.scalar_summary(['loss'], loss_tensor)`. 10 for e in tf.train.summary_iterator(path to events file): 11 for v in e.summary.value: 12 if v.tag == 'loss': 13 print(v.simple_value)
操做 |
描述 |
---|---|
tf.train.global_step(sess, global_step_tensor) |
一個用於獲取全局step的小輔助器 |
tf.train.write_graph(graph_def, logdir, name, as_text=True) |
將一個graph proto寫入一個文件中 |
# |
|
|
:— |
class tf.train.LooperThread |
可重複地執行代碼的線程 |
tf.train.LooperThread.init(coord, timer_interval_secs, target=None, args=None, kwargs=None) |
建立一個LooperThread |
tf.train.LooperThread.is_alive() |
返回是否該線程是活躍的 |
tf.train.LooperThread.join(timeout=None) |
等待線程結束 |
tf.train.LooperThread.loop(coord, timer_interval_secs, target, args=None, kwargs=None) |
啓動一個LooperThread,用於週期地調用某個函數 調用函數target(args) |
tf.py_func(func, inp, Tout, stateful=True, name=None) |
將python函數包裝成tf中操做節點 |
▷ tf.train.global_step
1 # Creates a variable to hold the global_step. 2 global_step_tensor = tf.Variable(10, trainable=False, name='global_step') 3 # Creates a session. 4 sess = tf.Session() 5 # Initializes the variable. 6 sess.run(global_step_tensor.initializer) 7 print('global_step: %s' % tf.train.global_step(sess, global_step_tensor)) 8 9 global_step: 10
▷ tf.train.write_graph
1 v = tf.Variable(0, name='my_variable') 2 sess = tf.Session() 3 tf.train.write_graph(sess.graph_def, '/tmp/my-model', 'train.pbtxt')
▷ tf.py_func
1 #tf.py_func(func, inp, Tout, stateful=True, name=None) 2 #func:爲一個python函數 3 #inp:爲輸入函數的參數,Tensor列表 4 #Tout: 指定func返回的輸出的數據類型,是一個列表 5 def my_func(x): 6 # x will be a numpy array with the contents of the placeholder below 7 return np.sinh(x) 8 inp = tf.placeholder(tf.float32, [...]) 9 y = py_func(my_func, [inp], [tf.float32])
TensorFlow 提供了一個方便的繼承unittest.TestCase類的方法,該類增長有關TensorFlow 測試的方法。以下例子:
1 import tensorflow as tf 2 3 class SquareTest(tf.test.TestCase): 4 5 def testSquare(self): 6 with self.test_session(): 7 x = tf.square([2, 3]) 8 self.assertAllEqual(x.eval(), [4, 9]) 9 10 if __name__ == '__main__': 11 tf.test.main()
操做 |
描述 |
---|---|
tf.test.main() |
運行全部的單元測試 |
tf.test.assert_equal_graph_def(actual, expected) |
斷言 兩個GraphDefs 是否幾乎同樣 |
tf.test.get_temp_dir() |
返回測試期間使用的臨時目錄 |
tf.test.is_built_with_cuda() |
返回是否Tensorflow支持CUDA(GPU)的build |
可對比compute_gradient 和compute_gradient_error函數的用法
操做 |
描述 |
---|---|
tf.test.compute_gradient(x, x_shape, y, y_shape, x_init_value=None, delta=0.001, init_targets=None) |
計算並返回理論的和數值的Jacobian矩陣 |
tf.test.compute_gradient_error(x, x_shape, y, y_shape, x_init_value=None, delta=0.001, init_targets=None) |
計算梯度的error。在計算所得的與數值估計的Jacobian中 爲dy/dx計算最大的error |
注:本教程不是原創 轉自http://blog.csdn.net/lenbow/article/details/52218551