tensorflow學習筆記——多線程輸入數據處理框架

  以前咱們學習使用TensorFlow對圖像數據進行預處理的方法。雖然使用這些圖像數據預處理的方法能夠減小無關因素對圖像識別模型效果的影響,但這些複雜的預處理過程也會減慢整個訓練過程。爲了不圖像預處理成爲神經網絡模型訓練效率的瓶頸,TensorFlow提供了一套多線程處理輸入數據的框架。python

  下面總結了一個經典的輸入數據處理的流程:git

   下面咱們首先學習TensorFlow中隊列的概念。在TensorFlow中,隊列不只是一種數據結構,它更提供了多線程機制。隊列也是TensorFlow多線程輸入數據處理框架的基礎。而後再學習上面的流程。最後這個流程將處理好的單個訓練數據整理成訓練數據 batch,這些batch就能夠做爲神經網絡的輸入。正則表達式

準備知識:多線程的簡單介紹

  在傳統操做系統中,每一個進程有一個地址空間,並且默認就有一個控制線程。線程顧名思義,就是一條流水線工做的過程(流水線的工做須要電源,電源就至關於CPU),而一條流水線必須屬於一個車間,一個車間就是一個進程,車間負責把資源整合到一塊兒,是一個資源單位,而一個車間內至少有一條流水線。因此,進程只是用來把資源集中到一塊兒(進程只是一個資源單位,或者說資源集合),而線程纔是CPU上的執行單位。算法

  多線程(即多個控制線程)的概念就是:在一個進程中存在多個線程,多個線程共享該進程的地址空間,至關於一個車間內有多條流水線,都共用一個車間的資源。好比成都地鐵和西安地鐵是不一樣的進程,而成都地鐵3號線是一個線程,成都地鐵全部的線程共享成都全部的資源,好比成都全部的乘客能夠被全部線拉。網絡

  開啓多線程的方式:數據結構

import time
import random
from threading import Thread


def study(name):
    print("%s is learning" % name)
    time.sleep(random.randint(1, 3))
    print("%s is playing " % name)


if __name__ == '__main__':
    t = Thread(target=study, args=('james', ))
    t.start()
    print("主線程開始運行")

'''
結果展現:
james is learning
主線程開始運行
james is playing 
'''

    t.start() 將開啓進程的信號發給操做系統後,操做系統要申請內存空間,讓好拷貝父進程地址空間到子進程,開銷遠大於線程。多線程

1,隊列與多線程

  在TensorFlow中,隊列和變量相似,都是計算圖上有狀態的節點。其餘的計算節點能夠修改他們的狀態。對於變量,能夠經過賦值操做修改變量的取值。對於隊列,修改隊列狀態的操做主要有Enqueue,EnqueueMany和Dequeue。下面程序展現瞭如何使用這些函數來操做一個隊列。框架

#_*_coding:utf-8_*_
import tensorflow as tf

# 建立一個先進先出的隊列,指定隊列中最多能夠保存兩個元素,並指定類型爲整數
q = tf.FIFOQueue(2, 'int32')
# 使用enqueue_many 函數來初始化隊列中的元素。
# 和變量初始化相似,在使用隊列以前須要明確的調用這個初始化過程
init = q.enqueue_many(([0, 10], ))
# 使用Dequeue 函數將隊列中的第一個元素出隊列。這個元素的值將被存在變量x中
x = q.dequeue()
# 將獲得的值加1
y = x + 1
# 將加 1 後的值在從新加入隊列
q_inc = q.enqueue([y])

with tf.Session() as sess:
    # 運行初始化隊列的操做
    init.run()
    for _ in range(6):
        #運行q_inc 將執行數據出隊列,出隊的元素 +1 ,從新加入隊列的整個過程
        v, _ = sess.run([x, q_inc])
        # 打印出隊元素的取值
        print('%s'%v)

'''
隊列開始有[0, 10] 兩個元素,第一個出隊的爲0, 加1以後爲[10, 1]
第二次出隊的爲10, 加1以後入隊的爲11, 獲得的隊列爲[1, 11]
以此類推,最後獲得的輸出爲:
0
10
1
11
2
'''

  TensorFlow中提供了FIFOQueue 和 RandomShuffleQueue 兩種隊列。在上面的程序中,已經展現瞭如何使用FIFOQueue,它的實現的一個先進先出隊列。 RandomShuffleQueue 會將隊列中的元素打亂,每次出隊操做獲得的是從當前隊列全部元素中隨機選擇的一個。在訓練審計網絡時但願每次使用的訓練數據儘可能隨機。 RandomShuffleQueue 就提供了這樣的功能。dom

  在TensorFlow中,隊列不只僅是一種數據結構,仍是異步計算張量取值的一個重要機制。好比多個線程能夠同時向一個隊列中寫元素,或者同時讀取一個隊列中的元素。在後面咱們會學習TensorFlow是如何利用隊列來實現多線程輸入數據處理的。異步

  TensorFlow提供了 tf.Coordinator 和 tf.QueueRunner 兩個類來完成多線程協同的功能。tf.Coordinator 主要用於協同多個線程一塊兒中止,並提供了 should_stop, request_stop 和 join 三個函數。在啓動線程以前,須要先聲明一個 tf.Coordinator 類,並將這個類傳入每個建立的線程中。啓動的線程須要一直查詢 tf.Coordinator 類中提供的 should_stop 函數,當這個函數的返回值爲 True時,則當前線程也須要退出。每個啓動的線程均可以經過調用 request_stop 函數來通知其餘線程退出。當某一個線程調用  request_stop 函數以後, should_stop 函數的返回值將被設置爲 TRUE,這樣其餘的線程就能夠同時終止了。下面程序展現瞭如何使用 tf.Coordinator。

