tensorflow(二)----線程隊列與io操做

1、隊列和線程 

一、隊列:

  1)、tf.FIFOQueue(capacity, dtypes, name='fifo_queue') 建立一個以先進先出的順序對元素進行排隊的隊列python

    參數:app

      capacity:整數。可能存儲在此隊列中的元素數量的上限dom

      dtypes:DType對象列表。長度dtypes必須等於每一個隊列元 素中的張量數,dtype的類型形狀,決定了後面進隊列元素形狀異步

    方法:函數

      q.dequeue()獲取隊列的數據ui

      q.enqueue(值)將一個數據添加進隊列編碼

      q.enqueue_many(列表或者元組)將多個數據添加進隊列spa

      q.size() 返回隊列的大小線程

  2)、tf.RandomShuffleQueue() 隨機出的隊列設計

二、隊列管理器

  tf.train.QueueRunner(queue, enqueue_ops=None)

  參數:

    queue:A Queue

    enqueue_ops:添加線程的隊列操做列表,[]*2,指定兩個線程

    create_threads(sess, coord=None,start=False) 建立線程來運行給定會話的入隊操做

    start:布爾值,若是True啓動線程;若是爲False調用者 必須調用start()啓動線程

    coord:線程協調器  用於線程的管理

三、線程協調器

  tf.train.Coordinator() 線程協調員,實現一個簡單的機制來協調一 組線程的終止

  方法:    返回的是線程協調實例

    request_stop()  請求中止

    join(threads=None, stop_grace_period_secs=120) 等待線程終止

 

結合隊列、隊列管理器和線程協調器實現異步的小例:

import tensorflow as tf

# 1.建立隊列
Q = tf.FIFOQueue(2000, tf.float32)

# 2.添加數據進隊列  
# 2.1建立一個數據(變量)
var = tf.Variable(0.0, tf.float32)
# 2.2數據自增
plus = tf.assign_add(var, 1)
# 2.3將數據添加進隊列
en_q = Q.enqueue(plus)

# 3.建立隊列管理器
qr = tf.train.QueueRunner(Q, enqueue_ops=[en_q] * 2)

# 4.變量初始化
init = tf.global_variables_initializer()

# 5.建立會話
with tf.Session() as sess:
    # 6.運行初始化
    sess.run(init)

    # 7.建立線程協調器
    coord = tf.train.Coordinator()

    # 8.開啓子線程
    threads = qr.create_threads(sess, coord=coord, start=True)

    # 9.主線程 從隊列中取數據
    for i in range(200):
        print(sess.run(Q.dequeue()))
        
    # 10.線程回收
    coord.request_stop()
    coord.join(threads)

 

2、文件讀取

一、文件讀取流程

