隊列和變量相似,都是計算圖上有狀態的節點,能夠經過賦值操做修改變量的取值。對於隊列,隊列的操做主要有Enqueue、EnqueueMany和Dequeue。
如下代碼展現如何進行隊列的初始化 入隊 出隊數據結構
# coding utf-8 import tensorflow as tf # 建立一個先進先出隊列,指定隊列中能夠保存兩個元素,並指定類型爲整數。 q = tf.FIFOQueue(2, 'int32') # 使用enqueue_many函數來初始化隊列中的元素。和變量初始化相似,在使用隊列以前 # 須要明確調用這個初始化過程. init = q.enqueue_many(([0, 10],)) # 使用Dequeue函數將隊列中的第一個元素出隊列。這個元素值,將被存在變量x中。 x = q.dequeue() y = x + 1 # 將加1後的值在從新加入隊列中。 q_inc = q.enqueue_many([y]) with tf.Session() as sess: # 運行初始化隊列的操做。 init.run() for _ in range(5): # 運行q_inc將執行數據出隊列、出隊的元素+1,、從新加入隊列的整個過程。 v, _ = sess.run([x, q_inc]) # 打印出隊元素的值。 print(v)
在TensorFlow中,隊列不單單是一種數據結構,仍是異步計算張量取值的一個重要機制。好比多個線程能夠同時向一個隊列中寫元素,或者同時讀取一個隊列中的元素。多線程
TF提供了tf.Coordinator和tf.QueueRunner兩個類來完成多線程協同的功能。dom
tf.Coordinator主要用於協同多個線程一塊兒中止,並提供了should_stop、request_stop和join三個函數。在啓動線程以前須要聲明一個tf.Coordinator類,並將這個類傳入每個建立的線程中。啓動的線程須要一直查詢tf.Coordinatorl類中提供的should_stop函數,當這個函數的返回值爲Truez時,則當前線程也須要退出。每個啓動的線程均可以經過調用request_stop函數來通知其餘線程退出。當某一個線程調用request_stop函數以後,should_stop函數的返回值將被設置爲True,這樣其餘線程就能夠同時終止。異步
tf.Coordinator演示代碼以下函數
# coding utf-8 import tensorflow as tf import numpy as np import threading import time # 線程中運行的程序,這個程序每隔1秒判斷是否須要中止並打印本身的ID。 def MyLoop(coord, worker_id): # 使用tf.Coordinator類提供的協同工具判斷當前線程是否須要中止並打印本身的ID while not coord.should_stop(): # 隨機中止全部線程 if np.random.rand() < 0.1: print('Stoping from id: %d\n' % worker_id) # 調用coord.request_stop()函數來通知其餘線程中止 coord.request_stop() else: # 打印當前線程的ID print('Working on id: %d\n' % worker_id) # 暫停1秒 time.sleep(1) # 聲明一個tf.train.Coordinator類來協同多個線程 coord = tf.train.Coordinator() # 聲明建立5個線程 threads = [threading.Thread(target=MyLoop, args=(coord, i, )) for i in range(5)] # 啓動全部的線程 for t in threads: t.start() # 等待全部線程退出 coord.join(threads)
tf.QueueRunner主要用於啓動多個線程來操做同一個隊列,啓動的這些線程能夠經過上面介紹的tf.Coordinator類來統一管理。如下代碼展現如何使用tf.QueueRunner和tf.Coordinator來管理多線程隊列操做。工具
import tensorflow as tf # 聲明一個先進先出的隊列,隊列中最多100個元素,類型爲實數 queue = tf .FIFOQueue(100, 'float') # 定義隊列的入隊操做 enqueue_op = queue.enqueue([tf.random_normal([1])]) # 使用 tf.train.QueueRunner來建立多個線程運行隊列的入隊操做 # tf.train.QueueRunner給出了被操做的隊列,[enqueue_op] * 5 # 表示了須要啓動5個線程,每一個線程中運行的是enqueue_op操做 qr = tf.train.QueueRunner(queue, [enqueue_op] * 5) # 將定義過的QueueRunner加入TensorFlow計算圖上指定的集合 # tf.train.add_queue_runner函數沒有指定集合, # 則加入默認集合tf.GraphKeys.QUEUE_RUNNERS。 # 下面的函數就是將剛剛定義的qr加入默認的tf.GraphKeys.QUEUE_RUNNERS結合 tf.train.add_queue_runner(qr) # 定義出隊操做 out_tensor = queue.dequeue() with tf.Session() as sess: # 使用tf.train.Coordinator來協同啓動的線程 coord = tf.train.Coordinator() # 使用tf.train.QueueRunner時,須要明確調用tf.train.start_queue_runners # 來啓動全部線程。不然由於沒有線程運行入隊操做,當調用出隊操做時,程序一直等待 # 入隊操做被運行。tf.train.start_queue_runners函數會默認啓動 # tf.GraphKeys.QUEUE_RUNNERS中全部QueueRunner.由於這個函數只支持啓動指定集合中的QueueRunner, # 因此通常來講tf.train.add_queue_runner函數和tf.train.start_queue_runners函數會指定同一個結合 threads = tf.train.start_queue_runners(sess=sess, coord=coord) # 獲取隊列中的取值 for _ in range(3): print(sess.run(out_tensor)[0]) # 使用tf.train.Coordinator來中止全部線程 coord.request_stop() coord.join(threads)