Python 之多進程的各類方法(3-26.27)

在一個python進程中開啓子進程,start方法和併發效果。python

import os,time
from multiprocessing import Process
def func(arg1,arg2):
    print('##'*arg1)
    time.sleep(2)#睡2秒
    print(os.getpid())#子進程號
    print(os.getppid())#子進程的父進程號
    print('***'*arg2)


if __name__=='__main__':#在win系統下必需要知足這個if條件
    print('%',os.getpid())#主進程號
    p=Process(target=func,args=(2,4))#建立子進程對象,並傳參args=(X,),注意只傳一個參數的時候必須在參數後加逗號
    p.start()
    print('%', os.getppid())#主進程的父進程
# 進程的生命週期
    # 主進程
    # 子進程
    # 開啓了子進程的主進程 :
        # 主進程本身的代碼若是長,等待本身的代碼執行結束,
        # 子進程的執行時間長,主進程會在主進程代碼執行完畢以後等待子進程執行完畢以後 主進程才結束
多進程
import os,time
from multiprocessing import Process
def func(arg1,arg2):
    print('*'*arg1)
    time.sleep(2)#睡5秒
    print('*'*arg2)
def func1():
    time.sleep(5)
    print('#######')

if __name__ == '__main__':
    p = Process(target=func,args=(10,20))
    p.start()
    p1 = Process(target=func1)
    p1.start()
    print('hahahaha')
    p.join()     # 是感知一個子進程的結束,將異步的程序改成同步
    print('====== : 運行完了')
#join方法能夠實如今多進程中須要的同步需求
join方法

進階,多個進程同時運行(注意,子進程的執行順序不是根據啓動順序決定的)數據庫

import os
import time
from multiprocessing import Process

def func(filename,content):
    with open(filename,'w') as f:
        f.write(content*10*'*')

if __name__ == '__main__':#win系統下,必須先知足if條件
    p_lst = []
    for i in range(10):#經過循環建立多個子進程
        p = Process(target=func,args=('info%s'%i,0))#建立子進程
        p_lst.append(p)#將建立的子進程對象添加到列表中
        p.start()#啓動子進程
    for p in p_lst:p.join()   # 以前的全部進程必須在這裏都執行完才能執行下面的代碼
    print([i for i in os.walk(r'E:\python10\day37')])#d打印文件內容
    
多個進程同時進行

除了上面這些開啓進程的方法,還有一種以繼承Process類的形式開啓進程的方式編程

import os,time
from multiprocessing import Process
class Myprocess(Process):
    def __init__(self,arg1,arg2):
        time.sleep(3)
        print('*'*arg1)
        super().__init__()#繼承父類中的init方法
        self.arg1=arg1
        self.arg2=arg2
    def run(self):
        print(self.pid)#子進程號
        print(self.name)#進程名字
        print(self.arg1)
        print(self.arg2)

if __name__=='__main__':
    for i in range(5):
        p=Myprocess(i,i+1)
        p.start()
        
# 自定義類 繼承Process類
# 必須實現一個run方法,run方法中是在子進程中執行的代碼
多進程中的第二種方法

進程之間的數據隔離問題json

# 進程 與 進程之間
import os
from multiprocessing import Process

def func():
    global n   # 聲明瞭一個全局變量
    n = 0       # 從新定義了一個n
    print('pid : %s'%os.getpid(),n)#子進程中

if __name__ == '__main__':
    n = 100
    p = Process(target=func)
    p.start()
    p.join()# 感知一個子進程的結束,將異步的程序改成同步
    print(os.getpid(),n)#主進程中
數據隔離

會隨着主進程的結束而結束。數組

主進程建立守護進程安全

  其一:守護進程會在主進程代碼執行結束後就終止服務器

  其二:守護進程內沒法再開啓子進程,不然拋出異常:AssertionError: daemonic processes are not allowed to have children數據結構

注意:進程之間是互相獨立的,主進程代碼運行結束,守護進程隨即終止多線程

import os,time
from multiprocessing import Process
def func():
    print(os.getpid())
    while True:
        time.sleep(1)
        print('我還活着')
def func1():
    print('in func start')
    time.sleep(2)
    print('in func1 end')

if __name__=='__main__':
    print('主進程 :>>>%s'%os.getpid())
    p=Process(target=func)
    p.daemon=True   #設置一個子進程爲守護進程
    p.start()
    p1=Process(target=func1)
    p1.start()
    time.sleep(4)
    p1.terminate()  # 結束一個子進程
    time.sleep(2)
    print(p1.is_alive())  # 檢驗一個進程是否還活着
    print('結束紫禁城!!!')
守護進程

守護進程中用到的其餘方法:p.terminate():結束一個子進程,p.is_alive():判斷進程是否還活着併發

      經過剛剛的學習,咱們想方設法實現了程序的異步,讓多個任務能夠同時在幾個進程中併發處理,他們之間的運行沒有順序,一旦開啓也不受咱們控制。儘管併發編程讓咱們能更加充分的利用IO資源,可是也給咱們帶來了新的問題。

  當多個進程使用同一份數據資源的時候,就會引起數據安全或順序混亂問題。

import os
import time
import random
import json
from multiprocessing import Process
from multiprocessing import Lock
def buy_ticket(i,lock):
    lock.acquire()    #取鑰匙
    with open('ticket','r',encoding='utf-8')as f:#查看系統中所剩餘票
        dic=json.load(f)  #load方法接收一個文件句柄,直接將文件中的json字符串轉換成數據結構返回
        print('餘票:%s'%dic['ticket'])
        time.sleep(random.random())#隨機睡0點幾秒
    if dic['ticket']>0:  
        dic['ticket']-=1  #取完票之後所剩餘票
        print('\033[32m%s買到票了\033[0m' % i)
    else:print('\033[32m%s沒買到票\033[0m'%i) 
    with open('ticket','w',encoding='utf-8')as f:  #將所剩餘票字典從新放回文件中
        dic=json.dump(dic,f) ##dump方法接收一個文件句柄,直接將字典轉換成json字符串寫入文件
    lock.release()#還鑰匙
if __name__=='__main__':
    lock=Lock()
    for i in range(10):#經過循環建立多個子進程
        p=Process(target=buy_ticket,args=(i,lock))#建立一個購買票子進程對象
        p.start()
經過鎖,解決數據安全問題

 上鎖中用到的其餘方法:lock.acquire():取鑰匙,lock.release():還鑰匙

#加鎖能夠保證多個進程修改同一塊數據時,同一時間只能有一個任務能夠進行修改,即串行的修改,沒錯,速度是慢了,但犧牲了速度卻保證了數據安全。
雖然能夠用文件共享數據實現進程間通訊,但問題是:
1.效率低(共享數據基於文件,而文件是硬盤上的數據)
2.須要本身加鎖處理

#所以咱們最好找尋一種解決方案可以兼顧:1、效率高(多個進程共享一塊內存的數據)2、幫咱們處理好鎖問題。這就是mutiprocessing模塊爲咱們提供的基於消息的IPC通訊機制:
隊列和管道。 隊列和管道都是將數據存放於內存中 隊列又是基於(管道
+鎖)實現的,可讓咱們從複雜的鎖問題中解脫出來, 咱們應該儘可能避免使用共享數據,儘量使用消息傳遞和隊列,避免處理複雜的同步和鎖問題,並且在進程數目增多時,每每能夠得到更好的可獲展性。

互斥鎖同時只容許一個線程更改數據,而信號量Semaphore是同時容許必定數量的線程更改數據 。
假設商場裏有4個迷你唱吧,因此同時能夠進去4我的,若是來了第五我的就要在外面等待,等到有人出來才能再進去玩。
實現:
信號量同步基於內部計數器,每調用一次acquire(),計數器減1;每調用一次release(),計數器加1.當計數器爲0時,acquire()調用被阻塞。這是迪科斯徹(Dijkstra)信號量概念P()和V()的Python實現。信號量同步機制適用於訪問像服務器這樣的有限資源。
信號量與進程池的概念很像,可是要區分開,信號量涉及到加鎖的概念
信號量的講解
import time,random
from multiprocessing import Process
from multiprocessing import Semaphore  #引用Semaphore模塊
def ktv(i,sem):  #建立一個進ktv唱歌的函數
    sem.acquire()  #取鑰匙 
    print('%s走進了KTV'%i)  
    time.sleep(random.randint(1,3))   #隨機睡1-3秒
    print('%s走出了KTV'%i) 
    sem.release()  #還鑰匙
if __name__=='__main__':
    sem=Semaphore(4)      #建立一個信號量對象,裏面只能容納4我的
    for i in range(20):  
        p=Process(target=ktv,args=(i,sem)) ##for循環建立對個子進程 (至關於建立20個等待唱歌的人)
        p.start()  #啓動子進程
