併發編程
進入第六篇。在第四章,講消息通訊時,咱們學到了Queue消息隊列的一些基本使用。昨天我在準備如何建立線程池這一章節的時候,發現對Queue消息隊列的講解有一些遺漏的知識點,而這些知識點,也並非可有可無的,因此在今天的章節裏,我要先對Queue先作一些補充以防你們對消息隊列有一些知識盲區。編程
再次提醒:
本系列全部的代碼均在Python3下編寫,也建議你們儘快投入到Python3的懷抱中來。多線程
消息隊列的先進先出併發
建立多線程的兩種方式ide
首先,要告訴你們的事,消息隊列可不是隻有queue.Queue
這一個類,除它以外,還有queue.LifoQueue
和queue.PriorityQueue
這兩個類。函數
從名字上,對於他們之間的區別,你大概也能猜到一二吧。優化
queue.Queue
:先進先出隊列queue.LifoQueue
:後進先出隊列queue.PriorityQueue
:優先級隊列spa
先來看看,咱們的老朋友,queue.Queue
。
所謂的先進先出
(FIFO,First in First Out),就是先進入隊列的消息,將優先被消費。
這和咱們平常排隊買菜是同樣的,先排隊的人確定是先買到菜。線程
用代碼來講明一下code
import queue
q = queue.Queue()
for i in range(5):
q.put(i)
while not q.empty():
print q.get()
看看輸出,符合咱們先進先出的預期。存入隊列的順序是01234
,被消費的順序也是01234
。orm
0
1
2
3
4
再來看看Queue.LifoQueue
,後進先出,就是後進入消息隊列的,將優先被消費。
這和咱們羽毛球筒是同樣的,最後放進羽毛球筒的球,會被第一個取出使用。
用代碼來看下
import queue
q = queue.LifoQueue()
for i in range(5):
q.put(i)
while not q.empty():
print q.get()
來看看輸出,符合咱們後進後出的預期。存入隊列的順序是01234
,被消費的順序也是43210
。
4
3
2
1
0
最後來看看Queue.PriorityQueue
,優先級隊列。
這和咱們平常生活中的會員機制有些相似,辦了金卡的人比銀卡的服務優先,辦了銀卡的人比不辦卡的人服務優先。
來用代碼看一下
from queue import PriorityQueue
# 從新定義一個類,繼承自PriorityQueue
class MyPriorityQueue(PriorityQueue):
def __init__(self):
PriorityQueue.__init__(self)
self.counter = 0
def put(self, item, priority):
PriorityQueue.put(self, (priority, self.counter, item))
self.counter += 1
def get(self, *args, **kwargs):
_, _, item = PriorityQueue.get(self, *args, **kwargs)
return item
queue = MyPriorityQueue()
queue.put('item2', 2)
queue.put('item5', 5)
queue.put('item3', 3)
queue.put('item4', 4)
queue.put('item1', 1)
while True:
print(queue.get())
來看看輸出,符合咱們的預期。咱們存入入隊列的順序是25341
,對應的優先級也是25341
,但是被消費的順序絲絕不受傳入順序的影響,而是根據指定的優先級來消費。
item1
item2
item3
item4
item5
在使用多線程處理任務時也不是線程越多越好,因爲在切換線程的時候,須要切換上下文環境,依然會形成cpu的大量開銷。爲解決這個問題,線程池的概念被提出來了。預先建立好一個較爲優化的數量的線程,讓過來的任務馬上可以使用,就造成了線程池。
在Python3中,建立線程池是經過concurrent.futures
函數庫中的ThreadPoolExecutor
類來實現的。
import time
import threading
from concurrent.futures import ThreadPoolExecutor
def target():
for i in range(5):
print('running thread-{}:{}'.format(threading.get_ident(), i))
time.sleep(1)
#: 生成線程池最大線程爲5個
pool = ThreadPoolExecutor(5)
for i in range(100):
pool.submit(target) # 往線程中提交,並運行
從結果來看,前面設置線程池最大線程數5個,有生效。
running thread-11308:0
running thread-12504:0
running thread-5656:0
running thread-12640:0
running thread-7948:0
running thread-11308:1
running thread-5656:1
running thread-7948:1
running thread-12640:1
running thread-12504:1
...
...
除了使用上述第三方模塊的方法以外,咱們還能夠本身結合前面所學的消息隊列來自定義線程池。
這裏咱們就使用queue來實現一個上面一樣效果的例子,你們感覺一下。
import time
import threading
from queue import Queue
def target(q):
while True:
msg = q.get()
for i in range(5):
print('running thread-{}:{}'.format(threading.get_ident(), i))
time.sleep(1)
def pool(workers,queue):
for n in range(workers):
t = threading.Thread(target=target, args=(queue,))
t.daemon = True
t.start()
queue = Queue()
# 建立一個線程池:並設置線程數爲5
pool(5, queue)
for i in range(100):
queue.put("start")
# 消息都被消費才能結束
queue.join()
輸出是和上面是徹底同樣的效果
running thread-11308:0
running thread-12504:0
running thread-5656:0
running thread-12640:0
running thread-7948:0
running thread-11308:1
running thread-5656:1
running thread-7948:1
running thread-12640:1
running thread-12504:1
...
...
構建線程池的方法,是能夠很靈活的,你們有舉能夠本身多研究。可是建議只要掌握一種本身熟悉的,能快速上手的就行了。