單線程、多線程之間、進程之間、協程之間不少時候須要協同完成工做,這個時候它們須要進行通信。或者說爲了解耦,廣泛採用Queue,生產消費模式。html
程序有時須要在列表的端點進行操做,比list更加優化的數據結構有Queue和deque。併發
deque通常用在定長隊列,多餘的數據會被丟棄,這個隊列是線程非安全的。app
from queue import Queue, deque # 大於會截取後面的一段 q = deque(iterable=[1,2,3,4], maxlen=5) # 參數iterable能夠是任何可迭代對象,maxlen表明定長 # 添加與取出 q.append(3) # 從尾部添加 q.pop() # 從尾部彈出一個 q.appendleft(4) # 從首部添加 q.popleft() # 從首部彈出 q.clear() # 清空隊列 q.extend([1, 3, 3]) # 將原來的隊列從右側擴展 q.extendleft() # 將原來的隊列從左側擴展 q.insert(2, 3) # 在索引爲2的位置插入3,若是隊列已達到最大,拋出異常 # 複製 q1 = q.copy() # 徹底符合一份隊列 # 統計 n = q.count(3) # 統計某個值的數目 x = q.index(3) # 查找某個值的位置 # 變換 q.reverse() # 將原來的q翻轉 q.remove(3) # 刪除隊列中的全部的3 q.rotate(2) # 向右旋轉兩步
Queue提供更加完整成熟的隊列操做,相對於deque來講偏重型,他是線程安全的隊列。async
from queue import Queue, deque q = Queue(maxsize=5) #maxsize<=0,隊列長度沒有限制,這個Queue是線程安全的,經過鎖機制保證 print(q.queue) # 一個deque隊列 print(q.mutex) # 隊列的線程鎖 print(q.not_empty) # 非空通知,用在多線程 print(q.not_full) # 非滿通知,用在多線程 print(q.all_tasks_done) # 完成的任務 print(q.maxsize) print(q.unfinished_tasks) # 隊列未完成的任務數量,即隊列目前的數目 # 數據存取 q.put(3, block=True, timeout=3) # 向隊列左邊添加數據,block爲True隊列滿了阻塞等待,block爲false則直接拋出異常 q.get(block=True, timeout=3) # 隊列取出數據,超時拋出異常,block爲false忽略timeout # q.get_nowait() # 當即獲取,沒有拋出異常 q.put_nowait(4) # 當即插入,已滿拋出異常 # 判斷 q.full() # 判斷當前隊列是否已滿,滿了返回True q.empty() # 判斷當前隊列是否爲空,空返回True # 統計 q.task_done() # 用來通知隊列任務完成 q.qsize() # 當前隊列的任務數量,不絕對可靠 q.join() # 阻塞直到全部的任務完成,即q.unfinished_tasks降爲0
from threading import Thread from queue import Queue, deque import time def get_from_queue(queue:Queue): while True: if not queue.empty(): print(queue.get_nowait()) queue.task_done() # 任務完成 def put_to_queue(queue:Queue): for i in range(100): if not queue.full(): queue.put_nowait(i) else: time.sleep(0.1) q = Queue(5) th1 = Thread(target=get_from_queue, args=(q,)) th2 = Thread(target=put_to_queue, args=(q,)) th1.start() th2.start()
multiprocessing的Queue對象能夠做爲進程間通信的第三者。
from multiprocessing import Queue, Process, Pool import time def get_from_queue(queue:Queue): while True: if not queue.empty(): print(queue.get_nowait()) def put_to_queue(queue:Queue): for i in range(100): if not queue.full(): queue.put_nowait(i) else: time.sleep(0.1) if __name__ == '__main__': q = Queue(9) # 這個Queue能夠在多個進程之間共享 p1 = Process(target=get_from_queue, args=(q,)) p2 = Process(target=put_to_queue, args=(q,)) p1.start() p2.start()
Queue對象的大部分方法和Queue.Queue的方法相同,用法也同樣,但有幾個特殊的方法:
q = Queue(9) # 這個Queue能夠在多個進程之間共享 # q.close() # 關閉隊列,再也不接收數據 # q.cancel_join_thread() # 取消阻塞等待 q.join_thread() # 線程阻塞等待
gevent.queue.Queue基於協程,Queue在多個協程間共享,Queue實現了迭代器協議,可使用for循環遍歷。
from gevent.queue import Queue import gevent import time def get_from_queue(queue:Queue, n): i = 0 print('start---get--{}'.format(n)) while True: print(str(queue.get()) + 'get' + str(n)) i += 1 if i == 100: break def put_to_queue(queue:Queue, n): i = 0 print('start---put--{}'.format(n)) while True: queue.put(i) print(str(i) + 'put' + str(n)) i += 1 if i == 100: break if __name__ == '__main__': q = Queue(9) # 這個Queue能夠在多個進程之間共享 job1 = [gevent.spawn(put_to_queue, q,i) for i in range(2)] job2 = [gevent.spawn(get_from_queue, q,i) for i in range(2)] job1.extend(job2) gevent.joinall(job1)
協程啓動後會按照添加到循環的順序開始執行,上例在隊列未滿以前一直執行put操做,直到隊列滿後阻塞就切換到put2協程,也會當即阻塞,而後切換到get1協程,獲取全部的值直到隊列爲空後阻塞切換。
其方法基本和Queue.Queue的方法相同,特殊方法以下:
q = Queue(9, items=[1,2,3, StopIteration]) # 實現迭代協議,最後一個必須是StopIteration # q.copy() #複製一個隊列 x = q.next() # 喚醒獲取值 q.peek(block=True, timeout=None) # 獲取一個值可是不刪除它 q.peek_nowait() # 當即獲取,忽略timeout q.put() # 會喚醒多個協程完成添加操做 q.get() # 會掛起多個協程
gevent.queue.JoinableQueue對象擴展了Queue的功能,添加了task_done和join方法。
q = JoinableQueue(9, items=[1,2,3, StopIteration]) # 這個Queue能夠在多個進程之間共享 q.task_done() # 通知隊列一個任務完成 q.unfinished_tasks # 未完成的任務計數 q.join() # 阻塞等待任務完成,若是unfinished_tasks降爲0,則解除
from gevent.queue import Queue, JoinableQueue import gevent import time def get_from_queue(queue:JoinableQueue): while True: try: x = queue.get() # 阻塞時就會切換協程 print(x) finally: queue.task_done() if __name__ == '__main__': q = JoinableQueue(8) job1 = [gevent.spawn(get_from_queue, q) for i in range(2)] for i in range(100): q.put(i) # 當Put被阻塞時將切換協程, q.join() # 若是不等待的話,最後一次put後將直接退出