進ktv唱歌的例子

信號量和鎖的概念相似,

鎖是 某一段代碼 同一時間 只能被1個進程執行

信號量是 某一段代碼 同一時間 只能被n個進程執行

python線程的事件用於主線程控制其餘線程的執行,事件主要提供了三個方法 set、wait、clear。

    事件處理的機制:全局定義了一個「Flag」,若是「Flag」值爲 False,那麼當程序執行 event.wait 方法時就會阻塞,若是「Flag」值爲True,那麼event.wait 方法時便再也不阻塞。

e.clear():將「Flag」設置爲False
e.set():將「Flag」設置爲True
import time,random
from multiprocessing import Event,Process
def cars(e,i):   #寫一個汽車通行函數
    if not e.is_set():   #若是通行這個事件的狀態是False,
        print('car%i在等待' % i)   #則汽車在此等待
        e.wait()  # 阻塞 直到等待  事件狀態變成 True 的信號
    print('\033[0;32;40mcar%i經過\033[0m' % i) #則汽車經過這個紅路燈路口

def light(e):  #寫一個紅綠燈變化函數
    while True:    #循環狀態
        if e.is_set():  #判斷事件狀態是否爲真
            e.clear()    #若是是真,則將事件狀態經過 e.clear 改變成False
            print('\033[31m紅燈亮了\033[0m')   #此時狀態爲False,定義紅燈亮了 
            time.sleep(2)  #紅燈停留2秒

        else:  #判斷事件狀態是否爲False 
            e.set()  #若是是False,則將事件狀態經過 e.set() 改變成True
            print('\033[31m綠燈亮了\033[0m') ##此時狀態爲True,定義綠燈亮了 
            time.sleep(2)   #紅燈停留2秒

if __name__=='__main__':
    e=Event()   #注意,事件在開始的時候是爲False的
    p=Process(target=light,args=(e,))  #建立一個子進程
    p.start()  #啓動子進程light
    for i in range(10):
        car=Process(target=cars,args=(e,i))    #建立多個子進程
        car.start()  #啓動子進程car
        time.sleep(random.random()) #隨機睡0.幾秒
紅綠燈,車輛通行例子

 

IPC(Inter-Process Communication)

建立共享的進程隊列,Queue是多進程安全的隊列,
可使用Queue實現多進程之間的數據傳遞。 
Queue([maxsize]) 
建立共享的進程隊列。
參數 :maxsize是隊列中容許的最大項數。若是省略此參數,則無大小限制。
底層隊列使用管道和鎖定實現。
Queue([maxsize]) 
建立共享的進程隊列。maxsize是隊列中容許的最大項數。若是省略此參數,則無大小限制。底層隊列使用管道和鎖定實現。另外,還須要運行支持線程以便隊列中的數據傳輸到底層管道中。 
Queue的實例q具備如下方法:

q.get( [ block [ ,timeout ] ] ) 
返回q中的一個項目。若是q爲空,此方法將阻塞,直到隊列中有項目可用爲止。block用於控制阻塞行爲,默認爲True. 若是設置爲False,將引起Queue.Empty異常(定義在Queue模塊中)。timeout是可選超時時間,用在阻塞模式中。若是在制定的時間間隔內沒有項目變爲可用,將引起Queue.Empty異常。

q.get_nowait( ) 
同q.get(False)方法。

q.put(item [, block [,timeout ] ] ) 
將item放入隊列。若是隊列已滿,此方法將阻塞至有空間可用爲止。block控制阻塞行爲,默認爲True。若是設置爲False,將引起Queue.Empty異常(定義在Queue庫模塊中)。timeout指定在阻塞模式中等待可用空間的時間長短。超時後將引起Queue.Full異常。

q.qsize() 
返回隊列中目前項目的正確數量。此函數的結果並不可靠,由於在返回結果和在稍後程序中使用結果之間,隊列中可能添加或刪除了項目。在某些系統上,此方法可能引起NotImplementedError異常。


q.empty() 
若是調用此方法時 q爲空,返回True。若是其餘進程或線程正在往隊列中添加項目,結果是不可靠的。也就是說,在返回和使用結果之間,隊列中可能已經加入新的項目。