二、文件讀取API

  1)文件隊列

  tf.train.string_input_producer(string_tensor, ,shuffle=True) 將輸出字符串(例如文件名)輸入到管道隊列

  參數:   

    string_tensor 含有文件名的1階張量

    num_epochs:過幾遍數據,默認無限過數據

    返回:具備輸出字符串的隊列

  2)文件閱讀器(根據文件格式,選擇對應的文件閱讀器)

    csv文件:  class tf.TextLineReader       默認按行讀取      返回:讀取器實例

    二進制文件:  tf.FixedLengthRecordReader(record_bytes)           record_bytes:整型,指定每次讀取的字節數      返回:讀取器實例

    TfRecords文件:  tf.TFRecordReader     返回:讀取器實例

    以上3個閱讀器有一個相同的方法:

    read(file_queue):從隊列中指定數量內容 返回一個Tensors元組(key, value)  其中key是文件名字,value是默認的內容(行,字節)

  3)文件內容解碼器(因爲從文件中讀取的是字符串,須要函數去解析這些字符串到張量)

    ①tf.decode_csv(records,record_defaults=None,field_delim = None,name = None)   將CSV轉換爲張量,與tf.TextLineReader搭配使用

      參數:

        records:tensor型字符串,每一個字符串是csv中的記錄行

        field_delim:默認分割符」,」

        record_defaults:參數決定了所得張量的類型,並設置一個值在輸入字符串中缺乏使用默認值

    ②tf.decode_raw(bytes,out_type,little_endian = None,name = None)   將字節轉換爲一個數字向量表示,字節爲一字符串類型的張量,與函數tf.FixedLengthRecordReader搭配使用,二進制讀取爲uint8格式

  4)開啓線程操做

    tf.train.start_queue_runners(sess=None,coord=None) 收集全部圖中的隊列線程,並啓動線程 sess:所在的會話中 coord:線程協調器 return:返回全部線程隊列

  5)管道讀端批處理

    ①tf.train.batch(tensors,batch_size,num_threads = 1,capacity = 32,name=None) 讀取指定大小(個數)的張量

     參數:

      tensors:能夠是包含張量的列表

      batch_size:從隊列中讀取的批處理大小

      num_threads:進入隊列的線程數

      capacity:整數,隊列中元素的最大數量

      返回:tensors

    ②tf.train.shuffle_batch(tensors,batch_size,capacity,min_after_dequeue,     num_threads=1,)  亂序讀取指定大小(個數)的張量

      參數:

        min_after_dequeue:留下隊列裏的張量個數,可以保持隨機打亂

三、文件讀取案例

import tensorflow as tf
import os


def csv_read(filelist):
    # 構建文件隊列
    Q = tf.train.string_input_producer(filelist)
    # 構建讀取器
    reader = tf.TextLineReader()
    # 讀取隊列
    key, value = reader.read(Q)
    # 構建解碼器
    x1, y = tf.decode_csv(value, record_defaults=[["None"], ["None"]])
    # 進行管道批處理
    x1_batch, y_batch = tf.train.batch([x1, y], batch_size=12, num_threads=1, capacity=12)
    # 開啓會話
    with tf.Session() as sess:
        # 建立線程協調器
        coord = tf.train.Coordinator()
        # 開啓線程
        threads = tf.train.start_queue_runners(sess, coord=coord)
        # 執行任務
        print(sess.run([x1_batch, y_batch]))
        # 線程回收
        coord.request_stop()
        coord.join(threads)


if __name__ == "__main__":
    filename = os.listdir("./data/")   #  文件目錄本身指定
    filelist = [os.path.join("./data/", file) for file in filename]
    csv_read(filelist)

 

3、圖片讀取與存儲

    1   圖像數字化三要素:長度,寬度,通道數(一通道 : 灰度值    三通道 : RGB)

    2   縮小圖片大小:

      tf.image.resize_images(images, size) 縮小圖片

      目的:

         一、增長圖片數據的統一性

         二、全部圖片轉換成指定大小

         三、縮小圖片數據量,防止增長開銷

    3  圖像讀取API

      1)圖像讀取器

        tf.WholeFileReader 將文件的所有內容做爲值輸出的讀取器

          return:讀取器實例 read(file_queue):輸出將是一個文件名(key)和該文件的內容 (值)

      2)圖像解碼器

        tf.image.decode_jpeg(contents) 將JPEG編碼的圖像解碼爲uint8張量

          return:uint8張量,3-D形狀[height, width, channels]

        tf.image.decode_png(contents) 將PNG編碼的圖像解碼爲uint8或uint16張量

          return:張量類型,3-D形狀[height, width, channels]

圖片讀取案的簡單demo:

import tensorflow as tf
import os

flags = tf.app.flags.FLAGS
tf.app.flags.DEFINE_string("data_home", "./data/dog/", "狗的圖片目錄")   # 文件路徑本身指定

