第十二節,TensorFlow讀取數據的幾種方法以及隊列的使用

TensorFlow程序讀取數據一共有3種方法:html

  • 供給數據(Feeding): 在TensorFlow程序運行的每一步, 讓Python代碼來供給數據。
  • 從文件讀取數據: 在TensorFlow圖的起始, 讓一個輸入管道從文件中讀取數據。
  • 預加載數據: 在TensorFlow圖中定義常量或變量來保存全部數據(僅適用於數據量比較小的狀況)。

一 預加載數據

import tensorflow as tf
x1 = tf.constant([2,3,4])
x2 = tf.constant([4,0,1])

y = tf.add(x1,x2)

with tf.Session() as sess:
    print(sess.run(y))

在這裏使用x1,x2保存具體的值,即將數據直接內嵌到圖中,再將圖傳入會話中執行,當數據量較大時,圖的輸出會遇到效率問題。python

二 供給數據

import tensorflow as tf
x1 = tf.placeholder(tf.int32)
x2 = tf.placeholder(tf.int32)
#用python產生數據
v1 = [2,3,4]
v2 = [4,0,1]

y = tf.add(x1,x2)

with tf.Session() as sess:
    print(sess.run(y,feed_dict={x1:v1,x2:v2}))

在這裏x1,x2只是佔位符,沒有具體的值,那麼運行的時候去哪取值呢?這時候就要用到sess.run()的feed_dict參數,將python產生的數據傳入,並計算y。git

以上兩種方法都很方便,可是遇到大型數據的時候就會很吃力,即便是Feed_dict,中間環節的增長也是不小的開銷,由於數據量大的時候,TensorFlow程序運行的每一步,咱們都須要使用python代碼去從文件中讀取數據,並對讀取到的文件數據進行解碼。最優的方案就是在圖中定義好文件讀取的方法,讓TF本身從文件中讀取數據,並解碼成可用的樣本集。github

三 TensorFlow中的隊列機制

從文件中讀取數據的方法有不少,好比能夠在一個文本里面寫入圖片數據的路徑和標籤,而後用tensorflow的read_file()讀入圖片;也能夠將圖片和標籤的值直接存放在CSV或者txt文件。正則表達式

咱們會在後面陸續介紹如下幾種讀取文件的方式:緩存

  • 從字典結構的數據文件讀取
  • 從bin文件讀取
  • 從CSV(TXT)讀取
  • 從原圖讀取
  • TFRecord格式文件的讀取

在講解文件的讀取以前,咱們須要先了解一下TensorFlow中的隊列機制,後面也會詳細介紹。網絡

TensorFlow提供了一個隊列機制,經過多線程將讀取數據與計算數據分開。由於在處理海量數據集的訓練時,沒法把數據集一次所有載入到內存中,須要一邊從硬盤中讀取,一邊進行訓練,爲了加快訓練速度,咱們能夠採用多個線程讀取數據,一個線程消耗數據。session

下面簡要介紹一下,TensorFlow裏與Queue有關的概念和用法。詳細內容點擊原文數據結構

其實概念只有三個:多線程

  • Queue是TF隊列和緩存機制的實現
  • QueueRunner是TF中對操做Queue的線程的封裝
  • Coordinator是TF中用來協調線程運行的工具

雖然它們常常同時出現,但這三樣東西在TensorFlow裏面是能夠單獨使用的,不妨先分開來看待。

1.Queue

據實現的方式不一樣,分紅具體的幾種類型,例如:

  • tf.FIFOQueue :按入列順序出列的隊列
  • tf.RandomShuffleQueue :隨機順序出列的隊列
  • tf.PaddingFIFOQueue :以固定長度批量出列的隊列
  • tf.PriorityQueue :帶優先級出列的隊列
  • ... ...

這些類型的Queue除了自身的性質不太同樣外,建立、使用的方法基本是相同的。

建立函數的參數:

tf.FIFOQueue(capacity, dtypes, shapes=None, names=None,
               shared_name=None, name="fifo_queue")
Queue主要包含 入列(enqueue)出列(dequeue)兩個操做。隊列自己也是圖中的一個節點。其餘節點(enqueue, dequeue)能夠修改隊列節點中的內容。enqueue操做返回計算圖中的一個Operation節點,dequeue操做返回一個Tensor值。Tensor在建立時一樣只是一個定義(或稱爲「聲明」),須要放在Session中運行才能得到真正的數值。下面是一個單獨使用Queue的例子:
#建立的圖:一個先入先出隊列,以及初始化,出隊,+1,入隊操做  
q = tf.FIFOQueue(3, "float")  
init = q.enqueue_many(([0.1, 0.2, 0.3],))  
x = q.dequeue()  
y = x + 1  
q_inc = q.enqueue([y])  
  
#開啓一個session,session是會話,會話的潛在含義是狀態保持,各類tensor的狀態保持  
with tf.Session() as sess:  
    sess.run(init)  
  
    for i in range(2):  
            sess.run(q_inc)    
    quelen =  sess.run(q.size())  
    
    for i in range(quelen):  
            print (sess.run(q.dequeue())) 

2. QueueRunner

以前的例子中,入隊操做都在主線程中進行,Session中能夠多個線程一塊兒運行。 在數據輸入的應用場景中,入隊操做從硬盤上讀取,入隊操做是從硬盤中讀取輸入,放到內存當中,速度較慢。 使用QueueRunner能夠建立一系列新的線程進行入隊操做,讓主線程繼續使用數據。若是在訓練神經網絡的場景中,就是訓練網絡和讀取數據是異步的,主線程在訓練網絡,另外一個線程在將數據從硬盤讀入內存。

'''
QueueRunner()的使用
'''
q = tf.FIFOQueue(10, "float")  
counter = tf.Variable(0.0)  #計數器
# 給計數器加一
increment_op = tf.assign_add(counter, 1.0)
# 將計數器加入隊列
enqueue_op = q.enqueue(counter)

# 建立QueueRunner
# 用多個線程向隊列添加數據
# 這裏實際建立了4個線程,兩個增長計數,兩個執行入隊
qr = tf.train.QueueRunner(q, enqueue_ops=[increment_op, enqueue_op] * 2)


#主線程  
with tf.Session() as sess:  
    sess.run(tf.initialize_all_variables())  
    #啓動入隊線程  
    enqueue_threads = qr.create_threads(sess, start=True)  
    #主線程  
    for i in range(10):              
        print (sess.run(q.dequeue()))  

能正確輸出結果,可是最後會報錯,ERROR:tensorflow:Exception in QueueRunner: Session has been closed.也就是說,當循環結束後,該Session就會自動關閉,至關於main函數已經結束了。

'''
QueueRunner()的使用
'''
q = tf.FIFOQueue(10, "float")  
counter = tf.Variable(0.0)  #計數器
# 給計數器加一
increment_op = tf.assign_add(counter, 1.0)
# 將計數器加入隊列
enqueue_op = q.enqueue(counter)

# 建立QueueRunner
# 用多個線程向隊列添加數據
# 這裏實際建立了4個線程,兩個增長計數,兩個執行入隊
qr = tf.train.QueueRunner(q, enqueue_ops=[increment_op, enqueue_op] * 2)

'''

#主線程  
with tf.Session() as sess:  
    sess.run(tf.initialize_all_variables())  
    #啓動入隊線程  
    enqueue_threads = qr.create_threads(sess, start=True)  
    #主線程  
    for i in range(10):              
        print (sess.run(q.dequeue()))  

'''


  
# 主線程  
sess = tf.Session()  
sess.run(tf.initialize_all_variables())  
  
# 啓動入隊線程  
enqueue_threads = qr.create_threads(sess, start=True) 
  
# 主線程  
for i in range(0, 10):  
    print(sess.run(q.dequeue()))  

