Python併發編程之消息隊列補充及如何建立線程池(六)

image.png


你們好,併發編程 進入第六篇。

在第四章,講消息通訊時,咱們學到了Queue消息隊列的一些基本使用。昨天我在準備如何建立線程池這一章節的時候,發現對Queue消息隊列的講解有一些遺漏的知識點,而這些知識點,也並非可有可無的,因此在今天的章節裏,我要先對Queue先作一些補充以防你們對消息隊列有一些知識盲區。編程

再次提醒
本系列全部的代碼均在Python3下編寫,也建議你們儘快投入到Python3的懷抱中來。多線程

本文目錄


  • 消息隊列的先進先出併發

  • 建立多線程的兩種方式ide

消息隊列的先進先出

首先,要告訴你們的事,消息隊列可不是隻有queue.Queue這一個類,除它以外,還有queue.LifoQueuequeue.PriorityQueue這兩個類。函數

從名字上,對於他們之間的區別,你大概也能猜到一二吧。優化

queue.Queue:先進先出隊列
queue.LifoQueue:後進先出隊列
queue.PriorityQueue:優先級隊列spa

先來看看,咱們的老朋友,queue.Queue
所謂的先進先出(FIFO,First in First Out),就是先進入隊列的消息,將優先被消費。
這和咱們平常排隊買菜是同樣的,先排隊的人確定是先買到菜。線程

image.png

用代碼來講明一下code

import queue

q = queue.Queue()

for i in range(5):
   q.put(i)

while not q.empty():
   print q.get()

看看輸出,符合咱們先進先出的預期。存入隊列的順序是01234,被消費的順序也是01234orm

0
1
2
3
4

再來看看Queue.LifoQueue,後進先出,就是後進入消息隊列的,將優先被消費。

這和咱們羽毛球筒是同樣的,最後放進羽毛球筒的球,會被第一個取出使用。


image.png


用代碼來看下

import queue

q = queue.LifoQueue()

for i in range(5):
   q.put(i)

while not q.empty():
   print q.get()

來看看輸出,符合咱們後進後出的預期。存入隊列的順序是01234,被消費的順序也是43210

4
3
2
1
0

最後來看看Queue.PriorityQueue,優先級隊列。
這和咱們平常生活中的會員機制有些相似,辦了金卡的人比銀卡的服務優先,辦了銀卡的人比不辦卡的人服務優先。


來用代碼看一下

from queue import PriorityQueue

# 從新定義一個類,繼承自PriorityQueue
class MyPriorityQueue(PriorityQueue):
   def __init__(self):
       PriorityQueue.__init__(self)
       self.counter = 0

   def put(self, item, priority):
       PriorityQueue.put(self, (priority, self.counter, item))
       self.counter += 1

   def get(self, *args, **kwargs):
       _, _, item = PriorityQueue.get(self, *args, **kwargs)
       return item


queue = MyPriorityQueue()
queue.put('item2', 2)
queue.put('item5', 5)
queue.put('item3', 3)
queue.put('item4', 4)
queue.put('item1', 1)

while True:
   print(queue.get())

來看看輸出,符合咱們的預期。咱們存入入隊列的順序是25341,對應的優先級也是25341,但是被消費的順序絲絕不受傳入順序的影響,而是根據指定的優先級來消費。

item1
item2
item3
item4
item5

建立多線程的兩種方式

在使用多線程處理任務時也不是線程越多越好,因爲在切換線程的時候,須要切換上下文環境,依然會形成cpu的大量開銷。爲解決這個問題,線程池的概念被提出來了。預先建立好一個較爲優化的數量的線程,讓過來的任務馬上可以使用,就造成了線程池。

在Python3中,建立線程池是經過concurrent.futures函數庫中的ThreadPoolExecutor類來實現的。

import time
import threading
from concurrent.futures import ThreadPoolExecutor


def target():
   for i in range(5):
       print('running thread-{}:{}'.format(threading.get_ident(), i))
       time.sleep(1)

#: 生成線程池最大線程爲5個
pool = ThreadPoolExecutor(5)

for i in range(100):
   pool.submit(target) # 往線程中提交,並運行

從結果來看,前面設置線程池最大線程數5個,有生效。

running thread-11308:0
running thread-12504:0
running thread-5656:0
running thread-12640:0
running thread-7948:0

running thread-11308:1
running thread-5656:1
running thread-7948:1
running thread-12640:1
running thread-12504:1

...
...

除了使用上述第三方模塊的方法以外,咱們還能夠本身結合前面所學的消息隊列來自定義線程池。

這裏咱們就使用queue來實現一個上面一樣效果的例子,你們感覺一下。

import time
import threading
from queue import Queue

def target(q):
   while True:
       msg = q.get()
       for i in range(5):
           print('running thread-{}:{}'.format(threading.get_ident(), i))
           time.sleep(1)

def pool(workers,queue):
   for n in range(workers):
       t = threading.Thread(target=target, args=(queue,))
       t.daemon = True
       t.start()

queue = Queue()
# 建立一個線程池:並設置線程數爲5
pool(5, queue)

for i in range(100):
   queue.put("start")

# 消息都被消費才能結束
queue.join()

輸出是和上面是徹底同樣的效果

running thread-11308:0
running thread-12504:0
running thread-5656:0
running thread-12640:0
running thread-7948:0

running thread-11308:1
running thread-5656:1
running thread-7948:1
running thread-12640:1
running thread-12504:1

...
...

構建線程池的方法,是能夠很靈活的,你們有舉能夠本身多研究。可是建議只要掌握一種本身熟悉的,能快速上手的就行了。

相關文章
相關標籤/搜索