併發編程之多進程

1、multiprocessing模塊介紹

  python中的多線程沒法利用多核優點,若是想要充分地使用多核CPU的資源(os.cpu\_count\(\)查看),在python中大部分狀況須要使用多進程。html

  Python提供了multiprocessing模塊用來開啓子進程,並在子進程中執行咱們定製的任務(好比函數),該模塊與多線程模塊threading的編程接口相似。multiprocessing模塊的功能衆多:支持子進程、通訊和共享數據、執行不一樣形式的同步,提供了Process、Queue、Pipe、Lock等組件。python

  須要再次強調的一點是:與線程不一樣,進程沒有任何共享狀態,進程修改的數據,改動僅限於該進程內數據庫

2、Process類的介紹

一、建立進程的類

Process([group [, target [, name [, args [, kwargs]]]]]),由該類實例化獲得的對象,可用來開啓一個子進程

強調:
1. 須要使用關鍵字的方式來指定參數
2. args指定的爲傳給target函數的位置參數,是一個元組形式,必須有逗號

二、參數介紹

  group參數未使用,值始終未None編程

  target表示調用對象,即子進程要執行的任務json

  args表示調用對象的位置參數元組,args=(1, 2, 'egon')windows

  kwargs表示調用對象的字典,kwargs={'name':'egon', 'age':18}安全

  name爲子進程的名稱服務器

三、方法介紹

p.start():啓動進程,並調用該子進程中的p.run() 
p.run():進程啓動時運行的方法,正是它去調用target指定的函數,咱們自定義類的類中必定要實現該方法  

p.terminate():強制終止進程p,不會進行任何清理操做,若是p建立了子進程,該子進程就成了殭屍進程,使用該方法須要特別當心這種狀況。若是p還保存了一個鎖那麼也將不會被釋放,進而致使死鎖
p.is_alive():若是p仍然運行,返回True

p.join([timeout]):主線程等待p終止(強調:是主線程處於等的狀態,而p是處於運行的狀態)。timeout是可選的超時時間。

四、屬性介紹

p.daemon:默認值爲False,若是設爲True,表明p爲後臺運行的守護進程,當p的父進程終止時,p也隨之終止,而且設定爲True後,p不能建立本身的新進程,必須在p.start()以前設置

p.name:進程的名稱

p.pid:進程的pid

p.exitcode:進程在運行時爲None、若是爲–N,表示被信號N結束(瞭解便可)

p.authkey:進程的身份驗證鍵,默認是由os.urandom()隨機生成的32字符的字符串。這個鍵的用途是爲涉及網絡鏈接的底層進程間通訊提供安全性,這類鏈接只有在具備相同的身份驗證鍵時才能成功(瞭解便可

3、Process類的使用

  注意:在windows中Process()必須放到# if __name__ == '__main__':下:網絡

  這是因爲Windows沒有fork,多處理模式啓動一個新的Python進程並導入調用模塊。若是在導入時調用Process(),那麼這個將啓動無限繼承的新進程(或直到機器耗盡資源)。多線程

  這對隱藏對Process()內部調用的緣由,使用if __name__ == '__main__',這個if語句中的語句將不會在導入時被調用。

一、建立並開啓子進程的兩種方式

from multiprocessing import Process
import time

def task(name):
    print("%s is running" % name)
    time.sleep(3)
    print("%s is done" % name)

if __name__ == '__main__':
    p = Process(target=task, args=('子進程1',))  # 獲得對象
    # Process(target=task, kwargs={'name': "子進程1"})

    p.start()   # 給操做系統發送啓動信號
    print("")
"""
主
子進程1 is running
子進程1 is done
"""
方式一:經過multiprocessing模塊開啓子進程

  上述方法中,是有父子關係,初始狀態和父親是同樣的,可是運行狀態徹底無關。

from multiprocessing import Process
import time
class MyProcess(Process):
    def __init__(self, name):
        super().__init__()
        self.name = name

    def run(self):  # run()是固定形式,p.start本質是調用的綁定的run方法
        print('%s is running'%self.name)
        time.sleep(3)
        print("%s is done" % self.name)

if __name__ == '__main__':
    p = MyProcess('子進程')
    p.start()   # 給操做系統發送啓動信號
    print('')
"""
主
子進程 is running   # 間隔三秒
子進程 is done
"""
方式二:不用默認的multiprocessing模塊,繼承並訂製本身的類

二、進程直接的內存空間是隔離的

from multiprocessing import Process
n=100 #在windows系統中應該把全局變量定義在if __name__ == '__main__'之上就能夠了
def work():
    global n
    n=0
    print('子進程內: ',n)


if __name__ == '__main__':
    p=Process(target=work)
    p.start()
    print('主進程內: ',n)

三、練習題

  一、思考開啓進程的方式一和方式二各開啓了幾個進程?

答:兩個方式都是開啓了一個主進程和四個子進程。

  二、進程之間的內存空間是共享的仍是隔離的?下述代碼執行的結果?

答:進程之間的內存空間是隔離的,執行輸出:「子進程內:0」

from multiprocessing import Process

n=100 #在windows系統中應該把全局變量定義在if __name__ == '__main__'之上就能夠了

def work():
    global n
    n=0
    print('子進程內: ',n)


if __name__ == '__main__':
    p=Process(target=work)
    p.start()
"""輸出:
子進程內:  0
"""
進程內存空間驗證

  三、基於多進程實現併發的套接字通訊?

# -*- coding:utf-8 -*-
__author__ = 'Qiushi Huang'

# 函數化改寫
from socket import *
from multiprocessing import Process


def talk(conn):
    while True:
        try:
            data = conn.recv(1024)
            if not data:break
            conn.send(data.upper())
        except ConnectionResetError:
            break
    conn.close()


def server(ip, port):
    server = socket(AF_INET, SOCK_STREAM)
    server.setsockopt(SOL_SOCKET, SO_REUSEADDR,1)  # 連接循環
    server.bind((ip, port))
    server.listen(5)

    while True:
        conn, address = server.accept()   # 主進程一直建連接
        p = Process(target=talk, args=(conn,))  # 注意conn參數傳入
        p.start()

    server.close()


if __name__ == '__main__':
    server('127.0.0.1', 9001)
服務端
# -*- coding:utf-8 -*-
__author__ = 'Qiushi Huang'

# 運行一次啓動一個客戶端進程
# 這種狀況仍是存在問題:啓動多個進程後,本機操做系統會崩潰掉
from socket import *


client = socket(AF_INET, SOCK_STREAM)
client.connect(('127.0.0.1', 9001))

while True:
    msg = input(">>: ").strip()
    if not msg:continue

    client.send(msg.encode('utf-8'))
    data = client.recv(1024)
    print(data.decode('utf-8'))
客戶端

  四、思考每來一個客戶端,服務端就開啓一個新的進程來服務它,這種實現方式有無問題?

答:以上是多進程實現併發套接字通訊的解決方案,這個解決方案是思路是:每來一個客戶端,都在服務端開啓一個進程,若是併發來一千一萬個客戶端,要開啓成千上萬個進程,而因爲機器的硬件配置和性能限制是沒法開啓出那麼多進程的。

 四、Process對象的join方法

from multiprocessing import Process
import time, os

def task():
    print('%s is running, parent id is <%s>' % (os.getpid(), os.getppid()))   # 進程和父進程查看方式
    time.sleep(3)
    print("%s is done, parent id is <%s>" % (os.getpid(), os.getppid()))

if __name__ == '__main__':
    p = Process(target=task, )
    p.start()

    p.join()   # 優先運行子進程,主進程卡在原地
    print('主進程', os.getpid(), 'pycharm ID', os.getppid())
    print(p.pid)  # 子進程運行完,變爲殭屍進程,主進程仍可以查到子進程的pid,當主進程結束後,全部殭屍子進程將被丟掉。
"""
828 is running, parent id is <827>
828 is done, parent id is <827>
主進程 827 pycharm ID 504
828
"""
join方法:優先運行子進程,主進程卡在原地,子進程結束後,運行主進程後面的代碼
from multiprocessing import Process
import time, os

def task(name ,n):
    print('%s is running' % name)
    time.sleep(n)

if __name__ == '__main__':
    start = time.time()
    p1 = Process(target=task, args=("子進程1",5))
    p2 = Process(target=task, args=("子進程2",3))
    p3 = Process(target=task, args=("子進程3",2))
    """
    進程開啓順序由操做系通通籌控制,順序是不必定的
    主進程 1014 pycharm ID 504
    子進程2 is running
    子進程1 is running
    子進程3 is running
    """
    p1.start()
    p2.start()
    p3.start()
    # 再添加join函數前,主程序的執行輸出次序是徹底隨機的,須要加join()保證主程序等到在子進程以後執行完成
    p1.join()
    p2.join()
    p3.join()
    # 以上並不是串行執行,實際是併發執行,只是約束了主程序要等在子程序後結束
    # print('主進程', os.getpid(), 'pycharm ID', os.getppid())
    print("主進程", (time.time()-start))
"""
子進程1 is running
子進程2 is running
子進程3 is running
主進程 5.010260343551636   # 主程序只等了5秒,說明確實是併發執行
"""
併發執行,約束主程序要等在子程序後結束
from multiprocessing import Process
import time, os

def task(name ,n):
    print('%s is running' % name)
    time.sleep(n)

if __name__ == '__main__':
    start = time.time()
    p1 = Process(target=task, args=("子進程1",5))
    p2 = Process(target=task, args=("子進程2",3))
    p3 = Process(target=task, args=("子進程3",2))
    p_l = [p1, p2, p3]

    for p in p_l:
        p.start()

    for p in p_l:
        p.join()

    print("主進程", (time.time()-start))
"""
子進程1 is running
子進程2 is running
子進程3 is running
主進程 5.007940769195557
"""
上述併發執行簡寫
from multiprocessing import Process
import time, os

def task(name ,n):
    print('%s is running' % name)
    time.sleep(n)

if __name__ == '__main__':
    start = time.time()
    p1 = Process(target=task, args=("子進程1",5))
    p2 = Process(target=task, args=("子進程2",3))
    p3 = Process(target=task, args=("子進程3",2))
    # 串行執行
    p1.start()
    p1.join()
    p2.start()
    p2.join()
    p3.start()
    p3.join()

    print("主進程", (time.time()-start))
"""
子進程1 is running
子進程2 is running
子進程3 is running
主進程 10.019965887069702
"""
改寫爲串行執行

五、is_alive方法查看進程是否存活;terminate方法關閉進程

#進程對象的其餘方法一:terminate,is_alive
from multiprocessing import Process
import time
import random

class Piao(Process):
    def __init__(self,name):
        self.name=name
        super().__init__()

    def run(self):
        print('%s is piaoing' %self.name)
        time.sleep(random.randrange(1,5))
        print('%s is piao end' %self.name)


p1=Piao('egon1')
p1.start()

p1.terminate()#關閉進程,不會當即關閉,因此is_alive馬上查看的結果可能仍是存活
print(p1.is_alive()) #結果爲True

print('開始')
print(p1.is_alive()) #結果爲False
terminate與is_alive

六、查看進程對象屬性pid及ppid

# -*- coding:utf-8 -*-
__author__ = 'Qiushi Huang'

from multiprocessing import Process
import time,os

def task():
    print('%s is running, parent id is <%s>' % (os.getpid(), os.getppid()))   # 進程和父進程查看方式
    time.sleep(3)
    print("%s is done, parent id is <%s>" % (os.getpid(), os.getppid()))

if __name__ == '__main__':
    # p = Process(target=task, args=('子進程1',))   # 報錯提示去掉參數TypeError: task() takes 0 positional arguments but 1 was given
    p = Process(target=task, )
    p.start()   # 給操做系統發送一個信號
    print('主進程', os.getpid(), 'pycharm ID', os.getppid())
"""
主進程 713 pycharm ID 504
714 is running, parent id is <713>
714 is done, parent id is <713>
"""
進程和父進程查看方法

七、查看進程對象屬性name

from multiprocessing import Process
import time
import random

def task(name):
    print('%s is piaoing' %name)
    time.sleep(random.randrange(1,5))
    print('%s is piao end' %name)

if __name__ == '__main__':
    p1=Process(target=task,args=('egon',),name='子進程1') #能夠用關鍵參數來指定進程名
    p1.start()

    print(p1.name,p1.pid,)
name與pid

八、殭屍進程和孤兒進程

  殭屍進程和孤兒進程詳解:點擊進入

  參考博客:http://www.cnblogs.com/Anker/p/3271773.html

4、守護進程

主進程建立守護進程

  其一:守護進程會在主進程代碼執行結束後就終止

  其二:守護進程內沒法再開啓子進程,不然拋出異常:AssertionError: daemonic processes are not allowed to have children

注意:進程之間是互相獨立的,主進程代碼運行結束,守護進程隨即終止

from multiprocessing import Process
import time

def task(name):
    print("%s is running" % name)
    time.sleep(2)


if __name__ == '__main__':
    p = Process(target=task, args=('子進程', ))
    p.daemon=True    # 守護進程必定要在進程開啓前設置
    p.start()

    print("主進程")
"""
主進程    ————》子進程還沒開始就已經結束了
"""
守護進程必定要在進程開啓前設置
# 驗證守護進程內部能再開子進程——》守護進程再開子進程會形成問題:會形成一堆孤兒
from multiprocessing import Process
import time

def task(name):
    print("%s is running" % name)
    time.sleep(2)
    p = Process(target=time.sleep, args=(3, ))
    p.start()

if __name__ == '__main__':
    p = Process(target=task, args=('子進程', ))
    p.daemon=True    # 守護進程必定要在進程開啓前設置
    p.start()

    p.join()   # 讓主進程等待子進程結束

    print("主進程")
"""
AssertionError: daemonic processes are not allowed to have children
"""
驗證守護進程內部不能再開子進程

練習:思考下列代碼的執行結果有可能有哪些狀況?爲何?

from multiprocessing import Process

import time
def foo():
    print(123)
    time.sleep(1)
    print("end123")

def bar():
    print(456)
    time.sleep(3)
    print("end456")

if __name__ == '__main__':
    p1=Process(target=foo)
    p2=Process(target=bar)

    p1.daemon=True   # 主進程代碼執行完畢後,守護進程死
    p1.start()
    p2.start()
    print("main-------")
"""
main-------
456
end456
"""
# 另外一種狀況是機器性能特別強,在執行到main----以前,已經啓動子進程p1了,會造成輸出:
"""
123
main-------
456
end456
"""
比較主進程和守護進程執行順序

5、互斥鎖(進程同步)

  進程之間數據不共享,可是共享同一套文件系統,因此訪問同一個文件,或同一個打印終端,是沒有問題的,而共享帶來的是競爭,競爭帶來的結果就是錯亂,以下

part1:多個進程共享同一打印終端

from multiprocessing import Process
import time

def task(name):
    print('%s 第一次' % name)
    time.sleep(1)
    print('%s 第二次' % name)
    time.sleep(1)
    print('%s 第三次' % name)


if __name__ == '__main__':
    for i in range(3):
        p = Process(target=task, args=('進程%s' % i, ))
        p.start()
"""
進程0 第一次
進程1 第一次
進程2 第一次
進程0 第二次
進程1 第二次
進程2 第二次
進程0 第三次
進程1 第三次
進程2 第三次
"""
併發運行,效率高,但競爭同一打印終端,誰搶到了誰打印
from multiprocessing import Process, Lock
import time

def task(name, mutex):
    mutex.acquire()   # 上鎖,哪一個進程搶到鎖,就一直給他運行
    print('%s 第一次' % name)
    time.sleep(1)
    print('%s 第二次' % name)
    time.sleep(1)
    print('%s 第三次' % name)
    mutex.release()   # 解鎖


if __name__ == '__main__':
    mutex = Lock()   # 只實例化一次,並傳給子進程,要保證全部進程用同一把鎖
    for i in range(3):
        p = Process(target=task, args=('進程%s' % i, mutex))  # 傳遞給子進程的鎖
        p.start()
"""
進程0 第一次
進程0 第二次
進程0 第三次
進程1 第一次
進程1 第二次
進程1 第三次
進程2 第一次
進程2 第二次
進程2 第三次
"""
加鎖:由併發變成了串行,下降效率保證數據安全不錯亂

  互斥鎖的意思就是互相排斥,若是把多個進程比喻爲多我的,互斥鎖的工做原理就是多我的都要去爭搶同一個資源:衛生間,一我的搶到衛生間後上一把鎖,其餘人都要等着,等到這個完成任務後釋放鎖,其餘人才有可能有一個搶到......因此互斥鎖的原理,就是把併發改爲穿行,下降了效率,但保證了數據安全不錯亂

part2:多個進程共享同一文件

  文件當數據庫,模擬搶票,文件db的內容爲:{"count": 1}

from multiprocessing import Process
import json, time

def search(name):
    """查票"""
    time.sleep(1)   # 模擬網絡延遲,併發去看票數
    dic = json.load(open('db.txt', 'r', encoding='utf-8'))
    print('<%s> 查看到剩餘票數[%s]' %(name, dic['count']))


def get(name):
    """買票"""
    time.sleep(1)   # 模擬網絡延遲
    dic = json.load(open('db.txt', 'r', encoding='utf-8'))
    if dic['count'] > 0:    # 確認有票
        dic['count'] -= 1
        time.sleep(3)
        # 寫入文件,即購票成功,這個必須是基於其餘人購票的結果,由併發改成串行
        json.dump(dic, open('db.txt', 'w', encoding='utf-8'))
        print('<%s> 購票成功' % name)


def task(name):
    search(name)
    get(name)


if __name__ == '__main__':
    for i in range(10):
        p = Process(target=task, args=('路人%s' % i, ))
        p.start()
"""
<路人1> 查看到剩餘票數[1]
<路人0> 查看到剩餘票數[1]
<路人2> 查看到剩餘票數[1]
<路人4> 查看到剩餘票數[1]
<路人3> 查看到剩餘票數[1]
<路人5> 查看到剩餘票數[1]
<路人6> 查看到剩餘票數[1]
<路人7> 查看到剩餘票數[1]
<路人8> 查看到剩餘票數[1]
<路人9> 查看到剩餘票數[1]
<路人1> 購票成功
<路人0> 購票成功
<路人2> 購票成功
<路人3> 購票成功
<路人4> 購票成功
<路人5> 購票成功
<路人6> 購票成功
<路人8> 購票成功
<路人7> 購票成功
<路人9> 購票成功
"""
併發運行,效率高,競爭寫入同一文件,數據寫入錯亂

  db.txt裏只有一張票,因爲併發賣出了10次。須要把購票行爲改成串行,只有第一我的能夠買到票

from multiprocessing import Process, Lock
import json, time

def search(name):
    """查票"""
    time.sleep(1)   # 模擬網絡延遲,併發去看票數
    dic = json.load(open('db.txt', 'r', encoding='utf-8'))
    print('<%s> 查看到剩餘票數[%s]' %(name, dic['count']))


def get(name):
    """買票"""
    time.sleep(1)   # 模擬網絡延遲
    dic = json.load(open('db.txt', 'r', encoding='utf-8'))
    if dic['count'] > 0:    # 確認有票
        dic['count'] -= 1
        time.sleep(3)
        # 寫入文件,即購票成功,這個必須是基於其餘人購票的結果,由併發改成串行
        json.dump(dic, open('db.txt', 'w', encoding='utf-8'))
        print('<%s> 購票成功' % name)


def task(name, mutex):
    search(name)   # 查票併發執行,人人均可以看到票
    mutex.acquire()    # 上鎖
    get(name)      # 購票改成串行,其餘人都必須等着
    mutex.release()    # 解鎖


if __name__ == '__main__':
    mutex = Lock()
    for i in range(10):
        p = Process(target=task, args=('路人%s' % i, mutex))
        p.start()
"""
<路人0> 查看到剩餘票數[1]
<路人1> 查看到剩餘票數[1]
<路人2> 查看到剩餘票數[1]
<路人3> 查看到剩餘票數[1]
<路人4> 查看到剩餘票數[1]
<路人5> 查看到剩餘票數[1]
<路人6> 查看到剩餘票數[1]
<路人7> 查看到剩餘票數[1]
<路人8> 查看到剩餘票數[1]
<路人9> 查看到剩餘票數[1]
<路人0> 購票成功
"""
加鎖:購票行爲由併發變成了串行,犧牲運行效率,保證數據安全

總結:

  加鎖能夠保證多個進程修改同一塊數據時,同一時間只能有一個任務能夠進行修改,即串行的修改,沒錯,速度是慢了,但犧牲了速度卻保證了數據安全。

雖然能夠用文件共享數據實現進程間通訊,但問題是:

  1.效率低(共享數據基於文件,而文件是硬盤上的數據)

  2.須要本身加鎖處理

  所以咱們最好找尋一種解決方案可以兼顧:一、效率高(多個進程共享一塊內存的數據)二、幫咱們處理好鎖問題。這就是mutiprocessing模塊爲咱們提供的基於消息的IPC通訊機制:隊列和管道

  1 隊列和管道都是將數據存放於內存中

  2 隊列又是基於(管道+鎖)實現的,可讓咱們從複雜的鎖問題中解脫出來

  咱們應該儘可能避免使用共享數據,儘量使用消息傳遞和隊列,避免處理複雜的同步和鎖問題,並且在進程數目增多時,每每能夠得到更好的可獲展性。

6、隊列

  進程彼此之間互相隔離,要實現進程間通訊(IPC),multiprocessing模塊支持兩種形式:隊列和管道,這兩種方式都是使用消息傳遞的。

一、建立隊列的類(底層就是以管道和鎖定的方式實現)

Queue([maxsize]):建立共享的進程隊列,Queue是多進程安全的隊列,可使用Queue實現多進程之間的數據傳遞。

二、參數介紹

  maxsize是隊列中容許最大項數,省略則無大小限制。

  但須要明確:

    一、隊列內存放的是消息而非大數據;

    二、隊列佔用的是內存空間,於是maxsize即便是無大小限制也受限於內存大小。

三、主要方法介紹 

q.put方法用以插入數據到隊列中,put方法還有兩個可選參數:blocked和timeout。若是blocked爲True(默認值),而且timeout爲正值,該方法會阻塞timeout指定的時間,直到該隊列有剩餘的空間。若是超時,會拋出Queue.Full異常。若是blocked爲False,但該Queue已滿,會當即拋出Queue.Full異常。
q.get方法能夠從隊列讀取而且刪除一個元素。一樣,get方法有兩個可選參數:blocked和timeout。若是blocked爲True(默認值),而且timeout爲正值,那麼在等待時間內沒有取到任何元素,會拋出Queue.Empty異常。若是blocked爲False,有兩種狀況存在,若是Queue有一個值可用,則當即返回該值,不然,若是隊列爲空,則當即拋出Queue.Empty異常.
 
q.get_nowait():同q.get(False)
q.put_nowait():同q.put(False)

q.empty():調用此方法時q爲空則返回True,該結果不可靠,好比在返回True的過程當中,若是隊列中又加入了項目。
q.full():調用此方法時q已滿則返回True,該結果不可靠,好比在返回True的過程當中,若是隊列中的項目被取走。
q.qsize():返回隊列中目前項目的正確數量,結果也不可靠,理由同q.empty()和q.full()同樣

四、其餘方法

q.cancel_join_thread():不會在進程退出時自動鏈接後臺線程。能夠防止join_thread()方法阻塞
q.close():關閉隊列,防止隊列中加入更多數據。調用此方法,後臺線程將繼續寫入那些已經入隊列但還沒有寫入的數據,但將在此方法完成時立刻關閉。
若是q被垃圾收集,將調用此方法。關閉隊列不會在隊列使用者中產生任何類型的數據結束信號或異常。
例如,若是某個使用者正在被阻塞在get()操做上,關閉生產者中的隊列不會致使get()方法返回錯誤。
q.join_thread():鏈接隊列的後臺線程。此方法用於在調用q.close()方法以後,等待全部隊列項被消耗。
默認狀況下,此方法由不是q的原始建立者的全部進程調用。調用q.cancel_join_thread方法能夠禁止這種行爲

五、隊列的使用

from multiprocessing import Queue

# 隊列中應該放消息,不該該放大文件大數據
# 隊列能夠不設置長度,可是隊列是受制於內存大小的,不可能無限存放
q = Queue(3)  # 指定隊列大小
q.put('hello')
q.put({'a': 1})
q.put([3,3,3,])

print(q.full())   # 查看隊列是否滿了  # True
# q.put(123)    # 隊列滿了再往裏面放時,被鎖住,只能在原地卡着。

print(q.get())
print(q.get())
print(q.get())
print(q.empty())   # 判斷隊列是否所有清空  # True

# print(q.get())   # 因爲已經空了,程序也卡在原處
"""
True
hello
{'a': 1}
[3, 3, 3]
True
"""
隊列應用 

7、生產者消費者模型 

  在併發編程中使用生產者和消費者模式可以解決絕大多數併發問題。該模式經過平衡生產線程和消費線程的工做能力來提升程序的總體處理數據的速度。

爲何要使用生產者消費者模型?

  生產者指的是生產數據的任務,消費者指的是處理數據的任務。
  在併發編程中,若是生產者處理速度很快,而消費者處理速度很慢,那麼生產者就必須等待消費者處理完,才能繼續生產數據。一樣的道理,若是消費者的處理能力大於生產者,那麼消費者就必須等待生產者。爲了解決這個問題因而引入了生產者和消費者模式

什麼是生產者和消費者模式?

  生產者消費者模式是經過一個容器來解決生產者和消費者的強耦合問題。
  生產者和消費者彼此之間不直接通信,而經過阻塞隊列來進行通信,因此生產者生產完數據以後不用等待消費者處理,直接扔給阻塞隊列,消費者不找生產者要數據,而是直接從阻塞隊列裏取,阻塞隊列就至關於一個緩衝區,平衡了生產者和消費者的處理能力。
    這個阻塞隊列就是用來給生產者和消費者解耦的

基於隊列實現生產者消費者模型

from multiprocessing import Process,Queue
import time,random,os
def consumer(q,name):
    while True:
        res=q.get()
        time.sleep(random.randint(1,3))
        print('\033[43m%s 吃 %s\033[0m' %(name,res))

def producer(q,name,food):
    for i in range(3):
        time.sleep(random.randint(1,3))
        res='%s%s' %(food,i)
        q.put(res)
        print('\033[45m%s 生產了 %s\033[0m' %(name,res))

if __name__ == '__main__':
    q=Queue()
    #生產者們:即廚師們
    p1=Process(target=producer,args=(q,'egon','包子'))

    #消費者們:即吃貨們
    c1=Process(target=consumer,args=(q,'alex'))

    #開始
    p1.start()
    c1.start()
    print('')
"""
執行結果
主
egon 生產了 包子0
egon 生產了 包子1
alex 吃 包子0
alex 吃 包子1
egon 生產了 包子2
alex 吃 包子2
"""
隊列實現生產者消費者模型

生產者消費者模型總結

一、生產者消費者模型何時用?

  程序中有兩類角色:一類負責生產數據(生產者)一類負責處理數據(消費者)

二、怎麼叫生產者消費者模型?

  引入隊列解決生產者和消費者之間的耦合,這個並不依賴進程,這實際是介紹了一種編程方式。

  若是使用了Queue,說明生產者、消費者、queue都在一臺機器,這屬於集中式架構,嚴重影響性能和穩定性。

  基於網絡通訊的消息隊列:Rabbitmq。

三、生產者消費者模型好處?

  引入生產者消費者模型爲了解決的問題是:

  (1)平衡生產者與消費者之間的工做能力,從而提升程序總體處理數據的速度

  (2)生產者消費者模型實現類程序的解耦合

四、如何實現生產者消費者模型

  如何實現:生產者<--->隊列<--->消費者

 

  此時的問題是主進程永遠不會結束,緣由是:生產者p在生產完後就結束了,可是消費者c在取空了q以後,則一直處於死循環中且卡在q.get()這一步。

  解決方法:讓生產者在生產完畢後,往隊列中再發一個結束信號,這樣消費者在接收到結束信號後就能夠break出死循環。

from multiprocessing import Process, Queue
import time

def producer(q):
    for i in range(10):
        res = '包子%s' % i
        time.sleep(0.5)   # 模擬生產時間
        print('生產者生產了%s' % res)

        q.put(res)  # 放入隊列中


def consumer(q):
    while True:    # 一直從隊列取一旦取空了,會加一把鎖,程序卡在這裏
        res = q.get()   # 取隊列中數據
        if res is None:break
        time.sleep(1)    # 模擬消費時間
        print('消費者消費了%s' % res)

if __name__ == '__main__':
    # 容器
    q = Queue()
    # 生產者
    p1 = Process(target=producer, args=(q, ))
    # 消費者
    c1 = Process(target=consumer, args=(q, ))

    p1.start()
    c1.start()

    p1.join()   # 保證生產者都執行完,主進程才執行完
    q.put(None)  # 往隊列裏放入None,給消費者判斷
    print("主進程")
"""
生產者生產了包子0
生產者生產了包子1
生產者生產了包子2
消費者消費了包子0
生產者生產了包子3
消費者消費了包子1
生產者生產了包子4
生產者生產了包子5
消費者消費了包子2
生產者生產了包子6
生產者生產了包子7
消費者消費了包子3
生產者生產了包子8
生產者生產了包子9
主進程
消費者消費了包子4
消費者消費了包子5
消費者消費了包子6
消費者消費了包子7
消費者消費了包子8
消費者消費了包子9
"""
生產者在生產完畢後發送結束信號None

  注意:結束信號None,不必定要由生產者發,主進程裏一樣能夠發,但主進程須要等生產者結束後才應該發送該信號

  有多個生產者和消費者時,解決方案是有幾個消費者就發送幾回結束信號:

from multiprocessing import Process, Queue
import time

def producer(q):
    for i in range(10):
        res = '包子%s' % i
        time.sleep(0.5)   # 模擬生產時間
        print('生產者生產了%s' % res)

        q.put(res)  # 放入隊列中

        # q.put(None)  # 這種方式會將消費者提早中止


def consumer(q):
    while True:    # 一直從隊列取一旦取空了,會加一把鎖,程序卡在這裏
        res = q.get()   # 取隊列中數據
        if res is None:break
        time.sleep(1)    # 模擬消費時間
        print('消費者消費了%s' % res)

if __name__ == '__main__':
    # 容器
    q = Queue()
    # 生產者
    p1 = Process(target=producer, args=(q, ))
    p2 = Process(target=producer, args=(q,))
    p3 = Process(target=producer, args=(q,))
    # 消費者
    c1 = Process(target=consumer, args=(q, ))
    c2 = Process(target=consumer, args=(q,))

    p1.start()
    p2.start()
    p3.start()
    c1.start()
    c2.start()

    p1.join()   # 保證生產者都執行完,主進程才執行完
    p2.join()
    p3.join()
    # 跟在正常信號後面,必須保證全部的生產者都生產結束
    q.put(None)  # 往隊列裏放入None,給消費者判斷
    q.put(None)  # 有幾個消費者就須要幾個結束信號
    print("主進程")
有幾個消費者就須要發送幾回結束信號

  上面這種解決方案很是Low,可使用JoinableQueue來解決這個問題。

8、JoinableQueue

  JoinableQueue([maxsize])這就像是一個Queue對象,但隊列容許項目的使用者通知生成者項目已經被成功處理。通知進程是使用共享的信號和條件變量來實現的。

參數介紹

  maxsize是隊列中容許最大項數,省略則無大小限制。

方法介紹

JoinableQueue的實例p除了與Queue對象相同的方法以外還具備:
q.task_done():使用者使用此方法發出信號,表示q.get()的返回項目已經被處理。若是調用此方法的次數大於從隊列中刪除項目的數量,將引起ValueError異常
q.join():生產者調用此方法進行阻塞,直到隊列中全部的項目均被處理。阻塞將持續到隊列中的每一個項目均調用q.task_done()方法爲止
# -*- coding:utf-8 -*-
__author__ = 'Qiushi Huang'

# JoinableQueue的用法和queue相似,只是這個queue是能夠被join的
from multiprocessing import Process, JoinableQueue
import time

def producer(q):
    for i in range(2):
        res = '包子%s' % i
        time.sleep(0.5)   # 模擬生產時間
        print('生產者生產了%s' % res)

        q.put(res)  # 放入隊列中
    q.join()  # 生產者生產完等隊列把數據都取完


def consumer(q):
    while True:
        res = q.get()   # 取隊列中數據
        if res is None:break
        time.sleep(1)    # 模擬消費時間
        print('消費者消費了%s' % res)
        q.task_done()   # 提供的接口,消費者告訴生產者取走了一個數據


if __name__ == '__main__':
    # 容器
    q = JoinableQueue()
    q.join()    # 等隊列執行完(等隊列取完)

    # 生產者
    p1 = Process(target=producer, args=(q,))
    p2 = Process(target=producer, args=(q,))
    p3 = Process(target=producer, args=(q,))
    # 消費者
    c1 = Process(target=consumer, args=(q,))
    c2 = Process(target=consumer, args=(q,))
    # 主進程執行完以後,守護進程也終止,所以把消費者設置爲守護進程
    c1.daemon=True
    c2.daemon=True

    p1.start()
    p2.start()
    p3.start()
    c1.start()
    c2.start()

    p1.join()
    p2.join()
    p3.join()

    print("主進程")
"""
生產者生產了包子0
生產者生產了包子0
生產者生產了包子0
生產者生產了包子1
生產者生產了包子1
生產者生產了包子1
消費者消費了包子0
消費者消費了包子0
消費者消費了包子0
消費者消費了包子1
消費者消費了包子1
消費者消費了包子1
主進程
"""
基於JoinableQueue實現生產者消費者模型

9、管道

  管道是進程間通訊(IPC)的第二種方式。

#建立管道的類:
Pipe([duplex]):在進程之間建立一條管道,並返回元組(conn1,conn2),其中conn1,conn2表示管道兩端的鏈接對象,強調一點:必須在產生Process對象以前產生管道
#參數介紹:
dumplex:默認管道是全雙工的,若是將duplex射成False,conn1只能用於接收,conn2只能用於發送。
#主要方法:
    conn1.recv():接收conn2.send(obj)發送的對象。若是沒有消息可接收,recv方法會一直阻塞。若是鏈接的另一端已經關閉,那麼recv方法會拋出EOFError。
    conn1.send(obj):經過鏈接發送對象。obj是與序列化兼容的任意對象
 #其餘方法:
conn1.close():關閉鏈接。若是conn1被垃圾回收,將自動調用此方法
conn1.fileno():返回鏈接使用的整數文件描述符
conn1.poll([timeout]):若是鏈接上的數據可用,返回True。timeout指定等待的最長時限。若是省略此參數,方法將當即返回結果。若是將timeout射成None,操做將無限期地等待數據到達。
 
conn1.recv_bytes([maxlength]):接收c.send_bytes()方法發送的一條完整的字節消息。maxlength指定要接收的最大字節數。若是進入的消息,超過了這個最大值,將引起IOError異常,而且在鏈接上沒法進行進一步讀取。若是鏈接的另一端已經關閉,不再存在任何數據,將引起EOFError異常。
conn.send_bytes(buffer [, offset [, size]]):經過鏈接發送字節數據緩衝區,buffer是支持緩衝區接口的任意對象,offset是緩衝區中的字節偏移量,而size是要發送字節數。結果數據以單條消息的形式發出,而後調用c.recv_bytes()函數進行接收    
 
conn1.recv_bytes_into(buffer [, offset]):接收一條完整的字節消息,並把它保存在buffer對象中,該對象支持可寫入的緩衝區接口(即bytearray對象或相似的對象)。offset指定緩衝區中放置消息處的字節位移。返回值是收到的字節數。若是消息長度大於可用的緩衝區空間,將引起BufferTooShort異常。
介紹
from multiprocessing import Process,Pipe

import time,os
def consumer(p,name):
    left,right=p
    left.close()
    while True:
        try:
            baozi=right.recv()
            print('%s 收到包子:%s' %(name,baozi))
        except EOFError:
            right.close()
            break
def producer(seq,p):
    left,right=p
    right.close()
    for i in seq:
        left.send(i)
        # time.sleep(1)
    else:
        left.close()
if __name__ == '__main__':
    left,right=Pipe()

    c1=Process(target=consumer,args=((left,right),'c1'))
    c1.start()


    seq=(i for i in range(10))
    producer(seq,(left,right))

    right.close()
    left.close()

    c1.join()
    print('主進程')

基於管道實現進程間通訊(與隊列的方式是相似的,隊列就是管道加鎖實現的)
基於管道實現進程間通訊(與隊列的方式是相似的,隊列就是管道加鎖實現的)

注意:生產者和消費者都沒有使用管道的某個端點,就應該將其關閉,如在生產者中關閉管道的右端,在消費者中關閉管道的左端。若是忘記執行這些步驟,程序可能再消費者中的recv()操做上掛起。管道是由操做系統進行引用計數的,必須在全部進程中關閉管道後才能生產EOFError異常。所以在生產者中關閉管道不會有任何效果,付費消費者中也關閉了相同的管道端點。

from multiprocessing import Process,Pipe

import time,os
def adder(p,name):
    server,client=p
    client.close()
    while True:
        try:
            x,y=server.recv()
        except EOFError:
            server.close()
            break
        res=x+y
        server.send(res)
    print('server done')
if __name__ == '__main__':
    server,client=Pipe()

    c1=Process(target=adder,args=((server,client),'c1'))
    c1.start()

    server.close()

    client.send((10,20))
    print(client.recv())
    client.close()

    c1.join()
    print('主進程')
#注意:send()和recv()方法使用pickle模塊對對象進行序列化。

管道能夠用於雙向通訊,利用一般在客戶端/服務器中使用的請求/響應模型或遠程過程調用,就可使用管道編寫與進程交互的程序
管道能夠用於雙向通訊,利用一般在客戶端/服務器中使用的請求/響應模型或遠程過程調用,就可使用管道編寫與進程交互的程序

10、共享數據

展望將來,基於消息傳遞的併發編程是大勢所趨

即使是使用線程,推薦作法也是將程序設計爲大量獨立的線程集合

經過消息隊列交換數據。這樣極大地減小了對使用鎖定和其餘同步手段的需求,

還能夠擴展到分佈式系統中

進程間通訊應該儘可能避免使用本節所講的共享數據的方式

進程間數據時獨立的,能夠藉助於隊列或管道實現通訊,兩者都是基於消息傳遞的。

雖然進程間數據獨立,但能夠經過Manager實現數據共享,事實上Manager的功能遠不止於此。

from multiprocessing import Manager,Process,Lock
import os
def work(d,lock):
    # with lock: #不加鎖而操做共享的數據,確定會出現數據錯亂
        d['count']-=1

if __name__ == '__main__':
    lock=Lock()
    with Manager() as m:
        dic=m.dict({'count':100})
        p_l=[]
        for i in range(100):
            p=Process(target=work,args=(dic,lock))
            p_l.append(p)
            p.start()
        for p in p_l:
            p.join()
        print(dic)
        #{'count': 94}
進程之間操做共享的數據

 

11、信號量

 

12、事件

 

十3、進程池

相關文章
相關標籤/搜索