python多進程——進程間通訊

(一)進程鎖

搶票的例子:html

# -*- coding:utf-8 -*-
from multiprocessing import Process, Lock
import time
import json

count = {'count': 1}  # 僅剩最後一張票
with open('db.txt', 'w', encoding='utf-8') as f:
    json.dump(count, f)


# 返回剩餘票數
def search():
    dic = json.load(open('db.txt'))
    print('剩餘票數%s' % dic['count'])
    return dic


def get_ticket(dic):
    time.sleep(0.1)  # 模擬讀數據的網絡延遲
    if dic['count'] > 0:
        dic['count'] -= 1
        time.sleep(0.2)  # 模擬寫數據的網絡延遲
        json.dump(dic, open('db.txt', 'w'))
        print('購票成功,剩餘:{}'.format(dic['count']))
    else:
        print('搶票失敗,去邀請好友助力!')


def ticket_purchase(lock, i):
    print('第{}個用戶'.format(i))
    # lock.acquire()
    get_ticket(search())
    # lock.release()


if __name__ == '__main__':
    lock = Lock()
    for i in range(10):  # 模擬併發10個客戶端搶票
        p = Process(target=ticket_purchase, args=(lock, i + 1))
        p.start()

結果:python

第6個用戶
剩餘票數1
第4個用戶
剩餘票數1
第7個用戶
剩餘票數1
第1個用戶
剩餘票數1
第10個用戶
剩餘票數1
第3個用戶
剩餘票數1
第5個用戶
剩餘票數1
第8個用戶
剩餘票數1
第2個用戶
剩餘票數1
第9個用戶
剩餘票數1
購票成功,剩餘:0
購票成功,剩餘:0
購票成功,剩餘:0
購票成功,剩餘:0
購票成功,剩餘:0
購票成功,剩餘:0
購票成功,剩餘:0
購票成功,剩餘:0
購票成功,剩餘:0
購票成功,剩餘:0
十個用戶會同時把票搶走,由於每次search同一時間能查到只有一個票

multipleprocessing.Lock

  • 非遞歸的鎖定對象,很是相似threading.Lock.一旦進程或線程得到了鎖,後續嘗試從任何進程或線程獲取它,將被阻塞直到被釋放; 任何進程或線程均可以釋放它。
  • Lock支持上下文管理協議,能夠在with中使用。

 

acquire(block=True, timeout=None)

  • 獲取一個鎖,阻塞(block=True)或不阻塞(block=False)
  • 當block設置爲True的時候(默認設爲True),若是鎖處於鎖定狀態,調用該方法會阻塞,直到鎖被釋放;而後將鎖設置爲鎖定狀態,並返回True。
  • 當block設置爲False的時候,調用該方法不會阻塞。若是鎖處於鎖定狀態,則返回False,不然將鎖設置爲鎖定狀態,並返回True。
  • 當timeout爲正數時,只要沒法獲取鎖,最多阻塞超時指定的秒數。超時值爲負值至關於超時值爲零。超時值爲「None」(默認值)的調用將超時時間設置爲無限。請注意,超時的負值或無值的處理方式與threading.lock.acquire()中實現的行爲不一樣。若是block參數設置爲false,超時參數沒有實際意義,所以會忽略timeout參數,。若是獲取了鎖,則返回true;若是過了超時時間,則返回false。

 

release()

  • 解鎖,能夠從任何進程或線程調用,而不只僅是最初獲取鎖的進程或線程。
  • 大部分行爲和threading.lock.release()相同,但在未鎖定狀態時調用引起ValueError 【後者引起RuntimeError】

 

    加鎖能夠保證多個進程修改同一塊數據時,同一時間只能有一個任務能夠進行修改,即串行的修改,沒錯,速度是慢了,但犧牲了速度卻保證了數據安全。json

  若是搶票步驟沒有加鎖,那麼可能會有幾我的同時把票搶走,由於每次search都能查到有一個票,加了鎖之後只能一個一個搶安全

 加鎖:網絡

# -*- coding:utf-8 -*-
from multiprocessing import Process, Lock
import time
import json


count = {'count': 1}  # 僅剩最後一張票
with open('db.txt', 'w', encoding='utf-8') as f:
    json.dump(count, f)


# 返回剩餘票數
def search():
    dic = json.load(open('db.txt'))
    print('剩餘票數%s' % dic['count'])
    return dic


def get_ticket(dic):
    time.sleep(0.1)  # 模擬讀數據的網絡延遲
    if dic['count'] > 0:
        dic['count'] -= 1
        time.sleep(0.2)  # 模擬寫數據的網絡延遲
        json.dump(dic, open('db.txt', 'w'))
        print('購票成功,剩餘:{}'.format(dic['count']))
    else:
        print('搶票失敗,去邀請好友助力!')


def ticket_purchase(lock, i):
    print('第{}個用戶'.format(i))
    lock.acquire()
    get_ticket(search())
    lock.release()


if __name__ == '__main__':
    lock = Lock()
    for i in range(10):  # 模擬併發10個客戶端搶票
        p = Process(target=ticket_purchase, args=(lock, i + 1))
        p.start()

結果:併發

