TensorFlow是一種符號編程框架(與theano相似),先構建數據流圖再輸入數據進行模型訓練。Tensorflow支持不少種樣例輸入的方式。最容易的是使用placeholder,但這須要手動傳遞numpy.array類型的數據。第二種方法就是使用二進制文件和輸入隊列的組合形式。這種方式不只節省了代碼量,避免了進行data augmentation和讀文件操做,能夠處理不一樣類型的數據, 並且也再也不須要人爲地劃分開「預處理」和「模型計算」。在使用TensorFlow進行異步計算時,隊列是一種強大的機制。php
隊列使用概述html
正如TensorFlow中的其餘組件同樣,隊列就是TensorFlow圖中的節點。這是一種有狀態的節點,就像變量同樣:其餘節點能夠修改它的內容。具體來講,其餘節點能夠把新元素插入到隊列後端(rear),也能夠把隊列前端(front)的元素刪除。隊列,如FIFOQueue和RandomShuffleQueue(A queue implementation that dequeues elements in a random order.)等對象,在TensorFlow的tensor異步計算時都很是重要。例如,一個典型的輸入結構是使用一個RandomShuffleQueue來做爲模型訓練的輸入,多個線程準備訓練樣本,而且把這些樣本壓入隊列,一個訓練線程執行一個訓練操做,此操做會從隊列中移除最小批次的樣本(mini-batches),這種結構具備許多優勢。前端
TensorFlow的Session對象是能夠支持多線程的,所以多個線程能夠很方便地使用同一個會話(Session)而且並行地執行操做。然而,在Python程序實現這樣的並行運算卻並不容易。全部線程都必須能被同步終止,異常必須能被正確捕獲並報告,會話終止的時候, 隊列必須能被正確地關閉。所幸TensorFlow提供了兩個類來幫助多線程的實現:tf.Coordinator和 tf.QueueRunner。從設計上這兩個類必須被一塊兒使用。Coordinator類能夠用來同時中止多個工做線程而且向那個在等待全部工做線程終止的程序報告異常。QueueRunner類用來協調多個工做線程同時將多個tensor壓入同一個隊列中。java
(1)讀二進制文件數據到隊列中python
同不少其餘的深度學習框架同樣,TensorFlow有它本身的二進制格式。它使用了a mixture of its Records 格式和protobuf。Protobuf是一種序列化數據結構的方式,給出了關於數據的一些描述。TFRecords是tensorflow的默認數據格式,一個record就是一個包含了序列化tf.train.Example 協議緩存對象的二進制文件,可使用python建立這種格式,而後即可以使用tensorflow提供的函數來輸入給機器學習模型。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
|
import
tensorflow as tf
def read_and_decode_single_example(filename_queue):
# 定義一個空的類對象,相似於c語言裏面的結構體定義
class
Image(self):
pass
image = Image()
image.height =
32
image.width =
32
image.depth =
3
label_bytes =
1
Bytes_to_read = label_bytes+image.heigth*image.width*
3
# A Reader that outputs fixed-length records from a file
reader = tf.FixedLengthRecordReader(record_bytes=Bytes_to_read)
# Returns the next record (key, value) pair produced by a reader, key 和value都是字符串類型的tensor
# Will dequeue a work unit from queue
if
necessary (e.g. when the
# Reader needs to start reading from a
new
file since it has
# finished with the previous file).
image.key, value_str = reader.read(filename_queue)
# Reinterpret the bytes of a string as a vector of numbers,每個數值佔用一個字節,在[
0
,
255
]區間內,所以out_type要取uint8類型
value = tf.decode_raw(bytes=value_str, out_type=tf.uint8)
# Extracts a slice from a tensor, value中包含了label和feature,故要對向量類型tensor進行
'parse'
操做
image.label = tf.slice(input_=value, begin=[
0
], size=[
1
])
value = value.slice(input_=value, begin=[
1
], size=[-
1
]).reshape((image.depth, image.height, image.width))
transposed_value = tf.transpose(value, perm=[
2
,
0
,
1
])
image.mat = transposed_value
return
image
|
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
|
filenames =[os.path.join(data_dir,
'test_batch.bin'
)]
# Output strings (e.g. filenames) to a queue
for
an input pipeline
filename_queue = tf.train.string_input_producer(string_tensor=filenames)
# returns symbolic label and image
img_obj = read_and_decode_single_example(
"filename_queue"
)
Label = img_obj.label
Image = img_obj.mat
sess = tf.Session()
# 初始化tensorflow圖中的全部狀態,如待讀取的下一個記錄tfrecord的位置,variables等
init = tf.initialize_all_variables()
sess.run(init)
tf.train.start_queue_runners(sess=sess)
# grab examples back.
# first example from file
label_val_1, image_val_1 = sess.run([label, image])
# second example from file
label_val_2, image_val_2 = sess.run([label, image])
|
可想而知,在你運行任何訓練步驟以前,咱們要告知tensorflow去啓動這些線程,不然這些隊列會由於等待數據入隊而被堵塞,致使數據流圖將一直處於掛起狀態。咱們能夠調用tf.train.start_queue_runners(sess=sess)來啓動全部的QueueRunners。這個調用並非符號化的操做,它會啓動輸入管道的線程,填充樣本到隊列中,以便出隊操做能夠從隊列中拿到樣本。另外,必需要先運行初始化操做再建立這些線程。若是這些隊列未被初始化,tensorflow會拋出錯誤。web
(2)從二進制文件中讀取mini-batchs編程
在訓練機器學習模型時,使用單個樣例更新參數屬於「online learning」,然而在線下環境下,咱們一般採用基於mini-batchs 隨機梯度降低法(SGD),可是在tensorflow中如何利用queuerunners返回訓練塊數據呢?請參見下面的程序:
1
2
3
4
5
|
image_batch, label_batch = tf.train.shuffle_batch(tensor_list=[image, label]],
batch_size=batch_size,
num_threads=
24
,
min_after_dequeue=min_samples_in_queue,
capacity=min_samples_in_queue+
3
*batch_size)
|
讀取batch數據須要使用新的隊列queues和QueueRunners(大體流程圖以下)。Shuffle_batch構建了一個RandomShuffleQueue,並不斷地把單個的(image,labels)對送入隊列中,這個入隊操做是經過QueueRunners啓動另外的線程來完成的。這個RandomShuffleQueue會順序地壓樣例到隊列中,直到隊列中的樣例個數達到了batch_size+min_after_dequeue個。它而後從隊列中選擇batch_size個隨機的元素進行返回。事實上,shuffle_batch返回的值就是RandomShuffleQueue.dequeue_many()的結果。有了這個batches變量,就能夠開始訓練機器學習模型了。
函數 tf.train.shuffle_batch(tensor_list, batch_size, capacity, min_after_dequeue, num_threads=1, seed=None, enqueue_many=False, shapes=None, shared_name=None, name=None)的使用說明:
做用:Creates batches by randomly shuffling tensors.(從隊列中隨機篩選多個樣例返回給image_batch和label_batch);
參數說明:
tensor_list: The list of tensors to enqueue.(待入隊的tensor list);
batch_size: The new batch size pulled from the queue;
capacity: An integer. The maximum number of elements in the queue(隊列長度);
min_after_dequeue: Minimum number elements in the queue after a dequeue, used to ensure a level of mixing of elements.(隨機取樣的樣本整體最小值,用於保證所取mini-batch的隨機性);
num_threads: The number of threads enqueuing `tensor_list`.(session會話支持多線程,這裏能夠設置多線程加速樣本的讀取)
seed: Seed for the random shuffling within the queue.
enqueue_many: Whether each tensor in `tensor_list` is a single example.(爲False時表示tensor_list是一個樣例,壓入時佔用隊列中的一個元素;爲True時表示tensor_list中的每個元素都是一個樣例,壓入時佔用隊列中的一個元素位置,能夠看做爲一個batch);
shapes: (Optional) The shapes for each example. Defaults to the inferred shapes for `tensor_list`.
shared_name: (Optional) If set, this queue will be shared under the given name across multiple sessions.
name: (Optional) A name for the operations.後端