(1)將圖的定義和圖的運行徹底分開,所以Tensorflow被認爲是一個「符合主義」的庫python
(2)Tensorflow中涉及的運算都要放在圖中,而圖的運行只發生在會話(session)中。開啓會話後,就能夠用數據去填充節點,進行運算。關閉會話後,就不能繼續計算了。所以會話提供了操做運算和Tensor求值的環境git
Tensorflow的邊有兩種鏈接關係:數據依賴和控制依賴。其中實線邊表示數據依賴,表明數據,即張量。張量在數據流圖中從前日後流動一遍就完成了一次前向傳播,而殘差從後向前流動一編就完成了一次反向傳播。編程
還有一種特殊邊,通常畫爲虛線邊,稱爲控制依賴,能夠用於控制操做的運算,這被用來確保happens-before關係,這類邊上沒有數據流過,但源節點必須在目的節點開始執行前完成執行。bootstrap
Tensorflow支持的張量數據屬性數組
數據類型 Python類型網絡
DT_FLOAT tf.float32 32位浮點型session
DT_DOUBLE tf.float64 64位浮點型多線程
DT_INT64 tf.int64 64位有符號整型app
DT_INT32 tf.int32 32位有符號整型dom
DT_INT16 tf.int16 16位有符號整型
DT_INT8 tf.int8 8位有符號整型
DT_UINT8 tf.uint8 8位無符號整型
DT_STRING tf.string 可變長度的字節數組
DT_BOOL tf.bool 布爾型
DT_COMPLEX64 tf.complex64 由兩個32位浮點數組成的複數
DT_QINT32 tf.qint32 用於量化操做的32位有符號整型
DT_QINT8 tf.qint8 用於量化操做的8位有符號整型
DT_QUINT8 tf.quint8 用於量化操做的8位無符號整型
圖中的節點又稱爲算子,它表明一個操做(op),通常用來表示施加的數學運算,也能夠表示數據輸入(feed in)的起點以及輸出(push out)的終點,或者是讀取/寫入持久變量的終點
下面列舉了一些Tensorflow實現的算子
類別 示例
數學運算操做 Add,Sub,Mul,Div,Exp,Log,Greater,Less,Equal
數組運算操做 Concat,Slice,Split,Constant,Rank,Shape,Shuffle
矩陣運算操做 MatMul,MatrixInverse,MatrixDeterminant
有狀態的操做 Variable,Assign,AssignAdd
神經網絡構建操做 SoftMax,Sigmoid,ReLU,Convolution2D,MaxPool
檢查點操做 Save,Restore
隊列和同步操做 Enqueue,Dequeue,MutexAcquire,MutexRelease
控制張量流動的操做 Merge,Switch,Enter,Leave,NextIteration
除了邊和節點,Tensorflow還涉及其餘一些概念,如圖、會話、設備、變量、內核等。
1.圖
把操做任務描述成有向無環圖,那麼如何構建圖呢?構建圖的第一步就是建立各個節點
2.會話
啓動圖的第一步是建立一個Session對象。會話(Session)提供在圖中執行操做的一些方法。通常的模式是,創建會話,此時會產生一張空圖,在會話中添加節點和邊,造成一張圖,而後執行
在調用Session對象的run()方法來執行圖時,傳入一些Tensor,這個過程叫填充(feed);返回的結果類型根據輸入的類型而定,這個過程叫取回(fetch)
會話是圖交互的一個橋樑,一個會話能夠有多個圖,會話能夠修改圖的結構,也能夠往圖中注入數據進行計算。所以,會話主要有兩個API接口:Extend和Run.Extend操做是在Graph中添加節點和邊,Run操做是輸入計算的節點和填充必要的數據後,進行運算,並輸出運算結果。
3.設備
設備(device)是指一塊能夠用來運算而且擁有本身的地址空間的硬件,如CPU和GPU.Tensorflow爲了實現分佈式執行操做,充分利用計算資源,能夠明確指定操做在哪一個設備上執行。
4.變量
變量是一種特殊的數據,它在圖中有固定的位置,不像普通張量那樣能夠流動。例如,建立一個變量張量,使用tf.Variable()構造函數,這個構造函數須要一個初始值
Tensorflow還提供填充機制,能夠在構建圖時使用tf.placeholder()臨時替代任意操做的張量,在調用Session對象的run()去執行圖時,使用填充數據做爲調用的參數,調用結束後,填充數據就消失。
import tensorflow as tf input1=tf.placeholder(tf.float32) input2=tf.placeholder(tf.float32) output=tf.multiply(input1,input2) with tf.Session() as sess: print(sess.run([output],feed_dict={input1:[7.],input2:[2.]}))
5.內核
咱們知道操做是對抽象操做的一個統稱,而內核則是可以運行在特定設備(如CPU,GPU)上的一種對操做的實現。所以,同一個操做可能會對應多個內核。
當自定義一個操做時,須要把新操做和內核經過註冊的方式添加到系統中。
Tensorflow的計算表現位數據流圖,因此tf.Graph類中包含一系列表示計算的操做對象(tf.Operation),以及在操做之間流動的數據——張量對象(tf.Tensor)。與圖相關的API均位於tf.Graph類中
tf.Graph._init_() 建立一個空圖
tf.Graph.as_default() 將某圖設置爲默認圖,並返回一個上下文管理器。若是不顯式添加一個默認圖,系統會自動設置一個全局的默認圖。所設置的默認圖,在模塊範圍內定義的節點都將默認加入默認圖中
tf.Graph.device(device_name_or_function) 定義運行圖所使用的設備,並返回一個上下文管理器
tf.Graph.name_scope(name) 爲節點建立層次化的名稱,並返回一個上下文管理器。
tf.Operation類表明圖中的一個節點,用於計算張量數據。該類型由節點構造器產生。
與操做相關的API均位於tf.Operation類中
tf.Operation.name 操做名稱
tf.Operation.type 操做類型
tf.Operation.inputs 操做的輸入
tf.Operation.outputs 操做的輸出
tf.Operation.control_inputs 操做的依賴
tf.Operation.run(feed_dict=None,session=None) 在會話中運行該操做
tf.Operation.get_attr(name) 獲取操做的屬性值
tf.Tensor類是操做輸出的符號句柄,它不包含操做i輸出的值,而是提供了一種在tf.Session中計算這些值得方法。這樣就能夠在操做之間構建一個數據流鏈接,使Tensorflow可以執行一個表示大量多步計算的圖形
tf.Tensor.dtype 張量的數據類型
tf.Tensor.name 張量的名稱
tf.Tensor.value_index 張量在操做輸出中的索引
tf.Tensor.graph 張量所在的圖
tf.Tensor.op 產生該張量的操做
tf.Tensor.consumers() 返回使用該張量的操做列表
tf.Tensor.eval(feed_dict,session=None) 在會話中求張量的值,須要使用sess.as_default()或者eval(session=sess)
tf.Tensor.get_shape() 返回用於表示張量的形狀(維度)的類Tensorshape
tf.Tensor.set_shape() 更新張量的形狀
tf.Tensor.device 設置計算該張量的設備
可視化時,須要在程序中給必要的節點添加摘要(summary),摘要會收集該節點的數據,並標記上第幾步、時間戳等標識,寫入事件文件(event file)中。tf.summary.FileWriter類用於在目錄中建立事件文件,而且向文件中添加摘要和事件,用來在TensorBoard中展現
可視化經常使用API
tf.summary.FileWriter._init_(logdir,graph=None,max_queue=10,flush_secs=120,graph_def=None) 建立FileWriter和事件文件,會在logdir中建立一個新的事件文件
tf.summary.FileWriter.add_summary(summary,global_step=None) 將摘要添加到事件文件
tf.summary.FileWriter.add_event(event) 向事件文件中添加一個事件
tf.summary.FileWriter.add_graph(graph,global_step=None,graph_def=None) 向事件文件中添加一個圖
tf.summary.FileWriter.get_logdir 獲取事件文件的路徑
tf.summary.FileWriter.flush() 將全部事件都寫入磁盤
tf.summary.FileWriter.close() 將事件寫入磁盤,並關閉文件操做符
tf.summary.scalar(name,tensor,collections=None) 輸出包含單個標量值的摘要
tf.summary.histogram(name,values,collections=None) 輸出包含直方圖的摘要
tf.summary.audio(name,tensor,sample_rate,max_outputs=3,collections=None) 輸出包含音頻的摘要
tf.summary.image(name,tensor,max_outputs=3,collections=None) 輸出包含圖片的摘要
tf.summary.merge(inputs,collections=None,name=None) 合併摘要,包含全部輸入摘要的值
在Tensorflow中有兩個做用域(scope),一個是name_scope,另外一個是variable_scope.二者的區別是,variable_scope主要是給variable_name加前綴的,也能夠給op_name加前綴;name_scope是給op_name加前綴。
variable_scope變量做用域機制在Tensorflow中主要由兩部分組成:
v=tf.get_variable(name,shape,dtype,initializer)#經過所給的名字建立或是返回一個變量
tf.variable_scope(<scope_name>)#爲變量指定命名空間
當tf.get_variable_scope().reuse==False時,variable_scope做用域只能用來建立新變量
import tensorflow as tf with tf.variable_scope("foo"): v=tf.get_variable("v",[1]) v2=tf.get_variable("v",[1]) assert(v.name=="foo/v:0")
上面這個程序會報錯,由於v這個變量已經被定義過了,但tf.get_variable_scope()。reuse默認爲False,因此不能重用
當tf.get_variable_scope().reuse==True時,做用域能夠共享變量
import tensorflow as tf with tf.variable_scope("foo") as scope: v=tf.get_variable("v",[1]) with tf.variable_scope("foo",reuse=True): v1=tf.get_variable("v",[1]) assert(v1==v)
1.獲取變量做用域
能夠直接經過tf.Variable_scope()來獲取變量做用域
若是在開啓一個變量做用域裏使用以前預先定義的一個做用域,則會跳過當前變量的做用域,保持預先存在的做用域不變
import tensorflow as tf with tf.variable_scope("foo") as foo_scope: assert (foo_scope.name=="foo") with tf.variable_scope("bar"): with tf.variable_scope("baz") as other_scope: assert (other_scope.name=="bar/baz") with tf.variable_scope(foo_scope) as foo_scope2: assert(foo_scope2.name=="foo") #保持不變
變量做用域能夠默認攜帶一個初始化器,在這個做用域中的子做用域或變量均可以繼承或者重寫父做用域初始化器中的值
import tensorflow as tf with tf.variable_scope("foo",initializer=tf.constant_initializer(0.4)): v=tf.get_variable("v",[1]) assert(v.eval()==0.4) w=tf.get_variable("w",[1],initializer=tf.constant_initializer(0.3)) assert(w.eval()==0.3) with tf.variable_scope("bar"): v=tf.get_variable("v",[1]) assert(v.eval()==0.4) with tf.variable_scope("baz",initializer=tf.constant_initializer(0.2)): v=tf.get_variable("v",[1]) assert(v.eval()==0.2)
上面講的是variable_name,那對於op_name呢?在variable_scope做用域下的操做,也會加上前綴
import tensorflow as tf with tf.variable_scope("foo"): x=1+tf.get_variable("v",[1]) assert x.op.name=="foo/add"
批標準化是爲了克服神經網絡層數加深致使難以訓練而誕生的。咱們知道,深度神經網絡隨着網絡深度加深,訓練起來會愈來愈困難,收斂速度會很慢,經常會致使梯度彌散。
統計機器學習中有一個ICS理論,這是一個經典假設:源域和目標域的數據分佈是一致的。也就是說,訓練數據和測試數據是知足相同分佈的。這是經過訓練數據得到的模型可以在測試集得到好的效果的一個基本保障
covariate shift是指訓練集的樣本數據和目標樣本數據集分佈不一致時,訓練獲得的模型沒法很好地泛化。它是分佈不一致假設之下地一個分支問題,也就是指源域和目標域的條件機率是一致的,可是其邊緣機率不一樣。的確,對於神經網絡的各層輸出,在通過了層內操做以後,各層輸出分佈就會與對應的輸入信號分佈不一樣,並且差別會隨着網絡深度增大而加大,可是每一層所指向的樣本標記仍然是不變的。
解決思路通常是根據訓練樣本和目標樣本的比例對訓練樣本作一個矯正。所以,經過引入批標準化來規範化某些層或者全部層的輸入,從而固定每層輸入信號的均值與方差
批標準化通常用在非線性映射(激活函數)以前,對x=Wu+b作規範化,使結果(輸出信號各個維度)的均值爲0,方差爲1.讓每一層的輸入有一個穩定的分佈會有利於網絡的訓練。
批標準化經過規範化讓激活函數分佈在線性區間,結果就是加大了梯度,讓模型更加大膽地進行梯度降低,因而有以下優勢:
1.加大探索的步長,加快收斂的速度
2.更容易跳出局部最小值
3.破壞原來的數據分佈,必定程度上緩解過擬合
所以,在遇到神經網絡收斂速度很慢或者梯度爆炸等沒法訓練的狀況下,均可以嘗試用批標準化來解決。
示例:
fc_mean,fc_var=tf.nn.moments(Wx_plus_b,axes=[0],) scale=tf.Variable(tf.ones([out_size])) shift=tf.Variable(tf.zeros([out_size])) epsilon=0.001 Wx_plus_b=tf.nn.batch_normalization(Wx_plus_b,fc_mean,fc_var,shift,scale,epsilon) #也就是在作Wx_plus_b=(Wx_plus_b-fc_mean)/tf.sqrt(fc_var+0.001) #Wx_plus_b=Wx_plus_b*scale+shift
激活函數運行時激活神經網絡中某一部分神經元,將激活系信息向後傳入下一層的神經網絡。神經網絡之因此能解決非線性問題,本質上就是激活函數加入了非線性因素,彌補了線性模型的表達力,把「激活的神經元的特徵」經過函數保留並映射到下一層。
由於神經網絡的數學基礎是到處可微,因此選取的激活函數要能保證數據輸入與輸出也是可微的。
激活函數不會更改輸入數據的維度,也就是輸入和輸出的維度是相同的。有以下的激活函數:
tf.nn.relu()
tf.nn.sigmoid()
tf.nn.tanh()
tf.nn.elu()
tf.nn.bias_add()
tf.nn.crelu()
tf.nn.relu6()
tf.nn.softplus()
tf.nn.softsign()
dropout函數。一個神經元將以機率keep_prob決定是否被抑制。若是被抑制,該神經元的輸出爲0,若是不被抑制,那麼該神經元的輸出值將被放大到原來的1/keep_prob倍
tf.nn.dropout()
卷積函數是構建神經網絡的重要支架,是在一批圖像上掃描二維過濾器。
tf.nn.convolution(inputs,filter,padding,strides=None,dilation_rate=None,name=None,data_format=None)這個函數計算N維卷積和
tf.nn.conv2d(input,filter,strides,padding,use_cudnn_on_gpu=None,data_format=None,name=None)這個函數的做用是對一個四維的輸入數據input和四維的卷積核filter進行操做,而後對輸入數據進行一個二維的卷積操做,最後獲得卷積以後的結果。
tf.nn.depthwise_conv2d(input,filter,strides,padding,rate=None,name=None,data_format=None)這個函數輸入張量的數據維度是[batch,in_height,in_width,in_channels],卷積核的維度是[filter_height,filter_width,in_channels,channel_multiplier],在通道in_channels上面的卷積深度是1,depthwise_conv2d函數將不一樣的卷積核獨立地應用在in_channels的每一個通道上(從通道1到channel_multiplier),而後把全部結果進行彙總。最後輸出通道的總數是in_channels*channel_multiplier
tf.nn.separable_conv2d(input,depthwise_filter,pointwise_filter,strides,padding,rate=None,name=None,data_format=None)是利用幾個分離的卷積核去作卷積。在這個API中,將應用一個二維的卷積核,在每一個通道上,以深度channel_multiplier進行卷積。
tf.nn.atrous_conv2d(value,filters,rate,padding,name=None)計算Atrous卷積,又稱孔卷積或者擴張卷積
tf.nn.conv2d_transpose(value,filter,output_shape,strides,padding=’SAME’,data_format=’NHWC’,name=None)在解卷積網絡中有時稱爲反捲積,但其實是conv2d的轉置
tf.nn.conv1d(value,filter,stride,padding,use_cudnn_on_gpu=None,data_format=None,name=None)這個函數與二維卷積相似。這個函數是用來計算給定三維的輸入核過濾器的狀況下的一維卷積。不一樣的是,它的輸入是三維,如[batch,in_width,in_channels]。stride是一個正整數,表明卷積核向右移動每一步的長度。
tf.nn.conv3d(input,filter,strides,padding,name=None)和二維卷積相似,這個函數用來計算給定五維的輸入和過濾器的狀況下的三維卷積
在神經網絡中,池化函數通常跟在卷積函數的下一層
tf.nn.avg_pool()計算池化區域中元素的平均值
tf,nn.max_pool()計算池化區域中元素的最大值
tf.nn.max_pool_with_argmax()計算池化區域中元素的最大值和該最大值所在的位置
tf.nn.avg_pool3d()和tf.max_pool3d()分別是在三維下的平均池化和最大池化
tf.nn.fractional_avg_pool()和tf.nn.fractional_max_pool()分別是三維下的平均池化和最大池化
tf.nn.pool()執行一個n維的池化操做
Tensorflow中常見的分類函數主要有:sigmoid_cross_entropy_with_logits、softmax、log_softmax、softmax_cross_entropy_with_logits等
tf.nn.sigmoid_cross_entropy_with_logits這個函數的輸入要格外注意,若是採用此函數做爲損失函數,在神經網絡的最後一層不須要進行sigmoid運算。
tf.nn.softmax 計算softmax激活
log_softmax 計算log softmax激活
softmax_cross_entropy_with_logits
如何加速神經網絡的訓練?Tensorflow提供了不少優化器
1.BGD法
批梯度降低,這種方法是利用現有參數對訓練集中的每個輸入生成一個估計輸出yi,而後跟實際輸出yi比較,統計全部偏差,求平均之後得平均偏差,以此做爲更新參數得依據
2.SGD法
隨機梯度降低,這種方法是將數據集拆分紅一個個批次,隨機抽取一個批次來計算並更新參數。(1)因爲抽取不可避免地梯度會有偏差,須要手動調整學習率,可是選擇合適地學習率又比較困難。尤爲是在訓練時,咱們經常想對常出現地特徵更新速度快一些,而對不常出現的特徵更新速度慢一些,而SGD在更新參數時對全部參數採用同樣的學習率,所以沒法知足要求。(2)SGD容易收斂到局部最優,而且在某些狀況下可能被困在鞍點
3.Momentum法
Momentum是模擬物理學中動量的概念,更新時在必定程度上保留以前的更新方向,利用當前的批次再微調本次的更新參數,所以引入一個新的變量v(速度),做爲前幾回梯度的累加。所以,Momentum可以更新學習率,再降低初期,先後梯度方向一致時,可以加速學習;在降低的中後期,在局部最小值的附近來回震盪時,可以抑制震盪,加快收斂。
4.Nesterov Momentum法
標準Momentum法首先計算一個梯度,而後在加速更新梯度的方向進行一個大的跳躍;Nesterov項首先在原來加速的梯度方向進行一個大的跳躍,而後再該位置計算梯度值,而後用這個梯度值修正最終的更新方向。
5.Adagrad法
Adagrad法可以自適應地爲各個參數分配不一樣的學習率,可以控制每一個維度的梯度方向。這種方法的優勢是可以實現學習率的自動更改:若是本次更新時梯度大,學習率就衰減得快一些;若是此次更新時梯度小,學習率衰減得就慢一些。
6.Adadelta法
Adagrad法仍然存在一些問題,其學習率單調遞減,在訓練的後期學習率很是小,而且須要手動設置一個全局的初始化學習率。Adadelta法用一階的方法,近似模擬二階牛頓法,解決了這些問題
7.RMSprop法
RMSprop法與Momentum法相似,經過引入一個衰減係數,使每一回合都衰減必定比例,在實踐中,對循環神經網絡(RNN)效果很好
8.Adam法
Adam法根據損失函數針對每一個參數的梯度的一階矩估計和二階矩估計動態調整每一個參數的學習率。
訓練好一個神經網絡後,咱們但願可以將其應用在預測數據上。那麼,如何把模型存儲起來呢?同時,對於一個已經存儲起來的模型,在將其應用在預測數據上時又如何加載呢?
Tensorflow的API提供瞭如下兩種方式來存儲和加載模型
(1)生成檢查點文件,擴展爲通常爲.ckpt,經過在tf.train.Saver對象上調用Saver.save()生成。它包含權重和其餘在程序中定義的變量,不包含圖結構。若是須要在另外一個程序中使用,須要從新建立圖形結構,並告訴Tensorflow如何處理這些權重。
(2)生成圖協議文件,這是一個二進制文件,擴展名通常爲.pb,用tf.train.write_graph()保存,只包含圖形結構,不包含權重,而後使用tf.import_graph_def()來加載圖形。
模型存儲主要是創建一個tf.train.Saver()來保存變量,而且指定保存的位置,通常模型的擴展名爲.ckpt。
存儲模型
import os import tensorflow as tf from tensorflow.examples.tutorials.mnist import input_data def init_weight(shape): return tf.Variable(tf.random_normal(shape,stdde=0.01)) def model(X,w_h,w_h2,w_o,p_keep_input,p_keep_hidden): X=tf.nn.dropout(X,p_keep_input) h=tf.nn.relu(tf.matmul(X,w_h)) h=tf.nn.dropout(h,p_keep_hidden) h2=tf.nn.relu(tf.matmul(h,w_h2)) h2=tf.nn.dropout(h2,p_keep_hidden) return tf.matmul(h2,w_o) mnist=input_data.read_data_sets("MNIST_data/",one_hot=True) trX,trY,teX,teY=mnist.train.images,mnist.train.labels,mnist.test.images,mnist.test.labels X=tf.placeholder("float",[None,784]) Y=tf.placeholder("float",[None,10]) w_h=init_weight([784,625]) w_h2=init_weight([625,625]) w_o=init_weight([625,10]) p_keep_input=tf.placeholder("float") p_keep_hidden=tf.placeholder("float") py_x=model(X,w_h,w_h2,w_o,p_keep_input,p_keep_hidden) cost=tf.reduce_mean(tf.nn.softmax_cross_entropy_with_logits(py_x,Y)) train_op=tf.train.RMSPropOptimizer(0.001,0.9).minimize(cost) predict_op=tf.argmax(py_x,1) ckpt_dir="./ckpt_dir" if not os.path.exists(ckpt_dir): os.makedirs(ckpt_dir) #定義一個計數器,爲訓練輪數計數 global_step=tf.Variable(0,name='global_step',trainable=False) saver=tf.train.Saver() non_storable_variable=tf.Variable(777) with tf.Session() as sess: tf.initialize_all_variables().run() start=global_step.eval() print("Start from:",start) for i in range(start,100): for start,end in zip(range(0,len(trX),128),range(128,len(trX)+1,128)): sess.run(train_op,feed_dict={X:trX[start:end],Y:trY[start:end],p_keep_input:0.8,p_keep_hidden:0.5}) global_step.assign(i).eval() saver.save(sess,ckpt_dir+"/model.ckpt",global_step=global_step)#存儲模型
加載模型
import tensorflow as tf with tf.Session() as sess: tf.initialize_all_variables().run() ckpt=tf.train.get_checkpoint_state(ckpt_dir) if ckpt and ckpt.model_checkpoint_path: print(ckpt.model_checkpoint_path) saver.restore(sess,ckpt.model_checkpoint_path)#加載全部的參數 #從這裏開始就能夠直接使用模型進行預測,或者接着繼續訓練
當僅保存圖模型時,纔將圖寫入二進制協議文件中
import tensorflow as tf v=tf.Variable(0,name='my_variable') sess=tf.Session() tf.train.write_graph(sess.graph_def,'/temp/tfmodel','train.pbtxt') with tf.Session() as _sess: with gfile.FastGFile("/tmp/tfmodel/train.pbtxt","rb") as f: graph_def=tf.GraphDef() graph_def.ParseFromString(f.read()) _sess.graph.as_default() tf.import_graph_def(graph_def,name='tfgraph')
和Tensorflow中的其餘組件同樣,隊列(queue)自己也是圖中的一個節點,是一種有狀態的節點,其餘節點,如入隊節點(enqueue)和出隊節點(dequeue),能夠修改它的內容。
Tensorflow中主要有兩種隊列,即FIFOQueue和RandomShuffleQueue。
1.FIFOQueue
FIFOQueue建立一個先入先出隊列。例如,咱們在訓練一些語音、文字樣本時,使用循環神經網絡的網絡結構,但願讀入的訓練樣本是有序的,就要用FIFOQueue
import tensorflow as tf q=tf.FIFOQueue(3,"float") init=q.enqueue_many(([0.1,0.2,0.3],)) x=q.dequeue() y=x+1 q_inc=q.enqueue([y]) with tf.Session() as sess: sess.run(init) quelen=sess.run(q.size()) for i in range(2):#執行2次操做 sess.run(q_inc) quelen=sess.run(q.size()) for i in range(quelen): print(sess.run(q.dequeue()))#輸出隊列的值
2.RandomShuffleQueue
RandomShuffleQueue建立一個隨機隊列,在出隊列時,是以隨機的順序產生元素的。例如,咱們在訓練一些圖像樣本時,使用CNN的網絡結構,但願能夠無序地讀入訓練樣本,就要用RandomShuffleQueue,每次隨機產生一個訓練樣本。
RandomShuffleQueue在Tensorflow使用異步計算時很是重要。由於Tensorflow的會話是支持多線程的,咱們能夠在主線程裏執行訓練操做,使用RandomShuffleQueue做爲訓練輸入,開多個線程來準備訓練樣本,將樣本壓入隊列後,主線程會從隊列中每次取出mini-batch的樣本進行訓練
q=tf.RandomShuffleQueue(capacity=10,min_after_dequeue=2,dtypes="float")#隊列最大長度爲10,出隊後最小長度爲2 sess=tf.Session() for i in range(0,10):#10次入隊 sess.run(q.enqueue(i)) for i in range(0,8):#8次出隊 print(sess.run(q.dequeue()))
咱們嘗試修改入隊次數爲12次,再運行,發現程序阻斷不動,或者咱們嘗試修改出隊此時爲10次,即不保留隊列最小長度,發現隊列輸出8次結果後,在終端仍然阻斷了。
阻斷通常發生在:
1.隊列長度等於最小值,執行出隊操做
2.隊列長度等於最大值,執行入隊操做
上面的例子都是在會話的主線程中進行入隊操做。當數據量很大時,入隊操做從硬盤中讀取數據,放入內存中,主線程須要等待入隊操做完成,才能進行訓練操做。會話中能夠運行多個線程,咱們使用線程管理器QueueRunner建立一系列的新線程進行入隊操做,讓主線程繼續使用數據,即訓練網絡和讀取數據是異步的,主線程在訓練網絡,另外一個線程在將數據從硬盤讀入內存。
咱們建立一個含有隊列的圖:
import tensorflow as tf q=tf.FIFOQueue(1000,"float") counter=tf.Variable(0.0)#計數器 increment_op=tf.assign_add(counter,tf.constant(1.0))#給計數器加一 enqueue_op=q.enqueue(counter)#計數器值加入隊列 #建立一個隊列管理器QueueRunner,用這兩個操做向隊列q中添加元素。目前咱們只使用一個線程: qr=tf.train.QueueRunner(q,enqueue_ops=[increment_op,enqueue_op]*1) #啓動一個會話,從隊列管理器qr中建立線程 with tf.Session() as sess: sess.run(tf.global_variables_initializer()) enqueue_threads=qr.create_threads(sess,start=True)#啓動入隊線程 for i in range(10): print(sess.run(q.dequeue()))
結果
1.0 1.0 2.0 3.0 5.0 6.0 7.0 8.0 9.0 10.0 ERROR:tensorflow:Exception in QueueRunner: Run call was cancelled ERROR:tensorflow:Exception in QueueRunner: Session has been closed. Exception in thread Thread-22: Traceback (most recent call last): File "C:\Anaconda3\envs\tensorflow-gpu\lib\threading.py", line 914, in _bootstrap_inner self.run() File "C:\Anaconda3\envs\tensorflow-gpu\lib\threading.py", line 862, in run self._target(*self._args, **self._kwargs) File "C:\Anaconda3\envs\tensorflow-gpu\lib\site-packages\tensorflow\python\training\queue_runner_impl.py", line 238, in _run enqueue_callable() File "C:\Anaconda3\envs\tensorflow-gpu\lib\site-packages\tensorflow\python\client\session.py", line 1245, in _single_tensor_run fetch_list_as_strings, [], status, None) File "C:\Anaconda3\envs\tensorflow-gpu\lib\contextlib.py", line 66, in __exit__ next(self.gen) File "C:\Anaconda3\envs\tensorflow-gpu\lib\site-packages\tensorflow\python\framework\errors_impl.py", line 466, in raise_exception_on_not_ok_status pywrap_tensorflow.TF_GetCode(status)) tensorflow.python.framework.errors_impl.CancelledError: Run call was cancelled Exception in thread Thread-23: Traceback (most recent call last): File "C:\Anaconda3\envs\tensorflow-gpu\lib\threading.py", line 914, in _bootstrap_inner self.run() File "C:\Anaconda3\envs\tensorflow-gpu\lib\threading.py", line 862, in run self._target(*self._args, **self._kwargs) File "C:\Anaconda3\envs\tensorflow-gpu\lib\site-packages\tensorflow\python\training\queue_runner_impl.py", line 238, in _run enqueue_callable() File "C:\Anaconda3\envs\tensorflow-gpu\lib\site-packages\tensorflow\python\client\session.py", line 1235, in _single_operation_run target_list_as_strings, status, None) File "C:\Anaconda3\envs\tensorflow-gpu\lib\contextlib.py", line 66, in __exit__ next(self.gen) File "C:\Anaconda3\envs\tensorflow-gpu\lib\site-packages\tensorflow\python\framework\errors_impl.py", line 466, in raise_exception_on_not_ok_status pywrap_tensorflow.TF_GetCode(status)) tensorflow.python.framework.errors_impl.CancelledError: Session has been closed.
能輸出結果,但最後會異常:
ERROR:tensorflow:Exception in QueueRunner: Run call was cancelled
ERROR:tensorflow:Exception in QueueRunner: Session has been closed.
咱們知道,使用with tf.Session的話,會話執行結束會自動關閉,至關於main函數已經結束,
固也就有 Session has been closed.的錯誤。
import tensorflow as tf # 建立一個含有隊列的圖 q = tf.FIFOQueue(1000,"float") # 建立一個長度爲1000的隊列 counter = tf.Variable(0.0) # 計數器 increment_op = tf.assign_add(counter,tf.constant(1.0)) # 操做:給計數器加1 enqueque_op = q.enqueue(counter) # 操做:計數器值加入隊列 # 建立一個隊列管理器 QueueRunner,用這兩個操做向隊列 q 中添加元素,啓動一個線程。 qr = tf.train.QueueRunner(q,enqueue_ops=[increment_op,enqueque_op]*1) # 啓動一個會話,從隊列管理器qr中建立線程 # 主線程 sess = tf.Session() sess.run(tf.global_variables_initializer()) enqueue_threads = qr.create_threads(sess,start=True) # 主線程 for i in range(10): print(sess.run(q.dequeue()))
使用Session就不會自動關閉,也就沒有了上面例子中的異常了,雖然沒有了異常,但也和咱們
設想的會打印順序的1,2,3,4,5…不同,並且像第一個例子中還會重複打印1.0,這是爲何呢?
這個本質是+1操做和入隊操做是異步的,也就是說若是加1操做執行了不少次以後,才執行一次入隊的話,就會出現入隊不是按咱們預想的順序那樣;反過來,當我執行幾回入隊以後,才執行一次加1操做就會出現一個數重複入隊的狀況。
那該怎麼解決這個問題呢!下面爲幾種解決的方法
# 方法1 import tensorflow as tf q = tf.FIFOQueue(1000,"float") counter = tf.Variable(0.0) increment_op = tf.assign_add(counter,tf.constant(1.0)) enqueque_op = q.enqueue(counter) # 把兩個操做變成列表中的一個元素 # 原 :qr = tf.train.QueueRunner(q,enqueue_ops=[increment_op,enqueque_op]*1) qr = tf.train.QueueRunner(q,enqueue_ops=[[increment_op,enqueque_op]]*1) sess = tf.Session() sess.run(tf.global_variables_initializer()) enqueue_threads = qr.create_threads(sess,start=True) for i in range(10): print(sess.run(q.dequeue()))
1.0 2.0 3.0 4.0 5.0 6.0 7.0 8.0 9.0 10.0
# 方法2 import tensorflow as tf q = tf.FIFOQueue(1000,"float") counter = tf.Variable(0.0) increment_op = tf.assign_add(counter,tf.constant(1.0)) # 原 enqueque_op = q.enqueue(counter) # 把加一操做變成入隊操做的依賴 with tf.control_dependencies([increment_op]): enqueque_op = q.enqueue(counter) # 因爲將加1變成了入隊的依賴,因此入隊操做只須要傳入enqueque_op就好了 qr = tf.train.QueueRunner(q,enqueue_ops=[enqueque_op]*1) sess = tf.Session() sess.run(tf.global_variables_initializer()) enqueue_threads = qr.create_threads(sess,start=True) for i in range(10): print(sess.run(q.dequeue()))
1.0 2.0 3.0 4.0 5.0 6.0 7.0 8.0 9.0 10.0
# 方法3 import tensorflow as tf q = tf.FIFOQueue(1000,"float") counter = tf.Variable(0.0) increment_op = tf.assign_add(counter,tf.constant(1.0)) enqueque_op = q.enqueue(counter) # 把兩個操做變成空操做的依賴 with tf.control_dependencies([increment_op,enqueque_op]): void_op = tf.no_op() # 因爲將兩個操做變成了空操做的依賴,因此入隊操做只須要傳入void_op就好了 qr = tf.train.QueueRunner(q,enqueue_ops=[void_op]*1) sess = tf.Session() sess.run(tf.global_variables_initializer()) enqueue_threads = qr.create_threads(sess,start=True) for i in range(10): print(sess.run(q.dequeue()))
1.0 2.0 3.0 4.0 5.0 6.0 7.0 8.0 9.0 10.0
# 方法4 import tensorflow as tf q = tf.FIFOQueue(1000,"float") counter = tf.Variable(0.0) increment_op = tf.assign_add(counter,tf.constant(1.0)) enqueque_op = q.enqueue(counter) # 原 :qr = tf.train.QueueRunner(q,enqueue_ops=[increment_op,enqueque_op]*1) # 用tf.group()把兩個操做組合起來 qr = tf.train.QueueRunner(q,enqueue_ops=[tf.group(increment_op,enqueque_op)]*1) sess = tf.Session() sess.run(tf.global_variables_initializer()) enqueue_threads = qr.create_threads(sess,start=True) for i in range(10): print(sess.run(q.dequeue()))
1.0 2.0 3.0 4.0 5.0 6.0 7.0 8.0 9.0 10.0
QueueRunner 有一個問題就是:入隊線程自顧自地執行,在須要的出隊操做完成以後,程序無法結束。這樣就要使用 tf.train.Coordinator 來實現線程間的同步,終止其餘線程。
import tensorflow as tf q = tf.FIFOQueue(1000,"float") counter = tf.Variable(0.0) increment_op = tf.assign_add(counter,tf.constant(1.0)) enqueque_op = q.enqueue(counter) qr = tf.train.QueueRunner(q,enqueue_ops=[[increment_op,enqueque_op]]*1) # 主線程 sess = tf.Session() sess.run(tf.global_variables_initializer()) #coordinator:協調器,協調線程間的關係能夠被當作一種信號量,起同步做用 coord = tf.train.Coordinator() # 啓動入隊線程,協調器是線程的參數 enqueue_threads = qr.create_threads(sess,coord=coord,start=True) # 主線程 for i in range(0,10): print(sess.run(q.dequeue())) coord.request_stop() # 通知其餘線程關閉 # join操做等待其餘線程結束,其餘全部的線程關閉後,這個函數才能返回 coord.join(enqueue_threads)
在關閉隊列線程後,再執行出隊操做,就會拋出 tf.errors.OutOfRange 錯誤。這種狀況就須要
使用 tf.errors.OutOfRangeError 來捕捉錯誤,終止循環:
import tensorflow as tf q = tf.FIFOQueue(1000,"float") counter = tf.Variable(0.0) increment_op = tf.assign_add(counter,tf.constant(1.0)) enqueque_op = q.enqueue(counter) qr = tf.train.QueueRunner(q,enqueue_ops=[[increment_op,enqueque_op]]*1) # 主線程 sess = tf.Session() sess.run(tf.global_variables_initializer()) #coordinator:協調器,協調線程間的關係能夠被當作一種信號量,起同步做用 coord = tf.train.Coordinator() # 啓動入隊線程,協調器是線程的參數 enqueue_threads = qr.create_threads(sess,coord=coord,start=True) coord.request_stop() # 通知其餘線程關閉 # 主線程 for i in range(0,10): try: print("i : ",i) print(sess.run(q.dequeue())) except tf.errors.OutOfRangeError: print('finish') break # join操做等待其餘線程結束,其餘全部的線程關閉後,這個函數才能返回 coord.join(enqueue_threads)
i : 0 1.0 i : 1 finish
說明:從打印出來的信息咱們能夠看出,將請求線程關閉放置在出隊的前面,也就是說我尚未出隊以前就請求將線程關閉了,但關閉線程須要必定的時間,因此後來在遍歷出隊是仍是能夠執行的線程關閉後,若是不拋異常的話就像上個例子那樣會報錯,因此這裏執行了異常,並打印出了「finish