#_*_coding:utf-8_*_
import tensorflow as tf
import numpy as np
import threading
import time

# 線程中運行的程序,這個程序每隔1秒判斷是否中止並打印本身的ID
def MyLoop(coord, worker_id):
    # 使用 tf.Coordinator 類提供的協同工具判斷當前是否須要中止
    while not coord.should_stop():
        # 隨機中止全部的線程
        if np.random.rand() < 0.1:
            print("Stopping from id: %d\n" % worker_id)
            # 調用 coord.request_stop() 函數來通知其餘線程中止
            coord.request_stop()
        else:
            # 打印當前線程的 ID
            print("Working on id: %d\n" % worker_id)
        # 暫停1 s
        time.sleep(1)

# 聲明一個  tf.train.Coordinator 類來協同多個線程
coord = tf.train.Coordinator()

# 聲明建立 5 個線程
threads = [
    threading.Thread(target=MyLoop, args=(coord, i, )) for i in range(5)
]

# 啓動全部的線程
for t in threads:
    t.start()

# 等待全部線程退出
coord.join(threads)
'''
Working on id: 0
Working on id: 1
Working on id: 2
Working on id: 3
Working on id: 4

Working on id: 0
Working on id: 1
Working on id: 3
Working on id: 2
Working on id: 4

Working on id: 0
Working on id: 2
Working on id: 1
Working on id: 3
Working on id: 4

Working on id: 2
Working on id: 1
Working on id: 0
Working on id: 3
Working on id: 4

Working on id: 3
Working on id: 0
Working on id: 1
Working on id: 2
Working on id: 4
Working on id: 1
Stopping from id: 0
'''

  當全部線程啓動以後,每一個線程會打印各自的ID,因而前面4行打印出了他們的ID。而後在暫停1秒以後,全部的線程又開始第二遍打印ID。在這個時候有一個線程推出的條件達到,因而調用了coord.request_stop 函數來中止全部其餘的線程。然而在打印Stoping_from_id:4以後,能夠看到有線程仍然在輸出。這是由於這些線程已經執行完 coord.should_stop 的判斷,因而仍然會繼續輸出本身的ID。但在下一輪判斷是否須要中止時將推出線程。因而在打印一次ID以後就不會再有輸出了。

  tf.QueueRunner 主要用於啓動多個線程來操做同一個隊列,啓動的這些線程能夠經過上面介紹的 tf.Coordinator 類來統一管理,下面代碼展現瞭如何使用 tf.QueueRunner 和 tf.Coordinator 來管理多線程隊列操做。

#_*_coding:utf-8_*_
import tensorflow as tf

# 聲明一個先進先出的隊列,隊列中最多100個元素,類型爲實數
queue = tf.FIFOQueue(100, 'float')
# 定義隊列的入隊操做
enqueue_op = queue.enqueue([tf.random_normal([1])])

# 使用 tf.train.QueueRunner 來建立多個線程運行隊列的入隊操做
# tf.train.QueueRunner 的第一個參數給出了被操做的隊列
# [enqueue_op] * 5 表示了須要啓動5個線程,每一個線程運行的是equeue_op操做
qr = tf.train.QueueRunner(queue, [enqueue_op]*5)

# 將定義過的 QueueRunner 加入 TensorFlow計算圖上指定的集合
# tf.train.add_queue_runner 函數沒有指定集合
# 則加入默認集合 tf.GraphKeys.QUEUE_RUNNERS
# 下面的函數就是講剛剛定義的qr加入默認的tf.GraphKeys.QUEUE_RUNNERS集合
tf.train.add_queue_runner(qr)
# 定義出隊操做
out_tensor = queue.dequeue()

with tf.Session() as sess:
    # 使用 tf.train.coordinator 來協同啓動的線程
    coord = tf.train.Coordinator()
    # 使用tf.train.QueueRunner時,須要明確調用 tf.train.start_queue_runnsers來啓動全部線程
    # 不然由於沒有線程運行入隊操做,當調用出隊操做時,程序會一直等待入隊操做被運行。
    # tf.train.start_queue_runners 函數會默認啓動 tf.GraphKeys.QUEUE_RUNNERS集合
    # 所說的 tf.train.add_queue_runner 函數和 tf.train.start_queue_runners 函數會指定同一個集合
    threads = tf.train.start_queue_runners(sess=sess, coord=coord)
    # 獲取隊列中的取值
    for _ in range(3):
        print(sess.run(out_tensor)[0])

    # s使用 tf.train.Coordinator 來中止全部的線程
    coord.request_stop()
    coord.join(threads)

'''
-0.88587755
-0.6659831
-2.9722364
'''

  

輸入文件隊列

  下面將學習如何使用TensorFlow中的隊列管理輸入文件列表。這裏假設全部的輸入數據都已經整理成了TFRecord 格式。雖然一個 TFRecord 文件中能夠存儲多個訓練樣例,可是當訓練數據量較大時,能夠將數據分紅多個 TFRecord 文件來提升處理效率。 TensorFlow 提供了 tf.train.match_filenames_once 函數來獲取符合一個正則表達式的全部文件,獲得的文件列表能夠經過 tf.train.string_input_producer 函數進行有效的管理。

  tf.train.string_input_producer 函數會使用初始化時提供的文件列表建立一個輸入隊列,輸入對壘中原始的元素爲文件列表中的全部文件。如上面的代碼所示,建立好的輸入隊列能夠做爲文件讀取函數的參數。每次調用文件讀取函數時,該函數會先判斷當前是否已有打開的文件可讀,若是沒有或者打開的文件以及讀完,這個函數會從輸入隊列中出隊一個文件並從這個文件中讀取數據。

  經過設置 shuffle 參數,tf.train.string_input_producer 函數支持隨機打亂文件列表中文件出隊的順序。當 shuffle 參數爲 TRUE時,文件在加入隊列以前會被打亂順序,因此出隊的順序也是隨機的。隨機打亂文件順序以及加入輸入隊列的過程會泡在一個單獨的線程上,這樣不會影響獲取文件的速度。tf.train.string_input_producer 函數生成的輸入隊列能夠同時被多個文件讀取線程操做,並且輸入隊列會將隊列中的文件均勻的分給不一樣的線程,不出現有些文件被處理過屢次而有些文件尚未被處理過的狀況。

  當一個輸入隊列中的全部文件都被處理完後,它會將初始化時提供的文件列表中的文件所有從新加入隊列。tf.train.string_input_producer 函數能夠設置 num_epochs 參數來限制加載初始文件列表的最大輪數。當全部文件都已經被使用了設定的輪數後,若是繼續嘗試讀取新的文件,輸入隊列會報 OutOfRange 的錯誤。在測試神經網絡模型時,由於全部測試數據只須要使用一次,因此能夠將 num_epochs 參數設置爲1,這樣在計算完一輪以後程序將自動中止。在展現  tf.train.match_filenames_once 和 tf.train.string_input_producer 函數的使用方法以前,咱們能夠先給出一個簡單的程序來生成數據。

#_*_coding:utf-8_*_
import tensorflow as tf

# 建立TFReocrd文件的幫助函數
def _int64_feature(value):
    return tf.train.Feature(int64_list=tf.train.Int64List(value=[value]))

# 模擬海量數據狀況降低數據寫入不一樣的文件,num_shards 定義了總共寫入多少文件
# instances_per_shard 定義了每一個文件中有多少個數據
num_shards = 2
instances_per_shard = 2
for i in range(num_shards):
    # 將數據分爲多個文件時,能夠將不一樣文件以相似0000n-of-0000m 的後綴區分
    # 其中m表示了數據總共被存在了多少個文件中,n表示當前文件的編號
    # 式樣的方式既方便了經過正則表達式獲取文件列表,又在文件名中加入了更多的信息
    filename = ('data.tfrecords-%.5d-of-%.5d' % (i, num_shards))
    writer = tf.python_io.TFRecordWriter(filename)
    # 將數據封裝成Example結構並寫入 TFRecord 文件
    for j in range(instances_per_shard):
        # Example 結構僅包含當前樣例屬於第幾個文件以及時當前文件的第幾個樣本
        example = tf.train.Example(features=tf.train.Features(
            feature={
                'i': _int64_feature(i),
                'j': _int64_feature(j)
            }
        ))
        writer.write(example.SerializeToString())
    writer.close()

  程序運行以後,在指定的目錄下生產兩個文件,每個文件中存儲了兩個樣例,在生成了樣例數據以後,下面代碼展現了 tf.train.match_filenames_once 函數 和 tf.train.string_input_producer 函數的使用方法:

#_*_coding:utf-8_*_
import tensorflow as tf

# 使用tf.train.match_filenames_once 函數獲取文件列表
files = tf.train.match_filenames_once('path/data.tfrecords-*')
# print(files)

# 輸入隊列中的文件列表爲 tf.train.match_filenames_once 函數獲取的文件列表
# 這裏將 shuffle參數設置爲FALSE來避免隨機打亂讀文件的順序
# 可是通常在解決真實問題,會將shuffle參數設置爲TRUE
filename_queue = tf.train.string_input_producer(files, shuffle=False)
# print(filename_queue)
# 讀取並解析一個樣本
reader = tf.TFRecordReader()
_, serialized_example = reader.read(filename_queue)
features = tf.parse_single_example(
    serialized_example,
    features={
        'i': tf.FixedLenFeature([], tf.int64),
        'j': tf.FixedLenFeature([], tf.int64),
    }
)