不使用with tf.Session,那麼Session就不會自動關閉。

並非咱們設想的1,2,3,4,本質緣由是增長計數的進程會不停的後臺運行,執行入隊的進程會先執行10次(由於隊列長度只有10),而後主線程開始消費數據,當一部分數據消費被後,入隊的進程又會開始執行。最終主線程消費完10個數據後中止,但其餘線程繼續運行,程序不會結束。

經驗:由於tensorflow是在圖上進行計算,要驅動一張圖進行計算,必需要送入數據,若是說數據沒有送進去,那麼sess.run(),就沒法執行,tf也不會主動報錯,提示沒有數據送進去,其實tf也不能主動報錯,由於tf的訓練過程和讀取數據的過程實際上是異步的。tf會一直掛起,等待數據準備好。現象就是tf的程序不報錯,可是一直不動,跟掛起相似。 

'''
QueueRunner()的使用
'''
q = tf.FIFOQueue(10, "float")  
counter = tf.Variable(0.0)  #計數器
# 給計數器加一
increment_op = tf.assign_add(counter, 1.0)
# 將計數器加入隊列
enqueue_op = q.enqueue(counter)

# 建立QueueRunner
# 用多個線程向隊列添加數據
# 這裏實際建立了4個線程,兩個增長計數,兩個執行入隊
qr = tf.train.QueueRunner(q, enqueue_ops=[increment_op, enqueue_op] * 2)


#主線程  
with tf.Session() as sess:  
    sess.run(tf.initialize_all_variables())  
    #啓動入隊線程  
    enqueue_threads = qr.create_threads(sess, start=True)  
    #主線程  
    for i in range(10):              
        print (sess.run(q.dequeue()))  

上圖將生成數據的線程註釋掉,程序就會卡在sess.run(q.dequeue()),等待數據的到來QueueRunner是用來啓動入隊線程用的。

3.Coordinator

Coordinator是個用來保存線程組運行狀態的協調器對象,它和TensorFlow的Queue沒有必然關係,是能夠單獨和Python線程使用的。例如:

'''
Coordinator
'''
import threading, time

# 子線程函數
def loop(coord, id):
    t = 0
    while not coord.should_stop():
        print(id)
        time.sleep(1)
        t += 1
        # 只有1號線程調用request_stop方法
        if (t >= 2 and id == 0):
            coord.request_stop()

# 主線程
coord = tf.train.Coordinator()
# 使用Python API建立10個線程
threads = [threading.Thread(target=loop, args=(coord, i)) for i in range(10)]

# 啓動全部線程,並等待線程結束
for t in threads: t.start()
coord.join(threads)

將這個程序運行起來,會發現全部的子線程執行完兩個週期後都會中止,主線程會等待全部子線程都中止後結束,從而使整個程序結束。因而可知,只要有任何一個線程調用了Coordinator的request_stop方法,全部的線程均可以經過should_stop方法感知並中止當前線程。

將QueueRunner和Coordinator一塊兒使用,實際上就是封裝了這個判斷操做,從而使任何一個出現異常時,可以正常結束整個程序,同時主線程也能夠直接調用request_stop方法來中止全部子線程的執行。

簡要 介紹完了TensorFlow中隊列機制後,咱們再來看一下如何從文件中讀取數據。

四 從文件中讀取數據

1.從字典結構的數據文件讀取(python數據格式)

(1)在介紹字典結構的數據文件的讀取以前,咱們先來介紹怎麼建立字典結構的數據文件。

  • 先要準備好圖片文件,咱們使用Open CV3進行圖像讀取。
  • 把cv2.imread()讀取到的圖像進行裁切,扭曲,等處理。
  • 使用numpy纔對數據進行處理,好比維度合併。
  • 把處理好的每一張圖像的數據和標籤分別存放在對應的list(或者ndarray)中。
  • 建立一個字典,包含兩個元素‘data’和'labels',並分別賦值爲上面的list。
  • 使用pickle模塊對字典進行序列化,並保存到文件中。

具體代碼咱們查看以下文章:圖片存儲爲cifar的Python數據格式

若是針對圖片比較多的狀況,咱們不太可能把全部圖像都寫入個文件,咱們能夠分批把圖像寫入幾個文件中。

(2)cifar10數據有三種版本,分別是MATLAB,Python和bin版本 數據下載連接: http://www.cs.toronto.edu/~kriz/cifar.html 

其中Python版本的數據便是以字典結構存儲的數據 。

針對字典結構的數據文件讀取,我在AlexNet那節中有詳細介紹,主要就是經過pickle模塊對文件進行反序列化,獲取咱們所須要的數據。

2.從bin文件讀取

在官網的cifar的例子中就是從bin文件中讀取的。bin文件須要以必定的size格式存儲,好比每一個樣本的值佔多少字節,label佔多少字節,且這對於每一個樣本都是固定的,而後一個挨着一個存儲。這樣就可使用tf.FixedLengthRecordReader 類來每次讀取固定長度的字節,正好對應一個樣本存儲的字節(包括label)。而且用tf.decode_raw進行解析。

(1)製做bin file 

如何將本身的圖片存爲bin file,能夠看看下面這篇博客,這篇博客使用C++和opencv將圖片存爲二進制文件: http://blog.csdn.net/code_better/article/details/53289759 

(2)從bin file讀入 
在後面會詳細講解如何從二進制記錄文件中讀取數據,並以cifar10_input.py做爲案例。

def read_cifar10(filename_queue):
  """Reads and parses examples from CIFAR10 data files.

  Recommendation: if you want N-way read parallelism, call this function
  N times.  This will give you N independent Readers reading different
  files & positions within those files, which will give better mixing of
  examples.

  Args:
    filename_queue: A queue of strings with the filenames to read from.

  Returns:
    An object representing a single example, with the following fields:
      height: number of rows in the result (32)
      width: number of columns in the result (32)
      depth: number of color channels in the result (3)
      key: a scalar string Tensor describing the filename & record number
        for this example.
      label: an int32 Tensor with the label in the range 0..9.
      uint8image: a [height, width, depth] uint8 Tensor with the image data
  """

  class CIFAR10Record(object):
    pass
  result = CIFAR10Record()

  # Dimensions of the images in the CIFAR-10 dataset.
  # See http://www.cs.toronto.edu/~kriz/cifar.html for a description of the
  # input format.
  label_bytes = 1  # 2 for CIFAR-100
  result.height = 32
  result.width = 32
  result.depth = 3
  image_bytes = result.height * result.width * result.depth
  # Every record consists of a label followed by the image, with a
  # fixed number of bytes for each.
  record_bytes = label_bytes + image_bytes

  # Read a record, getting filenames from the filename_queue.  No
  # header or footer in the CIFAR-10 format, so we leave header_bytes
  # and footer_bytes at their default of 0.
  reader = tf.FixedLengthRecordReader(record_bytes=record_bytes)
  result.key, value = reader.read(filename_queue)

  # Convert from a string to a vector of uint8 that is record_bytes long.
  record_bytes = tf.decode_raw(value, tf.uint8)

  # The first bytes represent the label, which we convert from uint8->int32.
  result.label = tf.cast(
      tf.strided_slice(record_bytes, [0], [label_bytes]), tf.int32)

  # The remaining bytes after the label represent the image, which we reshape
  # from [depth * height * width] to [depth, height, width].
  depth_major = tf.reshape(
      tf.strided_slice(record_bytes, [label_bytes],
                       [label_bytes + image_bytes]),
      [result.depth, result.height, result.width])
  # Convert from [depth, height, width] to [height, width, depth].
  result.uint8image = tf.transpose(depth_major, [1, 2, 0])

  return result

這段代碼若是看不懂,能夠先跳過。

3.從CSV(TXT)文件讀取