第2個用戶
剩餘票數1
第1個用戶
第9個用戶
第10個用戶
第5個用戶
第7個用戶
第8個用戶
第3個用戶
第6個用戶
第4個用戶
購票成功,剩餘:0
剩餘票數0
搶票失敗,去邀請好友助力!
剩餘票數0
搶票失敗,去邀請好友助力!
剩餘票數0
搶票失敗,去邀請好友助力!
剩餘票數0
搶票失敗,去邀請好友助力!
剩餘票數0
搶票失敗,去邀請好友助力!
剩餘票數0
搶票失敗,去邀請好友助力!
剩餘票數0
搶票失敗,去邀請好友助力!
剩餘票數0
搶票失敗,去邀請好友助力!
剩餘票數0
搶票失敗,去邀請好友助力!

  從結果能夠看出,並非手速最快的才能搶到app

 multiprocessing.RLock

  • 遞歸的鎖對象,必須由獲取它的進程或線程釋放遞歸鎖一旦進程或線程得到了遞歸鎖定,相同的進程或線程就能夠再次獲取它而不會阻塞該進程或線程獲取和釋放鎖的次數必須相等。
  • RLock支持上下文管理器協議,所以能夠在with語句中使用。

 

 acquire(block=True, timeout=None)

  • 獲取一個鎖,阻塞(block=True)或不阻塞(block=False)
  • 當在block參數設置爲True的狀況下調用時,除非該鎖已由當前進程或線程擁有,不然將一直阻塞到該鎖處於未鎖定狀態(此時它不屬於任何進程或線程)。而後,當前進程或線程取得鎖的全部權(若是它尚未全部權),鎖內的遞歸級別將增長一,返回true。
  • 當block設置爲False時,調用時不阻塞。若是鎖已經被另外一個進程或線程獲取,則當前進程或線程不佔用全部權,而且鎖中的遞歸級別不會更改,返回值爲False; 若是鎖處於解鎖狀態,則當前進程或線程將得到全部權,遞歸級別將遞增,返回True.

 

 release()

  • 釋放鎖,遞減遞歸級別。若是在遞減遞歸級別爲零以後,將鎖重置爲解鎖狀態(不禁任何進程或線程擁有),而且若是任何其餘進程或線程正在阻塞,等待解鎖,僅容許其中一個繼續執行。若是在遞減以後遞歸級別仍然非零,則鎖保持鎖定並仍由調用進程或線程擁有。
  • 只有在調用進程或線程擁有鎖時才調用此方法。若是此方法由全部者之外的進程或線程調用,或者鎖處於未鎖定(無主)狀態,則引起AssertionError。請注意,在這種狀況下引起的異常類型與threading.rlock.release()中實現的行爲不一樣 【後者引起RuntimeError】。

 

  若你的線程處理中會有一些比較複雜的代碼邏輯過程,好比不少層的函數調用,而這些函數其實都須要進行加鎖保護數據訪問。這樣就可能會反覆的屢次加鎖,於是用RLock就能夠進行屢次加鎖,解鎖,直到最終鎖被釋放,而若是用普通的lock,當你一個函數A已經加鎖,它內部調用另外一個函數B,若是B內部也會對同一個鎖加鎖,那麼這種狀況就也會致使死鎖。dom

 

 (二)信號量

multiprocessing.Semaphore

  • 信號量對象:近似的類比threading.Semaphore
  • 此類實現信號量對象。信號量管理表示release()調用數減去acquire()調用數再加上原子計數器的初始值。當計數器爲0時,acquire()方法將一直阻塞,直到它能夠返回而不使計數器爲負爲止。若是未給定,則值默認爲1。
  • 可選參數提供內部計數器的初始值;默認值爲1。若是給定的值小於0,則會引起ValueError
  • acquire方法的第一個參數被命名爲block,和multiprocessing.Lock.acquire() 一致

 

acquire(block=True,timeout=None)

  • 獲取信號量。
  • 在不帶參數的狀況下調用:
  1. 若是內部計數器大於零,則將其遞減1並當即返回true。異步

  2. 若是內部計數器爲零,則阻塞直到調用 release()喚醒。一旦喚醒(計數器大於0),將計數器遞減1並返回true。每次調用release()都會喚醒一個進程。不該依賴進程被喚醒的順序async

  • 當block設置爲false ,不阻塞。若是此時調用沒有參數acquire()會阻塞(即此時計數器爲0),則當即返回false; 不然,執行與不帶參數調用時相同的操做,並返回true。
  • 當timeout是None之外的值,它將最多阻止timeout秒。若是在該時間間隔內(計數器一直爲0),則返回false。不然返回true。

 

release()

  • 釋放信號量,將內部計數器遞增1。當計數器爲零而且另外一個進程正在等待它再次大於零時喚醒該進程。

【acquire()和release()不必定一對一,是否阻塞要取決於計數器的值】

 

# -*- coding:utf-8 -*-

from multiprocessing import Semaphore, Process
import time
import random


def enter_room(smp, i):
    if smp.acquire(block=True, timeout=random.randint(1, 3)):  # 超時還未獲取,返回false,反之返回True
        print('用戶%d進入了房間' % i)
        time.sleep(1)
        smp.release()
        print('用戶%d離開了房間' % i)
    else:
        print('等過久,走人')


if __name__ == '__main__':
    smp = Semaphore(2)
    for i in range(10):
        p = Process(target=enter_room, args=(smp, i))
        p.start()

結果:

用戶5進入了房間
用戶8進入了房間
用戶5離開了房間
用戶0進入了房間
用戶8離開了房間用戶9進入了房間

等過久,走人
用戶0離開了房間
用戶2進入了房間
用戶9離開了房間
用戶4進入了房間
等過久,走人
等過久,走人
用戶2離開了房間用戶6進入了房間

用戶4離開了房間
用戶6離開了房間

 

(三)事件

multiprocessing.Event

  • 克隆threading.Event
  • 這是進程之間通訊的最簡單機制之一:一個進程發出事件信號,其餘進程等待它。
  • 事件對象管理一個內部flag,該標誌可使用set()方法設置爲true,使用clear()方法重置爲false  。flag = False,wait()方法將阻塞,直到該flag爲True。flag初始值是Flase。

 

is_set()

  • 當且僅當內部標誌爲真時返回true。

 

set()

  • 將內部標誌設置爲true。一旦標誌爲真,調用wait()的線程將不阻塞。

 

clear()

  • 將內部標誌重置爲false。隨後,調用 wait的進程將阻塞,直到set()被調用以再次將內部標誌設置爲true。

 

wait(timeout=None)

  • 阻塞直到內部flag爲True: 若是調用時flag就爲True,則當即返回, 不然,直到另外一個進程調用set()將內部flag設置爲True,或者阻塞超時。
  • 當timeout不爲None時,它指定超時的時間(單位:秒)
  • 除非給出超時參數而且阻塞超時返回False,其它狀況皆爲True

 

紅綠燈:

# -*- coding:utf-8 -*-
from multiprocessing import Event, Process, Lock
import time


# 紅綠燈
def light(e):
    while 1:
        if e.is_set():  # 爲True,flag爲True
            print('紅燈')
            e.clear()  # 重置爲False,調用wait()的進程阻塞
            time.sleep(5)
        else:
            print('綠燈')
            e.set()
            time.sleep(5)


def car(e, i, l):
        while 1:
            l.acquire()  # 先獲取鎖,確認下一輛通行的車
            e.wait()  # 紅燈停,綠燈行
            print('奔馳{}以兩秒的時間飄過'.format(i))
            time.sleep(2)
            l.release()


if __name__ == '__main__':
    e = Event()
    l = Lock()
    p = Process(target=light, args=(e,))
    p.start()
    for i in range(5):  # 5輛車
        p = Process(target=car, args=(e, i, l))
        p.start()

結果:

綠燈
奔馳4以兩秒的時間飄過
奔馳2以兩秒的時間飄過
奔馳1以兩秒的時間飄過
紅燈
綠燈
奔馳3以兩秒的時間飄過
奔馳0以兩秒的時間飄過
奔馳4以兩秒的時間飄過
紅燈

闖紅燈的例子:

# -*- coding:utf-8 -*-
from multiprocessing import Event, Process, Lock
import time
import random


# 紅綠燈
def light(e):
    while 1:
        if e.is_set():  # 爲True,flag爲True
            print('紅燈')
            e.clear()  # 重置爲False,調用wait()的進程阻塞
            time.sleep(5)
        else:
            print('綠燈')
            e.set()
            time.sleep(5)


def car(e, i, l):
        while 1:
            l.acquire()  # 先獲取鎖,確認下一輛通行的車,若是沒有鎖那麼就同時過紅綠燈
            if e.wait(random.randint(0, 3)):  # 紅燈停,綠燈行
                print('奔馳{}以兩秒的時間飄過'.format(i))
            else:
                print('奔馳{}闖紅燈以兩秒的時間飄過'.format(i))
            time.sleep(2)
            l.release()


if __name__ == '__main__':
    e = Event()
    l = Lock()
    p = Process(target=light, args=(e,))
    p.start()
    for i in range(5):  # 5輛車
        p = Process(target=car, args=(e, i, l))
        p.start()

結果:

綠燈
奔馳4以兩秒的時間飄過
奔馳2以兩秒的時間飄過
奔馳1以兩秒的時間飄過
紅燈
奔馳3闖紅燈以兩秒的時間飄過
綠燈
奔馳0以兩秒的時間飄過
奔馳4以兩秒的時間飄過
紅燈
奔馳2闖紅燈以兩秒的時間飄過
奔馳1闖紅燈以兩秒的時間飄過

 

(四)管道

multipleprocessing.Pipe([duplex])

  • 返回一對的 表明的配管的端部的對象。(conn1, conn2)Connection.
  • 若是duplexTrue(默認值),則管道是雙向的。若是duplexFalse,管道是單向的:conn1只能用於接收消息,conn2只能用於發送消息
  • multipleprocessing.connection.Connection

 

send(obj)

  • 將對象發送到應該使用的鏈接的另外一端recv()
  • 該對象必須是可選擇的。很是大的pickle可能引起ValueError異常。(大約32 MiB +,雖然它取決於操做系統)

 

recv()

  • 使用返回從鏈接另外一端發送的對象 send()。阻塞直到有東西要收到。若是沒有什麼留下來接收,而另外一端被關閉。拋出 EOFError

 

fileno()

  • 返回鏈接使用的文件描述符或句柄。

 

close()

  • 關閉鏈接。
  • 當鏈接被垃圾收集時,會自動調用此方法。

 

poll([timeout])

  • 查詢是否有可供讀取的數據。
  • 未指定timeout,當即返回,若是timeout是一個數字,則阻塞timeout時間(單位:秒),若是是None,一直阻塞。

【若另外一端已關閉,則觸發BrokenPipeError異常

 

send_bytes(buffer[, offset[, size]])

  • 從相似bytes的對象中發送字節數據做爲完整的消息。
  • 若是給出offset則從buffer中的該位置讀取數據若是給出size,則將從緩衝區中讀取多個字節。很是大的緩衝區可能會引起 ValueError異常(大約32 MiB +,取決於操做系統)

 

recv_bytes([maxlength])

  • 返回從鏈接另外一端發送的字節數據的完整消息。直到有數據要接收。若是沒有要接收的內容,而且另外一端已關閉,則引起EOFError

 

recv_bytes_into(buffer[, offset])

  • 將鏈接另外一端發送的字節數據的完整消息讀入緩衝區,並返回消息中的字節數。直到有數據要接收。若是沒有要接收的內容而且另外一端已關閉,則引起EOFError
  • 緩衝區必須是與可寫的和bytes相似的對象。若是給出了偏移量,那麼消息將從該位置寫入緩衝區。偏移量必須是小於緩衝區長度的非負整數(以字節爲單位)。
  • 若是緩衝區過短,則會引起BufferToSort異常,發送的完整消息可從e.args[0]獲取,其中e是異常實例。

 

生產消費者模型:

傳輸字符串數據:

# -*- coding:utf-8 -*-
from multiprocessing import Process, Pipe, BufferTooShort
import time


def produce(right, name):
    for i in range(3):
        print('{}生產第{}包子'.format(name, i + 1))
        right.send('包子')
        time.sleep(1)
    right.close()


def consume(left, name):
    while 1:
            try:
                goods = left.recv()
                print('{}消費了一個{}'.format(name, goods))
            except EOFError:  # 關閉另外一端,由recv觸發此異常
                left.close()
                break


if __name__ == '__main__':
    left, right = Pipe()
    print('文件描述符:{}'.format(left.fileno()))
    p = Process(target=produce, args=(right, '生產者1'))  # 生產
    c = Process(target=consume, args=(left, '消費者1'))  # 消費
    p.start()
    c.start()
    right.close()  # 關閉多餘的兩端
    left.close()

結果:

文件描述符:436
生產者1生產第1包子
消費者1消費了一個包子
生產者1生產第2包子
消費者1消費了一個包子
生產者1生產第3包子
消費者1消費了一個包子

多個消費者:

# -*- coding:utf-8 -*-
from multiprocessing import Process, Pipe, BufferTooShort
import time


def produce(right, name):
    for i in range(3):
        print('{}生產第{}包子'.format(name, i + 1))
        right.send('包子')
        time.sleep(1)
    right.close()


def consume(left, name):
    while 1:
            try:
                goods = left.recv()
                print('{}消費了一個{}'.format(name, goods))
            except EOFError:  # 關閉另外一端,由recv觸發此異常
                left.close()
                break


if __name__ == '__main__':
    left, right = Pipe()
    print('文件描述符:{}'.format(left.fileno()))
    p = Process(target=produce, args=(right, '生產者1'))  # 生產
    c1 = Process(target=consume, args=(left, '消費者1'))  # 消費
    c2 = Process(target=consume, args=(left, '消費者2'))  # 消費
    c3 = Process(target=consume, args=(left, '消費者3'))  # 消費
    p.start()
    c1.start()
    c2.start()
    c3.start()
    right.close()  # 關閉多餘的兩端
    left.close()

結果:

文件描述符:432
生產者1生產第1包子
消費者2消費了一個包子
生產者1生產第2包子
消費者2消費了一個包子
生產者1生產第3包子
消費者3消費了一個包子

請注意,若是兩個進程(或線程)同時嘗試讀取或寫入管道的同一端,則管道中的數據可能會損壞。固然,同時使用管道的不一樣端的進程不存在損壞的風險。

 

傳輸字節:

# -*- coding:utf-8 -*-
from multiprocessing import Process, Pipe, BufferTooShort
import time


def produce(right, name):
    for i in range(3):
        print('{}生產第{}包子'.format(name, i + 1))
        right.send_bytes('包子'.encode())
        time.sleep(1)
    right.send_bytes('包子包子包子'.encode())
    right.close()


def consume(left, name):
    while 1:
            try:
                byte_content = bytearray(10)
                bytes_size = left.recv_bytes_into(byte_content)
                print('{}消費了一個{}'.format(name, byte_content.decode()))
                print('接收了{}個數據'.format(bytes_size))
            except EOFError:  # 關閉另外一端,由recv觸發此異常
                left.close()
                break
            except BufferTooShort as e:
                print('數據太長,完整數據爲:{}'.format(e.args[0].decode()))


if __name__ == '__main__':
    left, right = Pipe()
    print('文件描述符:{}'.format(left.fileno()))
    p = Process(target=produce, args=(right, '生產者1'))  # 生產
    c = Process(target=consume, args=(left, '消費者1'))  # 消費
    p.start()
    c.start()
    right.close()  # 關閉多餘的兩端
    left.close()

結果:

文件描述符:476
生產者1生產第1包子
消費者1消費了一個包子    
接收了6個數據
生產者1生產第2包子
消費者1消費了一個包子    
接收了6個數據
生產者1生產第3包子
消費者1消費了一個包子    
接收了6個數據
數據太長,完整數據爲:包子包子包子

奇怪的poll(),分析下面兩個代碼結果:

# -*- coding:utf-8 -*-
from multiprocessing import Process, Pipe, BufferTooShort
import time


def produce(right, name):
    for i in range(3):
        print('{}生產第{}包子'.format(name, i + 1))
        right.send('包子')
    time.sleep(3)
    right.close()
    print('right已關閉')


def consume(left, name):
    while 1:
            try:
                print('poll阻塞')
                print('是否有可供讀取的數據:{}'.format(left.poll(None)))
                goods = left.recv()
                print('{}消費了一個{}'.format(name, goods))
            except EOFError:  # 已關閉另外一端,由recv觸發此異常
                left.close()
                break


if __name__ == '__main__':
    left, right = Pipe()
    print('文件描述符:{}'.format(left.fileno()))
    p = Process(target=produce, args=(right, '生產者1'))  # 生產
    c = Process(target=consume, args=(left, '消費者1'))  # 消費
    p.start()
    c.start()
    right.close()  # 關閉多餘的兩端
    left.close()