with tf.Session() as sess:
    # 雖然在本段程序中沒有聲明任何變量
    # 但在使用 tf.train.match_filenames_once 函數時須要初始化一些變量
    # init = tf.global_variables_initializer()
    # init = tf.initialize_all_variables()
    init = tf.local_variables_initializer()
    sess.run(init)
    # sess.run(files)
    # sess.run([tf.global_variables_initializer(), tf.local_variables_initializer()])
    print(sess.run(files))

    # 聲明 tf.train.Coordinator 類來協同不一樣線程,並啓動線程
    coord = tf.train.Coordinator()
    threads = tf.train.start_queue_runners(sess=sess, coord=coord)

    # 屢次執行獲取數據的操做
    for i in range(6):
        print(sess.run([features['i'], features['j']]))
    coord.request_stop()
    coord.join(threads)

  打印結果以下:

[b'path\\data.tfrecords-00000-of-00002'
 b'path\\data.tfrecords-00001-of-00002']
[0, 0]
[0, 1]
[1, 0]
[1, 1]
[0, 0]
[0, 1]

  在不打亂文件列表的狀況下,會依次獨處樣例數據中的每個樣例。並且當全部樣例都被讀完以後,程序會自動從頭開始。若是限制 num_epochs=1,那麼程序會報錯。

組合訓練數據(batching)

  在上面,咱們已經學習瞭如何從文件列表中讀取單個樣例,將這些單個樣例經過預處理方法進行處理,就能夠獲得提升給神經網絡輸入層的訓練數據了。在以前學習過,將多個輸入樣例組織成一個batch能夠提升模型訓練的效率。因此在獲得單個樣例的預處理結果以後,還須要將他們組織成batch,而後再提供給審計網絡的輸入層。TensorFlow提供了 tf.train.batch 和 tf.train.shuffle_batch 函數來將單個的樣例組織成 batch 的形式輸出。這兩個函數都會生成一個隊列,隊列的入隊操做時生成單個樣例的方法,而每次出隊獲得的時一個batch的樣例。他們惟一的區別自安因而否會將數據順序打亂。下面代碼展現了這兩個函數的使用方法。

   下面代碼展現了 tf.train.batch函數的用法:

#_*_coding:utf-8_*_
import tensorflow as tf

# 讀取解析獲得樣例,這裏假設Example結構中 i表示一個樣例的特徵向量
# 好比一張圖像的像素矩陣,而j表示該樣例對應的標籤


# 使用tf.train.match_filenames_once 函數獲取文件列表
files = tf.train.match_filenames_once('path/data.tfrecords-*')

# 輸入隊列中的文件列表爲 tf.train.match_filenames_once 函數獲取的文件列表
# 這裏將 shuffle參數設置爲FALSE來避免隨機打亂讀文件的順序
# 可是通常在解決真實問題,會將shuffle參數設置爲TRUE
filename_queue = tf.train.string_input_producer(files, shuffle=False)
# print(filename_queue)
# 讀取並解析一個樣本
reader = tf.TFRecordReader()
_, serialized_example = reader.read(filename_queue)
features = tf.parse_single_example(
    serialized_example,
    features={
        'i': tf.FixedLenFeature([], tf.int64),
        'j': tf.FixedLenFeature([], tf.int64),
    }
)

example, label = features['i'], features['j']

# 一個 batch 中樣例的個數
batch_size = 2
# 組合樣例的隊列中最多能夠存儲的樣例個數。這個隊列若是太大,
# 那麼須要佔用不少內存資源,若是過小,那麼出隊操做可能會由於
# 沒有數據而被阻礙(block),從而致使訓練效率下降,通常來講
# 這個隊列的大小會和每個batch的大小相關,下面代碼給出了設置
# 隊列大小的一種方式。
capacity = 1000 + 3 * batch_size

# 使用 tf.train.batch 函數來組合樣例。[example, label] 參數給
# 出了須要組合的元素,通常 example 和 label分別表明訓練樣本和這個樣本
# 對應的正確標籤。batch_size 參數給出了每一個batch中樣例的個數。
# capacity 給出了隊列的最大容量。當隊列長度等於容量時,TensorFlow將暫停
# 入隊操做,而只是等待元素出隊。當元素個數小於容量時,TensorFlow將自動從新啓動入隊操做
example_batch, label_batch = tf.train.batch(
    [example, label], batch_size=batch_size, capacity=capacity
)

with tf.Session() as sess:
    tf.global_variables_initializer().run()
    tf.local_variables_initializer().run()
    coord = tf.train.Coordinator()
    threads = tf.train.start_queue_runners(sess=sess, coord=coord)

    # 獲取並打印組合以後的樣例,在真實問題中,這個輸出通常會做爲神經網絡的輸入
    for i in range(3):
        cur_example_batch, cur_label_batch = sess.run(
            [example_batch, label_batch]
        )
        print(cur_example_batch, cur_label_batch)
    coord.request_stop()
    coord.join(threads)

'''
運行上面的程式會獲得下面的輸出:
[0 0] [0 1]
[1 1] [0 1]
[0 0] [0 1]
從這個輸出能夠看到 tf.train.batch函數能夠將單個的數據組織成3個一組的batch
在 example, lable 中讀取的數據依次爲:
example:0  label:0
example:0  label:1
example:1  label:1
example:0  label:1
example:0  label:0
example:0  label:1
    這是由於 tf.train.batch 函數不會隨機打亂順序,因此在組合以後獲得的數據
    組成了上面給出的輸出。
'''

  下面代碼展現了 tf.train.shuffle_batch 函數的使用方法:

import tensorflow as tf

#_*_coding:utf-8_*_
import tensorflow as tf

# 讀取解析獲得樣例,這裏假設Example結構中 i表示一個樣例的特徵向量
# 好比一張圖像的像素矩陣,而j表示該樣例對應的標籤


# 使用tf.train.match_filenames_once 函數獲取文件列表
files = tf.train.match_filenames_once('path/data.tfrecords-*')

# 輸入隊列中的文件列表爲 tf.train.match_filenames_once 函數獲取的文件列表
# 這裏將 shuffle參數設置爲FALSE來避免隨機打亂讀文件的順序
# 可是通常在解決真實問題,會將shuffle參數設置爲TRUE
filename_queue = tf.train.string_input_producer(files, shuffle=False)
# print(filename_queue)
# 讀取並解析一個樣本
reader = tf.TFRecordReader()
_, serialized_example = reader.read(filename_queue)
features = tf.parse_single_example(
    serialized_example,
    features={
        'i': tf.FixedLenFeature([], tf.int64),
        'j': tf.FixedLenFeature([], tf.int64),
    }
)

example, label = features['i'], features['j']

# 一個 batch 中樣例的個數
batch_size = 2
# 組合樣例的隊列中最多能夠存儲的樣例個數。這個隊列若是太大,
# 那麼須要佔用不少內存資源,若是過小,那麼出隊操做可能會由於
# 沒有數據而被阻礙(block),從而致使訓練效率下降,通常來講
# 這個隊列的大小會和每個batch的大小相關,下面代碼給出了設置
# 隊列大小的一種方式。
capacity = 1000 + 3 * batch_size

# 使用 tf.train.shuffle_batch 函數來組合樣例。[example, label] 參數給
# 出了須要組合的元素,通常 example 和 label分別表明訓練樣本和這個樣本
# 對應的正確標籤。batch_size 參數給出了每一個batch中樣例的個數。
# capacity 給出了隊列的最大容量。min_after_dequeue參數是
# tf.train.shuffle_batch 特有的。min_after_dequeue參數限制了出隊時
# 隊列中元素的最小個數,當隊列中元素過小時,隨機打亂樣例的順序做用就不大了
# 因此 tf.train.shuffle_batch 函數提供了限制出隊時的最小元素的個數來保證
# 隨機打亂順序的做用。當出隊函數被調用可是隊列中元素不夠時,出隊操做將等待更多
# 的元素入隊纔會完成。若是min_after_dequeue參數被設定,capacity也應該來調整
example_batch, label_batch = tf.train.shuffle_batch(
    [example, label], batch_size=batch_size, capacity=capacity,
    min_after_dequeue=30
)

with tf.Session() as sess:
    tf.global_variables_initializer().run()
    tf.local_variables_initializer().run()
    coord = tf.train.Coordinator()
    threads = tf.train.start_queue_runners(sess=sess, coord=coord)

    # 獲取並打印組合以後的樣例,在真實問題中,這個輸出通常會做爲神經網絡的輸入
    for i in range(3):
        cur_example_batch, cur_label_batch = sess.run(
            [example_batch, label_batch]
        )
        print(cur_example_batch, cur_label_batch)
    coord.request_stop()
    coord.join(threads)

'''
運行上面的程式會獲得下面的輸出:
[0 1] [0 0]
[1 0] [0 0]
[1 0] [0 1]
從這個輸出能夠看到 tf.train.shuffle_batch函數已經將樣例順序打亂了
'''

  tf.train.batch 函數 和 tf.train.shuffle_batch 函數除了將單個訓練數據整理成輸入 batch,也提供了並行化處理輸入數據的方法。tf.train.batch 函數 和 tf.train.shuffle_batch 函數並行化的方式同樣,因此咱們執行應用更多的 tf.train.shuffle_batch 函數爲例。經過設置tf.train.shuffle_batch 函數中的 num_threads參數,能夠指定多個線程同時執行入隊操做。tf.train.shuffle_batch 函數的入隊操做就是數據讀取以及預處理的過程。當 num_threads 參數大於1時,多個線程會同時讀取一個文件中的不一樣樣例並進行預處理。若是須要多個線程處理不一樣文件中的樣例時,可使用tf.train.shuffle_batch_join 函數。此函數會從輸入文件隊列中獲取不一樣的文件分配給不一樣的線程。通常來講,輸入文件隊列時經過 tf.train.string_input_producer 函數生成的。這個函數會分均分配文件以保證不一樣文件中的數據會被儘可能平均地使用。

  tf.train.shuffle_batch 函數 和 tf.train.shuffle_batch_join 函數均可以完成多線程並行的方式來進行數據預處理,可是他們各有優劣。對於tf.train.shuffle_batch 函數,不一樣線程會讀取同一個文件。若是一個文件中的樣例比較類似(好比都屬於同一個類別),那麼神經網絡的訓練效果有可能會受到影響。因此在使用 tf.train.shuffle_batch 函數時,須要儘可能將同一個TFRecord 文件中的樣例隨機打亂。而是用 tf.train.shuffle_batch_join 函數時,不一樣線程會讀取不一樣文件。若是讀取數據的線程數比總文件數還大,那麼多個線程可能會讀取同一個文件中相近部分的數據。而卻多個線程讀取多個文件可能致使過多的硬盤尋址,從而使得讀取的效率下降。不一樣的並行化方式各有所長。具體採用哪種方法須要根據具體狀況來肯定。