有的時候在數據量不是很大的時候,能夠從CSV或者TXT文件進行讀取。 

(1)製做CSV(TXT)數據文本 
CSV (TXT)通常是一行存一個樣本(包括樣本值和label),用逗號隔開。用python的普通文本寫入便可。

(2)讀取的時候tf.TextLineReader 類來每次讀取一行,並使用tf.decode_csv來對每一行進行解析。 
這裏主要介紹一下:

 tf.decode_csv(records, record_defaults, field_delim=None, name=None) 
  • 首先records與第二種方法中相同,爲reader讀到的內容,這裏爲CSV (TXT)的一行。
  • 通常一行裏面的值會用逗號或者空格隔開,這裏第三個輸入參數就是指定用什麼來進行分割,默認爲逗號。
  • 第二個輸入參數是指定分割後每一個屬性的類型,好比分割後會有三列,那麼第二個參數就應該是[[‘int32’], [], [‘string’]], 可不指定類型(設爲空[])也能夠。若是分割後的屬性比較多,好比有100個,能夠用[ []*100 ]來表示 
col= tf.decode_csv(records, record_defaults=[ [ ]*100 ], field_delim=' ', name=None)

返回的col是長度爲100的list。 
須要注意的是,當數據量比較大的時候,存成CSV或TXT文件要比BIN文件大的多,所以在TF中讀取的速度也會慢不少。所以儘可能不要讀取大的CSV的方式來輸入。

在後面咱們會詳細講解如何從CSV文件中讀取數據,並有具體的案例。

4 從原圖中讀取

(1)製做數據路徑文件 
一行一例,每例包括該樣本的地址和label,用逗號分割開,用python普通文件寫入便可 
(2)讀取 
不少狀況下咱們的圖片訓練集就是原始圖片自己,並無像cifar dataset那樣存成bin等格式。所以咱們須要根據一個train_list列表,去挨個讀取圖片。這裏我用到的方法是首先將train_list.txt中的image list(也就是每一行有圖片的路徑和label組成)讀入隊列中,那麼對每次dequeue的內容中能夠提取當前圖片的路徑和label。

filename = os.path.join(data_dir, trainfilename)  
    with open(filename) as fid:  
        content = fid.read()  
    content = content.split('\n')  
    content = content[:-1]  
    valuequeue = tf.train.string_input_producer(content,shuffle=True)  
    value = valuequeue.dequeue()  
    dir, labels = tf.decode_csv(records=value, record_defaults=[["string"], [""]], field_delim=" ")  
    labels = tf.string_to_number(labels, tf.int32)  
    imagecontent = tf.read_file(dir)  
    image = tf.image.decode_png(imagecontent, channels=3, dtype=tf.uint8)  
    image = tf.cast(image, tf.float32)  
    #將圖片統一爲32*32大小的
    image = tf.image.resize_images(image,[32,32])
    image = tf.reshape(image,[result.depth, result.height, result.width])
    # Convert from [depth, height, width] to [height, width, depth].
    result.uint8image = tf.transpose(image, [1, 2, 0])

不過這個方法對電腦輸入輸出要求比較高,若是機械硬盤有壞道,就會報Input/Output error,出現這種狀況,要修復機械硬盤壞道。

5.從TFRecord文件讀取

TFrecord是Tensorflow推薦的數據集格式,與Tensorflow框架緊密結合。在TensorFlow中提供了一系列接口能夠訪問TFRecord格式。後面會詳細介紹如何將原始圖片文件轉換爲TFRecord格式,而後在運行中經過多線程的方式來讀取。

五 QueueRunner和Coordinator結合方式一

在TensorFlow中用Queue的經典模式有兩種,都是配合了QueueRunner和Coordinator一塊兒使用的。

這裏先介紹第一種方法,顯式的建立QueueRunner,而後調用它的create_threads方法啓動線程。例以下面這段代碼:

'''
配合使用
'''
import numpy as np
# 1000個4維輸入向量,每一個數取值爲1-10之間的隨機數
data = 10 * np.random.randn(1000, 4) + 1
# 1000個隨機的目標值,值爲0或1
target = np.random.randint(0, 2, size=1000)

# 建立Queue,隊列中每一項包含一個輸入數據和相應的目標值
queue = tf.FIFOQueue(capacity=50, dtypes=[tf.float32, tf.int32], shapes=[[4], []])

# 批量入列數據(這是一個Operation)
enqueue_op = queue.enqueue_many([data, target])
# 出列數據(這是一個Tensor定義)
data_sample, label_sample = queue.dequeue()

# 建立包含4個線程的QueueRunner
qr = tf.train.QueueRunner(queue, [enqueue_op] * 4)

with tf.Session() as sess:
    # 建立Coordinator
    coord = tf.train.Coordinator()
    # 啓動QueueRunner管理的線程
    enqueue_threads = qr.create_threads(sess, coord=coord, start=True)
    # 主線程,消費100個數據
    for step in range(100):
        if coord.should_stop():
            break
        data_batch, label_batch = sess.run([data_sample, label_sample])
    # 主線程計算完成,中止全部採集數據的進程
    coord.request_stop()
    coord.join(enqueue_threads)

六 QueueRunner和Coordinator結合方式二

這一小節咱們會使用QueueRunner和Coordinator來實現bin文件,以及csv文件、TFRecord格式文件的讀取,不過這裏咱們採用隱式建立線程的方法。在講解具體代碼以前,咱們須要先來說解關於TensorFlow中的文件隊列機制和內存隊列機制。

首先須要思考的一個問題是,什麼是數據讀取?以圖像數據爲例,讀取數據的過程能夠用下圖來表示:

假設咱們的硬盤中有一個圖片數據集0001.jpg,0002.jpg,0003.jpg……咱們只須要把它們讀取到內存中,而後提供給GPU或是CPU進行計算就能夠了。這聽起來很容易,但事實遠沒有那麼簡單。事實上,咱們必需要把數據先讀入後才能進行計算,假設讀入用時0.1s,計算用時0.9s,那麼就意味着每過1s,GPU都會有0.1s無事可作,這就大大下降了運算的效率。

如何解決這個問題?方法就是將讀入數據和計算分別放在兩個線程中,將數據讀入內存的一個隊列,以下圖所示:

 

讀取線程源源不斷地將文件系統中的圖片讀入到一個內存的隊列中,而負責計算的是另外一個線程,計算須要數據時,直接從內存隊列中取就能夠了。這樣就能夠解決GPU由於IO而空閒的問題!

而在tensorflow中,爲了方便管理,在內存隊列前又添加了一層所謂的「文件名隊列」。

爲何要添加這一層文件名隊列?咱們首先得了解機器學習中的一個概念:epoch。對於一個數據集來說,運行一個epoch就是將這個數據集中的圖片所有計算一遍。如一個數據集中有三張圖片A.jpg、B.jpg、C.jpg,那麼跑一個epoch就是指對A、B、C三張圖片都計算了一遍。兩個epoch就是指先對A、B、C各計算一遍,而後再所有計算一遍,也就是說每張圖片都計算了兩遍。

tensorflow使用文件名隊列+內存隊列雙隊列的形式讀入文件,能夠很好地管理epoch。下面咱們用圖片的形式來講明這個機制的運行方式。以下圖,仍是以數據集A.jpg, B.jpg, C.jpg爲例,假定咱們要跑一個epoch,那麼咱們就在文件名隊列中把A、B、C各放入一次,並在以後標註隊列結束。

程序運行後,內存隊列首先讀入A(此時A從文件名隊列中出隊):

 

再依次讀入B和C:

 

此時,若是再嘗試讀入,系統因爲檢測到了「結束」,就會自動拋出一個異常(OutOfRange)。外部捕捉到這個異常後就能夠結束程序了。這就是tensorflow中讀取數據的基本機制。若是咱們要跑2個epoch而不是1個epoch,那隻要在文件名隊列中將A、B、C依次放入兩次再標記結束就能夠了。