q.full() 
若是q已滿,返回爲True. 因爲線程的存在,結果也多是不可靠的(參考q.empty()方法)。。
隊列方法介紹
q.close() 
關閉隊列,防止隊列中加入更多數據。調用此方法時,後臺線程將繼續寫入那些已入隊列但還沒有寫入的數據,但將在此方法完成時立刻關閉。若是q被垃圾收集,將自動調用此方法。關閉隊列不會在隊列使用者中生成任何類型的數據結束信號或異常。例如,若是某個使用者正被阻塞在get()操做上,關閉生產者中的隊列不會致使get()方法返回錯誤。

q.cancel_join_thread() 
不會再進程退出時自動鏈接後臺線程。這能夠防止join_thread()方法阻塞。

q.join_thread() 
鏈接隊列的後臺線程。此方法用於在調用q.close()方法後,等待全部隊列項被消耗。默認狀況下,此方法由不是q的原始建立者的全部進程調用。調用q.cancel_join_thread()方法能夠禁止這種行爲。
其餘方法
# 隊列 先進先出
from multiprocessing import Process,Queue
def put_data(q):
    for i in ['b','c','d','e','a','f']:
        if q.full():#隊列是否已滿
            break
        q.put(i)  #放數據,數據一旦放到隊列能放的最大數值,再放回形成阻塞,因此上面用到了判斷隊列是否已滿
def get_data(q):
    for j in ['b','c','d','e','a','f']:
        if q.empty(): #判斷隊列中的數據是否已經取完
            break
        g=q.get()#取數據,數據一旦取完再取,會形成阻塞
        print(g)

if __name__=='__main__':
    q=Queue(6)  #建立一個隊列對象,括號內的數表示這個隊列能放的最大數據
    p1=Process(target=put_data,args=(q,))#建立一個子進程
    p2=Process(target=get_data,args=(q,))#建立一個子進程
    p1.start()#啓動子進程
    p2.start()#啓動子進程
單看隊列中放數據和取數據例子

 上面這個例子尚未加入進程通訊,只是先來看看隊列爲咱們提供的方法,以及這些方法的使用和現象。

在隊列中有用到方法:

q.put(i)  #放數據
g=q.get()#取數據
q.full():#判斷隊列是否已滿
q.empty(): #判斷隊列中的數據是否已經取完
q.put_nowait(3) # 可使用put_nowait,若是隊列滿了不會阻塞,可是會由於隊列滿了而報錯。

在併發編程中使用生產者和消費者模式可以解決絕大多數併發問題。該模式經過平衡生產線程和消費線程的工做能力來提升程序的總體處理數據的速度。

爲何要使用生產者和消費者模式

在線程世界裏,生產者就是生產數據的線程,消費者就是消費數據的線程。在多線程開發當中,若是生產者處理速度很快,而消費者處理速度很慢,那麼生產者就必須等待消費者處理完,才能繼續生產數據。一樣的道理,若是消費者的處理能力大於生產者,那麼消費者就必須等待生產者。爲了解決這個問題因而引入了生產者和消費者模式。

什麼是生產者消費者模式

生產者消費者模式是經過一個容器來解決生產者和消費者的強耦合問題。生產者和消費者彼此之間不直接通信,而經過阻塞隊列來進行通信,因此生產者生產完數據以後不用等待消費者處理,直接扔給阻塞隊列,消費者不找生產者要數據,而是直接從阻塞隊列裏取,阻塞隊列就至關於一個緩衝區,平衡了生產者和消費者的處理能力。

基於隊列實現生產者消費者模型
import time,random
from multiprocessing import Process,Queue
def consumer(q,name):  #建立一個消費者函數 q是隊列的實例化對象,name消費者名字
    while True: #消費者排隊購買
        food = q.get()  #取到產品
        if food is None: #若是產品已經賣完了,不能再取了,則返回空
            print('%s得到一個空'%name)
            break
        print('\033[31m%s消費了%s\033[0m'%(name,food))
        time.sleep(random.randint(1,2))  #隨機停留1-2秒

def producer(name,food,q):  #建立一個生產者函數
    for i in range(4):  #循環建立4個生產者
        # time.sleep(random.randint(1,2)) #隨機停留1-2秒
        f='%s生產了%s%s'%(name,food,i) #生產一個產品
        print(i)
        q.put(f)  #將產品放進隊列中
if __name__=='__main__':
    q = Queue(20)    #建立一個隊列的實例化對象
    p1 = Process(target=producer,args=('Egon','包子',q)) #建立一個生產者子進程
    p2 = Process(target=producer, args=('wusir','泔水', q))#建立一個生產者子進程
    c1 = Process(target=consumer, args=(q,'alex'))          #建立一個消費子進程
    c2 = Process(target=consumer, args=(q,'jinboss'))    #建立一個消費子進程
    p1.start() #啓動子進程
    p2.start()#啓動子進程
    c1.start()#啓動子進程
    c2.start()#啓動子進程
    p1.join()  #感知 p1 生產者子進程是否運行結束。
    p2.join()  #感知 p2 生產者子進程是否運行結束。再
    #等join感知到子進程中全部程序運行結束,再運行下面的程序
    q.put(None) #當隊列中放None時,消費者取到None說明生產者已經把須要生產的產品所有生產完且已經被消費者取完
    q.put(None)  #None也能夠理解成作了一個標誌,標誌生產者已經運行結束,且生產者生產的產品也已經被取 完
生產者消費者模式 例子

在此例子中除了用到隊列中的方法還有join:感知一個進程是否運行結束。它的做用就是:生防止產者在生產完後就結束了,可是消費者在取空了以後,則一直處於死循環中且卡在q.get()這一步。

 JoinableQueue([maxsize]) 

建立可鏈接的共享進程隊列。這就像是一個Queue對象,但隊列容許項目的使用者通知生產者項目已經被成功處理。通知進程是使用共享的信號和條件變量來實現的。 

JoinableQueue的實例p除了與Queue對象相同的方法以外,還具備如下方法:

q.task_done() 
使用者使用此方法發出信號,表示q.get()返回的項目已經被處理。若是調用此方法的次數大於從隊列中刪除的項目數量,將引起ValueError異常。

q.join() 
生產者將使用此方法進行阻塞,直到隊列中全部項目均被處理。阻塞將持續到爲隊列中的每一個項目均調用q.task_done()方法爲止。 
下面的例子說明如何創建永遠運行的進程,使用和處理隊列上的項目。生產者將項目放入隊列,並等待它們被處理。
方法介紹
import time,random
from multiprocessing import Process,JoinableQueue

def consumer(q,name):
    while True:
        f_c=q.get()  ## 每次獲取一個數據
        print('\033[31m%s消費了%s\033[0m' % (name,f_c))  ## 處理一個數據
        time.sleep(random.randint(1, 3))
        q.task_done()  ## 發送一個記號 : 標誌一個數據被處理成功

def producer(name,product,q):
    for i in range(10):
        time.sleep(random.randint(1,3))
        f_p='%s成產了%s%s份'%(name,product,i)  ## 每一次生產一個數據,
        print(f_p)
        q.put(f_p) # 且每一次生產的數據都放在隊列中
    q.join() #感知生產是否結束, 
    # 當生產者所有生產完畢以後,join信號 : 已經中止生產數據了 
    #  且要等待以前被刻上的記號都被消費完
    # 當數據都被處理完時,join阻塞結束
if __name__  == '__main__':
    q = JoinableQueue(15)
    p1 = Process(target=producer,args=('Egon','包子',q))
    p2 = Process(target=producer, args=('wusir','泔水', q))
    c1 = Process(target=consumer, args=(q,'alex'))
    c2 = Process(target=consumer, args=(q,'jinboss'))
    p1.start()
    p2.start()
    c1.daemon = True   # 設置爲守護進程(主進程中的代碼執行完畢以後,子進程自動結束)當主進程中的代碼運行結束之後,消費者進程也隨之結束
    c1.start()
    c2.start()
    p1.join()
    p2.join()      # 感知一個生產者的進程是否結束,若結束,則主進程中的代碼結束,消費者的進程也結束
生產者消費者模式改良版

#建立管道的類:
Pipe([duplex]):在進程之間建立一條管道,並返回元組(conn1,conn2),其中conn1,conn2表示管道兩端的鏈接對象,強調一點:必須在產生Process對象以前產生管道
#參數介紹:
dumplex:默認管道是全雙工的,若是將duplex射成False,conn1只能用於接收,conn2只能用於發送。
#主要方法:
    conn1.recv():接收conn2.send(obj)發送的對象。若是沒有消息可接收,recv方法會一直阻塞。若是鏈接的另一端已經關閉,那麼recv方法會拋出EOFError。
    conn1.send(obj):經過鏈接發送對象。obj是與序列化兼容的任意對象
 #其餘方法:
conn1.close():關閉鏈接。若是conn1被垃圾回收,將自動調用此方法
conn1.fileno():返回鏈接使用的整數文件描述符
conn1.poll([timeout]):若是鏈接上的數據可用,返回True。timeout指定等待的最長時限。若是省略此參數,方法將當即返回結果。若是將timeout射成None,操做將無限期地等待數據到達。
 
conn1.recv_bytes([maxlength]):接收c.send_bytes()方法發送的一條完整的字節消息。maxlength指定要接收的最大字節數。若是進入的消息,超過了這個最大值,將引起IOError異常,而且在鏈接上沒法進行進一步讀取。若是鏈接的另一端已經關閉,不再存在任何數據,將引起EOFError異常。
conn.send_bytes(buffer [, offset [, size]]):經過鏈接發送字節數據緩衝區,buffer是支持緩衝區接口的任意對象,offset是緩衝區中的字節偏移量,而size是要發送字節數。結果數據以單條消息的形式發出,而後調用c.recv_bytes()函數進行接收    
 
conn1.recv_bytes_into(buffer [, offset]):接收一條完整的字節消息,並把它保存在buffer對象中,該對象支持可寫入的緩衝區接口(即bytearray對象或相似的對象)。offset指定緩衝區中放置消息處的字節位移。返回值是收到的字節數。若是消息長度大於可用的緩衝區空間,將引起BufferTooShort異常。
介紹
from multiprocessing import Pipe,Process  #調用多進程和管道模塊
#第一種方法:
# def func(conn1,conn2): #建立子進程中的函數
#     conn2.close() #將暫時不用的鏈接對象關閉
#     while True:
#         try:
#             msg=conn1.recv() #接收主進程發過來的消息
#             print(msg)
#         except EOFError:
#             conn1.close() #當主進程中的鏈接對象關閉時,會出現EOFError錯誤
#             break
#
# if __name__=='__main__':
#     conn1,conn2=Pipe()   #建立管道,conn1,conn2是鏈接管道兩端的鏈接對象
#     Process(target=func,args=(conn1,conn2)).start()  #實例化一個進程,啓動進程
#     conn1.close() #將暫時不用的鏈接對象關閉
#     for i in range(20):
#         conn2.send('吃了麼')
#     conn2.close()
#第二種方法:
def func(conn1): #建立子進程中的函數
    while True:
            msg=conn1.recv() #接收主進程發過來的消息
            if msg is None:break  #當conn1接收到一個None的時候,標誌着conn2已經把消息發送完了,這時候直接break
            print(msg)
if __name__=='__main__':
    conn1,conn2=Pipe()   #建立管道,conn1,conn2是鏈接管道兩端的鏈接對象
    Process(target=func,args=(conn1,)).start()  #實例化一個進程,啓動進程
    for i in range(5):
        conn2.send('吃了麼')
    conn2.send(None)  #當conn2的內容所有發送完之後。爲了不發生阻塞,返回一個None(至關於作一個我已經發完了的標記)
利用管道收發消息

應該特別注意管道端點的正確管理問題。若是是生產者或消費者中都沒有使用管道的某個端點,就應將它關閉。這也說明了爲什麼在生產者中關閉了管道的輸出端,在消費者中關閉管道的輸入端。若是忘記執行這些步驟,程序可能在消費者中的recv()操做上掛起。管道是由操做系統進行引用計數的,必須在全部進程中關閉管道後才能生成EOFError異常。所以,在生產者中關閉管道不會有任何效果,除非消費者也關閉了相同的管道端點。

from multiprocessing import Process,Pipe,Lock
def producer(con,p,name,product):  
    con.close()
    for i in range(10):
        f=('%s生產的%s第%s份'%(name,product,i))
        print(f)
        p.send(f)
    p.send(None)
    p.send(None)
    p.send(None)
    p.close()

def consumer(con,p,name,lock):
    p.close()
    while True:
        lock.acquire()
        product=con.recv()
        lock.release()
        if product is None:
            con.close()
            break
        print('%s消費了%s'%(name,product))


if __name__=='__main__':
    con,p=Pipe()
    lock=Lock()
    p1=Process(target=producer,args=(con,p,'yimi','麪包'))
    c1= Process(target=consumer,args=(con,p,'titi',lock))
    c2= Process(target=consumer, args=(con, p, 'eva', lock))
    c3= Process(target=consumer, args=(con, p, 'anni', lock))
    c1.start()
    c2.start()
    c3.start()
    p1.start()
    con.close()
    p.close()
管道實現生產者消費者模型

注意:pipe 數據具備不安全性,須要加 lock來控制操做管道的行爲,來避免進程之間爭搶數據形成的數據不安全現象。

 

展望將來,基於消息傳遞的併發編程是大勢所趨

即使是使用線程,推薦作法也是將程序設計爲大量獨立的線程集合,經過消息隊列交換數據。

這樣極大地減小了對使用鎖定和其餘同步手段的需求,還能夠擴展到分佈式系統中。

但進程間應該儘可能避免通訊,即使須要通訊,也應該選擇進程安全的工具來避免加鎖帶來的問題。

之後咱們會嘗試使用數據庫來解決如今進程之間的數據共享問題。

from multiprocessing import Manager,Process,Lock
def main(dic,lock):
    lock.acquire() #拿鑰匙
    dic['count'] -= 1 #處理數據
    lock.release()  #還鑰匙

if __name__ == '__main__':
    m = Manager()  #建立Manager對象
    l = Lock()  #建立鎖對象,加鎖的做用是避免數據處理混亂
    dic=m.dict({'count':100}) #調用Manager中的dict方法,獲得具備數據共享功能的字典
    p_lst = []  #建立一個空列表,用來放進程對象
    for i in range(50):
        p = Process(target=main,args=(dic,l))
        p.start()
        p_lst.append(p)
    for i in p_lst: i.join() #循環獲得各個子進程,並join,感知進程是否結束
    print('主進程',dic)
Mananger進程間的數據共享

爲何要有進程池?進程池的概念。

在程序實際處理問題過程當中,忙時會有成千上萬的任務須要被執行,閒時可能只有零星任務。那麼在成千上萬個任務須要被執行的時候,咱們就須要去建立成千上萬個進程麼?首先,建立進程須要消耗時間,銷燬進程也須要消耗時間。第二即使開啓了成千上萬的進程,操做系統也不能讓他們同時執行,這樣反而會影響程序的效率。所以咱們不能無限制的根據任務開啓或者結束進程。那麼咱們要怎麼作呢?

在這裏,要給你們介紹一個進程池的概念,定義一個池子,在裏面放上固定數量的進程,有需求來了,就拿一個池中的進程來處理任務,等處處理完畢,進程並不關閉,而是將進程再放回進程池中繼續等待任務。若是有不少任務須要執行,池中的進程數量不夠,任務就要等待以前的進程執行任務完畢歸來,拿到空閒進程才能繼續執行。也就是說,池中進程的數量是固定的,那麼同一時間最多有固定數量的進程在運行。這樣不會增長操做系統的調度難度,還節省了開閉進程的時間,也必定程度上可以實現併發效果。

 

Pool([numprocess  [,initializer [, initargs]]]):建立進程池
1 numprocess:要建立的進程數,若是省略,將默認使用cpu_count()的值
2 initializer:是每一個工做進程啓動時要執行的可調用對象,默認爲None
3 initargs:是要傳給initializer的參數組

數據池的方法:

1 p.apply(func [, args [, kwargs]]):在一個池工做進程中執行func(*args,**kwargs),而後返回結果。

2 '''須要強調的是:此操做並不會在全部池工做進程中並執行func函數。若是要經過不一樣參數併發地執行func函數,
  必須從不一樣線程調用p.apply()函數或者使用p.apply_async()'''
3 p.apply_async(func [, args [, kwargs]]):在一個池工做進程中執行func(*args,**kwargs),而後返回結果。

4 '''此方法的結果是AsyncResult類的實例,callback是可調用對象,接收輸入參數。當func的結果變爲可用時,
  將理解傳遞給callback。callback禁止執行任何阻塞操做,不然將接收其餘異步操做中的結果。'''
   
5 p.close():關閉進程池,防止進一步操做。若是全部操做持續掛起,它們將在工做進程終止前完成
 
6 P.jion():等待全部工做進程退出。此方法只能在close()或teminate()以後調用
1 方法apply_async()和map_async()的返回值是AsyncResul的實例obj。實例具備如下方法
2 obj.get():返回結果,若是有必要則等待結果到達。timeout是可選的。若是在指定時間內尚未到達,將引起一場。若是遠程操做中引起了異常,它將在調用此方法時再次被引起。
3 obj.ready():若是調用完成,返回True
4 obj.successful():若是調用完成且沒有引起異常,返回True,若是在結果就緒以前調用此方法,引起異常
5 obj.wait([timeout]):等待結果變爲可用。
6 obj.terminate():當即終止全部工做進程,同時不執行任何清理或結束任何掛起工做。若是p被垃圾回收,將自動調用此函數
其餘方法瞭解

