搶票的例子:html
# -*- coding:utf-8 -*- from multiprocessing import Process, Lock import time import json count = {'count': 1} # 僅剩最後一張票 with open('db.txt', 'w', encoding='utf-8') as f: json.dump(count, f) # 返回剩餘票數 def search(): dic = json.load(open('db.txt')) print('剩餘票數%s' % dic['count']) return dic def get_ticket(dic): time.sleep(0.1) # 模擬讀數據的網絡延遲 if dic['count'] > 0: dic['count'] -= 1 time.sleep(0.2) # 模擬寫數據的網絡延遲 json.dump(dic, open('db.txt', 'w')) print('購票成功,剩餘:{}'.format(dic['count'])) else: print('搶票失敗,去邀請好友助力!') def ticket_purchase(lock, i): print('第{}個用戶'.format(i)) # lock.acquire() get_ticket(search()) # lock.release() if __name__ == '__main__': lock = Lock() for i in range(10): # 模擬併發10個客戶端搶票 p = Process(target=ticket_purchase, args=(lock, i + 1)) p.start()
結果:python
第6個用戶
剩餘票數1
第4個用戶
剩餘票數1
第7個用戶
剩餘票數1
第1個用戶
剩餘票數1
第10個用戶
剩餘票數1
第3個用戶
剩餘票數1
第5個用戶
剩餘票數1
第8個用戶
剩餘票數1
第2個用戶
剩餘票數1
第9個用戶
剩餘票數1
購票成功,剩餘:0
購票成功,剩餘:0
購票成功,剩餘:0
購票成功,剩餘:0
購票成功,剩餘:0
購票成功,剩餘:0
購票成功,剩餘:0
購票成功,剩餘:0
購票成功,剩餘:0
購票成功,剩餘:0
十個用戶會同時把票搶走,由於每次search同一時間能查到只有一個票
加鎖能夠保證多個進程修改同一塊數據時,同一時間只能有一個任務能夠進行修改,即串行的修改,沒錯,速度是慢了,但犧牲了速度卻保證了數據安全。json
若是搶票步驟沒有加鎖,那麼可能會有幾我的同時把票搶走,由於每次search都能查到有一個票,加了鎖之後只能一個一個搶安全
加鎖:網絡
# -*- coding:utf-8 -*- from multiprocessing import Process, Lock import time import json count = {'count': 1} # 僅剩最後一張票 with open('db.txt', 'w', encoding='utf-8') as f: json.dump(count, f) # 返回剩餘票數 def search(): dic = json.load(open('db.txt')) print('剩餘票數%s' % dic['count']) return dic def get_ticket(dic): time.sleep(0.1) # 模擬讀數據的網絡延遲 if dic['count'] > 0: dic['count'] -= 1 time.sleep(0.2) # 模擬寫數據的網絡延遲 json.dump(dic, open('db.txt', 'w')) print('購票成功,剩餘:{}'.format(dic['count'])) else: print('搶票失敗,去邀請好友助力!') def ticket_purchase(lock, i): print('第{}個用戶'.format(i)) lock.acquire() get_ticket(search()) lock.release() if __name__ == '__main__': lock = Lock() for i in range(10): # 模擬併發10個客戶端搶票 p = Process(target=ticket_purchase, args=(lock, i + 1)) p.start()
結果:併發
第2個用戶
剩餘票數1
第1個用戶
第9個用戶
第10個用戶
第5個用戶
第7個用戶
第8個用戶
第3個用戶
第6個用戶
第4個用戶
購票成功,剩餘:0
剩餘票數0
搶票失敗,去邀請好友助力!
剩餘票數0
搶票失敗,去邀請好友助力!
剩餘票數0
搶票失敗,去邀請好友助力!
剩餘票數0
搶票失敗,去邀請好友助力!
剩餘票數0
搶票失敗,去邀請好友助力!
剩餘票數0
搶票失敗,去邀請好友助力!
剩餘票數0
搶票失敗,去邀請好友助力!
剩餘票數0
搶票失敗,去邀請好友助力!
剩餘票數0
搶票失敗,去邀請好友助力!
從結果能夠看出,並非手速最快的才能搶到app
RLock
支持上下文管理器協議,所以能夠在with
語句中使用。
AssertionError
。請注意,在這種狀況下引起的異常類型與threading.rlock.release()中實現的行爲不一樣 【後者引起RuntimeError
】。
若你的線程處理中會有一些比較複雜的代碼邏輯過程,好比不少層的函數調用,而這些函數其實都須要進行加鎖保護數據訪問。這樣就可能會反覆的屢次加鎖,於是用RLock就能夠進行屢次加鎖,解鎖,直到最終鎖被釋放,而若是用普通的lock,當你一個函數A已經加鎖,它內部調用另外一個函數B,若是B內部也會對同一個鎖加鎖,那麼這種狀況就也會致使死鎖。dom
threading.Semaphore
此類實現信號量對象。信號量管理表示release()調用數減去acquire()調用數再加上原子計數器的初始值。當計數器爲0時,acquire()方法將一直阻塞,直到它能夠返回而不使計數器爲負爲止。若是未給定,則值默認爲1。acquire
方法的第一個參數被命名爲block,和multiprocessing.Lock.acquire() 一致
若是內部計數器大於零,則將其遞減1並當即返回true。異步
若是內部計數器爲零,則阻塞直到調用 release()喚醒
。一旦喚醒(計數器大於0),將計數器遞減1並返回true。每次調用release()都會喚醒一個進程。不該依賴進程被喚醒的順序。async
【acquire()和release()不必定一對一,是否阻塞要取決於計數器的值】
# -*- coding:utf-8 -*- from multiprocessing import Semaphore, Process import time import random def enter_room(smp, i): if smp.acquire(block=True, timeout=random.randint(1, 3)): # 超時還未獲取,返回false,反之返回True print('用戶%d進入了房間' % i) time.sleep(1) smp.release() print('用戶%d離開了房間' % i) else: print('等過久,走人') if __name__ == '__main__': smp = Semaphore(2) for i in range(10): p = Process(target=enter_room, args=(smp, i)) p.start()
結果:
用戶5進入了房間
用戶8進入了房間
用戶5離開了房間
用戶0進入了房間
用戶8離開了房間用戶9進入了房間
等過久,走人
用戶0離開了房間
用戶2進入了房間
用戶9離開了房間
用戶4進入了房間
等過久,走人
等過久,走人
用戶2離開了房間用戶6進入了房間
用戶4離開了房間
用戶6離開了房間
threading.Event
set()
方法設置爲true,並使用clear()方法重置爲false 。flag = False,wait()
方法將阻塞,直到該flag爲True。flag初始值是Flase。
將阻塞,直到set()被調用以再次將內部標誌設置爲true。
紅綠燈:
# -*- coding:utf-8 -*- from multiprocessing import Event, Process, Lock import time # 紅綠燈 def light(e): while 1: if e.is_set(): # 爲True,flag爲True print('紅燈') e.clear() # 重置爲False,調用wait()的進程阻塞 time.sleep(5) else: print('綠燈') e.set() time.sleep(5) def car(e, i, l): while 1: l.acquire() # 先獲取鎖,確認下一輛通行的車 e.wait() # 紅燈停,綠燈行 print('奔馳{}以兩秒的時間飄過'.format(i)) time.sleep(2) l.release() if __name__ == '__main__': e = Event() l = Lock() p = Process(target=light, args=(e,)) p.start() for i in range(5): # 5輛車 p = Process(target=car, args=(e, i, l)) p.start()
結果:
綠燈
奔馳4以兩秒的時間飄過
奔馳2以兩秒的時間飄過
奔馳1以兩秒的時間飄過
紅燈
綠燈
奔馳3以兩秒的時間飄過
奔馳0以兩秒的時間飄過
奔馳4以兩秒的時間飄過
紅燈
# -*- coding:utf-8 -*- from multiprocessing import Event, Process, Lock import time import random # 紅綠燈 def light(e): while 1: if e.is_set(): # 爲True,flag爲True print('紅燈') e.clear() # 重置爲False,調用wait()的進程阻塞 time.sleep(5) else: print('綠燈') e.set() time.sleep(5) def car(e, i, l): while 1: l.acquire() # 先獲取鎖,確認下一輛通行的車,若是沒有鎖那麼就同時過紅綠燈 if e.wait(random.randint(0, 3)): # 紅燈停,綠燈行 print('奔馳{}以兩秒的時間飄過'.format(i)) else: print('奔馳{}闖紅燈以兩秒的時間飄過'.format(i)) time.sleep(2) l.release() if __name__ == '__main__': e = Event() l = Lock() p = Process(target=light, args=(e,)) p.start() for i in range(5): # 5輛車 p = Process(target=car, args=(e, i, l)) p.start()
結果:
綠燈
奔馳4以兩秒的時間飄過
奔馳2以兩秒的時間飄過
奔馳1以兩秒的時間飄過
紅燈
奔馳3闖紅燈以兩秒的時間飄過
綠燈
奔馳0以兩秒的時間飄過
奔馳4以兩秒的時間飄過
紅燈
奔馳2闖紅燈以兩秒的時間飄過
奔馳1闖紅燈以兩秒的時間飄過
(conn1, conn2)
Connection.
True(
默認值),則管道是雙向的。若是duplex是False,
管道是單向的:conn1
只能用於接收消息,conn2
只能用於發送消息
recv()
。ValueError
異常。(大約32 MiB +,雖然它取決於操做系統)
send()
。阻塞直到有東西要收到。若是沒有什麼留下來接收,而另外一端被關閉。拋出 EOFError
【若另外一端已關閉,則觸發BrokenPipeError異常】
ValueError
異常(大約32 MiB +,取決於操做系統)
傳輸字符串數據:
# -*- coding:utf-8 -*- from multiprocessing import Process, Pipe, BufferTooShort import time def produce(right, name): for i in range(3): print('{}生產第{}包子'.format(name, i + 1)) right.send('包子') time.sleep(1) right.close() def consume(left, name): while 1: try: goods = left.recv() print('{}消費了一個{}'.format(name, goods)) except EOFError: # 關閉另外一端,由recv觸發此異常 left.close() break if __name__ == '__main__': left, right = Pipe() print('文件描述符:{}'.format(left.fileno())) p = Process(target=produce, args=(right, '生產者1')) # 生產 c = Process(target=consume, args=(left, '消費者1')) # 消費 p.start() c.start() right.close() # 關閉多餘的兩端 left.close()
結果:
文件描述符:436
生產者1生產第1包子
消費者1消費了一個包子
生產者1生產第2包子
消費者1消費了一個包子
生產者1生產第3包子
消費者1消費了一個包子
多個消費者:
# -*- coding:utf-8 -*- from multiprocessing import Process, Pipe, BufferTooShort import time def produce(right, name): for i in range(3): print('{}生產第{}包子'.format(name, i + 1)) right.send('包子') time.sleep(1) right.close() def consume(left, name): while 1: try: goods = left.recv() print('{}消費了一個{}'.format(name, goods)) except EOFError: # 關閉另外一端,由recv觸發此異常 left.close() break if __name__ == '__main__': left, right = Pipe() print('文件描述符:{}'.format(left.fileno())) p = Process(target=produce, args=(right, '生產者1')) # 生產 c1 = Process(target=consume, args=(left, '消費者1')) # 消費 c2 = Process(target=consume, args=(left, '消費者2')) # 消費 c3 = Process(target=consume, args=(left, '消費者3')) # 消費 p.start() c1.start() c2.start() c3.start() right.close() # 關閉多餘的兩端 left.close()
結果:
文件描述符:432
生產者1生產第1包子
消費者2消費了一個包子
生產者1生產第2包子
消費者2消費了一個包子
生產者1生產第3包子
消費者3消費了一個包子
請注意,若是兩個進程(或線程)同時嘗試讀取或寫入管道的同一端,則管道中的數據可能會損壞。固然,同時使用管道的不一樣端的進程不存在損壞的風險。
傳輸字節:
# -*- coding:utf-8 -*- from multiprocessing import Process, Pipe, BufferTooShort import time def produce(right, name): for i in range(3): print('{}生產第{}包子'.format(name, i + 1)) right.send_bytes('包子'.encode()) time.sleep(1) right.send_bytes('包子包子包子'.encode()) right.close() def consume(left, name): while 1: try: byte_content = bytearray(10) bytes_size = left.recv_bytes_into(byte_content) print('{}消費了一個{}'.format(name, byte_content.decode())) print('接收了{}個數據'.format(bytes_size)) except EOFError: # 關閉另外一端,由recv觸發此異常 left.close() break except BufferTooShort as e: print('數據太長,完整數據爲:{}'.format(e.args[0].decode())) if __name__ == '__main__': left, right = Pipe() print('文件描述符:{}'.format(left.fileno())) p = Process(target=produce, args=(right, '生產者1')) # 生產 c = Process(target=consume, args=(left, '消費者1')) # 消費 p.start() c.start() right.close() # 關閉多餘的兩端 left.close()
結果:
文件描述符:476
生產者1生產第1包子
消費者1消費了一個包子
接收了6個數據
生產者1生產第2包子
消費者1消費了一個包子
接收了6個數據
生產者1生產第3包子
消費者1消費了一個包子
接收了6個數據
數據太長,完整數據爲:包子包子包子
奇怪的poll(),分析下面兩個代碼結果:
# -*- coding:utf-8 -*- from multiprocessing import Process, Pipe, BufferTooShort import time def produce(right, name): for i in range(3): print('{}生產第{}包子'.format(name, i + 1)) right.send('包子') time.sleep(3) right.close() print('right已關閉') def consume(left, name): while 1: try: print('poll阻塞') print('是否有可供讀取的數據:{}'.format(left.poll(None))) goods = left.recv() print('{}消費了一個{}'.format(name, goods)) except EOFError: # 已關閉另外一端,由recv觸發此異常 left.close() break if __name__ == '__main__': left, right = Pipe() print('文件描述符:{}'.format(left.fileno())) p = Process(target=produce, args=(right, '生產者1')) # 生產 c = Process(target=consume, args=(left, '消費者1')) # 消費 p.start() c.start() right.close() # 關閉多餘的兩端 left.close()
結果:
文件描述符:544
poll阻塞
生產者1生產第1包子
生產者1生產第2包子
生產者1生產第3包子
是否有可供讀取的數據:True
消費者1消費了一個包子
poll阻塞
是否有可供讀取的數據:True
消費者1消費了一個包子
poll阻塞
是否有可供讀取的數據:True
消費者1消費了一個包子
poll阻塞
right已關閉
是否有可供讀取的數據:True
# -*- coding:utf-8 -*- from multiprocessing import Process, Pipe, BufferTooShort import time def produce(right, name): for i in range(3): print('{}生產第{}包子'.format(name, i + 1)) right.send('包子') # time.sleep(3) right.close() print('right已關閉') def consume(left, name): while 1: try: print('poll阻塞') print('是否有可供讀取的數據:{}'.format(left.poll(None))) goods = left.recv() print('{}消費了一個{}'.format(name, goods)) except EOFError: # 已關閉另外一端,由recv觸發此異常 left.close() break if __name__ == '__main__': left, right = Pipe() print('文件描述符:{}'.format(left.fileno())) p = Process(target=produce, args=(right, '生產者1')) # 生產 c = Process(target=consume, args=(left, '消費者1')) # 消費 p.start() c.start() right.close() # 關閉多餘的兩端 left.close()
結果:
文件描述符:440
生產者1生產第1包子
生產者1生產第2包子
生產者1生產第3包子
right已關閉
poll阻塞
是否有可供讀取的數據:True
消費者1消費了一個包子
poll阻塞
是否有可供讀取的數據:True
消費者1消費了一個包子
poll阻塞
是否有可供讀取的數據:True
消費者1消費了一個包子
poll阻塞
Process Process-2:
Traceback (most recent call last):
......
BrokenPipeError: [WinError 109] 管道已結束。
第四次循環poll(None)的執行若先於管道的right端關閉代碼right.close()的執行,poll(None)返回True,並以recv引起的異常結束。反之,poll(None)引起BrokenPipeError異常
隊列是線程和進程安全的。
隊列進程安全
生產消費者模型,隊列實現:
# -*- coding:utf-8 -*- from multiprocessing import Process, Queue, JoinableQueue import os def consumer(q): while True: print('消費者進程{}等吃'.format(os.getpid())) res = q.get() if res is None: print('消費者進程{}結束'.format(os.getpid(), res)) break # 收到結束信號則結束 else: print('消費者進程{}吃了{}'.format(os.getpid(), res)) def producer(food, q): for i in range(2): q.put(food) print('生產者進程{}生產了 第{}個{}'.format(os.getpid(), i + 1, food)) print('生產者進程{}生產完成'.format(os.getpid())) if __name__ == '__main__': q = Queue() # 生產者 p1 = Process(target=producer, args=('包子', q)) p2 = Process(target=producer, args=('水果', q)) p3 = Process(target=producer, args=('米飯', q)) # 消費者 c1 = Process(target=consumer, args=(q,)) c2 = Process(target=consumer, args=(q,)) # 開始 p1.start() p2.start() p3.start() c1.start() c2.start() p1.join() p2.join() p3.join() # 有幾個消費者就put幾個None q.put(None) # 必須保證生產者所有生產完畢,才應該發送結束信號 q.put(None) q.put(None)
結果:
消費者進程12108等吃
消費者進程3648等吃
生產者進程19544生產了 第1個包子
生產者進程19544生產了 第2個包子
生產者進程19544生產完成
消費者進程12108吃了包子
消費者進程12108等吃
消費者進程3648吃了包子
消費者進程3648等吃
生產者進程828生產了 第1個米飯
消費者進程12108吃了米飯生產者進程828生產了 第2個米飯
消費者進程12108等吃
生產者進程828生產完成
消費者進程3648吃了米飯
消費者進程3648等吃
生產者進程20244生產了 第1個水果
消費者進程12108吃了水果生產者進程20244生產了 第2個水果
消費者進程12108等吃
生產者進程20244生產完成
消費者進程3648吃了水果
消費者進程3648等吃
消費者進程12108結束
消費者進程3648結束
因爲消費者收到None才能結束,所以要注意兩個問題,None必須在隊列尾部,幾個消費者,尾部就應該有幾個None
生產消費者模型,JoinableQueue實現
# -*- coding:utf-8 -*- from multiprocessing import Process,Queue, JoinableQueue import os def consumer(q): while 1: print('消費者進程{}等吃'.format(os.getpid())) res = q.get() q.task_done() # Semaphore - 1 print('消費者進程{}吃了{}'.format(os.getpid(), res)) def producer(food, q): for i in range(2): q.put(food) print('生產者進程{}生產了 第{}個{}'.format(os.getpid(), i + 1, food)) print('生產者進程{}生產完成,等待消費者消費'.format(os.getpid())) q.join() # 等待消費者進程 if __name__ == '__main__': q = JoinableQueue() # 生產者 p1 = Process(target=producer, args=('包子', q)) p2 = Process(target=producer, args=('水果', q)) p3 = Process(target=producer, args=('米飯', q)) # 消費者 c1 = Process(target=consumer, args=(q,)) c2 = Process(target=consumer, args=(q,)) c1.daemon = True c2.daemon = True # 開始 p1.start() p2.start() p3.start() c1.start() c2.start() p1.join() p2.join() p3.join()
結果:
消費者進程9952等吃
消費者進程3840等吃
生產者進程10980生產了 第1個包子
生產者進程10980生產了 第2個包子
生產者進程10980生產完成,等待消費者消費
消費者進程9952吃了包子
消費者進程9952等吃
消費者進程3840吃了包子
消費者進程3840等吃
生產者進程7452生產了 第1個水果
生產者進程18556生產了 第1個米飯
消費者進程9952吃了水果
消費者進程9952等吃
生產者進程7452生產了 第2個水果
生產者進程7452生產完成,等待消費者消費
生產者進程18556生產了 第2個米飯
生產者進程18556生產完成,等待消費者消費
消費者進程3840吃了米飯
消費者進程3840等吃
消費者進程9952吃了水果
消費者進程9952等吃
消費者進程3840吃了米飯
消費者進程3840等吃
其思路就是put以後,有個信號量計數器+1 ,每get一下調用一下taskdone,計數器就會-1。若是生產者很快生產完後,調用join,進程會等待,等到計數器爲0的時候,全部調用join()的生產者會被喚醒。所以,生產者喚醒了-->意味着消費者已經消費完,消費者因爲死循環還在等吃的(get阻塞)。設置消費者線程爲守護線程,讓主進程隨着生產者進程的結束而結束,主進程 結束後,停止守護線程(消費者)
# -*- coding:utf-8 -*- from multiprocessing import Process, Queue def f(q): q.put('X' * 1000000) # q.cancel_join_thread() if __name__ == '__main__': queue = Queue() p = Process(target=f, args=(queue,)) p.start() print('join阻塞') p.join() # this deadlocks print('get阻塞') obj = queue.get() # q.cancel_join_thread()執行後,join()不阻塞,可是get()拿不到數據,數據丟失,致使阻塞 print(obj)
multiprocessing.Queue底層是基於Pipe構建的,可是數據傳遞時並非直接寫入Pipe,而是寫入進程本地buffer,經過一個feeder線程寫入底層Pipe,
所以一次put數據很大的時候,會一直等待get()取出。沒有get()就join該進程,會致使死鎖
你能夠建立一個進程池,進程將使用Pool類執行提交給它的任務。
# -*- coding:utf-8 -*- from multiprocessing import Pool import os import time def func(n): print('i={}, pid={}'.format(n, os.getpid())) time.sleep(1) return n**2 if __name__ == '__main__': p = Pool(5) for i in range(10): # p.apply(func, (i,)) # 只在一個進程中執行,會阻塞主進程 p.apply_async(func, (i,)) # 適合並行,一下由五個進程處理五個任務,不阻塞主進程 print('主進程') p.close() p.join()
結果:
主進程
i=0, pid=6540
i=1, pid=1348
i=2, pid=17060
i=3, pid=7632
i=4, pid=7396
i=5, pid=6540
i=6, pid=1348
i=7, pid=7396
i=8, pid=7632
i=9, pid=17060
# -*- coding:utf-8 -*- from multiprocessing import Pool import os import time def fun(n): print('i={}, pid={}'.format(n, os.getpid())) time.sleep(1) return n if __name__ == '__main__': p = Pool(4) # result = p.map(fun, [(1, 2), (1, 2)], chunksize=1) # map阻塞主進程,結果出來後,再解除阻塞 result = p.map_async(fun, [(1, 2), (1, 2)], chunksize=1) # 異步,不阻塞主線程,任務還在子進程進行; print('主進程') # print(result) # map返回列表,可直接打印 print(result.get()) # map_async返回結果對象 p.close() p.join()
結果:
主進程
i=(1, 2), pid=2004
i=(1, 2), pid=5328
[(1, 2), (1, 2)]
# -*- coding:utf-8 -*- from multiprocessing import Pool import os import time def fun(n): print('i={}, pid={}'.format(n, os.getpid())) time.sleep(1) return n if __name__ == '__main__': p = Pool(4) # result = p.imap(fun, [(1, 2), (3, 4)], chunksize=1) # 異步 result = p.imap_unordered(fun, [(1, 2), (3, 4)], chunksize=1) # 異步,不阻塞主線程,任務還在子進程進行,結果無序; print('主進程') for i in result: # imap返回迭代器 print(i) p.close() p.join()
結果:
主進程
i=(1, 2), pid=17396
i=(3, 4), pid=12496
(1, 2)
(3, 4)
# -*- coding:utf-8 -*- from multiprocessing import Pool import os import time def fun(n, k): print('i={}, pid={}'.format(n, os.getpid())) time.sleep(1) return n, k if __name__ == '__main__': p = Pool(4) # result = p.starmap(fun, [(1, 2), (3, 4)], chunksize=1) # 阻塞,直到所有結果處理完 result = p.starmap_async(fun, [(1, 2), (3, 4)], chunksize=1) # 異步,不阻塞主線程,任務還在子進程進行; print('主進程') # print(result) # starmap返回列表,直接打印 print(result.get()) p.close() p.join()
結果:
主進程
i=1, pid=14660
i=3, pid=10564
[(1, 2), (3, 4)]
使用進程池實現搶票:
# -*- coding:utf-8 -*- from multiprocessing import Process, Pool, Manager import time import json count = {'count': 1} # 僅剩最後一張票 with open('db.txt', 'w', encoding='utf-8') as f: json.dump(count, f) # 返回剩餘票數 def search(): dic = json.load(open('db.txt')) print('剩餘票數%s' % dic['count']) return dic def get_ticket(dic): time.sleep(0.1) # 模擬讀數據的網絡延遲 if dic['count'] > 0: dic['count'] -= 1 time.sleep(0.2) # 模擬寫數據的網絡延遲 json.dump(dic, open('db.txt', 'w')) print('購票成功,剩餘:{}'.format(dic['count'])) else: print('搶票失敗,去邀請好友助力!') def ticket_purchase(lock, i): print('第{}個用戶'.format(i)) lock.acquire() get_ticket(search()) lock.release() if __name__ == '__main__': lock = Manager().Lock() # 要使用Manager().Lock() p = Pool(5) for i in range(10): # 模擬併發10個客戶端搶票 p.apply_async(ticket_purchase, (lock, i + 1)) p.close() p.join()
結果:
第1個用戶
剩餘票數1
第2個用戶
第3個用戶
第4個用戶
第5個用戶
購票成功,剩餘:0
剩餘票數0
第6個用戶
搶票失敗,去邀請好友助力!
剩餘票數0
第7個用戶
搶票失敗,去邀請好友助力!
剩餘票數0
第8個用戶
搶票失敗,去邀請好友助力!
剩餘票數0
第9個用戶
搶票失敗,去邀請好友助力!
剩餘票數0
第10個用戶
搶票失敗,去邀請好友助力!
剩餘票數0
搶票失敗,去邀請好友助力!
剩餘票數0
搶票失敗,去邀請好友助力!
剩餘票數0
搶票失敗,去邀請好友助力!
剩餘票數0
搶票失敗,去邀請好友助力!
# -*- coding:utf-8 -*- from multiprocessing import Pool import os import sys def func(x): print("pid: ", os.getpid(), " got: ", x) sys.stdout.flush() return [x, x+1] def got(r): print("got result: ", r) if __name__ == '__main__': pool = Pool(processes=1, maxtasksperchild=9) # 進程執行了九個任務就會退出,換新的進程執行 keys = [1, 2, 3, 4, 5, 6, 7, 8, 9, 10] result = pool.map_async(func, keys, chunksize=1, callback=got) # chunksize指定每chuncksize個元素爲一個任務 # result = pool.map_async(func, keys, chunksize=2, callback=got) # chunksize爲2說明此時只有五個任務,沒有換新的進程執行 pool.close() pool.join()
結果:
pid: 8188 got: 1
pid: 8188 got: 2
pid: 8188 got: 3
pid: 8188 got: 4
pid: 8188 got: 5
pid: 8188 got: 6
pid: 8188 got: 7
pid: 8188 got: 8
pid: 8188 got: 9
pid: 10860 got: 10
got result: [[1, 2], [2, 3], [3, 4], [4, 5], [5, 6], [6, 7], [7, 8], [8, 9], [9, 10], [10, 11]]
參考: