tensorflow 多線程隊列讀取

1隊列

隊列和變量相似,都是計算圖上有狀態的節點,能夠經過賦值操做修改變量的取值。對於隊列,隊列的操做主要有Enqueue、EnqueueMany和Dequeue。
如下代碼展現如何進行隊列的初始化 入隊 出隊數據結構

# coding utf-8
import tensorflow as tf

# 建立一個先進先出隊列,指定隊列中能夠保存兩個元素,並指定類型爲整數。
q = tf.FIFOQueue(2, 'int32')
# 使用enqueue_many函數來初始化隊列中的元素。和變量初始化相似,在使用隊列以前
# 須要明確調用這個初始化過程.
init = q.enqueue_many(([0, 10],))
# 使用Dequeue函數將隊列中的第一個元素出隊列。這個元素值,將被存在變量x中。
x = q.dequeue()
y = x + 1
# 將加1後的值在從新加入隊列中。
q_inc = q.enqueue_many([y])

with tf.Session() as sess:
    # 運行初始化隊列的操做。
    init.run()
    for _ in range(5):
        # 運行q_inc將執行數據出隊列、出隊的元素+1,、從新加入隊列的整個過程。
        v, _ = sess.run([x, q_inc])
        # 打印出隊元素的值。
        print(v)

2線程

在TensorFlow中,隊列不單單是一種數據結構,仍是異步計算張量取值的一個重要機制。好比多個線程能夠同時向一個隊列中寫元素,或者同時讀取一個隊列中的元素。多線程

TF提供了tf.Coordinator和tf.QueueRunner兩個類來完成多線程協同的功能。dom

2.1 tf.Coordinator

tf.Coordinator主要用於協同多個線程一塊兒中止,並提供了should_stop、request_stop和join三個函數。在啓動線程以前須要聲明一個tf.Coordinator類,並將這個類傳入每個建立的線程中。啓動的線程須要一直查詢tf.Coordinatorl類中提供的should_stop函數,當這個函數的返回值爲Truez時,則當前線程也須要退出。每個啓動的線程均可以經過調用request_stop函數來通知其餘線程退出。當某一個線程調用request_stop函數以後,should_stop函數的返回值將被設置爲True,這樣其餘線程就能夠同時終止。異步

tf.Coordinator演示代碼以下函數

# coding utf-8
import tensorflow as tf
import numpy as np
import threading
import time

# 線程中運行的程序,這個程序每隔1秒判斷是否須要中止並打印本身的ID。
def MyLoop(coord, worker_id):
    # 使用tf.Coordinator類提供的協同工具判斷當前線程是否須要中止並打印本身的ID
    while not coord.should_stop():
        # 隨機中止全部線程
        if np.random.rand() < 0.1:
            print('Stoping from id: %d\n' % worker_id)
            # 調用coord.request_stop()函數來通知其餘線程中止
            coord.request_stop()
        else:
            # 打印當前線程的ID
            print('Working on id: %d\n' % worker_id)
            # 暫停1秒
            time.sleep(1)

# 聲明一個tf.train.Coordinator類來協同多個線程
coord = tf.train.Coordinator()
# 聲明建立5個線程
threads = [threading.Thread(target=MyLoop, args=(coord, i, )) for i in range(5)]
# 啓動全部的線程
for t in threads: t.start()
# 等待全部線程退出
coord.join(threads)

2.2 tf.QueueRunner

tf.QueueRunner主要用於啓動多個線程來操做同一個隊列,啓動的這些線程能夠經過上面介紹的tf.Coordinator類來統一管理。如下代碼展現如何使用tf.QueueRunner和tf.Coordinator來管理多線程隊列操做。工具

import tensorflow as tf
 # 聲明一個先進先出的隊列,隊列中最多100個元素,類型爲實數
queue = tf .FIFOQueue(100, 'float')
# 定義隊列的入隊操做
enqueue_op = queue.enqueue([tf.random_normal([1])])

# 使用 tf.train.QueueRunner來建立多個線程運行隊列的入隊操做
# tf.train.QueueRunner給出了被操做的隊列,[enqueue_op] * 5
# 表示了須要啓動5個線程,每一個線程中運行的是enqueue_op操做
qr = tf.train.QueueRunner(queue, [enqueue_op] * 5)
# 將定義過的QueueRunner加入TensorFlow計算圖上指定的集合
# tf.train.add_queue_runner函數沒有指定集合,
# 則加入默認集合tf.GraphKeys.QUEUE_RUNNERS。
# 下面的函數就是將剛剛定義的qr加入默認的tf.GraphKeys.QUEUE_RUNNERS結合
tf.train.add_queue_runner(qr)
# 定義出隊操做
out_tensor = queue.dequeue()

with tf.Session() as sess:
    # 使用tf.train.Coordinator來協同啓動的線程
    coord = tf.train.Coordinator()
    # 使用tf.train.QueueRunner時,須要明確調用tf.train.start_queue_runners
    # 來啓動全部線程。不然由於沒有線程運行入隊操做,當調用出隊操做時,程序一直等待
    # 入隊操做被運行。tf.train.start_queue_runners函數會默認啓動
    # tf.GraphKeys.QUEUE_RUNNERS中全部QueueRunner.由於這個函數只支持啓動指定集合中的QueueRunner,
    # 因此通常來講tf.train.add_queue_runner函數和tf.train.start_queue_runners函數會指定同一個結合
    threads = tf.train.start_queue_runners(sess=sess, coord=coord)
    # 獲取隊列中的取值
    for _ in range(3): print(sess.run(out_tensor)[0])

    # 使用tf.train.Coordinator來中止全部線程
    coord.request_stop()
    coord.join(threads)
相關文章
相關標籤/搜索