輸入數據處理框架

  前面已經學習了開始給出的流程圖中的全部步驟,下面將這些步驟串成一個完成的TensorFlow來處理輸入數據,下面代碼給出了這個步驟:

#_*_coding:utf-8_*_
import tensorflow as tf

# 建立文件隊列,並經過文件列表建立輸入文件隊列
# 須要統一全部原始數據的格式並將他們存儲到TFRecord文件中
# 下面給出的文件列表應該包含全部提供訓練數據的TFRecord文件
files = tf.train.match_filenames_once('path/output.tfrecords')
filename_queue = tf.train.string_input_producer([files])

# 解析TFRecord文件中的數據,這裏假設image中存儲的時圖像的原始數據
# label爲該樣例所對應的標籤。height,width 和 channels 給出了圖片的維度
reader = tf.TFRecordReader()
_, serialized_example = reader.read(filename_queue)
# 用FixedLenFeature 將讀入的Example解析成 tensor
features = tf.parse_single_example(
    serialized_example,
    features={
        'image_raw': tf.FixedLenFeature([], tf.string),
        'pixels': tf.FixedLenFeature([], tf.int64),
        'label': tf.FixedLenFeature([], tf.int64)
    }
)
# 從原始圖像數據解析出像素矩陣,並根據圖像尺寸還原圖像
decoded_images = tf.decode_raw(features['image_raw'], tf.uint8)
labels = tf.cast(features['label'], tf.int32)
pixels = tf.cast(features['pixels'], tf.int32)

retyped_images = tf.cast(decoded_images, tf.float32)
images = tf.reshape(retyped_images, [784])


# 將處理後的圖像和標籤數據經過 tf.train.shuffle_batch 整理成
# 神經網絡訓練訓練時須要的batch
# 將文件以100個爲一組打包
min_after_dequeue = 10000
batch_size = 100
capacity = min_after_dequeue + 3 * batch_size

image_batch, label_batch = tf.train.shuffle_batch([images, labels],
                                                  batch_size=batch_size,
                                                  capacity=capacity,
                                                  min_after_dequeue=min_after_dequeue)

# 訓練模型 計算審計網絡的前向傳播結果
def inference(input_tensor, weights1, biases1, weights2, biases2):
    # 引入激活函數讓每一層去線性化 tf.nn.relu()
    layer1 = tf.nn.relu(tf.matmul(input_tensor, weights1) + biases1)
    return tf.matmul(layer1, weights2) + biases2

# 模型相關的參數
INPUT_NODE = 784
OUTPUT_NODE = 10
LAYER1_NODE = 500
REGULARAZTION_RATE = 0.0001
TREINING_STEPS = 5000

# 生成隱藏層的參數
weights1 = tf.Variable(tf.truncated_normal([INPUT_NODE, LAYER1_NODE], stddev=0.1))
biases1 = tf.Variable(tf.constant(0.1, shape=[LAYER1_NODE]))

# 生成輸出層的參數
weights2 = tf.Variable(tf.truncated_normal([LAYER1_NODE, OUTPUT_NODE], stddev=0.1))
biases2 = tf.Variable(tf.constant(0.1, shape=[OUTPUT_NODE]))

y = inference(image_batch, weights1, biases1, weights2, biases2)

# 計算交叉熵及其平均值(對於分類問題,一般將交叉熵與softmax迴歸一塊兒使用
cross_entropy = tf.nn.sparse_softmax_cross_entropy_with_logits(logits=y,
                                                               labels=label_batch)
cross_entropy_mean = tf.reduce_mean(cross_entropy)

# 損失函數的計算
regularizer = tf.contrib.layers.l2_regularizer(REGULARAZTION_RATE)
# 計算模型的正則化損失,通常只計算神經網絡邊上的權重的正則化損失,而不是用偏置項
regularization = regularizer(weights1) + regularizer(weights2)
# 總損失等於交叉熵損失和正則化損失的和
loss = cross_entropy_mean + regularization

# 優化損失函數
# 通常優化器的目的是優化權重W和誤差 biases,最小化損失函數的結果
train_step = tf.train.GradientDescentOptimizer(0.01).minimize(loss)

# 初始化會話,並開始訓練過程
with tf.Session() as sess:
    # 因爲使用了Coordinator,必須對local 和 global 變量進行初始化
    sess.run(tf.local_variables_initializer())
    sess.run(tf.global_variables_initializer())
    coord = tf.train.Coordinator()
    threads = tf.train.start_queue_runners(sess=sess, coord=coord)
    # 循環的訓練神經網絡
    for i in range(TREINING_STEPS):
        if i %1000 == 0:
            print("After %d training step(s), loss is %g " % (i, sess.run(loss)))

        sess.run(train_step)
    coord.request_stop()
    coord.join(threads)

  下面代碼是生成TFRecord文件的(數據是MNIST數據)代碼:

#_*_coding:utf-8_*_
import tensorflow as tf
from tensorflow.examples.tutorials.mnist import input_data
import numpy as np

# 生成整數型的屬性
def _int64_feature(value):
    return tf.train.Feature(int64_list=tf.train.Int64List(value=[value]))

# 生成字符串型的屬性
def _bytes_feature(value):
    return tf.train.Feature(bytes_list=tf.train.BytesList(value=[value]))

mnist = input_data.read_data_sets(
    'data', dtype=tf.uint8, one_hot=True
)
images = mnist.train.images
# 訓練數據所對應的正確答案,能夠做爲一個屬性保存在TFRecord中
labels = mnist.train.labels
# 訓練數據的圖像分辨率,這能夠做爲Example中的一個屬性
pixels = images.shape[1]
num_examples = mnist.train.num_examples

# 輸出TFRecord 文件的地址
filename = 'path/output.tfrecords'
# 建立一個writer來寫TFRecord 文件
writer = tf.python_io.TFRecordWriter(filename)
for index in range(num_examples):
    # 將圖像矩陣轉化爲一個字符串
    image_raw = images[index].tostring()
    # 將一個樣例轉化爲 Example Protocol Buffer,並將全部的信息寫入這個數據結構
    example = tf.train.Example(
        features=tf.train.Features(
            feature={
                'pixels': _int64_feature(pixels),
                'labels': _int64_feature(np.argmax(labels[index])),
                'image_raw': _bytes_feature(image_raw)
            }
        ))
    # 將一個Example寫入 TFRecord文件
    writer.write(example.SerializeToString())
writer.close()

  上面代碼給出了從輸入數據處理的整個流程。(可是程序可能會報錯,咱們這裏主要學習思路)。從下圖中能夠看出,輸入數據處理的第一步是爲獲取存儲訓練數據的文件列表。下圖的文件列表爲{A, B, C}.經過 tf.train.string_input_producer 函數能夠選擇性地將文件列表中文件的順序打亂,並加入輸入隊列。由於是否打亂文件的順序是可選的,因此在圖中是虛線的。tf.train.string_input_producer 函數會生成並維護一個輸入文件隊列,不一樣線程中的文件讀取函數能夠共享這個輸入文件隊列。在讀取樣例數據以後,須要將圖像進行預處理。圖像預處理的過程也會經過tf.train.shuffle_batch 提供的機制並行地跑在多個線程中。輸入數據處理流程的最後經過 tf.train.shuffle_batch 函數將處理好的單個樣例整理成 batch 提供給神經網絡的輸入層。經過這種方式,能夠有效地提升數據預處理的效率,避免數據預處理成爲神經網絡模型性能過程當中的性能瓶頸。

 TensorFlow 數據讀取機制主要是兩種方法:

  • (1)使用文件隊列方法,如使用 slice_input_producer 和 string_input_producer;這種方法既能夠將數據轉存爲 TFRecord數據格式,也能夠直接讀取文件圖片數據,固然轉存爲 TFRecord 數據格式進行讀取會更高效點。而這二者之間的區別就是前者是輸入 tensor_list ,所以能夠將多個list組合成一個 tensorlist 做爲輸入;然後者只能是一個 string_tensor了。
  • (2)使用TensorFlow 1.4版本後出現的 tf.data.DataSet 的數據讀取機制(pipeline機制),這是TensorFlow強烈推薦的方式,是一種更高效的讀取方式。使用 tf.data.Dataset 模塊的pipeline機制,能夠實現 CPU 多線程處理輸入的數據,如讀取圖片和圖片的一些預處理,這樣 GPU就能夠專一於訓練過程,而CPU去準備數據。

  舉例以下:

image_dir ='path/to/image_dir/*.jpg'
image_list = glob.glob(image_dir)
label_list=...
image_list = tf.convert_to_tensor(image_list, dtype=tf.string)

# 能夠將image_list,label_list多個list組合成一個tensor_list
image_que, label_que = tf.train.slice_input_producer([image_list,label_list], num_epochs=1)

# 只能時string_tensor,因此不能組合多個list
image = tf.train.string_input_producer(image_list, num_epochs=1)

  

tf.train.slice_input_produce() 函數的用法

  這個函數的做用就是從輸入的 tensor_list 按要求抽取一個 tensor 放入文件名隊列,下面學習各個參數:

tf.slice_input_producer(tensor_list, num_epochs=None, shuffle=True,
                         seed=None,capacity=32, shared_name=None, name=None)

  說明:

  • tensor_list 這個就是輸入,格式爲tensor的列表;通常爲[data, label],即由特徵和標籤組成的數據集
  • num_epochs 這個是你抽取batch的次數,若是沒有給定值,那麼將會抽取無數次batch(這會致使你訓練過程停不下來),若是給定值,那麼在到達次數以後就會報OutOfRange的錯誤
  • shuffle 是否隨機打亂,若是爲False,batch是按順序抽取;若是爲True,batch是隨機抽取
  • seed 隨機種子
  • capcity 隊列容量的大小,爲整數
  • name 名稱

  舉個例子:咱們的數據data的 shape是(4000,10),label的shape是(4000, 2),運行下面這行代碼:

input_queue = tf.train.slice_input_producer([data, label], 
                                   num_epochs=1, shuffle=True, capacity=32 )

  結果確定是返回值包含兩組數據的 list,每一個list的shape和輸入的data和label的shape對應。

 

batch_size 的設置與影響

1,batch_size 的含義

  batch_size 能夠理解爲批處理參數,它的極限值爲訓練集樣本總數,當數據量比較小時,能夠將batch_size 值設置爲全數據集(Full batch cearning)。實際上,在深度學習中所涉及到的數據都是比較多的,通常都採用小批量數據處理原則。

2,關於小批量訓練網絡的優缺點

小批量訓練網絡的優勢:

  • 相對海量的的數據集和內存容量,小批量處理須要更少的內存就能夠訓練網絡。
  • 一般小批量訓練網絡速度更快,例如咱們將一個大樣本分紅11小樣本(每一個樣本100個數據),採用小批量訓練網絡時,每次傳播後更新權重,就傳播了11批,在每批次後咱們均更新了網絡的(權重)參數;若是在傳播過程當中使用了一個大樣本,咱們只會對訓練網絡的權重參數進行1次更新。
  • 全數據集肯定的方向可以更好地表明樣本整體,從而可以更準確地朝着極值所在的方向;可是不一樣權值的梯度值差異較大,所以選取一個全局的學習率很困難。

小批量訓練網絡的缺點:

  • 批次越小,梯度的估值就越不許確,在下圖中,咱們能夠看到,與完整批次漸變(藍色)方向相比,小批量漸變(綠色)的方向波動更大。
  • 極端特例batch_size = 1,也成爲在線學習(online learning);線性神經元在均方偏差代價函數的錯誤面是一個拋物面,橫截面是橢圓,對於多層神經元、非線性網絡,在局部依然近似是拋物面,使用online learning,每次修正方向以各自樣本的梯度方向修正,這就形成了波動較大,難以達到收斂效果。

3,爲何須要 batch_size 的參數

  Batch 的選擇,首先決定的時降低的方向。若是數據集比較小,徹底能夠採用全數據集(Full  Batch Learning)的形式,這樣作有以下好處:

  • 全數據集肯定的方向可以更好的表明樣本整體,從而更準確地朝着極值所在的方向
  • 因爲不一樣權值的梯度差異較大,所以選取一個全局的學習率很困難

  Full  Batch Learning 可使用 Rprop 只基於梯度符號而且針對性單獨更新各權值。可是對於很是大的數據集,上述兩個好處變成了兩個壞處:

  • 隨着數據集的海量增長和內存限制,一次載入全部數據不現實
  • 以Rprop的方式迭代,會因爲各個 batch之間的採樣差別性,各次梯度修正值相互抵消,沒法修正。這纔有了後來的RMSprop的妥協方案。

4,選擇適中的 batch_size

  可不能夠選擇一個適中的Batch_size 值呢?固然能夠,就是批梯度降低法(Mini-batches Learning)。由於若是數據集足夠充分,那麼用一半(甚至少得多)的數據訓練算出來的梯度與用所有數據訓練出來的梯度是幾乎同樣的。

在合理的範圍內,增大Batch_size 有什麼好處?

  1. 內存利用率提升了,大矩陣乘法的並行化效率提升
  2. 跑完一次epoch(全數據集)所須要的迭代次數減小,對於相同數據量的處理速度進一步加快。
  3. 在必定範圍內,通常來講Batch_Size 越大,其肯定的降低方向越準,引發訓練震盪越小。

盲目增大Batch_size 有什麼壞處?

內存利用率提升了,可是內存容量可能撐不住了

跑完一次epoch(全數據集)所須要的迭代次數減小,要想達到相同的精度,其所花費的時間大大的增長了,從而對參數的修正也就顯得更加緩慢。

Batch_size 增大到必定程度,其肯定的降低方向已經基本再也不變化。

5,調節Batch_Size 對訓練效果影響到底如何?

  這裏有一個LeNet 在MNIST 數據集上的效果。MNIST 是一個手寫體標準庫。

  運行結果如上圖所示,其中絕對時間作了標準化處理。運行結果與上文分析相印證:

  1. Batch_Size 過小,算法在200 epochs 內不收斂。
  2. 隨着Batch_Size 增大,處理相同數據量的速度越快。
  3. 隨着Batch_Size 增大,達到相同精度所須要的epoch 數量愈來愈多
  4. 因爲上述兩種因素的矛盾,Batch_Size 增大到某個時候,達到時間上的最優
  5. 因爲最終收斂精度會陷入不一樣的局部極值,所以Batch_Size 增大到某些時候,達到最終收斂精度上的最優

 

   此文是本身的學習筆記總結,學習於《TensorFlow深度學習框架》,俗話說,好記性不如爛筆頭,寫寫老是好的,因此若侵權,請聯繫我,謝謝。

相關文章
相關標籤/搜索