典型的文件數據讀取會包含下面這些步驟:

1.文件名列表

可使用字符串張量(好比["file0", "file1"][("file%d" % i) for i in range(2)], [("file%d" % i) for i in range(2)]) 或者tf.train.match_filenames_once ()函數來產生文件名列表。

filenames = [os.path.join(data_dir, 'data_batch_%d.bin' % i)
                 for i in xrange(1, 6)]

2.文件名隊列

對於文件名隊列,咱們使用tf.train.string_input_producer()函數。這個函數須要傳入一個文件名list,系統會自動將它轉爲一個先入先出的文件名隊列, 文件閱讀器會須要它來讀取數據。

# 同時打開多個文件,顯示建立Queue,同時隱含了QueueRunner的建立
filename_queue = tf.train.string_input_producer(filenames)

3.可配置的 文件名亂序(shuffling),可配置的最大訓練迭代數(epoch limit)

tf.train.string_input_producer還有兩個重要的參數,一個是num_epochs,它就是咱們上文中提到的epoch數。另一個就是shuffle,shuffle是指在一個epoch內文件的順序是否被打亂。若設置shuffle=False,以下圖,每一個epoch內,數據仍是按照A、B、C的順序進入文件名隊列,這個順序不會改變:

若是設置shuffle=True,那麼在一個epoch內,數據的先後順序就會被打亂,以下圖所示:

在tensorflow中,內存隊列不須要咱們本身創建,咱們只須要使用reader對象從文件名隊列中讀取數據就能夠了。

4.針對輸入文件格式的閱讀器

根據你的文件格式, 選擇對應的文件閱讀器, 而後將文件名隊列提供給閱讀器的read()方法。閱讀器的read()方法會輸出一個key來表徵輸入的文件和其中的紀錄(對於調試很是有用),同時獲得一個字符串標量, 這個字符串標量能夠被一個或多個解析器,或者轉換操做將其解碼爲張量而且構形成爲樣本。

CSV 文件讀取

從CSV文件中讀取數據, 須要使用TextLineReader()decode_csv() 操做, 以下面的例子所示:咱們須要從iris_0.csv和iris_1.csv文件讀取數據,iris_0.csv和iris_1.csv文件數據是從iris數據集中選取的部分數據,內容以下:總共有21條記錄。

# 同時打開多個文件(文件格式必須同樣),隱式示建立Queue,同時隱含了QueueRunner的建立
filename_queue = tf.train.string_input_producer(["iris_0.csv","iris_1.csv"])

reader = tf.TextLineReader()
# Tensorflow的Reader對象能夠直接接受一個Queue做爲輸入  每次的執行都會從文件中讀取一行內容
key, value = reader.read(filename_queue)

# 若是某一列爲空,指定默認值,同時指定了默認列的類型
record_defaults = [[0.0], [0.0], [0.0], [0.0], [0]]
#  操做會解析讀取的一行內容並將其轉爲張量列表
col1, col2, col3, col4, col5 = tf.decode_csv(value, record_defaults=record_defaults)
features = [col1, col2, col3, col4]

#獲取一行數據
#row = tf.decode_csv(value, record_defaults=record_defaults)

with tf.Session() as sess:
    coord = tf.train.Coordinator()
    # 啓動計算圖中全部的隊列線程 調用來將文件名填充到隊列,不然操做會被阻塞到文件名隊列中有值爲止。
    threads = tf.train.start_queue_runners(coord=coord)
    
    # 主線程,消費50個數據    
    for _ in range(50):
        example, label = sess.run([features, col5])
        print('Step {0} {1} {2}'.format(_,example,label))
    # 主線程計算完成,中止全部採集數據的進程
    coord.request_stop()
   # 指定等待某個線程結束 coord.join(threads)
readdecode_csvtf.train.start_queue_runnersread
  • 在這個例子中,tf.train.string_input_produecer()會將一個隱含的QueueRunner添加到全局圖中(相似的操做還有tf.train.shuffle_batch()等)。因爲沒有顯式地返回QueueRunner()調用create_threads()啓動線程,這裏使用了tf.train.start_queue_runners()方法直接啓動tf.GraphKeys.QUEUE_RUNNERS集合中的全部隊列線程。初學者會常常在代碼中看到tf.train.start_queue_runners()這個函數,但每每很難理解它的用處,在這裏,有了上面的鋪墊後,咱們就能夠解釋這個函數的做用了。

在咱們使用tf.train.string_input_producer建立文件名隊列後,整個系統其實仍是處於「停滯狀態」的,也就是說,咱們文件名並無真正被加入到隊列中(以下圖所示)。此時若是咱們開始計算,由於內存隊列中什麼也沒有,計算單元就會一直等待,致使整個系統被阻塞。

而使用tf.train.start_queue_runners()以後,纔會啓動填充隊列的線程,這時系統就再也不「停滯」。此後計算單元就能夠拿到數據並進行計算,整個程序也就跑起來了,這就是函數tf.train.start_queue_runners的用處。

每次read()的執行都會從文件中讀取一行內容, decode_csv() 操做會解析這一行內容並將其轉爲張量列表。若是輸入的參數有缺失,record_default參數能夠根據張量的類型來設置默認值。

5.紀錄解析器(從bin文件讀入)

二進制bin文件中讀取固定長度紀錄, 可使用tf.FixedLengthRecordReadertf.decode_raw操做。decode_raw操做能夠將一個字符串轉換爲一個uint8的張量。

舉例來講,the CIFAR-10 dataset的文件格式定義是:每條記錄的長度都是固定的,一個字節的標籤,後面是3072字節的圖像數據。uint8的張量的標準操做就能夠從中獲取圖像片而且根據須要進行重組。 例子代碼能夠在tensorflow/models/image/cifar10/cifar10_input.py找到。

def read_cifar10(filename_queue):
  """Reads and parses examples from CIFAR10 data files.

  Recommendation: if you want N-way read parallelism, call this function
  N times.  This will give you N independent Readers reading different
  files & positions within those files, which will give better mixing of
  examples.

  Args:
    filename_queue: A queue of strings with the filenames to read from.

  Returns:
    An object representing a single example, with the following fields:
      height: number of rows in the result (32)
      width: number of columns in the result (32)
      depth: number of color channels in the result (3)
      key: a scalar string Tensor describing the filename & record number
        for this example.
      label: an int32 Tensor with the label in the range 0..9.
      uint8image: a [height, width, depth] uint8 Tensor with the image data
  """

  class CIFAR10Record(object):
    pass
  result = CIFAR10Record()

  # Dimensions of the images in the CIFAR-10 dataset.
  # See http://www.cs.toronto.edu/~kriz/cifar.html for a description of the
  # input format.
  label_bytes = 1  # 2 for CIFAR-100
  result.height = 32
  result.width = 32
  result.depth = 3
  image_bytes = result.height * result.width * result.depth
  # Every record consists of a label followed by the image, with a
  # fixed number of bytes for each.
  record_bytes = label_bytes + image_bytes

  # Read a record, getting filenames from the filename_queue.  No
  # header or footer in the CIFAR-10 format, so we leave header_bytes
  # and footer_bytes at their default of 0.
  reader = tf.FixedLengthRecordReader(record_bytes=record_bytes)
  result.key, value = reader.read(filename_queue)

  # Convert from a string to a vector of uint8 that is record_bytes long.
  record_bytes = tf.decode_raw(value, tf.uint8)

  # The first bytes represent the label, which we convert from uint8->int32.
  result.label = tf.cast(
      tf.strided_slice(record_bytes, [0], [label_bytes]), tf.int32)

  # The remaining bytes after the label represent the image, which we reshape
  # from [depth * height * width] to [depth, height, width].
  depth_major = tf.reshape(
      tf.strided_slice(record_bytes, [label_bytes],
                       [label_bytes + image_bytes]),
      [result.depth, result.height, result.width])
  # Convert from [depth, height, width] to [height, width, depth].
  result.uint8image = tf.transpose(depth_major, [1, 2, 0])

  return result

read_cifar10()函數須要傳入一個文件名隊列,這個函數主要作了如下事情:

  • 計算每一個記錄(樣本)包含多少字節。一張圖像所佔字節數 + 圖像標籤所佔字節數。
  • 每執行一次read()的執行都會從文件中讀取一行內容,decode_raw() 操做會解析這一行內容並將其轉爲張量。
  • 提取第一個字節,即標籤,並把類型從uint8->int32
  • 提取剩下的字節,即圖像。
  • 把圖像數據轉換爲[height,width,depth]形狀。
  • 返回一個對象resulit。包含兩個屬性(都是張量),result.uint8image包含一張形狀爲[height,width,depth]的圖像,result.label存儲該圖像對應的標籤。

6.可配置的預處理器

你能夠對輸入的樣本進行任意的預處理, 這些預處理不依賴於訓練參數,好比上面read_cifar10()的函數,獲取一張圖像數據張量後,咱們能夠對圖像進行處理,好比裁切,水平翻轉,以及對圖片進行歸一化處理等等。咱們能夠在tensorflow/models/image/cifar10/cifar10.py找到數據歸一化, 提取隨機數據片,增長噪聲或失真等等預處理的例子。

7.批處理(TFRecord格式文件讀寫)

在數據輸入管道的末端, 咱們須要有內存隊列來執行輸入樣本的批量讀取。咱們使用tf.train.shuffle_batch() 函數來對內存隊列中的樣本進行亂序處理。 

 咱們用一個具體的例子來演示一下tf.train.shuffle_batch()函數的使用。如圖,假設咱們在當前文件夾中已經有A.、B.、C三個子文件夾。並在每一個文件夾下下面放置對應的圖片。

針對這些文件咱們須要作下面幾步處理:

  • 讀取全部圖片文件,並存爲TFRecord格式文件。
  • 咱們讀取記錄文中的數據。使用tf.TFRecordReader類建立一個文件讀取器,每執行一次read()方法會讀取一個樣本。
  • 使用tf.train.shuffle_batch()函數每次讀取batch_size張圖像數據。
'''
shuffle_batch()的使用
'''
import os
import cv2

def write_binary():  
    '''
    將默認路徑下的全部圖片存爲TFRecord格式 保存到文件data.tfrecord中
    '''
    #獲取當前工做目錄
    cwd = os.getcwd()
    
    #當前目錄下的子目錄  一共有12張圖片
    classes=['A','B','C']
    
    #建立對象 用於向記錄文件寫入記錄
    writer = tf.python_io.TFRecordWriter('data.tfrecord')  
    
    #遍歷每個子文件夾
    for index, name in enumerate(classes):
        class_path = os.path.join(cwd,name)
        #遍歷子目錄下的每個文件
        for img_name in os.listdir(class_path):
            #每個圖片全路徑
            img_path = os.path.join(class_path , img_name)
            #讀取圖像
            img = cv2.imread(img_path)
            #縮放
            img1 = cv2.resize(img,(250,250))
            #將圖片轉化爲原生bytes
            img_raw = img1.tobytes()         
            #將數據整理成 TFRecord 須要的數據結構 
            example = tf.train.Example(features=tf.train.Features(feature={
                    'img_raw': tf.train.Feature(bytes_list=tf.train.BytesList(value=[img_raw])),
                    "label": tf.train.Feature(int64_list=tf.train.Int64List(value=[index]))}))  

            #序列化  
            serialized = example.SerializeToString()  
            #寫入文件  
            writer.write(serialized)  
    writer.close()  

def read_and_decode(filename):  
    '''
    讀取TFRecord格式格式文件,返回讀取到的一張圖像以及對應的標籤 
    
    args:
        filename:TFRecord格式文件路徑
              
    '''
    #建立文件隊列,不限讀取的數量  
    filename_queue = tf.train.string_input_producer([filename],shuffle=False)  
    #建立一個文件讀取器 從隊列文件中讀取數據
    reader = tf.TFRecordReader()  
    
    #reader從 TFRecord 讀取內容並保存到 serialized_example中 
    _, serialized_example = reader.read(filename_queue)  

    # 讀取serialized_example的格式 
    features = tf.parse_single_example(     
        serialized_example,  
        features={  
            'label': tf.FixedLenFeature([], tf.int64),  
            'img_raw': tf.FixedLenFeature([], tf.string)      
        }  
    )  
    
    # 解析從 serialized_example 讀取到的內容  
    img=tf.decode_raw(features['img_raw'],tf.uint8)
    img = tf.reshape(img, [250, 250, 3])
    label = tf.cast(features['label'], tf.int32)
    return img,label  


#將默認路徑下的全部圖片存爲TFRecord格式 保存到文件data.tfrecord中
write_binary()  
    
#讀取TFRecord格式格式文件,返回讀取到的一張圖像以及對應的標籤 
img,label = read_and_decode('data.tfrecord')  

'''
讀取批量數據  這裏設置batch_size=12,即一次從內存隊列中隨機讀取12張圖片,這讀取到的圖片可能有重複的,
這主要是由於設置內存隊列最小元素個數爲100,最大元素個數爲2000,而咱們總共只有12張圖片,因此隊列中有許多重複的圖片
'''
img_batch, label_batch = tf.train.shuffle_batch([img,label], batch_size=12, capacity=2000, min_after_dequeue=100, num_threads=2)  


with tf.Session() as sess:
    
    sess.run(tf.global_variables_initializer())  
    #建立一個協調器,管理線程
    coord = tf.train.Coordinator()  
    
    #啓動QueueRunner, 此時文件名纔開始進隊。
    threads=tf.train.start_queue_runners(sess=sess,coord=coord)  

    img, label = sess.run([img_batch, label_batch])  
    for i in range(12):
        cv2.imwrite('%d_%d_p.jpg'%(i,label[i]),img[i])
    #終止線程
    coord.request_stop()
    coord.join(threads)
    

運行後:

咱們在讀取TFRecord文件時,一次讀取12張圖片,這主要是由於咱們設置batch_size=12,而且咱們能夠看到讀取到的12張圖片是隨機,這裏出現了重複的。

8.實驗

咱們再用一個具體的例子感覺tensorflow中的數據讀取。如圖,假設咱們在當前文件夾中已經有A.jpg、B.jpg、C.jpg三張圖片,咱們但願讀取這三張圖片5個epoch而且把讀取的結果從新存到read文件夾中。

'''
測試
'''
tf.reset_default_graph()
# 新建一個Session
with tf.Session() as sess:
    # 咱們要讀三幅圖片A.jpg, B.jpg, C.jpg
    filename = ['A.jpg', 'B.jpg', 'C.jpg']
    # string_input_producer會產生一個文件名隊列
    filename_queue = tf.train.string_input_producer(filename, shuffle=True, num_epochs=5)
    # reader從文件名隊列中讀數據。對應的方法是reader.read
    reader = tf.WholeFileReader()
    key, value = reader.read(filename_queue)
    # tf.train.string_input_producer定義了一個epoch變量,要對它進行初始化
    tf.local_variables_initializer().run()
    # 使用start_queue_runners以後,纔會開始填充隊列
    threads = tf.train.start_queue_runners(sess=sess)
    for i in range(15):
        # 獲取圖片數據並保存
        image_data = sess.run(value)
        with open('read/test_%d.jpg' % (i+1), 'wb') as f:
            f.write(image_data)

