Queue是python標準庫中的線程安全的隊列(FIFO)實現,提供了一個適用於多線程編程的先進先出的數據結構,即隊列,用來在生產者和消費者線程之間的信息傳遞。一個線程放入數據,另一個線程取數據。python
class queue.Queue(maxsize=0) #先入先出編程
class queue.LifoQueue(maxsize=0) #後入先出(Last in first out)安全
class queue.PriorityQueue(maxsize=0) #存儲數據時可設置優先級的隊列數據結構
隊列中的方法:多線程
1.queue.Queue.get() #獲取隊列數據,當隊列是空的時候,會卡主,等待數據的放入,沒有數據放入,會一直阻塞 併發
get(self, block=True, timeout=None) #默認狀態下,block()若是沒有數據是阻塞的app
import queue q = queue.Queue() q.put(1) q.put(2) print(q.get()) print(q.get()) print(q.get())
運行程序:
1
2
阻塞................
隊列就是用來存取數據的,當數據取完以後,就會等待新的數據放入,get()就會一直等待,知道數據放入。要想不等待,可使用下面方法:ide
固然,使用get()加上參數block=False也能實現和get_nowait()同樣的功能。學習
block=True(False)設置當隊列是空的時候,是否阻塞,True阻塞,False不阻塞,報錯。timeout=None(time)設置阻塞時間,即等待一段時間,若是在這段時間內,沒有數據放入,就報錯。fetch
2.get_nowait() #獲取數據,若是隊列是空的,則報錯
import queue q = queue.Queue() q.put(1) q.put(2) print(q.get()) print(q.get()) print(q.get_nowait())
執行結果以下:
1
2
Traceback (most recent call last):
File "/home/zhuzhu/day9/隊列.py", line 8, in <module>
print(q.get_nowait())
File "/usr/lib/python3.5/queue.py", line 192, in get_nowait
return self.get(block=False)
File "/usr/lib/python3.5/queue.py", line 161, in get
raise Empty
queue.Empty
上面使用,get_nowait(),若是隊列是空的,則報錯,能夠用異常來抓取異常,而後能夠繼續執行。
3.queue.Queue.qsize() #判斷隊列裏面元素的個數
import queue q = queue.Queue() print(q.qsize()) q.put(1) print(q.qsize()) q.put(2) print(q.qsize())
執行結果:
0
1
2
q.qsize()是判斷隊列的長度,若是長度爲0,說明隊列是空的,這個時候使用get()就要注意,程序會阻塞。
4.q.qut() #向隊列中放入數據
put(self, item, block=True, timeout=None)
put()和get()差很少同樣,put()當隊列滿的時候,會報錯,block是設置阻塞狀態是否開啓,timeout是設置阻塞時間,默認一直阻塞。
5.q.empty(self) #判斷隊列是不是空Return True if the queue is empty, False otherwise (not reliable!).
6.q.full()
#判斷隊列是不是滿的Return True if the queue is full, False otherwise (not reliable!)
7.put_nowait() 等價於put(block=False) #若是隊列滿了,則報錯。Put an item into the queue without blocking
下面來看一下LifoQueue,後進先出的情形:
import queue q = queue.LifoQueue() q.put(1) q.put(2) q.put(3) print("第一個取出:",q.get()) print("第二個取出:",q.get())
上面是LifoQueue(maxsize=0)的情形,後進入的先被取出。
下面來看一下PriorityQueue的情形,有優先級的queue:
import queue q = queue.PriorityQueue() q.put((3,"alex")) q.put((1,"geng")) q.put((8,"zeng")) print("第一個取出",q.get()) print("第二個取出:",q.get()) print("第三個取出:",q.get())
執行結果:
第一個取出 (1, 'geng')
第二個取出: (3, 'alex')
第三個取出: (8, 'zeng')
上面程序中,是有優先級的放入,put((等級,內容)),存放以元組形式放入,前一個是登記,後面一個是消息。用來VIP優先級的情形。
生產者消費者模型
在併發編程中使用生產者和消費者模式可以解決絕大多數併發問題。該模式經過平衡生產線程和消費線程的工做能力來提升程序的總體處理數據的速度。
爲何要使用生產者和消費者模式
在線程世界裏,生產者就是生產數據的線程,消費者就是消費數據的線程。在多線程開發當中,若是生產者處理速度很快,而消費者處理速度很慢,那麼生產者就必須等待消費者處理完,才能繼續生產數據。一樣的道理,若是消費者的處理能力大於生產者,那麼消費者就必須等待生產者。爲了解決這個問題因而引入了生產者和消費者模式。
什麼是生產者消費者模式
生產者消費者模式是經過一個容器來解決生產者和消費者的強耦合問題。生產者和消費者彼此之間不直接通信,而經過阻塞隊列來進行通信,因此生產者生產完數據以後不用等待消費者處理,直接扔給阻塞隊列,消費者不找生產者要數據,而是直接從阻塞隊列裏取,阻塞隊列就至關於一個緩衝區,平衡了生產者和消費者的處理能力。
下面來學習一個最基本的生產者消費者模型的例子:
'''生產者消費者模型就是兩個線程,一個生產,另一個消費,二者相互配合,有交互''' import queue,time,threading def producer(name): '''定義生產者模型''' count = 1 #初始化變量 while True: q.put("骨頭%s" %count) #生成骨頭 print("[%s]生成了骨頭%s" %(name,count)) count += 1 #每次生產一個 time.sleep(0.5) #定義產能,生產效率 def consumer(name): '''定義消費者模型''' while True: print("\033[31m[%s] 吃了[%s]\033[0m" %(name,q.get())) time.sleep(1) #定義消費效率 if __name__ == "__main__": try: q = queue.Queue(maxsize=10) #初始化一個Queue,而且定義最大容量 p = threading.Thread(target=producer,args=("geng",)) #初始化生產者線程 p.start() except KeyboardInterrupt as f: print("生產者線程斷開了!!") try: c=threading.Thread(target=consumer,args=("alex",)) c.start() except KeyboardInterrupt as e: print("線程斷開了!!!")
執行結果:
[geng]生成了骨頭1
[alex] 吃了[骨頭1]
[geng]生成了骨頭2
[geng]生成了骨頭3
[alex] 吃了[骨頭2]
[geng]生成了骨頭4
[alex] 吃了[骨頭3]
[geng]生成了骨頭5
[geng]生成了骨頭6
[alex] 吃了[骨頭4]
[geng]生成了骨頭7
[geng]生成了骨頭8
[alex] 吃了[骨頭5]
[geng]生成了骨頭9
[geng]生成了骨頭10
[alex] 吃了[骨頭6]
[geng]生成了骨頭11
[geng]生成了骨頭12
[alex] 吃了[骨頭7]
[geng]生成了骨頭13
[geng]生成了骨頭14
上面就是生產者和消費者的簡單模型,使用了queue(隊列),生成者就是生成商品,而後放到隊列中;消費者就是去這個隊列中根據條件取數,這樣不斷生產和取數,就是簡單的生產者消費者模型,其中time.sleep()是生成效率和消費效率,控制程序的節奏,而count+=1表明消費者生產的能力,每次只生成一個,若是把這個調成10,那麼效率就很高,每次生成完成以後,都要等待好久。固然,要調效率,要修改一下代碼。
隊列queue的源代碼以下:
'''A multi-producer, multi-consumer queue.''' try: import threading except ImportError: import dummy_threading as threading from collections import deque from heapq import heappush, heappop from time import monotonic as time __all__ = ['Empty', 'Full', 'Queue', 'PriorityQueue', 'LifoQueue'] class Empty(Exception): 'Exception raised by Queue.get(block=0)/get_nowait().' pass class Full(Exception): 'Exception raised by Queue.put(block=0)/put_nowait().' pass class Queue: '''Create a queue object with a given maximum size. If maxsize is <= 0, the queue size is infinite. ''' def __init__(self, maxsize=0): self.maxsize = maxsize self._init(maxsize) # mutex must be held whenever the queue is mutating. All methods # that acquire mutex must release it before returning. mutex # is shared between the three conditions, so acquiring and # releasing the conditions also acquires and releases mutex. self.mutex = threading.Lock() # Notify not_empty whenever an item is added to the queue; a # thread waiting to get is notified then. self.not_empty = threading.Condition(self.mutex) # Notify not_full whenever an item is removed from the queue; # a thread waiting to put is notified then. self.not_full = threading.Condition(self.mutex) # Notify all_tasks_done whenever the number of unfinished tasks # drops to zero; thread waiting to join() is notified to resume self.all_tasks_done = threading.Condition(self.mutex) self.unfinished_tasks = 0 def task_done(self): '''Indicate that a formerly enqueued task is complete. Used by Queue consumer threads. For each get() used to fetch a task, a subsequent call to task_done() tells the queue that the processing on the task is complete. If a join() is currently blocking, it will resume when all items have been processed (meaning that a task_done() call was received for every item that had been put() into the queue). Raises a ValueError if called more times than there were items placed in the queue. ''' with self.all_tasks_done: unfinished = self.unfinished_tasks - 1 if unfinished <= 0: if unfinished < 0: raise ValueError('task_done() called too many times') self.all_tasks_done.notify_all() self.unfinished_tasks = unfinished def join(self): '''Blocks until all items in the Queue have been gotten and processed. The count of unfinished tasks goes up whenever an item is added to the queue. The count goes down whenever a consumer thread calls task_done() to indicate the item was retrieved and all work on it is complete. When the count of unfinished tasks drops to zero, join() unblocks. ''' with self.all_tasks_done: while self.unfinished_tasks: self.all_tasks_done.wait() def qsize(self): '''Return the approximate size of the queue (not reliable!).''' with self.mutex: return self._qsize() def empty(self): '''Return True if the queue is empty, False otherwise (not reliable!). This method is likely to be removed at some point. Use qsize() == 0 as a direct substitute, but be aware that either approach risks a race condition where a queue can grow before the result of empty() or qsize() can be used. To create code that needs to wait for all queued tasks to be completed, the preferred technique is to use the join() method. ''' with self.mutex: return not self._qsize() def full(self): '''Return True if the queue is full, False otherwise (not reliable!). This method is likely to be removed at some point. Use qsize() >= n as a direct substitute, but be aware that either approach risks a race condition where a queue can shrink before the result of full() or qsize() can be used. ''' with self.mutex: return 0 < self.maxsize <= self._qsize() def put(self, item, block=True, timeout=None): '''Put an item into the queue. If optional args 'block' is true and 'timeout' is None (the default), block if necessary until a free slot is available. If 'timeout' is a non-negative number, it blocks at most 'timeout' seconds and raises the Full exception if no free slot was available within that time. Otherwise ('block' is false), put an item on the queue if a free slot is immediately available, else raise the Full exception ('timeout' is ignored in that case). ''' with self.not_full: if self.maxsize > 0: if not block: if self._qsize() >= self.maxsize: raise Full elif timeout is None: while self._qsize() >= self.maxsize: self.not_full.wait() elif timeout < 0: raise ValueError("'timeout' must be a non-negative number") else: endtime = time() + timeout while self._qsize() >= self.maxsize: remaining = endtime - time() if remaining <= 0.0: raise Full self.not_full.wait(remaining) self._put(item) self.unfinished_tasks += 1 self.not_empty.notify() def get(self, block=True, timeout=None): '''Remove and return an item from the queue. If optional args 'block' is true and 'timeout' is None (the default), block if necessary until an item is available. If 'timeout' is a non-negative number, it blocks at most 'timeout' seconds and raises the Empty exception if no item was available within that time. Otherwise ('block' is false), return an item if one is immediately available, else raise the Empty exception ('timeout' is ignored in that case). ''' with self.not_empty: if not block: if not self._qsize(): raise Empty elif timeout is None: while not self._qsize(): self.not_empty.wait() elif timeout < 0: raise ValueError("'timeout' must be a non-negative number") else: endtime = time() + timeout while not self._qsize(): remaining = endtime - time() if remaining <= 0.0: raise Empty self.not_empty.wait(remaining) item = self._get() self.not_full.notify() return item def put_nowait(self, item): '''Put an item into the queue without blocking. Only enqueue the item if a free slot is immediately available. Otherwise raise the Full exception. ''' return self.put(item, block=False) def get_nowait(self): '''Remove and return an item from the queue without blocking. Only get an item if one is immediately available. Otherwise raise the Empty exception. ''' return self.get(block=False) # Override these methods to implement other queue organizations # (e.g. stack or priority queue). # These will only be called with appropriate locks held # Initialize the queue representation def _init(self, maxsize): self.queue = deque() def _qsize(self): return len(self.queue) # Put a new item in the queue def _put(self, item): self.queue.append(item) # Get an item from the queue def _get(self): return self.queue.popleft() class PriorityQueue(Queue): '''Variant of Queue that retrieves open entries in priority order (lowest first). Entries are typically tuples of the form: (priority number, data). ''' def _init(self, maxsize): self.queue = [] def _qsize(self): return len(self.queue) def _put(self, item): heappush(self.queue, item) def _get(self): return heappop(self.queue) class LifoQueue(Queue): '''Variant of Queue that retrieves most recently added entries first.''' def _init(self, maxsize): self.queue = [] def _qsize(self): return len(self.queue) def _put(self, item): self.queue.append(item) def _get(self): return self.queue.pop()
看看源代碼,可以讓本身對這些方法,有更好的理解,以後會多看看源代碼是如何寫的。多參考源代碼的寫法,裏面有不少好的書寫習慣和格式。