def picread(filelist):
    # 構建文件名隊列
    file_q = tf.train.string_input_producer(filelist)
    # 構建讀取器
    reader = tf.WholeFileReader()
    # 讀取內容
    key, value = reader.read(file_q)
    print(value)
    # 構建解碼器
    image = tf.image.decode_jpeg(value)
    print(image)
    # 統一圖片大小   設置長寬
    resize_image = tf.image.resize_images(image, [256,256])
    print(resize_image)
    # 指定通道大小
    resize_image.set_shape([256,256,3])
    # 構建批量處理管道
    image_batch = tf.train.batch([resize_image], batch_size=100,num_threads=1, capacity=100)

    return image_batch

if __name__ == "__main__":
    filename = os.listdir(flags.data_home)
    filelist = [os.path.join(flags.data_home, file) for file in filename]
    image_batch = picread(filelist)

    with tf.Session() as sess:
        # 構建線程協調器
        coord = tf.train.Coordinator()
        # 開啓線程
        threads = tf.train.start_queue_runners(sess,coord=coord)
        # 訓練數據
        print(sess.run(image_batch))
        # 回收線程
        coord.request_stop()
        coord.join(threads)

 

4、TFRecords分析、存取

  1 概念

   TFRecords是Tensorflow設計的一種內置文件格式,是一種二進制文件, 它能更好的利用內存,更方便複製和移動 (將二進制數據和標籤(訓練的類別標籤)數據存儲在同一個文件中)

  2 TFRecords文件分析

    1)文件格式:*.tfrecords

    2)寫入文件內容:Example協議塊

  3 TFRecords存儲

    1)創建TFRecord存儲器

       tf.python_io.TFRecordWriter(path) 寫入tfrecords文件

      參數: 

        path: TFRecords文件的路徑

      return:無,  執行寫文件操做

      方法:

        write(record):向文件中寫入一個字符串記錄     # 一個序列化的Example,Example.SerializeToString()

        close():關閉文件寫入器

    2)構造每一個樣本的Example協議塊

    tf.train.Example(features=None) 寫入tfrecords文件

      參數:

        features:tf.train.Features類型的特徵實例

        return:example格式協議塊

    tf.train.Features(feature=None) 構建每一個樣本的信息鍵值對

      參數:

         feature:字典數據,key爲要保存的名字

        value爲tf.train.Feature實例

        return:Features類型

    tf.train.Feature(**options)

      參數:

        **options:例如 bytes_list=tf.train. BytesList(value=[Bytes])

               int64_list=tf.train. Int64List(value=[Value])

               float_list = tf.train. FloatList(value=[value])

  4  TFRecords讀取方法

     1)構建文件隊列

        tf.train.string_input_producer(string_tensor, ,shuffle=True)  

     2)構建文件讀取器,讀取隊列的數據

        tf.TFRecordReader     返回:讀取器實例

        read(file_queue)

     3)解析TFRecords的example協議內存塊

        ①tf.parse_single_example(serialized,features=None,name=None) 解析一個單一的Example原型

          參數:

            serialized:標量字符串Tensor,一個序列化的Example

            features:dict字典數據,鍵爲讀取的名字,值爲FixedLenFeature

            return:一個鍵值對組成的字典,鍵爲讀取的名字

        ②tf.FixedLenFeature(shape,dtype)

          參數:

            shape:輸入數據的形狀,通常不指定,爲空列表

             dtype:輸入數據類型,與存儲進文件的類型要一致 類型只能是float32,int64,string

     4)解碼

        tf.decode_raw(bytes,out_type,little_endian = None,name = None)   將字節轉換爲一個數字向量表示,字節爲一字符串類型的張量,與函數tf.FixedLengthRecordReader搭配使用,二進制讀取爲uint8格式

 

如下是從二進制文件中讀取數據,寫入tfrecords文件,再從tfrecords文件讀取的小案例:

import tensorflow as tf
import os