咱們這裏使用filename_queue = tf.train.string_input_producer(filename, shuffle=False, num_epochs=5)創建了一個會跑5個epoch的文件名隊列。並使用reader讀取,reader每次讀取一張圖片並保存。

運行代碼後,咱們獲得就能夠看到read文件夾中的圖片,正好是按順序的5個epoch:

若是咱們設置filename_queue = tf.train.string_input_producer(filename, shuffle=False, num_epochs=5)中的shuffle=True,那麼在每一個epoch內圖像就會被打亂,如圖所示:

咱們這裏只是用三張圖片舉例,實際應用中一個數據集確定不止3張圖片,不過涉及到的原理都是共通的。

完整代碼:

# -*- coding: utf-8 -*-
"""
Created on Wed May  2 20:39:25 2018

@author: zy
"""

import tensorflow as tf

'''
TensorFlow中隊列的使用
'''

'''
下面是一個單獨使用Queue的例子:
'''
#建立的圖:一個先入先出隊列,以及初始化,出隊,+1,入隊操做  
q = tf.FIFOQueue(3, "float")  
init = q.enqueue_many(([0.1, 0.2, 0.3],))  
x = q.dequeue()  
y = x + 1  
q_inc = q.enqueue([y])  
  
#開啓一個session,session是會話,會話的潛在含義是狀態保持,各類tensor的狀態保持  
with tf.Session() as sess:  
    sess.run(init)  
  
    for i in range(2):  
            sess.run(q_inc)    
    quelen =  sess.run(q.size())  
    
    for i in range(quelen):  
            print (sess.run(q.dequeue()))  
            
            
        
'''
QueueRunner()的使用
'''
q = tf.FIFOQueue(10, "float")  
counter = tf.Variable(0.0)  #計數器
# 給計數器加一
increment_op = tf.assign_add(counter, 1.0)
# 將計數器加入隊列
enqueue_op = q.enqueue(counter)

# 建立QueueRunner
# 用多個線程向隊列添加數據
# 這裏實際建立了4個線程,兩個增長計數,兩個執行入隊
qr = tf.train.QueueRunner(q, enqueue_ops=[increment_op, enqueue_op] * 2)

'''

#主線程  
with tf.Session() as sess:  
    sess.run(tf.initialize_all_variables())  
    #啓動入隊線程  
    enqueue_threads = qr.create_threads(sess, start=True)  
    #主線程  
    for i in range(10):              
        print (sess.run(q.dequeue()))  

'''


  
# 主線程  
sess = tf.Session()  
sess.run(tf.initialize_all_variables())  
  
# 啓動入隊線程  
enqueue_threads = qr.create_threads(sess, start=True) 
  
# 主線程  
for i in range(0, 10):  
    print(sess.run(q.dequeue()))  
    
    
    
    
    
'''
Coordinator
'''
import threading, time

# 子線程函數
def loop(coord, id):
    t = 0
    while not coord.should_stop():
        print(id)
        time.sleep(1)
        t += 1
        # 只有1號線程調用request_stop方法
        if (t >= 2 and id == 0):
            coord.request_stop()

# 主線程
coord = tf.train.Coordinator()
# 使用Python API建立10個線程
threads = [threading.Thread(target=loop, args=(coord, i)) for i in range(10)]

# 啓動全部線程,並等待線程結束
for t in threads: t.start()
coord.join(threads)


'''
 QueueRunner和Coordinator結合方式一
'''
'''
import numpy as np
# 1000個4維輸入向量,每一個數取值爲1-10之間的隨機數
data = 10 * np.random.randn(1000, 4) + 1
# 1000個隨機的目標值,值爲0或1
target = np.random.randint(0, 2, size=1000)


# 建立Queue,隊列中每一項包含一個輸入數據和相應的目標值
queue = tf.FIFOQueue(capacity=50, dtypes=[tf.float32, tf.int32], shapes=[[4], []])

# 批量入列數據(這是一個Operation)
enqueue_op = queue.enqueue_many([data, target])
# 出列數據(這是一個Tensor定義)
data_sample, label_sample = queue.dequeue()

# 建立包含4個線程的QueueRunner
qr = tf.train.QueueRunner(queue, [enqueue_op] * 4)

with tf.Session() as sess:
    # 建立Coordinator
    coord = tf.train.Coordinator()
    # 啓動QueueRunner管理的線程
    enqueue_threads = qr.create_threads(sess, coord=coord, start=True)
    # 主線程,消費100個數據
    for step in range(100):
        if coord.should_stop():
            break
        data_batch, label_batch = sess.run([data_sample, label_sample])
    # 主線程計算完成,中止全部採集數據的進程
    coord.request_stop()
    coord.join(enqueue_threads)
'''

'''
QueueRunner和Coordinator結合的數據讀取機制 讀取CSV文件
'''
tf.reset_default_graph()
# 同時打開多個文件(文件格式必須同樣),隱式建立Queue,同時隱含了QueueRunner的建立
filename_queue = tf.train.string_input_producer(["iris_0.csv","iris_1.csv"])

reader = tf.TextLineReader()
# Tensorflow的Reader對象能夠直接接受一個Queue做爲輸入  每次read的執行都會從文件中讀取一行內容
key, value = reader.read(filename_queue)

# 若是某一列爲空,指定默認值,同時指定了默認列的類型
record_defaults = [[0.0], [0.0], [0.0], [0.0], [0]]
# decode_csv 操做會解析讀取的一行內容並將其轉爲張量列表
col1, col2, col3, col4, col5 = tf.decode_csv(value, record_defaults=record_defaults)
features = [col1, col2, col3, col4]

#獲取一行數據
#row = tf.decode_csv(value, record_defaults=record_defaults)

with tf.Session() as sess:
    coord = tf.train.Coordinator()
    # 啓動計算圖中全部的隊列線程 調用tf.train.start_queue_runners來將文件名填充到隊列,不然read操做會被阻塞到文件名隊列中有值爲止。
    threads = tf.train.start_queue_runners(coord=coord)
    
    # 主線程,消費50個數據    
    for _ in range(50):
        example, label = sess.run([features, col5])
        print('Step {0} {1} {2}'.format(_,example,label))
    # 主線程計算完成,中止全部採集數據的進程
    coord.request_stop()
    #指定等待某個線程結束
    coord.join(threads)



'''
shuffle_batch()的使用
'''
import os
import cv2


def write_binary():  
    '''
    將默認路徑下的全部圖片存爲TFRecord格式 保存到文件data.tfrecord中
    '''
    #獲取當前工做目錄
    cwd = os.getcwd()
    
    #當前目錄下的子目錄  一共有12張圖片
    classes=['A','B','C']
    
    #建立對象 用於向記錄文件寫入記錄
    writer = tf.python_io.TFRecordWriter('data.tfrecord')  
    
    #遍歷每個子文件夾
    for index, name in enumerate(classes):
        class_path = os.path.join(cwd,name)
        #遍歷子目錄下的每個文件
        for img_name in os.listdir(class_path):
            #每個圖片全路徑
            img_path = os.path.join(class_path , img_name)
            #讀取圖像
            img = cv2.imread(img_path)
            #縮放
            img1 = cv2.resize(img,(250,250))
            #將圖片轉化爲原生bytes
            img_raw = img1.tobytes()         
            #將數據整理成 TFRecord 須要的數據結構 
            example = tf.train.Example(features=tf.train.Features(feature={
                    'img_raw': tf.train.Feature(bytes_list=tf.train.BytesList(value=[img_raw])),
                    "label": tf.train.Feature(int64_list=tf.train.Int64List(value=[index]))}))  

            #序列化  
            serialized = example.SerializeToString()  
            #寫入文件  
            writer.write(serialized)  
    writer.close()  

