TensorFlow程序讀取數據一共有3種方法:html
import tensorflow as tf x1 = tf.constant([2,3,4]) x2 = tf.constant([4,0,1]) y = tf.add(x1,x2) with tf.Session() as sess: print(sess.run(y))
在這裏使用x1,x2保存具體的值,即將數據直接內嵌到圖中,再將圖傳入會話中執行,當數據量較大時,圖的輸出會遇到效率問題。python
import tensorflow as tf x1 = tf.placeholder(tf.int32) x2 = tf.placeholder(tf.int32) #用python產生數據 v1 = [2,3,4] v2 = [4,0,1] y = tf.add(x1,x2) with tf.Session() as sess: print(sess.run(y,feed_dict={x1:v1,x2:v2}))
在這裏x1,x2只是佔位符,沒有具體的值,那麼運行的時候去哪取值呢?這時候就要用到sess.run()的feed_dict參數,將python產生的數據傳入,並計算y。git
以上兩種方法都很方便,可是遇到大型數據的時候就會很吃力,即便是Feed_dict,中間環節的增長也是不小的開銷,由於數據量大的時候,TensorFlow程序運行的每一步,咱們都須要使用python代碼去從文件中讀取數據,並對讀取到的文件數據進行解碼。最優的方案就是在圖中定義好文件讀取的方法,讓TF本身從文件中讀取數據,並解碼成可用的樣本集。github
從文件中讀取數據的方法有不少,好比能夠在一個文本里面寫入圖片數據的路徑和標籤,而後用tensorflow的read_file()讀入圖片;也能夠將圖片和標籤的值直接存放在CSV或者txt文件。正則表達式
咱們會在後面陸續介紹如下幾種讀取文件的方式:緩存
在講解文件的讀取以前,咱們須要先了解一下TensorFlow中的隊列機制,後面也會詳細介紹。網絡
TensorFlow提供了一個隊列機制,經過多線程將讀取數據與計算數據分開。由於在處理海量數據集的訓練時,沒法把數據集一次所有載入到內存中,須要一邊從硬盤中讀取,一邊進行訓練,爲了加快訓練速度,咱們能夠採用多個線程讀取數據,一個線程消耗數據。session
下面簡要介紹一下,TensorFlow裏與Queue有關的概念和用法。詳細內容點擊原文。數據結構
其實概念只有三個:多線程
Queue
是TF隊列和緩存機制的實現QueueRunner
是TF中對操做Queue的線程的封裝Coordinator
是TF中用來協調線程運行的工具雖然它們常常同時出現,但這三樣東西在TensorFlow裏面是能夠單獨使用的,不妨先分開來看待。
據實現的方式不一樣,分紅具體的幾種類型,例如:
這些類型的Queue除了自身的性質不太同樣外,建立、使用的方法基本是相同的。
建立函數的參數:
tf.FIFOQueue(capacity, dtypes, shapes=None, names=None,
shared_name=None, name="fifo_queue")
#建立的圖:一個先入先出隊列,以及初始化,出隊,+1,入隊操做 q = tf.FIFOQueue(3, "float") init = q.enqueue_many(([0.1, 0.2, 0.3],)) x = q.dequeue() y = x + 1 q_inc = q.enqueue([y]) #開啓一個session,session是會話,會話的潛在含義是狀態保持,各類tensor的狀態保持 with tf.Session() as sess: sess.run(init) for i in range(2): sess.run(q_inc) quelen = sess.run(q.size()) for i in range(quelen): print (sess.run(q.dequeue()))
以前的例子中,入隊操做都在主線程中進行,Session中能夠多個線程一塊兒運行。 在數據輸入的應用場景中,入隊操做從硬盤上讀取,入隊操做是從硬盤中讀取輸入,放到內存當中,速度較慢。 使用QueueRunner
能夠建立一系列新的線程進行入隊操做,讓主線程繼續使用數據。若是在訓練神經網絡的場景中,就是訓練網絡和讀取數據是異步的,主線程在訓練網絡,另外一個線程在將數據從硬盤讀入內存。
''' QueueRunner()的使用 ''' q = tf.FIFOQueue(10, "float") counter = tf.Variable(0.0) #計數器 # 給計數器加一 increment_op = tf.assign_add(counter, 1.0) # 將計數器加入隊列 enqueue_op = q.enqueue(counter) # 建立QueueRunner # 用多個線程向隊列添加數據 # 這裏實際建立了4個線程,兩個增長計數,兩個執行入隊 qr = tf.train.QueueRunner(q, enqueue_ops=[increment_op, enqueue_op] * 2) #主線程 with tf.Session() as sess: sess.run(tf.initialize_all_variables()) #啓動入隊線程 enqueue_threads = qr.create_threads(sess, start=True) #主線程 for i in range(10): print (sess.run(q.dequeue()))
能正確輸出結果,可是最後會報錯,ERROR:tensorflow:Exception in QueueRunner: Session has been closed.也就是說,當循環結束後,該Session就會自動關閉,至關於main函數已經結束了。
''' QueueRunner()的使用 ''' q = tf.FIFOQueue(10, "float") counter = tf.Variable(0.0) #計數器 # 給計數器加一 increment_op = tf.assign_add(counter, 1.0) # 將計數器加入隊列 enqueue_op = q.enqueue(counter) # 建立QueueRunner # 用多個線程向隊列添加數據 # 這裏實際建立了4個線程,兩個增長計數,兩個執行入隊 qr = tf.train.QueueRunner(q, enqueue_ops=[increment_op, enqueue_op] * 2) ''' #主線程 with tf.Session() as sess: sess.run(tf.initialize_all_variables()) #啓動入隊線程 enqueue_threads = qr.create_threads(sess, start=True) #主線程 for i in range(10): print (sess.run(q.dequeue())) ''' # 主線程 sess = tf.Session() sess.run(tf.initialize_all_variables()) # 啓動入隊線程 enqueue_threads = qr.create_threads(sess, start=True) # 主線程 for i in range(0, 10): print(sess.run(q.dequeue()))
不使用with tf.Session,那麼Session就不會自動關閉。
並非咱們設想的1,2,3,4,本質緣由是增長計數的進程會不停的後臺運行,執行入隊的進程會先執行10次(由於隊列長度只有10),而後主線程開始消費數據,當一部分數據消費被後,入隊的進程又會開始執行。最終主線程消費完10個數據後中止,但其餘線程繼續運行,程序不會結束。
經驗:由於tensorflow是在圖上進行計算,要驅動一張圖進行計算,必需要送入數據,若是說數據沒有送進去,那麼sess.run(),就沒法執行,tf也不會主動報錯,提示沒有數據送進去,其實tf也不能主動報錯,由於tf的訓練過程和讀取數據的過程實際上是異步的。tf會一直掛起,等待數據準備好。現象就是tf的程序不報錯,可是一直不動,跟掛起相似。
''' QueueRunner()的使用 ''' q = tf.FIFOQueue(10, "float") counter = tf.Variable(0.0) #計數器 # 給計數器加一 increment_op = tf.assign_add(counter, 1.0) # 將計數器加入隊列 enqueue_op = q.enqueue(counter) # 建立QueueRunner # 用多個線程向隊列添加數據 # 這裏實際建立了4個線程,兩個增長計數,兩個執行入隊 qr = tf.train.QueueRunner(q, enqueue_ops=[increment_op, enqueue_op] * 2) #主線程 with tf.Session() as sess: sess.run(tf.initialize_all_variables()) #啓動入隊線程 enqueue_threads = qr.create_threads(sess, start=True) #主線程 for i in range(10): print (sess.run(q.dequeue()))
上圖將生成數據的線程註釋掉,程序就會卡在sess.run(q.dequeue()),等待數據的到來QueueRunner是用來啓動入隊線程用的。
Coordinator是個用來保存線程組運行狀態的協調器對象,它和TensorFlow的Queue沒有必然關係,是能夠單獨和Python線程使用的。例如:
''' Coordinator ''' import threading, time # 子線程函數 def loop(coord, id): t = 0 while not coord.should_stop(): print(id) time.sleep(1) t += 1 # 只有1號線程調用request_stop方法 if (t >= 2 and id == 0): coord.request_stop() # 主線程 coord = tf.train.Coordinator() # 使用Python API建立10個線程 threads = [threading.Thread(target=loop, args=(coord, i)) for i in range(10)] # 啓動全部線程,並等待線程結束 for t in threads: t.start() coord.join(threads)
將這個程序運行起來,會發現全部的子線程執行完兩個週期後都會中止,主線程會等待全部子線程都中止後結束,從而使整個程序結束。因而可知,只要有任何一個線程調用了Coordinator的request_stop
方法,全部的線程均可以經過should_stop
方法感知並中止當前線程。
將QueueRunner和Coordinator一塊兒使用,實際上就是封裝了這個判斷操做,從而使任何一個出現異常時,可以正常結束整個程序,同時主線程也能夠直接調用request_stop
方法來中止全部子線程的執行。
簡要 介紹完了TensorFlow中隊列機制後,咱們再來看一下如何從文件中讀取數據。
(1)在介紹字典結構的數據文件的讀取以前,咱們先來介紹怎麼建立字典結構的數據文件。
具體代碼咱們查看以下文章:圖片存儲爲cifar的Python數據格式
若是針對圖片比較多的狀況,咱們不太可能把全部圖像都寫入個文件,咱們能夠分批把圖像寫入幾個文件中。
(2)cifar10數據有三種版本,分別是MATLAB,Python和bin版本 數據下載連接: http://www.cs.toronto.edu/~kriz/cifar.html
其中Python版本的數據便是以字典結構存儲的數據 。
針對字典結構的數據文件讀取,我在AlexNet那節中有詳細介紹,主要就是經過pickle模塊對文件進行反序列化,獲取咱們所須要的數據。
在官網的cifar的例子中就是從bin文件中讀取的。bin文件須要以必定的size格式存儲,好比每一個樣本的值佔多少字節,label佔多少字節,且這對於每一個樣本都是固定的,而後一個挨着一個存儲。這樣就可使用tf.FixedLengthRecordReader 類來每次讀取固定長度的字節,正好對應一個樣本存儲的字節(包括label)。而且用tf.decode_raw進行解析。
(1)製做bin file
如何將本身的圖片存爲bin file,能夠看看下面這篇博客,這篇博客使用C++和opencv將圖片存爲二進制文件: http://blog.csdn.net/code_better/article/details/53289759
(2)從bin file讀入
在後面會詳細講解如何從二進制記錄文件中讀取數據,並以cifar10_input.py做爲案例。
def read_cifar10(filename_queue): """Reads and parses examples from CIFAR10 data files. Recommendation: if you want N-way read parallelism, call this function N times. This will give you N independent Readers reading different files & positions within those files, which will give better mixing of examples. Args: filename_queue: A queue of strings with the filenames to read from. Returns: An object representing a single example, with the following fields: height: number of rows in the result (32) width: number of columns in the result (32) depth: number of color channels in the result (3) key: a scalar string Tensor describing the filename & record number for this example. label: an int32 Tensor with the label in the range 0..9. uint8image: a [height, width, depth] uint8 Tensor with the image data """ class CIFAR10Record(object): pass result = CIFAR10Record() # Dimensions of the images in the CIFAR-10 dataset. # See http://www.cs.toronto.edu/~kriz/cifar.html for a description of the # input format. label_bytes = 1 # 2 for CIFAR-100 result.height = 32 result.width = 32 result.depth = 3 image_bytes = result.height * result.width * result.depth # Every record consists of a label followed by the image, with a # fixed number of bytes for each. record_bytes = label_bytes + image_bytes # Read a record, getting filenames from the filename_queue. No # header or footer in the CIFAR-10 format, so we leave header_bytes # and footer_bytes at their default of 0. reader = tf.FixedLengthRecordReader(record_bytes=record_bytes) result.key, value = reader.read(filename_queue) # Convert from a string to a vector of uint8 that is record_bytes long. record_bytes = tf.decode_raw(value, tf.uint8) # The first bytes represent the label, which we convert from uint8->int32. result.label = tf.cast( tf.strided_slice(record_bytes, [0], [label_bytes]), tf.int32) # The remaining bytes after the label represent the image, which we reshape # from [depth * height * width] to [depth, height, width]. depth_major = tf.reshape( tf.strided_slice(record_bytes, [label_bytes], [label_bytes + image_bytes]), [result.depth, result.height, result.width]) # Convert from [depth, height, width] to [height, width, depth]. result.uint8image = tf.transpose(depth_major, [1, 2, 0]) return result
這段代碼若是看不懂,能夠先跳過。
有的時候在數據量不是很大的時候,能夠從CSV或者TXT文件進行讀取。
(1)製做CSV(TXT)數據文本
CSV (TXT)通常是一行存一個樣本(包括樣本值和label),用逗號隔開。用python的普通文本寫入便可。
(2)讀取的時候tf.TextLineReader 類來每次讀取一行,並使用tf.decode_csv來對每一行進行解析。
這裏主要介紹一下:
tf.decode_csv(records, record_defaults, field_delim=None, name=None)
col= tf.decode_csv(records, record_defaults=[ [ ]*100 ], field_delim=' ', name=None)
返回的col是長度爲100的list。
須要注意的是,當數據量比較大的時候,存成CSV或TXT文件要比BIN文件大的多,所以在TF中讀取的速度也會慢不少。所以儘可能不要讀取大的CSV的方式來輸入。
在後面咱們會詳細講解如何從CSV文件中讀取數據,並有具體的案例。
(1)製做數據路徑文件
一行一例,每例包括該樣本的地址和label,用逗號分割開,用python普通文件寫入便可
(2)讀取
不少狀況下咱們的圖片訓練集就是原始圖片自己,並無像cifar dataset那樣存成bin等格式。所以咱們須要根據一個train_list列表,去挨個讀取圖片。這裏我用到的方法是首先將train_list.txt中的image list(也就是每一行有圖片的路徑和label組成)讀入隊列中,那麼對每次dequeue的內容中能夠提取當前圖片的路徑和label。
filename = os.path.join(data_dir, trainfilename) with open(filename) as fid: content = fid.read() content = content.split('\n') content = content[:-1] valuequeue = tf.train.string_input_producer(content,shuffle=True) value = valuequeue.dequeue() dir, labels = tf.decode_csv(records=value, record_defaults=[["string"], [""]], field_delim=" ") labels = tf.string_to_number(labels, tf.int32) imagecontent = tf.read_file(dir) image = tf.image.decode_png(imagecontent, channels=3, dtype=tf.uint8) image = tf.cast(image, tf.float32) #將圖片統一爲32*32大小的 image = tf.image.resize_images(image,[32,32]) image = tf.reshape(image,[result.depth, result.height, result.width]) # Convert from [depth, height, width] to [height, width, depth]. result.uint8image = tf.transpose(image, [1, 2, 0])
不過這個方法對電腦輸入輸出要求比較高,若是機械硬盤有壞道,就會報Input/Output error,出現這種狀況,要修復機械硬盤壞道。
TFrecord是Tensorflow推薦的數據集格式,與Tensorflow框架緊密結合。在TensorFlow中提供了一系列接口能夠訪問TFRecord格式。後面會詳細介紹如何將原始圖片文件轉換爲TFRecord格式,而後在運行中經過多線程的方式來讀取。
在TensorFlow中用Queue的經典模式有兩種,都是配合了QueueRunner和Coordinator一塊兒使用的。
這裏先介紹第一種方法,顯式的建立QueueRunner,而後調用它的create_threads
方法啓動線程。例以下面這段代碼:
''' 配合使用 ''' import numpy as np # 1000個4維輸入向量,每一個數取值爲1-10之間的隨機數 data = 10 * np.random.randn(1000, 4) + 1 # 1000個隨機的目標值,值爲0或1 target = np.random.randint(0, 2, size=1000) # 建立Queue,隊列中每一項包含一個輸入數據和相應的目標值 queue = tf.FIFOQueue(capacity=50, dtypes=[tf.float32, tf.int32], shapes=[[4], []]) # 批量入列數據(這是一個Operation) enqueue_op = queue.enqueue_many([data, target]) # 出列數據(這是一個Tensor定義) data_sample, label_sample = queue.dequeue() # 建立包含4個線程的QueueRunner qr = tf.train.QueueRunner(queue, [enqueue_op] * 4) with tf.Session() as sess: # 建立Coordinator coord = tf.train.Coordinator() # 啓動QueueRunner管理的線程 enqueue_threads = qr.create_threads(sess, coord=coord, start=True) # 主線程,消費100個數據 for step in range(100): if coord.should_stop(): break data_batch, label_batch = sess.run([data_sample, label_sample]) # 主線程計算完成,中止全部採集數據的進程 coord.request_stop() coord.join(enqueue_threads)
這一小節咱們會使用QueueRunner和Coordinator來實現bin文件,以及csv文件、TFRecord格式文件的讀取,不過這裏咱們採用隱式建立線程的方法。在講解具體代碼以前,咱們須要先來說解關於TensorFlow中的文件隊列機制和內存隊列機制。
首先須要思考的一個問題是,什麼是數據讀取?以圖像數據爲例,讀取數據的過程能夠用下圖來表示:
假設咱們的硬盤中有一個圖片數據集0001.jpg,0002.jpg,0003.jpg……咱們只須要把它們讀取到內存中,而後提供給GPU或是CPU進行計算就能夠了。這聽起來很容易,但事實遠沒有那麼簡單。事實上,咱們必需要把數據先讀入後才能進行計算,假設讀入用時0.1s,計算用時0.9s,那麼就意味着每過1s,GPU都會有0.1s無事可作,這就大大下降了運算的效率。
如何解決這個問題?方法就是將讀入數據和計算分別放在兩個線程中,將數據讀入內存的一個隊列,以下圖所示:
讀取線程源源不斷地將文件系統中的圖片讀入到一個內存的隊列中,而負責計算的是另外一個線程,計算須要數據時,直接從內存隊列中取就能夠了。這樣就能夠解決GPU由於IO而空閒的問題!
而在tensorflow中,爲了方便管理,在內存隊列前又添加了一層所謂的「文件名隊列」。
爲何要添加這一層文件名隊列?咱們首先得了解機器學習中的一個概念:epoch。對於一個數據集來說,運行一個epoch就是將這個數據集中的圖片所有計算一遍。如一個數據集中有三張圖片A.jpg、B.jpg、C.jpg,那麼跑一個epoch就是指對A、B、C三張圖片都計算了一遍。兩個epoch就是指先對A、B、C各計算一遍,而後再所有計算一遍,也就是說每張圖片都計算了兩遍。
tensorflow使用文件名隊列+內存隊列雙隊列的形式讀入文件,能夠很好地管理epoch。下面咱們用圖片的形式來講明這個機制的運行方式。以下圖,仍是以數據集A.jpg, B.jpg, C.jpg爲例,假定咱們要跑一個epoch,那麼咱們就在文件名隊列中把A、B、C各放入一次,並在以後標註隊列結束。
程序運行後,內存隊列首先讀入A(此時A從文件名隊列中出隊):
再依次讀入B和C:
此時,若是再嘗試讀入,系統因爲檢測到了「結束」,就會自動拋出一個異常(OutOfRange)。外部捕捉到這個異常後就能夠結束程序了。這就是tensorflow中讀取數據的基本機制。若是咱們要跑2個epoch而不是1個epoch,那隻要在文件名隊列中將A、B、C依次放入兩次再標記結束就能夠了。
典型的文件數據讀取會包含下面這些步驟:
可使用字符串張量(好比["file0", "file1"]
, [("file%d" % i) for i in range(2)]
, [("file%d" % i) for i in range(2)]
) 或者tf.train.match_filenames_once
()函數來產生文件名列表。
filenames = [os.path.join(data_dir, 'data_batch_%d.bin' % i) for i in xrange(1, 6)]
對於文件名隊列,咱們使用tf.train.string_input_producer()函數。這個函數須要傳入一個文件名list,系統會自動將它轉爲一個先入先出的文件名隊列, 文件閱讀器會須要它來讀取數據。
# 同時打開多個文件,顯示建立Queue,同時隱含了QueueRunner的建立 filename_queue = tf.train.string_input_producer(filenames)
tf.train.string_input_producer還有兩個重要的參數,一個是num_epochs,它就是咱們上文中提到的epoch數。另一個就是shuffle,shuffle是指在一個epoch內文件的順序是否被打亂。若設置shuffle=False,以下圖,每一個epoch內,數據仍是按照A、B、C的順序進入文件名隊列,這個順序不會改變:
若是設置shuffle=True,那麼在一個epoch內,數據的先後順序就會被打亂,以下圖所示:
在tensorflow中,內存隊列不須要咱們本身創建,咱們只須要使用reader對象從文件名隊列中讀取數據就能夠了。
根據你的文件格式, 選擇對應的文件閱讀器, 而後將文件名隊列提供給閱讀器的read()
方法。閱讀器的read()
方法會輸出一個key來表徵輸入的文件和其中的紀錄(對於調試很是有用),同時獲得一個字符串標量, 這個字符串標量能夠被一個或多個解析器,或者轉換操做將其解碼爲張量而且構形成爲樣本。
CSV 文件讀取
從CSV文件中讀取數據, 須要使用TextLineReader()
和decode_csv()
操做, 以下面的例子所示:咱們須要從iris_0.csv和iris_1.csv文件讀取數據,iris_0.csv和iris_1.csv文件數據是從iris數據集中選取的部分數據,內容以下:總共有21條記錄。
# 同時打開多個文件(文件格式必須同樣),隱式示建立Queue,同時隱含了QueueRunner的建立 filename_queue = tf.train.string_input_producer(["iris_0.csv","iris_1.csv"]) reader = tf.TextLineReader() # Tensorflow的Reader對象能夠直接接受一個Queue做爲輸入 每次的執行都會從文件中讀取一行內容 key, value = reader.read(filename_queue) # 若是某一列爲空,指定默認值,同時指定了默認列的類型 record_defaults = [[0.0], [0.0], [0.0], [0.0], [0]] # 操做會解析讀取的一行內容並將其轉爲張量列表 col1, col2, col3, col4, col5 = tf.decode_csv(value, record_defaults=record_defaults) features = [col1, col2, col3, col4] #獲取一行數據 #row = tf.decode_csv(value, record_defaults=record_defaults) with tf.Session() as sess: coord = tf.train.Coordinator() # 啓動計算圖中全部的隊列線程 調用來將文件名填充到隊列,不然操做會被阻塞到文件名隊列中有值爲止。 threads = tf.train.start_queue_runners(coord=coord) # 主線程,消費50個數據 for _ in range(50): example, label = sess.run([features, col5]) print('Step {0} {1} {2}'.format(_,example,label)) # 主線程計算完成,中止全部採集數據的進程 coord.request_stop()
# 指定等待某個線程結束 coord.join(threads)readdecode_csvtf.train.start_queue_runnersread
tf.train.string_input_produecer()
會將一個隱含的QueueRunner添加到全局圖中(相似的操做還有tf.train.shuffle_batch()
等)。因爲沒有顯式地返回QueueRunner()調用create_threads()啓動線程,這裏使用了tf.train.start_queue_runners()
方法直接啓動tf.GraphKeys.QUEUE_RUNNERS
集合中的全部隊列線程。初學者會常常在代碼中看到tf.train.start_queue_runners()這個函數,但每每很難理解它的用處,在這裏,有了上面的鋪墊後,咱們就能夠解釋這個函數的做用了。在咱們使用tf.train.string_input_producer建立文件名隊列後,整個系統其實仍是處於「停滯狀態」的,也就是說,咱們文件名並無真正被加入到隊列中(以下圖所示)。此時若是咱們開始計算,由於內存隊列中什麼也沒有,計算單元就會一直等待,致使整個系統被阻塞。
而使用tf.train.start_queue_runners()以後,纔會啓動填充隊列的線程,這時系統就再也不「停滯」。此後計算單元就能夠拿到數據並進行計算,整個程序也就跑起來了,這就是函數tf.train.start_queue_runners的用處。
每次read()
的執行都會從文件中讀取一行內容, decode_csv()
操做會解析這一行內容並將其轉爲張量列表。若是輸入的參數有缺失,record_default
參數能夠根據張量的類型來設置默認值。
從二進制bin文件中讀取固定長度紀錄, 可使用tf.FixedLengthRecordReader
的tf.decode_raw
操做。decode_raw
操做能夠將一個字符串轉換爲一個uint8的張量。
舉例來講,the CIFAR-10 dataset的文件格式定義是:每條記錄的長度都是固定的,一個字節的標籤,後面是3072字節的圖像數據。uint8的張量的標準操做就能夠從中獲取圖像片而且根據須要進行重組。 例子代碼能夠在tensorflow/models/image/cifar10/cifar10_input.py
找到。
def read_cifar10(filename_queue): """Reads and parses examples from CIFAR10 data files. Recommendation: if you want N-way read parallelism, call this function N times. This will give you N independent Readers reading different files & positions within those files, which will give better mixing of examples. Args: filename_queue: A queue of strings with the filenames to read from. Returns: An object representing a single example, with the following fields: height: number of rows in the result (32) width: number of columns in the result (32) depth: number of color channels in the result (3) key: a scalar string Tensor describing the filename & record number for this example. label: an int32 Tensor with the label in the range 0..9. uint8image: a [height, width, depth] uint8 Tensor with the image data """ class CIFAR10Record(object): pass result = CIFAR10Record() # Dimensions of the images in the CIFAR-10 dataset. # See http://www.cs.toronto.edu/~kriz/cifar.html for a description of the # input format. label_bytes = 1 # 2 for CIFAR-100 result.height = 32 result.width = 32 result.depth = 3 image_bytes = result.height * result.width * result.depth # Every record consists of a label followed by the image, with a # fixed number of bytes for each. record_bytes = label_bytes + image_bytes # Read a record, getting filenames from the filename_queue. No # header or footer in the CIFAR-10 format, so we leave header_bytes # and footer_bytes at their default of 0. reader = tf.FixedLengthRecordReader(record_bytes=record_bytes) result.key, value = reader.read(filename_queue) # Convert from a string to a vector of uint8 that is record_bytes long. record_bytes = tf.decode_raw(value, tf.uint8) # The first bytes represent the label, which we convert from uint8->int32. result.label = tf.cast( tf.strided_slice(record_bytes, [0], [label_bytes]), tf.int32) # The remaining bytes after the label represent the image, which we reshape # from [depth * height * width] to [depth, height, width]. depth_major = tf.reshape( tf.strided_slice(record_bytes, [label_bytes], [label_bytes + image_bytes]), [result.depth, result.height, result.width]) # Convert from [depth, height, width] to [height, width, depth]. result.uint8image = tf.transpose(depth_major, [1, 2, 0]) return result
read_cifar10()函數須要傳入一個文件名隊列,這個函數主要作了如下事情:
read()
的執行都會從文件中讀取一行內容,decode_raw()
操做會解析這一行內容並將其轉爲張量。你能夠對輸入的樣本進行任意的預處理, 這些預處理不依賴於訓練參數,好比上面read_cifar10()的函數,獲取一張圖像數據張量後,咱們能夠對圖像進行處理,好比裁切,水平翻轉,以及對圖片進行歸一化處理等等。咱們能夠在tensorflow/models/image/cifar10/cifar10.py
找到數據歸一化, 提取隨機數據片,增長噪聲或失真等等預處理的例子。
在數據輸入管道的末端, 咱們須要有內存隊列來執行輸入樣本的批量讀取。咱們使用tf.train.shuffle_batch()
函數來對內存隊列中的樣本進行亂序處理。
咱們用一個具體的例子來演示一下tf.train.shuffle_batch()函數的使用。如圖,假設咱們在當前文件夾中已經有A.、B.、C三個子文件夾。並在每一個文件夾下下面放置對應的圖片。
針對這些文件咱們須要作下面幾步處理:
''' shuffle_batch()的使用 ''' import os import cv2 def write_binary(): ''' 將默認路徑下的全部圖片存爲TFRecord格式 保存到文件data.tfrecord中 ''' #獲取當前工做目錄 cwd = os.getcwd() #當前目錄下的子目錄 一共有12張圖片 classes=['A','B','C'] #建立對象 用於向記錄文件寫入記錄 writer = tf.python_io.TFRecordWriter('data.tfrecord') #遍歷每個子文件夾 for index, name in enumerate(classes): class_path = os.path.join(cwd,name) #遍歷子目錄下的每個文件 for img_name in os.listdir(class_path): #每個圖片全路徑 img_path = os.path.join(class_path , img_name) #讀取圖像 img = cv2.imread(img_path) #縮放 img1 = cv2.resize(img,(250,250)) #將圖片轉化爲原生bytes img_raw = img1.tobytes() #將數據整理成 TFRecord 須要的數據結構 example = tf.train.Example(features=tf.train.Features(feature={ 'img_raw': tf.train.Feature(bytes_list=tf.train.BytesList(value=[img_raw])), "label": tf.train.Feature(int64_list=tf.train.Int64List(value=[index]))})) #序列化 serialized = example.SerializeToString() #寫入文件 writer.write(serialized) writer.close() def read_and_decode(filename): ''' 讀取TFRecord格式格式文件,返回讀取到的一張圖像以及對應的標籤 args: filename:TFRecord格式文件路徑 ''' #建立文件隊列,不限讀取的數量 filename_queue = tf.train.string_input_producer([filename],shuffle=False) #建立一個文件讀取器 從隊列文件中讀取數據 reader = tf.TFRecordReader() #reader從 TFRecord 讀取內容並保存到 serialized_example中 _, serialized_example = reader.read(filename_queue) # 讀取serialized_example的格式 features = tf.parse_single_example( serialized_example, features={ 'label': tf.FixedLenFeature([], tf.int64), 'img_raw': tf.FixedLenFeature([], tf.string) } ) # 解析從 serialized_example 讀取到的內容 img=tf.decode_raw(features['img_raw'],tf.uint8) img = tf.reshape(img, [250, 250, 3]) label = tf.cast(features['label'], tf.int32) return img,label #將默認路徑下的全部圖片存爲TFRecord格式 保存到文件data.tfrecord中 write_binary() #讀取TFRecord格式格式文件,返回讀取到的一張圖像以及對應的標籤 img,label = read_and_decode('data.tfrecord') ''' 讀取批量數據 這裏設置batch_size=12,即一次從內存隊列中隨機讀取12張圖片,這讀取到的圖片可能有重複的, 這主要是由於設置內存隊列最小元素個數爲100,最大元素個數爲2000,而咱們總共只有12張圖片,因此隊列中有許多重複的圖片 ''' img_batch, label_batch = tf.train.shuffle_batch([img,label], batch_size=12, capacity=2000, min_after_dequeue=100, num_threads=2) with tf.Session() as sess: sess.run(tf.global_variables_initializer()) #建立一個協調器,管理線程 coord = tf.train.Coordinator() #啓動QueueRunner, 此時文件名纔開始進隊。 threads=tf.train.start_queue_runners(sess=sess,coord=coord) img, label = sess.run([img_batch, label_batch]) for i in range(12): cv2.imwrite('%d_%d_p.jpg'%(i,label[i]),img[i]) #終止線程 coord.request_stop() coord.join(threads)
運行後:
咱們在讀取TFRecord文件時,一次讀取12張圖片,這主要是由於咱們設置batch_size=12,而且咱們能夠看到讀取到的12張圖片是隨機,這裏出現了重複的。
咱們再用一個具體的例子感覺tensorflow中的數據讀取。如圖,假設咱們在當前文件夾中已經有A.jpg、B.jpg、C.jpg三張圖片,咱們但願讀取這三張圖片5個epoch而且把讀取的結果從新存到read文件夾中。
''' 測試 ''' tf.reset_default_graph() # 新建一個Session with tf.Session() as sess: # 咱們要讀三幅圖片A.jpg, B.jpg, C.jpg filename = ['A.jpg', 'B.jpg', 'C.jpg'] # string_input_producer會產生一個文件名隊列 filename_queue = tf.train.string_input_producer(filename, shuffle=True, num_epochs=5) # reader從文件名隊列中讀數據。對應的方法是reader.read reader = tf.WholeFileReader() key, value = reader.read(filename_queue) # tf.train.string_input_producer定義了一個epoch變量,要對它進行初始化 tf.local_variables_initializer().run() # 使用start_queue_runners以後,纔會開始填充隊列 threads = tf.train.start_queue_runners(sess=sess) for i in range(15): # 獲取圖片數據並保存 image_data = sess.run(value) with open('read/test_%d.jpg' % (i+1), 'wb') as f: f.write(image_data)
咱們這裏使用filename_queue = tf.train.string_input_producer(filename, shuffle=False, num_epochs=5)創建了一個會跑5個epoch的文件名隊列。並使用reader讀取,reader每次讀取一張圖片並保存。
運行代碼後,咱們獲得就能夠看到read文件夾中的圖片,正好是按順序的5個epoch:
若是咱們設置filename_queue = tf.train.string_input_producer(filename, shuffle=False, num_epochs=5)中的shuffle=True,那麼在每一個epoch內圖像就會被打亂,如圖所示:
咱們這裏只是用三張圖片舉例,實際應用中一個數據集確定不止3張圖片,不過涉及到的原理都是共通的。
完整代碼:
# -*- coding: utf-8 -*- """ Created on Wed May 2 20:39:25 2018 @author: zy """ import tensorflow as tf ''' TensorFlow中隊列的使用 ''' ''' 下面是一個單獨使用Queue的例子: ''' #建立的圖:一個先入先出隊列,以及初始化,出隊,+1,入隊操做 q = tf.FIFOQueue(3, "float") init = q.enqueue_many(([0.1, 0.2, 0.3],)) x = q.dequeue() y = x + 1 q_inc = q.enqueue([y]) #開啓一個session,session是會話,會話的潛在含義是狀態保持,各類tensor的狀態保持 with tf.Session() as sess: sess.run(init) for i in range(2): sess.run(q_inc) quelen = sess.run(q.size()) for i in range(quelen): print (sess.run(q.dequeue())) ''' QueueRunner()的使用 ''' q = tf.FIFOQueue(10, "float") counter = tf.Variable(0.0) #計數器 # 給計數器加一 increment_op = tf.assign_add(counter, 1.0) # 將計數器加入隊列 enqueue_op = q.enqueue(counter) # 建立QueueRunner # 用多個線程向隊列添加數據 # 這裏實際建立了4個線程,兩個增長計數,兩個執行入隊 qr = tf.train.QueueRunner(q, enqueue_ops=[increment_op, enqueue_op] * 2) ''' #主線程 with tf.Session() as sess: sess.run(tf.initialize_all_variables()) #啓動入隊線程 enqueue_threads = qr.create_threads(sess, start=True) #主線程 for i in range(10): print (sess.run(q.dequeue())) ''' # 主線程 sess = tf.Session() sess.run(tf.initialize_all_variables()) # 啓動入隊線程 enqueue_threads = qr.create_threads(sess, start=True) # 主線程 for i in range(0, 10): print(sess.run(q.dequeue())) ''' Coordinator ''' import threading, time # 子線程函數 def loop(coord, id): t = 0 while not coord.should_stop(): print(id) time.sleep(1) t += 1 # 只有1號線程調用request_stop方法 if (t >= 2 and id == 0): coord.request_stop() # 主線程 coord = tf.train.Coordinator() # 使用Python API建立10個線程 threads = [threading.Thread(target=loop, args=(coord, i)) for i in range(10)] # 啓動全部線程,並等待線程結束 for t in threads: t.start() coord.join(threads) ''' QueueRunner和Coordinator結合方式一 ''' ''' import numpy as np # 1000個4維輸入向量,每一個數取值爲1-10之間的隨機數 data = 10 * np.random.randn(1000, 4) + 1 # 1000個隨機的目標值,值爲0或1 target = np.random.randint(0, 2, size=1000) # 建立Queue,隊列中每一項包含一個輸入數據和相應的目標值 queue = tf.FIFOQueue(capacity=50, dtypes=[tf.float32, tf.int32], shapes=[[4], []]) # 批量入列數據(這是一個Operation) enqueue_op = queue.enqueue_many([data, target]) # 出列數據(這是一個Tensor定義) data_sample, label_sample = queue.dequeue() # 建立包含4個線程的QueueRunner qr = tf.train.QueueRunner(queue, [enqueue_op] * 4) with tf.Session() as sess: # 建立Coordinator coord = tf.train.Coordinator() # 啓動QueueRunner管理的線程 enqueue_threads = qr.create_threads(sess, coord=coord, start=True) # 主線程,消費100個數據 for step in range(100): if coord.should_stop(): break data_batch, label_batch = sess.run([data_sample, label_sample]) # 主線程計算完成,中止全部採集數據的進程 coord.request_stop() coord.join(enqueue_threads) ''' ''' QueueRunner和Coordinator結合的數據讀取機制 讀取CSV文件 ''' tf.reset_default_graph() # 同時打開多個文件(文件格式必須同樣),隱式建立Queue,同時隱含了QueueRunner的建立 filename_queue = tf.train.string_input_producer(["iris_0.csv","iris_1.csv"]) reader = tf.TextLineReader() # Tensorflow的Reader對象能夠直接接受一個Queue做爲輸入 每次read的執行都會從文件中讀取一行內容 key, value = reader.read(filename_queue) # 若是某一列爲空,指定默認值,同時指定了默認列的類型 record_defaults = [[0.0], [0.0], [0.0], [0.0], [0]] # decode_csv 操做會解析讀取的一行內容並將其轉爲張量列表 col1, col2, col3, col4, col5 = tf.decode_csv(value, record_defaults=record_defaults) features = [col1, col2, col3, col4] #獲取一行數據 #row = tf.decode_csv(value, record_defaults=record_defaults) with tf.Session() as sess: coord = tf.train.Coordinator() # 啓動計算圖中全部的隊列線程 調用tf.train.start_queue_runners來將文件名填充到隊列,不然read操做會被阻塞到文件名隊列中有值爲止。 threads = tf.train.start_queue_runners(coord=coord) # 主線程,消費50個數據 for _ in range(50): example, label = sess.run([features, col5]) print('Step {0} {1} {2}'.format(_,example,label)) # 主線程計算完成,中止全部採集數據的進程 coord.request_stop() #指定等待某個線程結束 coord.join(threads) ''' shuffle_batch()的使用 ''' import os import cv2 def write_binary(): ''' 將默認路徑下的全部圖片存爲TFRecord格式 保存到文件data.tfrecord中 ''' #獲取當前工做目錄 cwd = os.getcwd() #當前目錄下的子目錄 一共有12張圖片 classes=['A','B','C'] #建立對象 用於向記錄文件寫入記錄 writer = tf.python_io.TFRecordWriter('data.tfrecord') #遍歷每個子文件夾 for index, name in enumerate(classes): class_path = os.path.join(cwd,name) #遍歷子目錄下的每個文件 for img_name in os.listdir(class_path): #每個圖片全路徑 img_path = os.path.join(class_path , img_name) #讀取圖像 img = cv2.imread(img_path) #縮放 img1 = cv2.resize(img,(250,250)) #將圖片轉化爲原生bytes img_raw = img1.tobytes() #將數據整理成 TFRecord 須要的數據結構 example = tf.train.Example(features=tf.train.Features(feature={ 'img_raw': tf.train.Feature(bytes_list=tf.train.BytesList(value=[img_raw])), "label": tf.train.Feature(int64_list=tf.train.Int64List(value=[index]))})) #序列化 serialized = example.SerializeToString() #寫入文件 writer.write(serialized) writer.close() def read_and_decode(filename): ''' 讀取TFRecord格式格式文件,返回讀取到的一張圖像以及對應的標籤 args: filename:TFRecord格式文件路徑 ''' #建立文件隊列,不限讀取的數量 filename_queue = tf.train.string_input_producer([filename],shuffle=False) #建立一個文件讀取器 從隊列文件中讀取數據 reader = tf.TFRecordReader() #reader從 TFRecord 讀取內容並保存到 serialized_example中 _, serialized_example = reader.read(filename_queue) # 讀取serialized_example的格式 features = tf.parse_single_example( serialized_example, features={ 'label': tf.FixedLenFeature([], tf.int64), 'img_raw': tf.FixedLenFeature([], tf.string) } ) # 解析從 serialized_example 讀取到的內容 img=tf.decode_raw(features['img_raw'],tf.uint8) img = tf.reshape(img, [250, 250, 3]) label = tf.cast(features['label'], tf.int32) return img,label tf.reset_default_graph() #將默認路徑下的全部圖片存爲TFRecord格式 保存到文件data.tfrecord中 write_binary() #讀取TFRecord格式格式文件,返回讀取到的一張圖像以及對應的標籤 img,label = read_and_decode('data.tfrecord') ''' 讀取批量數據 這裏設置batch_size=12,即一次從內存隊列中隨機讀取12張圖片,這讀取到的圖片可能有重複的, 這主要是由於設置內存隊列最小元素個數爲100,最大元素個數爲2000,而咱們總共只有12張圖片,因此隊列中有許多重複的圖片 ''' img_batch, label_batch = tf.train.shuffle_batch([img,label], batch_size=12, capacity=2000, min_after_dequeue=100, num_threads=2) with tf.Session() as sess: sess.run(tf.global_variables_initializer()) #建立一個協調器,管理線程 coord = tf.train.Coordinator() #啓動QueueRunner, 此時文件名纔開始進隊。 threads=tf.train.start_queue_runners(sess=sess,coord=coord) img, label = sess.run([img_batch, label_batch]) for i in range(12): cv2.imwrite('%d_%d_p.jpg'%(i,label[i]),img[i]) #終止線程 coord.request_stop() coord.join(threads) ''' 測試 ''' tf.reset_default_graph() # 新建一個Session with tf.Session() as sess: # 咱們要讀三幅圖片A.jpg, B.jpg, C.jpg filename = ['A.jpg', 'B.jpg', 'C.jpg'] # string_input_producer會產生一個文件名隊列 filename_queue = tf.train.string_input_producer(filename, shuffle=True, num_epochs=5) # reader從文件名隊列中讀數據。對應的方法是reader.read reader = tf.WholeFileReader() key, value = reader.read(filename_queue) # tf.train.string_input_producer定義了一個epoch變量,要對它進行初始化 tf.local_variables_initializer().run() # 使用start_queue_runners以後,纔會開始填充隊列 threads = tf.train.start_queue_runners(sess=sess) for i in range(15): # 獲取圖片數據並保存 image_data = sess.run(value) with open('read/test_%d.jpg' % (i+1), 'wb') as f: f.write(image_data)
假如咱們如今須要對貓和狗的圖片進行分類,咱們已經收集了許多貓和狗的圖片,首先咱們須要創建一個文件夾命名爲data,在該文件夾下面建立兩個子文件夾train,test分別用於保存測試集和訓練集圖片,而後還須要在每一個文件夾下面建立兩個文件夾,分別命名cat和dog,用來存放對應類別的圖片。
有了這些圖片以後,咱們想每次讀取指定batch_size大小得數據樣本,而且這些樣本是打亂的。
代碼以下:
# -*- coding: utf-8 -*- """ Created on Thu May 10 16:48:17 2018 @author: zy """ ''' 如何將給定的數據原始圖片以及標籤保存成TFRecord格式的文件 並使用隊列每次讀取batch_size大小樣本集 ''' import tensorflow as tf import os import cv2 import random import numpy as np #隨機種子,使得每次運行結果同樣 random.seed(0) def get_files(dirpath): ''' 獲取文件相對路徑和標籤(非one_hot) 返回一個元組 args: dirpath:數據所在的目錄 記作父目錄 假設有10類數據,則父目錄下有10個子目錄,每一個子目錄存放着對應的圖片 ''' #保存讀取到的的文件和標籤 image_list = [] label_list = [] #遍歷子目錄 classes = [x for x in os.listdir(dirpath) if os.path.isdir(dirpath)] #遍歷每個子文件夾 for index, name in enumerate(classes): #子文件夾路徑 class_path = os.path.join(dirpath,name) #遍歷子目錄下的每個文件 for img_name in os.listdir(class_path): #每個圖片全路徑 img_path = os.path.join(class_path , img_name) #追加 image_list.append(img_path) label_list.append(index) #保存打亂後的文件和標籤 images = [] labels = [] # 打亂文件順序 連續打亂兩次 indices = list(range(len(image_list))) random.shuffle(indices) for i in indices: images.append(image_list[i]) labels.append(label_list[i]) random.shuffle([images,labels]) print('樣本長度爲:',len(images)) #print(images[0:10],labels[0:10]) return images, labels ''' 生成數據的格式 先生成 TFRecord 格式的樣例數據,Example 的結構以下,表示第1個文件中的第1個數據 { 'i':0, 'j':0 } ''' def WriteTFRecord(dirpath,dstpath='.',train_data=True,IMAGE_HEIGHT=227,IMAGE_WIDTH=227): ''' 把指定目錄下的數據寫入同一個TFRecord格式文件中 args: dirpath:數據所在的目錄 記作父目錄 假設有10類數據,則父目錄下有10個子目錄,每一個子目錄存放着對應的圖片 dstpath:保存TFRecord文件的目錄 train_data:表示傳入的是否是訓練集文件所在路徑 IMAGE_HEIGHT:保存的圖片數據高度 IMAGE_WIDTH:保存的圖片數據寬度 ''' if not os.path.isdir(dstpath): os.mkdir(dstpath) #獲取全部數據文件路徑,以及對應標籤 image_list, label_list = get_files(dirpath) #把海量數據寫入多個TFrecord文件 length_per_shard = 10000 #每一個記錄文件的樣本長度 num_shards = int(np.ceil(len(image_list) / length_per_shard)) print('記錄文件個數:',num_shards) ''' 當全部數類別圖片都在一個文件夾下面時,能夠將數據寫入不一樣的文件 可是若是同一類別的圖片放在相同的文件下,就不能夠將數據寫入不一樣的文件 這主要是由於後者保存的TFRecord文件中都是同一類別,而隊列取數據時,是從一個文件讀取完,纔會讀取另外一個文件, 這樣會致使一次讀取的batch_size圖像都是同一類別,對訓練不利 所以咱們必須想個辦法讓一個TFRecord格式的文件包含各類類別的圖片,而且順序是打亂的 ''' #依次寫入每個TFRecord文件 for index in range(num_shards): #按0000n-of-0000m的後綴區分文件。n表明當前文件標號,沒表明文件總數 if train_data: filename = os.path.join(dstpath,'train_data.tfrecord-%.5d-of-%.5d'%(index,num_shards)) else: filename = os.path.join(dstpath,'test_data.tfrecord-%.5d-of-%.5d'%(index,num_shards)) print(filename) #建立對象 用於向記錄文件寫入記錄 writer = tf.python_io.TFRecordWriter(filename) #起始索引 idx_start = index*length_per_shard #結束索引 idx_end = np.min([(index+1)*length_per_shard - 1,len(image_list)]) #遍歷子目錄下的每個文件 for img_path,label in zip(image_list[idx_start:idx_end], label_list[idx_start:idx_end]): #讀取圖像 img = cv2.imread(img_path) ''' 在這裏能夠對圖片進行處理,也能夠擴大數據集,或者歸一化輸入等待,不過我在這裏不對原始圖片進行其它處理,只是把圖片大小設置爲固定的 ''' #縮放 img = cv2.resize(img,(IMAGE_HEIGHT,IMAGE_WIDTH)) #將圖片轉化爲原生bytes image = img.tobytes() #將數據整理成 TFRecord 須要的數據結構 example = tf.train.Example(features=tf.train.Features(feature={ 'image': tf.train.Feature(bytes_list=tf.train.BytesList(value=[image])), "label": tf.train.Feature(int64_list=tf.train.Int64List(value=[label]))})) #序列化 serialized = example.SerializeToString() #寫入文件 writer.write(serialized) writer.close() def read_and_decode(filename,num_epochs = None,IMAGE_HEIGHT=227,IMAGE_WIDTH=227): ''' 讀取TFRecord格式格式文件,返回讀取到的一張圖像以及對應的標籤 args: filename:TFRecord格式文件路徑 list列表 num_epochs:每一個數據集文件迭代輪數 IMAGE_HEIGHT:保存的圖片數據高度 IMAGE_WIDTH:保存的圖片數據寬度 ''' ''' 建立文件隊列,經過設置 shuffle 參數爲 True,將文件的入隊順序打亂,因此出隊順序是隨機的。隨機打亂文件順序和入隊操做 會跑在一個單獨的線程上,不會影響出隊的速度. 當輸入隊列中的全部文件都處理完後,它會將文件列表中的文件從新加入隊列。能夠經過設置 num_epochs 參數來限制加載初始 文件列表的最大輪數 ''' filename_queue = tf.train.string_input_producer(filename,shuffle=False,num_epochs = num_epochs) #建立一個文件讀取器 從隊列文件中讀取數據 reader = tf.TFRecordReader() #reader從 TFRecord 讀取內容並保存到 serialized_example中 _, serialized_example = reader.read(filename_queue) # 讀取serialized_example的格式 features = tf.parse_single_example( serialized_example, features={ 'image': tf.FixedLenFeature([], tf.string), 'label': tf.FixedLenFeature([], tf.int64) } ) # 解析從 serialized_example 讀取到的內容 img = tf.decode_raw(features['image'],tf.uint8) img = tf.reshape(img, [IMAGE_HEIGHT, IMAGE_WIDTH, 3]) ''' 在這裏能夠對讀取到的圖片數據進行預處理,好比歸一化輸入,PCA處理等,可是不能夠增長數據 ''' label = tf.cast(features['label'], tf.int32) return img,label def input_data(filenames,num_epochs=None,batch_size=256, capacity=4096, min_after_dequeue=1024, num_threads=10): ''' 讀取小批量batch_size數據 args: filenames:TFRecord文件路徑組成的list num_epochs:每一個數據集文件迭代輪數 batch_size:小批量數據大小 capacity:內存隊列元素最大個數 min_after_dequeue:內存隊列元素最小個數 num_threads:線城數 ''' ''' 讀取批量數據 這裏設置batch_size,即一次從內存隊列中隨機讀取batch_size張圖片,這裏設置內存隊列最小元素個數爲1024,最大元素個數爲4096 shuffle_batch 函數會將數據順序打亂 bacth 函數不會將數據順序打亂 ''' img,label = read_and_decode(filenames,num_epochs) images_batch, labels_batch = tf.train.shuffle_batch([img,label], batch_size=batch_size, capacity=capacity, min_after_dequeue=batch_size*5, num_threads=num_threads) return images_batch,labels_batch def file_match(s,root='.'): ''' 尋找指定目錄下(不包含子目錄)中的文件名含有指定字符串的文件,並打印出其相對路徑 args: s:要匹配的字符串 root : 指定要搜索的目錄 return:返回符合條件的文件列表 ''' #用來保存目錄 dirs=[] #用來保存匹配字符串的文件 matchs=[] for current_name in os.listdir(root): add_root_name = os.path.join(root,current_name) if os.path.isdir(add_root_name): dirs.append(add_root_name) elif os.path.isfile(add_root_name) and s in add_root_name: matchs.append(add_root_name) ''' #這裏用來遞歸搜索子目錄的 for dir in dirs: file_match(s,dir) ''' return matchs ''' 測試 ''' if __name__ == '__main__': #訓練集數據所在的目錄 dirpath = './data/train' training_step = 1 ''' 判斷訓練測試集TFRecord格式文件是否存在,不存在則生成 若是存在,直接讀取 ''' # 獲取當前目錄下包含指定字符串的文件列表 files = file_match('train_data.tfrecord') #判斷數據集是否存在 if len(files) == 0: print('開始讀圖片文件並寫入TFRecord格式文件中.........') #將指定路徑下全部圖片存爲TFRecord格式 保存到文件data.tfrecord中 WriteTFRecord(dirpath) print('寫入完畢!\n') #正則表達式匹配 files = tf.train.match_filenames_once('./train_data.tfrecord') #讀取TFRecord格式格式文件,返回讀取到的batch_size圖像以及對應的標籤 images_batch, labels_batch = input_data(files) with tf.Session() as sess: sess.run(tf.global_variables_initializer()) #建立一個協調器,管理線程 coord = tf.train.Coordinator() #啓動QueueRunner, 此時文件名纔開始進隊 threads = tf.train.start_queue_runners(sess=sess,coord=coord) print('開始訓練!\n') for step in range(training_step): img, label = sess.run([images_batch, labels_batch]) print('setp :',step) for i in range(256): cv2.imwrite('%d_%d_p.jpg'%(i,label[i]),img[i]) #終止線程 coord.request_stop() coord.join(threads)
程序運行後,會生成三個TFRecord文件,以下:
而且咱們生成了了一個小批量樣本大小的樣本圖片:
參考文章
[1]Tensorflow讀取數據的4種方式(8)---《深度學習》