flags = tf.app.flags.FLAGS
tf.app.flags.DEFINE_string("data_home", "./data/cifar10/cifar-10-batches-bin/", "二進制文件目錄") tf.app.flags.DEFINE_string(
"data_tfrecords", "./data/temp/tfrecords", "tfrecords文件路徑") class cifarread(object): def __init__(self, filelist): self.filelist = filelist # 構建圖的一些數據 self.height = 32 self.width = 32 self.channel = 3 self.label_bytes = 1 self.image_bytes = self.height * self.width*self.channel self.bytes = self.label_bytes + self.image_bytes def read_decode(self): """ 讀取二進制文件 :return: image_batch, label_batch """ # 構建文件名隊列 file_q = tf.train.string_input_producer(self.filelist) # 構建閱讀器 reader = tf.FixedLengthRecordReader(record_bytes=self.bytes) # 讀取數據 key, value = reader.read(file_q) # 解碼 label_image = tf.decode_raw(value, tf.uint8) # 分割數據集 label = tf.cast(tf.slice(label_image, [0], [self.label_bytes]), tf.int32) image = tf.slice(label_image, [self.label_bytes], [self.image_bytes]) # 改變形狀 image_tensor = tf.reshape(image, [self.height, self.width, self.channel]) # 批量處理 image_batch, label_batch = tf.train.batch([image_tensor, label], batch_size=10, num_threads=1, capacity=10) return image_batch, label_batch def write2tfrecords(self, image_batch, label_batch): """ 將從二進制文件中讀取的內容寫入tfrecords文件 :param image_batch: :param label_batch: :return: """ # 構建一個tfrecords文件存儲器 writer = tf.python_io.TFRecordWriter(flags.data_tfrecords) # 對於每個樣本,都要構造example寫入 for i in range(10): # 取出特徵值,轉換成字符串 image_string = image_batch[i].eval().tostring() # 取出目標值 label_int = int(label_batch[i].eval()[0]) example = tf.train.Example(features=tf.train.Features(feature={ "image":tf.train.Feature(bytes_list = tf.train.BytesList(value=[image_string])), "label":tf.train.Feature(int64_list = tf.train.Int64List(value=[label_int])) })) # 寫入文件中,要先把協議序列化值以後才能存儲 writer.write(example.SerializeToString()) writer.close() return None def read_tfrecords(self): """ 從tfrecords文件讀取內容 :return: image_batch, label_batch """ # 構造文件隊列 file_q = tf.train.string_input_producer([flags.data_tfrecords]) # 構造閱讀器,讀取數據 reader = tf.TFRecordReader() # 一次只讀取一個樣本 key, value = reader.read(file_q) # 解析內容 解析example協議 feature = tf.parse_single_example(value, features={ "image":tf.FixedLenFeature([], tf.string), "label":tf.FixedLenFeature([], tf.int64) }) # 解碼 字符串須要解碼, 整形不用 image = tf.decode_raw(feature["image"], tf.uint8) # 設置圖片的形狀,以便批處理 image_reshape = tf.reshape(image, [self.height, self.width]) label = tf.cast(feature["label"], tf.int32) # 批處理 image_batch, label_batch = tf.train.batch([image_reshape, label],batch_size=10 ,num_threads=1, capacity=10) return image_batch, label_batch if __name__ == "__main__": filename = os.listdir(flags.data_home) filelist = [os.path.join(flags.data_home, file) for file in filename if file[-3:] == "bin"] cif = cifarread(filelist) # 讀取二進制文件 image_batch, label_batch = cif.read_decode() # 讀取tfrecords文件 # cif.read_tfrecords() with tf.Session() as sess: # 構建線程協調器 coord = tf.train.Coordinator() # 開啓線程 threads = tf.train.start_queue_runners(sess, coord=coord) # 執行任務 print(sess.run([image_batch, label_batch])) # 存儲tfrecords文件 # cif.write2tfrecords(image_batch, label_batch) # 回收線程 coord.request_stop() coord.join(threads)
相關文章
相關標籤/搜索