進程池和多進程效率對比
import time
from multiprocessing import Pool,Process
def func(n):
    for i in range(10):
        print(n+1)

def func2(n):
    for i in range(10):
        print(n+2)
if __name__ == '__main__':
    start = time.time()  #進程開始的時間
    pool = Pool(5)               # 5個進程(根據計算機的cpu個數+1)
    pool.map(func,range(1,10))    # 100個func任務
    pool.map(func2,range(10,1,-1))    # 100個func2任務
    t1 = time.time() - start  #利用進程池完成全部任務所需的時間

    start = time.time()  #進程開始的時間
    p_lst = []   #建立一個用來存放左右進程對象的空列表
    for i in range(100):  #100個進程
        p = Process(target=func,args=(i,))  #每循環一次建立一個進程對象,去執行func任務
        p_lst.append(p)
        p.start()  #啓動進程
    for p in p_lst :p.join()  #利用循環感知每一個進程是否結束
    t2 = time.time() - start ##利用多個進程完成全部任務所需的時間
    print(t1,t2)       #t1=0.17700982093811035,t2=4.100234508514404
    
#總結:使用進程池的方法可以高效的去完成任務
進程池和多進程效率對比
同步和異步
import os,time
from multiprocessing import Pool

def work(n):
    print('%s run' %os.getpid())
    time.sleep(3)
    return n**2

if __name__ == '__main__':
    p=Pool(3) #進程池中從無到有建立三個進程,之後一直是這三個進程在執行任務
    res_l=[]
    for i in range(10):
        res=p.apply(work,args=(i,)) # 同步調用,直到本次任務執行完畢拿到res,等待任務work執行的過程當中可能有阻塞也可能沒有阻塞
                                    # 但無論該任務是否存在阻塞,同步調用都會在原地等着
    print(res_l)
進程池同步調用
import os
import time
import random
from multiprocessing import Pool

def work(n):
    print('%s run' %os.getpid())
    time.sleep(random.random())
    return n**2

if __name__ == '__main__':
    p=Pool(3) #進程池中從無到有建立三個進程,之後一直是這三個進程在執行任務
    res_l=[]
    for i in range(10):
        res=p.apply_async(work,args=(i,)) # 異步運行,根據進程池中有的進程數,每次最多3個子進程在異步執行
                                          # 返回結果以後,將結果放入列表,歸還進程,以後再執行新的任務
                                          # 須要注意的是,進程池中的三個進程不會同時開啓或者同時結束
                                          # 而是執行完一個就釋放一個進程,這個進程就去接收新的任務。  
        res_l.append(res)

    # 異步apply_async用法:若是使用異步提交的任務,主進程須要使用jion,等待進程池內任務都處理完,而後能夠用get收集結果
    # 不然,主進程結束,進程池可能還沒來得及執行,也就跟着一塊兒結束了
    p.close()
    p.join()
    for res in res_l:
        print(res.get()) #使用get來獲取apply_aync的結果,若是是apply,則沒有get方法,由於apply是同步執行,馬上獲取結果,也根本無需get
進程池異步調用
回調函數
須要回調函數的場景:進程池中任何一個任務一旦處理完了,就當即告知主進程:我好了額,你能夠處理個人結果了。主進程則調用一個函數去處理該結果,該函數即回調函數

咱們能夠把耗時間(阻塞)的任務放到進程池中,而後指定回調函數(主進程負責執行),這樣主進程在執行回調函數時就省去了I/O的過程,直接拿到的是任務的結果。
import time,os
from multiprocessing import Pool
def f1(n): #n=10
    print('in f1',os.getpid())
    return n**2 #返回n,此時n=100
def f2(n): 
    print('in f2',os.getpid())
    print(n)
if __name__=='__main__':
    print('主進程 ',os.getpid())
    p=Pool(5)
    p.apply_async(f1,args=(10,),callback=f2)  #進程池的異步調用,n=10,callback是將子進程中的f2函數回調到主進程中
    p.close() #關閉進程
    p.join() #感知進程是否結束
進程池的回調函數

若是在主進程中等待進程池中全部任務都執行完畢後,再統一處理結果,則無需回調函數

相關文章
相關標籤/搜索