def read_and_decode(filename):  
    '''
    讀取TFRecord格式格式文件,返回讀取到的一張圖像以及對應的標籤 
    
    args:
        filename:TFRecord格式文件路徑
              
    '''
    #建立文件隊列,不限讀取的數量  
    filename_queue = tf.train.string_input_producer([filename],shuffle=False)  
    #建立一個文件讀取器 從隊列文件中讀取數據
    reader = tf.TFRecordReader()  
    
    #reader從 TFRecord 讀取內容並保存到 serialized_example中 
    _, serialized_example = reader.read(filename_queue)  

    # 讀取serialized_example的格式 
    features = tf.parse_single_example(     
        serialized_example,  
        features={  
            'label': tf.FixedLenFeature([], tf.int64),  
            'img_raw': tf.FixedLenFeature([], tf.string)      
        }  
    )  
    
    # 解析從 serialized_example 讀取到的內容  
    img=tf.decode_raw(features['img_raw'],tf.uint8)
    img = tf.reshape(img, [250, 250, 3])
    label = tf.cast(features['label'], tf.int32)
    return img,label  

tf.reset_default_graph()
#將默認路徑下的全部圖片存爲TFRecord格式 保存到文件data.tfrecord中
write_binary()  
    
#讀取TFRecord格式格式文件,返回讀取到的一張圖像以及對應的標籤 
img,label = read_and_decode('data.tfrecord')  

'''
讀取批量數據  這裏設置batch_size=12,即一次從內存隊列中隨機讀取12張圖片,這讀取到的圖片可能有重複的,
這主要是由於設置內存隊列最小元素個數爲100,最大元素個數爲2000,而咱們總共只有12張圖片,因此隊列中有許多重複的圖片
'''
img_batch, label_batch = tf.train.shuffle_batch([img,label], batch_size=12, capacity=2000, min_after_dequeue=100, num_threads=2)  


with tf.Session() as sess:
    
    sess.run(tf.global_variables_initializer())  
    #建立一個協調器,管理線程
    coord = tf.train.Coordinator()  
    
    #啓動QueueRunner, 此時文件名纔開始進隊。
    threads=tf.train.start_queue_runners(sess=sess,coord=coord)  

    img, label = sess.run([img_batch, label_batch])  
    for i in range(12):
        cv2.imwrite('%d_%d_p.jpg'%(i,label[i]),img[i])
    #終止線程
    coord.request_stop()
    coord.join(threads)
    
    
    
    
'''
測試
'''
tf.reset_default_graph()
# 新建一個Session
with tf.Session() as sess:
    # 咱們要讀三幅圖片A.jpg, B.jpg, C.jpg
    filename = ['A.jpg', 'B.jpg', 'C.jpg']
    # string_input_producer會產生一個文件名隊列
    filename_queue = tf.train.string_input_producer(filename, shuffle=True, num_epochs=5)
    # reader從文件名隊列中讀數據。對應的方法是reader.read
    reader = tf.WholeFileReader()
    key, value = reader.read(filename_queue)
    # tf.train.string_input_producer定義了一個epoch變量,要對它進行初始化
    tf.local_variables_initializer().run()
    # 使用start_queue_runners以後,纔會開始填充隊列
    threads = tf.train.start_queue_runners(sess=sess)
    for i in range(15):
        # 獲取圖片數據並保存
        image_data = sess.run(value)
        with open('read/test_%d.jpg' % (i+1), 'wb') as f:
            f.write(image_data)
View Code

七 讀取原始圖片轉換爲小批量大小的樣本數據

假如咱們如今須要對貓和狗的圖片進行分類,咱們已經收集了許多貓和狗的圖片,首先咱們須要創建一個文件夾命名爲data,在該文件夾下面建立兩個子文件夾train,test分別用於保存測試集和訓練集圖片,而後還須要在每一個文件夾下面建立兩個文件夾,分別命名cat和dog,用來存放對應類別的圖片。

有了這些圖片以後,咱們想每次讀取指定batch_size大小得數據樣本,而且這些樣本是打亂的。

  • 咱們把這些文件保存爲TFRecord格式文件
  • 從TFRecord文件中讀取batch_size樣本集。

代碼以下:

# -*- coding: utf-8 -*-
"""
Created on Thu May 10 16:48:17 2018

@author: zy
"""

'''
如何將給定的數據原始圖片以及標籤保存成TFRecord格式的文件
並使用隊列每次讀取batch_size大小樣本集
'''
import tensorflow as tf
import os
import cv2
import random
import numpy as np


#隨機種子,使得每次運行結果同樣
random.seed(0)

def get_files(dirpath):
    '''
    獲取文件相對路徑和標籤(非one_hot)  返回一個元組
    
    args:
          dirpath:數據所在的目錄 記作父目錄
                  假設有10類數據,則父目錄下有10個子目錄,每一個子目錄存放着對應的圖片                           
    '''
    #保存讀取到的的文件和標籤
    image_list = []
    label_list = []
    
    #遍歷子目錄    
    classes = [x for x  in os.listdir(dirpath) if os.path.isdir(dirpath)]
    
     #遍歷每個子文件夾
    for index, name in enumerate(classes):
        #子文件夾路徑
        class_path = os.path.join(dirpath,name)
        #遍歷子目錄下的每個文件            
        for img_name in os.listdir(class_path):
            #每個圖片全路徑
            img_path = os.path.join(class_path , img_name)
            #追加
            image_list.append(img_path)
            label_list.append(index)

    #保存打亂後的文件和標籤
    images = []
    labels = []
    # 打亂文件順序 連續打亂兩次
    indices = list(range(len(image_list)))
    random.shuffle(indices)
    for i in indices:
        images.append(image_list[i])
        labels.append(label_list[i])    
    random.shuffle([images,labels])    
    
    print('樣本長度爲:',len(images))
    #print(images[0:10],labels[0:10])
    return images, labels

'''
生成數據的格式

    先生成 TFRecord 格式的樣例數據,Example 的結構以下,表示第1個文件中的第1個數據
    {
     'i':0,
     'j':0
     }
'''