結果:

文件描述符:544
poll阻塞
生產者1生產第1包子
生產者1生產第2包子
生產者1生產第3包子
是否有可供讀取的數據:True
消費者1消費了一個包子
poll阻塞
是否有可供讀取的數據:True
消費者1消費了一個包子
poll阻塞
是否有可供讀取的數據:True
消費者1消費了一個包子
poll阻塞
right已關閉
是否有可供讀取的數據:True
# -*- coding:utf-8 -*-
from multiprocessing import Process, Pipe, BufferTooShort
import time


def produce(right, name):
    for i in range(3):
        print('{}生產第{}包子'.format(name, i + 1))
        right.send('包子')
    # time.sleep(3)
    right.close()
    print('right已關閉')


def consume(left, name):
    while 1:
            try:
                print('poll阻塞')
                print('是否有可供讀取的數據:{}'.format(left.poll(None)))
                goods = left.recv()
                print('{}消費了一個{}'.format(name, goods))
            except EOFError:  # 已關閉另外一端,由recv觸發此異常
                left.close()
                break


if __name__ == '__main__':
    left, right = Pipe()
    print('文件描述符:{}'.format(left.fileno()))
    p = Process(target=produce, args=(right, '生產者1'))  # 生產
    c = Process(target=consume, args=(left, '消費者1'))  # 消費
    p.start()
    c.start()
    right.close()  # 關閉多餘的兩端
    left.close()

結果:

文件描述符:440
生產者1生產第1包子
生產者1生產第2包子
生產者1生產第3包子
right已關閉
poll阻塞
是否有可供讀取的數據:True
消費者1消費了一個包子
poll阻塞
是否有可供讀取的數據:True
消費者1消費了一個包子
poll阻塞
是否有可供讀取的數據:True
消費者1消費了一個包子
poll阻塞
Process Process-2:
Traceback (most recent call last):
......
BrokenPipeError: [WinError 109] 管道已結束。

第四次循環poll(None)的執行若先於管道的right端關閉代碼right.close()的執行,poll(None)返回True,並以recv引起的異常結束。反之,poll(None)引起BrokenPipeError異常

 

(五)隊列

隊列是線程和進程安全的。

multiprocessing.Queue

  • 返回使用管道和一些/信號量實現的進程共享隊列。當一個進程第一次將一個項目放入隊列時,就會啓動一個feeder線程,它將對象從緩衝區傳輸到管道中
  • 一般的queue.empty和queue.full異常從標準庫的queue模塊引起,以發出信號超時。
  • queue實現queue.queue的全部方法,但task_done()和join()除外。

 

qsize()

  • 返回隊列的大體大小。這個數字不可靠.

 

empty()

  • 爲空返回True,不然返回False。這個數字不可靠

 

full()

  • 隊列已滿返回True,不然返回False。這個數字不可靠

 

put(obj[, block[, timeout]])

  • 將obj放入隊列。若是可選參數block爲「True」(默認值),timeout爲「None」(默認值),則根據須要阻塞,直到插槽可用。若是超時爲正數,則最多會阻塞timeout秒數,若是在此時間內沒有可用插槽,引起queue.Full異常。不然(block爲False),若是空閒插槽當即可用,則將項目放入隊列,不然引起queue.Full異常(在這種狀況下,超時將被忽略)。

 

 put_nowait()

  • 顧名思義,等效於put(obj,False)

 

