上文中咱們提到搶票軟件的原理,其實還能夠作一個小優化,具體就要用到進程鎖這個模塊,進程鎖能夠起到和join相同的做用,可是有區別.node
xxxxxxxxxx
* join是吧全部的子進程變成了串行
* 進程鎖則是把鎖住的代碼變成了串行,相比之下進程鎖更加方便,實用性泛用性也更強
示例代碼以下:python
xxxxxxxxxx
from multiprocessing import Process, Lock
import time
import json
import os
def search():
time.sleep(1) # 模擬網絡io
with open('db.txt', 'rt', encoding='utf-8')as f:
res = json.load(f)
print(f'還剩{res["count"]}張票')
def get():
with open('db.txt', 'rt', encoding='utf-8')as f:
res = json.load(f)
# print(f'還剩{res["count"]}張票')
time.sleep(1) # 模擬網絡io
if res['count'] > 0:
res['count'] -= 1
with open('db.txt', 'wt', encoding='utf-8')as f:
json.dump(res, f)
f.flush()
print(f'{os.getpid()}搶票成功')
time.sleep(1) # 模擬網絡io
else:
print('票已經售空!~!~!~!~!~!~!~!')
def task(lock):
# search()
lock.acquire() # 進程鎖鎖住,同一時間只能拿有一個用於執行get函數,也就是購買函數
get()
lock.release() # 釋放鎖,下一個用戶能夠操做
if __name__ == '__main__':
lock = Lock() # 寫在main裏面,主進程裏,讓子進程拿到同一把鎖
for i in range(5):
p = Process(target=task, args=(lock,))
p.start()
# p.join()# 由於加入了進程鎖,因此join用來串行子進程的方式就能夠棄用了
以上方法雖然相比以前有了必定的改進,但實際上仍是很麻煩,並不適用於併發數量較多的狀況,並且一方面效率比較低,另一方面須要本身加鎖處理.web
因此其實咱們還有一種更好更方便的方式來解決這個問題,就是隊列Queue.編程
首先咱們瞭解一下進程間通訊的概念便可,簡稱是IPC,全稱是Inter-Process Communication,就是字面意思,進程之間的通訊,咱們一般會採用Queue隊列的方式.json
隊列的概念其實就是管道pipe加進程鎖lock,其做用局勢建立共享的進程隊列,從而來實現進程之間的通訊以及數據傳遞,實際上隊列裏面通常不會放入比較大的數據,大部分時候都是消息或者是一些很是小的數據.windows
Queue(maxsize)就是建立隊列的方法,下面咱們來看一下隊列Queue的部分源碼,筆者已經把那些複雜的實現原理刪掉,咱們只須要了解其有什麼參數以及怎麼使用就好:網絡
xxxxxxxxxx
class Queue(object):
def __init__(self, maxsize=0, *, ctx):
pass
# 這裏maxsize = 0就是隊列的最大長度,若是沒有手動賦值的話隊列就不限長度.
def put(self, obj, block=True, timeout=None):
pass
# 這裏,put方法,最經常使用的方法之一,將值放入隊列中
# 其中的參數,obj即爲要放入的值
# block是一種阻塞狀態,默認爲True,即阻塞,阻塞的意思就是若是隊列長度已經滿了,put不能放值進去,那麼這個方法就會中止,不會繼續向下執行.
# timeout即爲時間延時,延時就是put方法在這些時間以內若是不能成功給隊列放入值就會報錯.要注意的是若是block爲False的話延時是沒有做用的,由於此時方法不具備阻塞做用,因此延時天然是無效的.
def get(self, block=True, timeout=None):
pass
# get,便是從隊列中取值,其參數和用法和put徹底同樣,只是咱們不用往裏面放值而是按以前put進去的順序往外取值.block和timeout的含義也和上面put同樣
def qsize(self):
return self._maxsize - self._sem._semlock._get_value()
# 這個方法能夠返回隊列中的值的數量,但並不是可靠值,由於隊列中值會在不斷的變化,使用時候要注意
def empty(self):
return not self._poll()
# 判斷隊列是否爲空,返回值爲布爾值True或者False
def full(self):
return self._sem._semlock._is_zero()
# 判斷隊列是否已經滿了,返回值爲布爾值True或者False
def get_nowait(self):
return self.get(False)
# 用法等同於get,可是沒有block參數,至關於block = False
def put_nowait(self, obj):
return self.put(obj, False)
# 用法等同於put,可是沒有block參數,至關於block = False
def close(self):
pass
# 字面意思,關閉當前隊列,防止隊列中加入更多的數據
生產者:生產數據的任務併發
消費者:處理數據的任務app
生產者<-->隊列<-->消費者
生產者能夠不停地生產,能夠達到本身最大的生產效率 消費者能夠不停的消費,也達到了本身最大的消費效率生產者消費者模型: 大大提升了生產者生產的效率和消費者消費的效率
因此在併發編程中,使用生產者和消費者模型能夠解決絕大多數的併發問題.這個模式經過平衡生產線程和消費線程的工做能力來提升程序的總體處理數據的速度.
要注意的一點是,隊列Queue並不適合傳輸大型的數據或文件,一般應該傳遞消息之類,上文中也有提到.
如下爲生產者和消費者的一個示例:
x
from multiprocessing import Process, Queue
def producer(q, name, food):
'''生產者函數'''
for i in range(3):
print(f'{name}生產出來{food}{i}')
res = f'{food}{i}'
q.put(res)
def consumer(q, name):
'''消費者函數'''
while True:
res = q.get(timeout=2)
if res is None:# 這裏,當取到隊列中的值是None時,跳出循環,即消費者中止消費
print(f'{name}沒東西了,木法吃了')
break
print(f'{name}吃了{res}')
if __name__ == '__main__':
q = Queue()
p1 = Process(target=producer, args=(q, 'rocky', '包子'))
p2 = Process(target=producer, args=(q, 'tank', '韭菜'))
p3 = Process(target=producer, args=(q, 'nick', '蒜泥'))
c1 = Process(target=consumer, args=(q, '111'))
c2 = Process(target=consumer, args=(q, '222'))
p1.start()
p2.start()
p3.start()
c1.start()
c2.start()
p1.join()
p2.join()
p3.join()
q.put(None)# 爲何這裏要加入兩個None呢,爲了讓兩個消費者都能成功的跳出消費循環,不至於到最後報錯隊列爲空.
q.put(None)
其實這種處理方式有點傻,試想一下,若是這樣處理生產者和消費者之間的關係,那麼每多一個消費者咱們就要多在隊列里加入一個None,這樣會很是的麻煩,並且代碼也會顯得很是的low.
因此咱們能夠借用JoinableQueue來完成這個事情.
JoinableQueue的用法和Queue很是的類似,可是有些許不一樣,讓咱們來看下JoinableQueue的源碼如何:
x
class JoinableQueue(Queue):
def __init__(self, maxsize=0, *, ctx):
pass
# 咱們能夠看到,JoinableQueue的自身init裏面和Queue基本上如出一轍
def put(self, obj, block=True, timeout=None):
pass
# put的用法和Queue同樣,參數也同樣
def task_done(self):
pass
# 這個就是JoinableQueue和Queue最大的不一樣之一,咱們能夠看到,JoinableQueue裏面並無get方法,因此在調用get方法的時候是調用的JoinableQueue父類,也就是Queue的get方法.
# 而task_done的意義就是使用者調用此方法的時候會發出一個信號,表示q.get()返回的項目已經被處理掉了.
def join(self):
pass
# 這裏的join一樣爲阻塞做用,不一樣的是在JoinableQueue裏面,咱們調用這個方法的時候會阻塞直到隊列裏全部的項目都會處理掉.處理掉的意思就是在全部的q.get()下面都要加上task_done的調用,這樣join纔會再也不阻塞程序的運行.
# 上述概念咱們能夠理解爲一個計數器
# 對這個計數器來講:
# put +1的操做
# task_done -1的操做
# q.task_done() 完成了一次任務,和get連用,減小計數器的計數
# q.join() 計數器爲零纔會不阻塞
那麼,採用JoinableQueue改造過上面的生產者消費者模型的小實例爲:
x
from multiprocessing import Process, JoinableQueue
import time, random
def producer(q, name, food):
'''生產者函數'''
for i in range(3):
time.sleep(2 * random.random())
print(f'{name}生產出來{food}{i}')
res = f'{food}{i}'
q.put(res)
def consumer(q, name):
'''消費者函數'''
while True:
res = q.get()
time.sleep(2 * random.random())
# if res is None:# 由於斷定方式變了,咱們沒有向隊列里加入None,因此這個判斷無效了
# print(f'{name}沒東西了,木法吃了')
# break
print(f'{name}吃了{res}')
q.task_done()# 這裏是至關於隊列的計數器減一
if __name__ == '__main__':
q = JoinableQueue()
p1 = Process(target=producer, args=(q, 'rocky', '包子'))
p2 = Process(target=producer, args=(q, 'tank', '韭菜'))
p3 = Process(target=producer, args=(q, 'nick', '蒜泥'))
c1 = Process(target=consumer, args=(q, '111'), daemon=True)
c2 = Process(target=consumer, args=(q, '222'), daemon=True)
# 這裏在消費者的子進程生成時咱們將其定義爲守護進程,這樣主進程在執行完畢後子進程就會隨之結束,咱們就不用再每添加一個消費者就要往隊列裏添加一個None了.
p1.start()
p2.start()
p3.start()
c1.start()
c2.start()
p1.join()
p2.join()
p3.join() # 生產者生產完畢
q.join() # 這裏是主進程的最後一行代碼
# 因此把消費者c1,c2作成守護進程的話,主進程最後一行一旦執行經過(q.join()經過也就是隊列爲空),就能夠結束子進程,也就是消費者的兩個子進程就會結束