def WriteTFRecord(dirpath,dstpath='.',train_data=True,IMAGE_HEIGHT=227,IMAGE_WIDTH=227):
    '''
    把指定目錄下的數據寫入同一個TFRecord格式文件中 
    
    args:
        dirpath:數據所在的目錄 記作父目錄
                 假設有10類數據,則父目錄下有10個子目錄,每一個子目錄存放着對應的圖片             
        dstpath:保存TFRecord文件的目錄
        train_data:表示傳入的是否是訓練集文件所在路徑
        IMAGE_HEIGHT:保存的圖片數據高度
        IMAGE_WIDTH:保存的圖片數據寬度
    '''
    if not os.path.isdir(dstpath):
        os.mkdir(dstpath)
        
    #獲取全部數據文件路徑,以及對應標籤
    image_list, label_list =  get_files(dirpath)
    
    #把海量數據寫入多個TFrecord文件
    length_per_shard = 10000                                  #每一個記錄文件的樣本長度
    num_shards = int(np.ceil(len(image_list) / length_per_shard))       
    
    print('記錄文件個數:',num_shards)                                
    
    '''
    當全部數類別圖片都在一個文件夾下面時,能夠將數據寫入不一樣的文件
    可是若是同一類別的圖片放在相同的文件下,就不能夠將數據寫入不一樣的文件
    這主要是由於後者保存的TFRecord文件中都是同一類別,而隊列取數據時,是從一個文件讀取完,纔會讀取另外一個文件,
    這樣會致使一次讀取的batch_size圖像都是同一類別,對訓練不利
    所以咱們必須想個辦法讓一個TFRecord格式的文件包含各類類別的圖片,而且順序是打亂的
    '''    
    #依次寫入每個TFRecord文件        
    for  index in  range(num_shards):                 
        #按0000n-of-0000m的後綴區分文件。n表明當前文件標號,沒表明文件總數
        if train_data:
            filename = os.path.join(dstpath,'train_data.tfrecord-%.5d-of-%.5d'%(index,num_shards))
        else:
            filename = os.path.join(dstpath,'test_data.tfrecord-%.5d-of-%.5d'%(index,num_shards))
        print(filename)    

        #建立對象 用於向記錄文件寫入記錄
        writer = tf.python_io.TFRecordWriter(filename)                          
                        
        #起始索引
        idx_start = index*length_per_shard
        #結束索引
        idx_end = np.min([(index+1)*length_per_shard - 1,len(image_list)])
        
        #遍歷子目錄下的每個文件            
        for img_path,label in zip(image_list[idx_start:idx_end], label_list[idx_start:idx_end]):
            #讀取圖像
            img = cv2.imread(img_path)
            
            '''
            在這裏能夠對圖片進行處理,也能夠擴大數據集,或者歸一化輸入等待,不過我在這裏不對原始圖片進行其它處理,只是把圖片大小設置爲固定的
            '''
            #縮放
            img = cv2.resize(img,(IMAGE_HEIGHT,IMAGE_WIDTH))
            
            
            
            #將圖片轉化爲原生bytes
            image = img.tobytes()         
            #將數據整理成 TFRecord 須要的數據結構 
            example = tf.train.Example(features=tf.train.Features(feature={
                'image': tf.train.Feature(bytes_list=tf.train.BytesList(value=[image])),
                "label": tf.train.Feature(int64_list=tf.train.Int64List(value=[label]))}))  

            #序列化  
            serialized = example.SerializeToString()  
            #寫入文件  
            writer.write(serialized)  
        writer.close()  
        
    
    
def read_and_decode(filename,num_epochs = None,IMAGE_HEIGHT=227,IMAGE_WIDTH=227):  
    '''
    讀取TFRecord格式格式文件,返回讀取到的一張圖像以及對應的標籤 
    
    args:
        filename:TFRecord格式文件路徑 list列表
        num_epochs:每一個數據集文件迭代輪數
        IMAGE_HEIGHT:保存的圖片數據高度
        IMAGE_WIDTH:保存的圖片數據寬度
              
    '''
    '''
    建立文件隊列,經過設置 shuffle 參數爲 True,將文件的入隊順序打亂,因此出隊順序是隨機的。隨機打亂文件順序和入隊操做
    會跑在一個單獨的線程上,不會影響出隊的速度.
    當輸入隊列中的全部文件都處理完後,它會將文件列表中的文件從新加入隊列。能夠經過設置 num_epochs 參數來限制加載初始
    文件列表的最大輪數
    '''
    filename_queue = tf.train.string_input_producer(filename,shuffle=False,num_epochs = num_epochs)  
    #建立一個文件讀取器 從隊列文件中讀取數據
    reader = tf.TFRecordReader()  
    
    #reader從 TFRecord 讀取內容並保存到 serialized_example中 
    _, serialized_example = reader.read(filename_queue)  

    # 讀取serialized_example的格式 
    features = tf.parse_single_example(     
        serialized_example,  
        features={  
            'image': tf.FixedLenFeature([], tf.string),  
            'label': tf.FixedLenFeature([], tf.int64)             
        }  
    )  
    
    # 解析從 serialized_example 讀取到的內容  
    img = tf.decode_raw(features['image'],tf.uint8)
    img = tf.reshape(img, [IMAGE_HEIGHT, IMAGE_WIDTH, 3])

    '''
    在這裏能夠對讀取到的圖片數據進行預處理,好比歸一化輸入,PCA處理等,可是不能夠增長數據    
    '''    
    label = tf.cast(features['label'], tf.int32)
    return img,label  


def input_data(filenames,num_epochs=None,batch_size=256, capacity=4096, min_after_dequeue=1024, num_threads=10):
    '''
    讀取小批量batch_size數據
    
    args:
        filenames:TFRecord文件路徑組成的list
        num_epochs:每一個數據集文件迭代輪數
        batch_size:小批量數據大小
        capacity:內存隊列元素最大個數
        min_after_dequeue:內存隊列元素最小個數
        num_threads:線城數
    '''
    '''
    讀取批量數據  這裏設置batch_size,即一次從內存隊列中隨機讀取batch_size張圖片,這裏設置內存隊列最小元素個數爲1024,最大元素個數爲4096    
    shuffle_batch 函數會將數據順序打亂
    bacth 函數不會將數據順序打亂
    '''
    img,label = read_and_decode(filenames,num_epochs)
    images_batch, labels_batch = tf.train.shuffle_batch([img,label], batch_size=batch_size, capacity=capacity, min_after_dequeue=batch_size*5, num_threads=num_threads)
    return images_batch,labels_batch    




def  file_match(s,root='.'):
    '''
    尋找指定目錄下(不包含子目錄)中的文件名含有指定字符串的文件,並打印出其相對路徑
    
    args:
        s:要匹配的字符串
        root : 指定要搜索的目錄
        
    return:返回符合條件的文件列表
    '''
    #用來保存目錄
    dirs=[]
    #用來保存匹配字符串的文件
    matchs=[]
    for current_name in os.listdir(root):
        add_root_name = os.path.join(root,current_name)
        if os.path.isdir(add_root_name):
            dirs.append(add_root_name)
        elif os.path.isfile(add_root_name) and s in add_root_name:
            matchs.append(add_root_name)

    '''
    #這裏用來遞歸搜索子目錄的
    for dir in dirs:
        file_match(s,dir)
    '''
    return matchs

    

'''
測試
'''
if __name__ == '__main__':
    #訓練集數據所在的目錄
    dirpath = './data/train'
    
    training_step = 1
    
    '''    
    判斷訓練測試集TFRecord格式文件是否存在,不存在則生成
    若是存在,直接讀取        
    '''    
    # 獲取當前目錄下包含指定字符串的文件列表 
    files = file_match('train_data.tfrecord')
    #判斷數據集是否存在
    if len(files) == 0:    
        print('開始讀圖片文件並寫入TFRecord格式文件中.........')
        #將指定路徑下全部圖片存爲TFRecord格式 保存到文件data.tfrecord中
        WriteTFRecord(dirpath)  
        print('寫入完畢!\n')
        #正則表達式匹配
        files = tf.train.match_filenames_once('./train_data.tfrecord')            
    
    
    #讀取TFRecord格式格式文件,返回讀取到的batch_size圖像以及對應的標籤
    images_batch, labels_batch = input_data(files)        
    
    
    with tf.Session() as sess:    
        sess.run(tf.global_variables_initializer()) 
        
        #建立一個協調器,管理線程
        coord = tf.train.Coordinator()      
        #啓動QueueRunner, 此時文件名纔開始進隊
        threads = tf.train.start_queue_runners(sess=sess,coord=coord)      
        
        print('開始訓練!\n')
        for step in range(training_step):
            img, label = sess.run([images_batch, labels_batch])  
            print('setp :',step)
            for i in range(256):
                cv2.imwrite('%d_%d_p.jpg'%(i,label[i]),img[i])           
                
        #終止線程
        coord.request_stop()
        coord.join(threads)
       

程序運行後,會生成三個TFRecord文件,以下:

而且咱們生成了了一個小批量樣本大小的樣本圖片:

 參考文章

[1]Tensorflow讀取數據的4種方式(8)---《深度學習》

[2]數據讀取

[3]tensorflow學習——tfreader格式,隊列讀取數據tf.train.shuffle_batch()

[4]十圖詳解tensorflow數據讀取機制(附代碼)

[5]Tensorflow10下如何玩轉TFRecord?!比賽學習必備,有源碼!

相關文章
相關標籤/搜索