get([block,[, timeout])

  • 從隊列中移除並返回項目。若是可選參數block爲True(默認值),timeout=None(默認值),則在item可用以前根據須要進行阻塞。若是超時爲正數,則最多阻塞timeout秒,若是在該時間內沒有可用項,則引起queue.Empty異常。不然(block爲False),若是item當即可用,則返回該項,不然引起queue.Empty異常(在這種狀況下忽略超時)。

 

get_nowait()

  • 顧名思義,等效於get(False)

 

close()

  • 指示當前進程不會將更多數據放入此隊列。後臺線程將全部緩衝數據刷新到管道後將退出。當隊列被垃圾收集時,將自動調用此函數。

 

 join_thread()

  • join()後臺線程。這隻能在調用close()以後使用。它將一直阻塞,直到後臺線程退出,以確保緩衝區中的全部數據都已刷新到管道中。
  • 默認狀況下,若是進程不是隊列的建立者,那麼在退出時,它將嘗試加入隊列的後臺線程。進程能夠調用cancel_join_thread()使join_thread()不作任何操做。

 

cancel_join_thread()

  • 防止join_thread()阻塞。可防止後臺線程在進程退出時自動join()——請參見join_thread()。
  • 此方法的更好名稱多是allow_exit_without_flush()。但它極可能會致使排隊的數據丟失,您幾乎確定不須要使用它。只有當您須要當前進程當即退出而不等待將排隊的數據刷新到底層管道時,它纔有存在的意義,並且您不關心丟失的數據。

 

隊列進程安全

生產消費者模型,隊列實現:

# -*- coding:utf-8 -*-
from multiprocessing import Process, Queue, JoinableQueue
import os


def consumer(q):
    while True:
        print('消費者進程{}等吃'.format(os.getpid()))
        res = q.get()
        if res is None:
            print('消費者進程{}結束'.format(os.getpid(), res))
            break  # 收到結束信號則結束
        else:
            print('消費者進程{}吃了{}'.format(os.getpid(), res))


def producer(food, q):
    for i in range(2):
        q.put(food)
        print('生產者進程{}生產了 第{}個{}'.format(os.getpid(), i + 1, food))
    print('生產者進程{}生產完成'.format(os.getpid()))


if __name__ == '__main__':
    q = Queue()
    # 生產者
    p1 = Process(target=producer, args=('包子', q))
    p2 = Process(target=producer, args=('水果', q))
    p3 = Process(target=producer, args=('米飯', q))

    # 消費者
    c1 = Process(target=consumer, args=(q,))
    c2 = Process(target=consumer, args=(q,))

    # 開始
    p1.start()
    p2.start()
    p3.start()
    c1.start()
    c2.start()

    p1.join()
    p2.join()
    p3.join()

    # 有幾個消費者就put幾個None
    q.put(None)  # 必須保證生產者所有生產完畢,才應該發送結束信號
    q.put(None)
    q.put(None)

結果:

消費者進程12108等吃
消費者進程3648等吃
生產者進程19544生產了 第1個包子
生產者進程19544生產了 第2個包子
生產者進程19544生產完成
消費者進程12108吃了包子
消費者進程12108等吃
消費者進程3648吃了包子
消費者進程3648等吃
生產者進程828生產了 第1個米飯
消費者進程12108吃了米飯生產者進程828生產了 第2個米飯
消費者進程12108等吃

生產者進程828生產完成
消費者進程3648吃了米飯
消費者進程3648等吃
生產者進程20244生產了 第1個水果
消費者進程12108吃了水果生產者進程20244生產了 第2個水果

消費者進程12108等吃
生產者進程20244生產完成
消費者進程3648吃了水果
消費者進程3648等吃
消費者進程12108結束
消費者進程3648結束

因爲消費者收到None才能結束,所以要注意兩個問題,None必須在隊列尾部,幾個消費者,尾部就應該有幾個None

 

multipleprocessing.JoinableQueue

task_done()

  • 指示之前排隊的任務已完成。由隊列使用者使用。對於用於獲取任務的每一個get(),對task_done()的後續調用會告訴隊列任務的處理已完成。

 

join()

  • 阻塞,直到隊列中的全部項目都被獲取和處理。
  • 每當將項目添加到隊列時,未完成任務的計數就會增長。每當使用者調用task_done()以指示已檢索到該項,而且對其進行的全部工做都已完成時,計數就會降低。當未完成任務的計數降至零時,join()將取消阻塞。

 

生產消費者模型,JoinableQueue實現

# -*- coding:utf-8 -*-
from multiprocessing import Process,Queue, JoinableQueue
import os


def consumer(q):
    while 1:
        print('消費者進程{}等吃'.format(os.getpid()))
        res = q.get()
        q.task_done()  # Semaphore - 1
        print('消費者進程{}吃了{}'.format(os.getpid(), res))


def producer(food, q):
    for i in range(2):
        q.put(food)
        print('生產者進程{}生產了 第{}個{}'.format(os.getpid(), i + 1, food))
    print('生產者進程{}生產完成,等待消費者消費'.format(os.getpid()))
    q.join()  # 等待消費者進程


if __name__ == '__main__':
    q = JoinableQueue()
    # 生產者
    p1 = Process(target=producer, args=('包子', q))
    p2 = Process(target=producer, args=('水果', q))
    p3 = Process(target=producer, args=('米飯', q))

    # 消費者
    c1 = Process(target=consumer, args=(q,))
    c2 = Process(target=consumer, args=(q,))
    c1.daemon = True
    c2.daemon = True

    # 開始
    p1.start()
    p2.start()
    p3.start()
    c1.start()
    c2.start()

    p1.join()
    p2.join()
    p3.join()

結果:

消費者進程9952等吃
消費者進程3840等吃
生產者進程10980生產了 第1個包子
生產者進程10980生產了 第2個包子
生產者進程10980生產完成,等待消費者消費
消費者進程9952吃了包子
消費者進程9952等吃
消費者進程3840吃了包子
消費者進程3840等吃
生產者進程7452生產了 第1個水果
生產者進程18556生產了 第1個米飯
消費者進程9952吃了水果
消費者進程9952等吃
生產者進程7452生產了 第2個水果
生產者進程7452生產完成,等待消費者消費
生產者進程18556生產了 第2個米飯
生產者進程18556生產完成,等待消費者消費
消費者進程3840吃了米飯
消費者進程3840等吃
消費者進程9952吃了水果
消費者進程9952等吃
消費者進程3840吃了米飯
消費者進程3840等吃

  其思路就是put以後,有個信號量計數器+1  ,每get一下調用一下taskdone,計數器就會-1。若是生產者很快生產完後,調用join,進程會等待,等到計數器爲0的時候,全部調用join()的生產者會被喚醒。所以,生產者喚醒了-->意味着消費者已經消費完,消費者因爲死循環還在等吃的(get阻塞)。設置消費者線程爲守護線程,讓主進程隨着生產者進程的結束而結束,主進程 結束後,停止守護線程(消費者)

 

死鎖:

# -*- coding:utf-8 -*-
from multiprocessing import Process, Queue


def f(q):
    q.put('X' * 1000000)
    # q.cancel_join_thread()


if __name__ == '__main__':
    queue = Queue()
    p = Process(target=f, args=(queue,))
    p.start()
    print('join阻塞')
    p.join()            # this deadlocks
    print('get阻塞')
    obj = queue.get()  # q.cancel_join_thread()執行後,join()不阻塞,可是get()拿不到數據,數據丟失,致使阻塞
    print(obj)
  multiprocessing.Queue底層是基於Pipe構建的,可是數據傳遞時並非直接寫入Pipe,而是寫入進程本地buffer,經過一個feeder線程寫入底層Pipe,
所以一次put數據很大的時候,會一直等待get()取出。沒有get()就join該進程,會致使死鎖

 

(六)進程池

  你能夠建立一個進程池,進程將使用Pool類執行提交給它的任務。

 multiprocessing.Pool([processes[, initializer[, initargs[, maxtasksperchild[,context]]]]])

  •   一個進程池對象,它控制可向其提交做業的工做進程池。它支持帶超時和回調的異步結果,並具備並行映射實現。
  •   processes是要使用的工做進程數。若是processes爲None,則使用os.cpu_count()返回的數字。
  •   若是initializer設定項不是「None」,則每一個工做進程在啓動時都將調用initializer(*initargs) 。
  •   maxtasksperchild是工做進程在退出並用新的工做進程替換以前能夠完成的任務數,以釋放未使用的資源。 默認的maxtasksperchild是None,這意味着工做進程將與池同樣長。
  •   context可用於指定用於啓動工做進程的上下文。 一般使用函數multiprocessing.Pool()或上下文對象的Pool()方法建立池。 在兩種狀況下,上下文都是適當的。(如何指定上下文?怎麼用?)
  •   注意,pool對象的方法只能由建立pool的進程調用。

 

 apply(fun[, args[, kwds]])

  • 使用參數args和關鍵字參數kwds調用func。它會一直阻塞,直到結果準備就緒。apply_async() 更適合並行執行工做。此外,func只在池中的一個進程中執行。

 

apply_async(func[, args[, kwds[, callback[, error_callback]]]])

  • apply()方法的一個變種,它返回一個結果對象o;o.get()獲取func返回的結果。
  • 若是指定了callback,則它應該接受單個參數,而且可調用。當結果就緒時,調用callback函數,若是調用失敗,調用error_callback。
  • 若是指定了error_callback,那麼它應該是接受單個參數而且可調用。若是目標函數失敗,則使用異常實例調用error_callback。
  • 回調應該當即完成,不然處理結果的線程將被阻塞。

 

# -*- coding:utf-8 -*-
from multiprocessing import Pool
import os
import time


def func(n):
    print('i={}, pid={}'.format(n, os.getpid()))
    time.sleep(1)
    return n**2


if __name__ == '__main__':
    p = Pool(5)
    for i in range(10):
        # p.apply(func, (i,))  # 只在一個進程中執行,會阻塞主進程
        p.apply_async(func, (i,))  # 適合並行,一下由五個進程處理五個任務,不阻塞主進程
    print('主進程')
    p.close()
    p.join()

結果:

主進程
i=0, pid=6540
i=1, pid=1348
i=2, pid=17060
i=3, pid=7632
i=4, pid=7396
i=5, pid=6540
i=6, pid=1348
i=7, pid=7396
i=8, pid=7632
i=9, pid=17060

 

map(func, [iterable,[chunksize]])

  • 至關於內置函數map,它僅支持可迭代的參數,他會阻塞,直到所有結果準備就緒。
  • 此方法將iterable切換爲多個塊,並將其做爲單獨的任務提交給進程池。能夠經過將chunksize設置爲正整數來指定這些塊的(近似)大小
  • 請注意,它可能會致使很是長的iterables的高內存使用率,考慮將imap() 或 imap_unordered() 與顯式chunksize選項一塊兒使用,以提升效率

 

map_async(func, iterable[, chunksize[,callback[,error_callback]]])

  • map()方法的一個變種,它返回一個結果對象。
  • 若是指定了回調,則它應該接受單個參數而且可調用。當結果就緒時,調用callback,若是調用失敗,error_callback被應用。
  • 若是指定了error_callback,那麼它應該接受單個參數並可調用。若是目標函數調用失敗,則使用異常實例調用error_callback。
  • 回調應該當即完成,不然處理結果的線程將被阻塞。

 

# -*- coding:utf-8 -*-
from multiprocessing import Pool
import os
import time


def fun(n):
    print('i={}, pid={}'.format(n, os.getpid()))
    time.sleep(1)
    return n


if __name__ == '__main__':
    p = Pool(4)
    # result = p.map(fun, [(1, 2), (1, 2)], chunksize=1)  # map阻塞主進程,結果出來後,再解除阻塞
    result = p.map_async(fun, [(1, 2), (1, 2)], chunksize=1)  # 異步,不阻塞主線程,任務還在子進程進行;
    print('主進程')
    # print(result)  # map返回列表,可直接打印
    print(result.get())  # map_async返回結果對象
    p.close()
    p.join()

結果:

主進程
i=(1, 2), pid=2004
i=(1, 2), pid=5328
[(1, 2), (1, 2)]

imap(func, iterable[, chunksize])

  • 惰性版本的map
  • chunksize參數與map()方法使用的參數相同。 對於很是長的迭代,使用較大的chunksize值可使做業比使用默認值1更快地完成。
  • 此外,若是chunksize爲1,則imap()方法返回的迭代器的next()方法具備可選的超時參數:若是在超時秒內沒法返回結果,則next(timeout)將引起multiprocessing.TimeoutError。

 

imap_unorderable(func, iterable[,chunksize])

  • 除了返回的結果是無序的(除非池中只有一個進程),其它跟imap同樣

 

# -*- coding:utf-8 -*-
from multiprocessing import Pool
import os
import time


def fun(n):
    print('i={}, pid={}'.format(n, os.getpid()))
    time.sleep(1)
    return n


if __name__ == '__main__':
    p = Pool(4)
    # result = p.imap(fun, [(1, 2), (3, 4)], chunksize=1)  # 異步
    result = p.imap_unordered(fun, [(1, 2), (3, 4)], chunksize=1)  # 異步,不阻塞主線程,任務還在子進程進行,結果無序;
    print('主進程')
    for i in result: # imap返回迭代器
        print(i)
    p.close()
    p.join()

結果:

主進程
i=(1, 2), pid=17396
i=(3, 4), pid=12496
(1, 2)
(3, 4)

startmap(func, iterable[, chunksize]) 

  • 與map()相似,只是迭代的元素應該做爲參數解包的迭代。所以,iterable=[(1,2),(3,4)]的結果是[func(1,2),func(3,4)]。

 

startmap_async(func, iterable[, chunksize[,callback[,error_callback]]])

  • starmap()和map_async()的組合

 

# -*- coding:utf-8 -*-
from multiprocessing import Pool
import os
import time


def fun(n, k):
    print('i={}, pid={}'.format(n, os.getpid()))
    time.sleep(1)
    return n, k


if __name__ == '__main__':
    p = Pool(4)
    # result = p.starmap(fun, [(1, 2), (3, 4)], chunksize=1)  # 阻塞,直到所有結果處理完
    result = p.starmap_async(fun, [(1, 2), (3, 4)], chunksize=1)  # 異步,不阻塞主線程,任務還在子進程進行;
    print('主進程')
    # print(result)  # starmap返回列表,直接打印
    print(result.get())
    p.close()
    p.join()

結果:

主進程
i=1, pid=14660
i=3, pid=10564
[(1, 2), (3, 4)]

 

close()

  • 防止將任何其餘任務提交到池中。 完成全部任務後,工做進程將退出。

 

terminate()

  • 當即中止工做進程而不完成未完成的工做。 當池對象被垃圾收集時,將當即調用terminate()。

 

join()

  • 等待工做進程退出。 必須在使用join()以前調用close()或terminate()。

 

池對象在3.3版本支持上下文管理協議

 

 使用進程池實現搶票:

# -*- coding:utf-8 -*-
from multiprocessing import Process, Pool, Manager
import time
import json

count = {'count': 1}  # 僅剩最後一張票
with open('db.txt', 'w', encoding='utf-8') as f:
    json.dump(count, f)


# 返回剩餘票數
def search():
    dic = json.load(open('db.txt'))
    print('剩餘票數%s' % dic['count'])
    return dic


def get_ticket(dic):
    time.sleep(0.1)  # 模擬讀數據的網絡延遲
    if dic['count'] > 0:
        dic['count'] -= 1
        time.sleep(0.2)  # 模擬寫數據的網絡延遲
        json.dump(dic, open('db.txt', 'w'))
        print('購票成功,剩餘:{}'.format(dic['count']))
    else:
        print('搶票失敗,去邀請好友助力!')


def ticket_purchase(lock, i):
    print('第{}個用戶'.format(i))
    lock.acquire()
    get_ticket(search())
    lock.release()


if __name__ == '__main__':
    lock = Manager().Lock()  # 要使用Manager().Lock()
    p = Pool(5)
    for i in range(10):  # 模擬併發10個客戶端搶票
        p.apply_async(ticket_purchase, (lock, i + 1))
    p.close()
    p.join()

結果:

第1個用戶
剩餘票數1
第2個用戶
第3個用戶
第4個用戶
第5個用戶
購票成功,剩餘:0
剩餘票數0
第6個用戶
搶票失敗,去邀請好友助力!
剩餘票數0
第7個用戶
搶票失敗,去邀請好友助力!
剩餘票數0
第8個用戶
搶票失敗,去邀請好友助力!
剩餘票數0
第9個用戶
搶票失敗,去邀請好友助力!
剩餘票數0
第10個用戶
搶票失敗,去邀請好友助力!
剩餘票數0
搶票失敗,去邀請好友助力!
剩餘票數0
搶票失敗,去邀請好友助力!
剩餘票數0
搶票失敗,去邀請好友助力!
剩餘票數0
搶票失敗,去邀請好友助力!

maxtasksperchild和chunksize具體效果:

# -*- coding:utf-8 -*-
from multiprocessing import Pool
import os
import sys


def func(x):
    print("pid: ", os.getpid(), " got: ", x)
    sys.stdout.flush()
    return [x, x+1]


def got(r):
    print("got result: ", r)


if __name__ == '__main__':
    pool = Pool(processes=1, maxtasksperchild=9)  # 進程執行了九個任務就會退出,換新的進程執行
    keys = [1, 2, 3, 4, 5, 6, 7, 8, 9, 10]
    result = pool.map_async(func, keys, chunksize=1, callback=got)  # chunksize指定每chuncksize個元素爲一個任務
    # result = pool.map_async(func, keys, chunksize=2, callback=got)  # chunksize爲2說明此時只有五個任務,沒有換新的進程執行
    pool.close()
    pool.join()

結果:

pid:  8188  got:  1
pid:  8188  got:  2
pid:  8188  got:  3
pid:  8188  got:  4
pid:  8188  got:  5
pid:  8188  got:  6
pid:  8188  got:  7
pid:  8188  got:  8
pid:  8188  got:  9
pid:  10860  got:  10
got result:  [[1, 2], [2, 3], [3, 4], [4, 5], [5, 6], [6, 7], [7, 8], [8, 9], [9, 10], [10, 11]]

 

 

 

參考:

python進程與多進程

一個線程已經得到了鎖,爲何要重複再加鎖。

使用maxtasksperchild進行python多處理

python文檔

相關文章
相關標籤/搜索