二 python併發編程之多進程實現

一 multiprocessing模塊介紹html

二 process類的介紹python

三 process類的使用linux

四 守護進程git

五 進程同步(鎖)github

六 隊列數據庫

七 管道編程

八 共享數據json

九 信號量windows

十 事件數組

十一 進程池

 

一 multiprocessing模塊介紹

python中的多線程沒法利用多核優點,若是想要充分地使用多核CPU的資源(os.cpu_count()查看),在python中大部分狀況須要使用多進程。Python提供了multiprocessing。
    multiprocessing模塊用來開啓子進程,並在子進程中執行咱們定製的任務(好比函數),該模塊與多線程模塊threading的編程接口相似。

  multiprocessing模塊的功能衆多:支持子進程、通訊和共享數據、執行不一樣形式的同步,提供了Process、Queue、Pipe、Lock等組件。

    須要再次強調的一點是:與線程不一樣,進程沒有任何共享狀態,進程修改的數據,改動僅限於該進程內。

 

二 process類的介紹

建立進程的類

1 Process([group [, target [, name [, args [, kwargs]]]]]),由該類實例化獲得的對象,表示一個子進程中的任務(還沒有啓動)
2 
3 強調:
4 1. 須要使用關鍵字的方式來指定參數
5 2. args指定的爲傳給target函數的位置參數,是一個元組形式,必須有逗號

參數介紹:

1 group參數未使用,值始終爲None
2 
3 target表示調用對象,即子進程要執行的任務
4 
5 args表示調用對象的位置參數元組,args=(1,2,'egon',)
6 
7 kwargs表示調用對象的字典,kwargs={'name':'egon','age':18}
8 
9 name爲子進程的名稱

方法介紹:

1 p.start():啓動進程,並調用該子進程中的p.run() 
2 p.run():進程啓動時運行的方法,正是它去調用target指定的函數,咱們自定義類的類中必定要實現該方法  
3 
4 p.terminate():強制終止進程p,不會進行任何清理操做,若是p建立了子進程,該子進程就成了殭屍進程,使用該方法須要特別當心這種狀況。若是p還保存了一個鎖那麼也將不會被釋放,進而致使死鎖
5 p.is_alive():若是p仍然運行,返回True
6 
7 p.join([timeout]):主線程等待p終止(強調:是主線程處於等的狀態,而p是處於運行的狀態)。timeout是可選的超時時間,須要強調的是,p.join只能join住start開啓的進程,而不能join住run開啓的進程

屬性介紹:

1 p.daemon:默認值爲False,若是設爲True,表明p爲後臺運行的守護進程,當p的父進程終止時,p也隨之終止,而且設定爲True後,p不能建立本身的新進程,必須在p.start()以前設置
2 
3 p.name:進程的名稱
4 
5 p.pid:進程的pid
6 
7 p.exitcode:進程在運行時爲None、若是爲–N,表示被信號N結束(瞭解便可)
8 
9 p.authkey:進程的身份驗證鍵,默認是由os.urandom()隨機生成的32字符的字符串。這個鍵的用途是爲涉及網絡鏈接的底層進程間通訊提供安全性,這類鏈接只有在具備相同的身份驗證鍵時才能成功(瞭解便可)

 

三 process類的使用

注意:在windows中Process()必須放到# if __name__ == '__main__':下

 1 Since Windows has no fork, the multiprocessing module starts a new Python process and imports the calling module. 
 2 If Process() gets called upon import, then this sets off an infinite succession of new processes (or until your machine runs out of resources). 
 3 This is the reason for hiding calls to Process() inside
 4 
 5 if __name__ == "__main__"
 6 since statements inside this if-statement will not get called upon import.
 7 因爲Windows沒有fork,多處理模塊啓動一個新的Python進程並導入調用模塊。 
 8 若是在導入時調用Process(),那麼這將啓動無限繼承的新進程(或直到機器耗盡資源)。 
 9 這是隱藏對Process()內部調用的原,使用if __name__ == 「__main __」,這個if語句中的語句將不會在導入時被調用。
10 

建立並開啓子進程的兩種方式

 1 #開進程的方法一:
 2 import time
 3 import random
 4 from multiprocessing import Process
 5 def piao(name):
 6     print('%s piaoing' %name)
 7     time.sleep(random.randrange(1,5))
 8     print('%s piao end' %name)
 9 
10 
11 
12 p1=Process(target=piao,args=('egon',)) #必須加,號
13 p2=Process(target=piao,args=('alex',))
14 p3=Process(target=piao,args=('wupeqi',))
15 p4=Process(target=piao,args=('yuanhao',))
16 
17 p1.start()
18 p2.start()
19 p3.start()
20 p4.start()
21 print('主線程')
方法一
 1 #開進程的方法二:
 2 import time
 3 import random
 4 from multiprocessing import Process
 5 
 6 
 7 class Piao(Process):
 8     def __init__(self,name):
 9         super().__init__()
10         self.name=name
11     def run(self):
12         print('%s piaoing' %self.name)
13 
14         time.sleep(random.randrange(1,5))
15         print('%s piao end' %self.name)
16 
17 p1=Piao('egon')
18 p2=Piao('alex')
19 p3=Piao('wupeiqi')
20 p4=Piao('yuanhao')
21 
22 p1.start() #start會自動調用run
23 p2.start()
24 p3.start()
25 p4.start()
26 print('主線程')
方法二

進程直接的內存空間是隔離的

 1 from multiprocessing import Process
 2 n=100 #在windows系統中應該把全局變量定義在if __name__ == '__main__'之上就能夠了
 3 def work():
 4     global n
 5     n=0
 6     print('子進程內: ',n)
 7 
 8 
 9 if __name__ == '__main__':
10     p=Process(target=work)
11     p.start()
12     print('主進程內: ',n)

Process對象的join方法

 1 from multiprocessing import Process
 2 import time
 3 import random
 4 
 5 class Piao(Process):
 6     def __init__(self,name):
 7         self.name=name
 8         super().__init__()
 9     def run(self):
10         print('%s is piaoing' %self.name)
11         time.sleep(random.randrange(1,3))
12         print('%s is piao end' %self.name)
13 
14 
15 p=Piao('egon')
16 p.start()
17 p.join(0.0001) #等待p中止,等0.0001秒就再也不等了
18 print('開始')
join:主進程等,等待子進程結束
 1 from multiprocessing import Process
 2 import time
 3 import random
 4 def piao(name):
 5     print('%s is piaoing' %name)
 6     time.sleep(random.randint(1,3))
 7     print('%s is piao end' %name)
 8 
 9 p1=Process(target=piao,args=('egon',))
10 p2=Process(target=piao,args=('alex',))
11 p3=Process(target=piao,args=('yuanhao',))
12 p4=Process(target=piao,args=('wupeiqi',))
13 
14 p1.start()
15 p2.start()
16 p3.start()
17 p4.start()
18 
19 #有的同窗會有疑問:既然join是等待進程結束,那麼我像下面這樣寫,進程不就又變成串行的了嗎?
20 #固然不是了,必須明確:p.join()是讓誰等?
21 #很明顯p.join()是讓主線程等待p的結束,卡住的是主線程而絕非進程p,
22 
23 #詳細解析以下:
24 #進程只要start就會在開始運行了,因此p1-p4.start()時,系統中已經有四個併發的進程了
25 #而咱們p1.join()是在等p1結束,沒錯p1只要不結束主線程就會一直卡在原地,這也是問題的關鍵
26 #join是讓主線程等,而p1-p4仍然是併發執行的,p1.join的時候,其他p2,p3,p4仍然在運行,等#p1.join結束,可能p2,p3,p4早已經結束了,這樣p2.join,p3.join.p4.join直接經過檢測,無需等待
27 # 因此4個join花費的總時間仍然是耗費時間最長的那個進程運行的時間
28 p1.join()
29 p2.join()
30 p3.join()
31 p4.join()
32 
33 print('主線程')
34 
35 
36 #上述啓動進程與join進程能夠簡寫爲
37 # p_l=[p1,p2,p3,p4]
38 # 
39 # for p in p_l:
40 #     p.start()
41 # 
42 # for p in p_l:
43 #     p.join()
有了join,程序不就是串行了嗎?

殭屍進程與孤兒進程(瞭解)

參考博客:http://www.cnblogs.com/Anker/p/3271773.html

一:殭屍進程(有害)
  殭屍進程:一個進程使用fork建立子進程,若是子進程退出,而父進程並無調用wait或waitpid獲取子進程的狀態信息,那麼子進程的進程描述符仍然保存在系統中。這種進程稱之爲僵死進程。詳解以下

咱們知道在unix/linux中,正常狀況下子進程是經過父進程建立的,子進程在建立新的進程。子進程的結束和父進程的運行是一個異步過程,即父進程永遠沒法預測子進程到底何時結束,若是子進程一結束就馬上回收其所有資源,那麼在父進程內將沒法獲取子進程的狀態信息。

所以,UNⅨ提供了一種機制能夠保證父進程能夠在任意時刻獲取子進程結束時的狀態信息:
1、在每一個進程退出的時候,內核釋放該進程全部的資源,包括打開的文件,佔用的內存等。可是仍然爲其保留必定的信息(包括進程號the process ID,退出狀態the termination status of the process,運行時間the amount of CPU time taken by the process等)
二、直到父進程經過wait / waitpid來取時才釋放. 但這樣就致使了問題,若是進程不調用wait / waitpid的話,那麼保留的那段信息就不會釋放,其進程號就會一直被佔用,可是系統所能使用的進程號是有限的,若是大量的產生僵死進程,將由於沒有可用的進程號而致使系統不能產生新的進程. 此即爲殭屍進程的危害,應當避免。

  任何一個子進程(init除外)在exit()以後,並不是立刻就消失掉,而是留下一個稱爲殭屍進程(Zombie)的數據結構,等待父進程處理。這是每一個子進程在結束時都要通過的階段。若是子進程在exit()以後,父進程沒有來得及處理,這時用ps命令就能看到子進程的狀態是「Z」。若是父進程能及時 處理,可能用ps命令就來不及看到子進程的殭屍狀態,但這並不等於子進程不通過殭屍狀態。  若是父進程在子進程結束以前退出,則子進程將由init接管。init將會以父進程的身份對殭屍狀態的子進程進行處理。

二:孤兒進程(無害)

  孤兒進程:一個父進程退出,而它的一個或多個子進程還在運行,那麼那些子進程將成爲孤兒進程。孤兒進程將被init進程(進程號爲1)所收養,並由init進程對它們完成狀態收集工做。

  孤兒進程是沒有父進程的進程,孤兒進程這個重任就落到了init進程身上,init進程就好像是一個民政局,專門負責處理孤兒進程的善後工做。每當出現一個孤兒進程的時候,內核就把孤 兒進程的父進程設置爲init,而init進程會循環地wait()它的已經退出的子進程。這樣,當一個孤兒進程淒涼地結束了其生命週期的時候,init進程就會表明黨和政府出面處理它的一切善後工做。所以孤兒進程並不會有什麼危害。

咱們來測試一下(建立完子進程後,主進程所在的這個腳本就退出了,當父進程先於子進程結束時,子進程會被init收養,成爲孤兒進程,而非殭屍進程),文件內容

import os
import sys
import time

pid = os.getpid()
ppid = os.getppid()
print 'im father', 'pid', pid, 'ppid', ppid
pid = os.fork()
#執行pid=os.fork()則會生成一個子進程
#返回值pid有兩種值:
#    若是返回的pid值爲0,表示在子進程當中
#    若是返回的pid值>0,表示在父進程當中
if pid > 0:
    print 'father died..'
    sys.exit(0)

# 保證主線程退出完畢
time.sleep(1)
print 'im child', os.getpid(), os.getppid()

執行文件,輸出結果:
im father pid 32515 ppid 32015
father died..
im child 32516 1

看,子進程已經被pid爲1的init進程接收了,因此殭屍進程在這種狀況下是不存在的,存在只有孤兒進程而已,孤兒進程聲明週期結束天然會被init來銷燬。


三:殭屍進程危害場景:

  例若有個進程,它按期的產 生一個子進程,這個子進程須要作的事情不多,作完它該作的事情以後就退出了,所以這個子進程的生命週期很短,可是,父進程只管生成新的子進程,至於子進程 退出以後的事情,則一律漠不關心,這樣,系統運行上一段時間以後,系統中就會存在不少的僵死進程,假若用ps命令查看的話,就會看到不少狀態爲Z的進程。 嚴格地來講,僵死進程並非問題的根源,罪魁禍首是產生出大量僵死進程的那個父進程。所以,當咱們尋求如何消滅系統中大量的僵死進程時,答案就是把產生大 量僵死進程的那個元兇槍斃掉(也就是經過kill發送SIGTERM或者SIGKILL信號啦)。槍斃了元兇進程以後,它產生的僵死進程就變成了孤兒進 程,這些孤兒進程會被init進程接管,init進程會wait()這些孤兒進程,釋放它們佔用的系統進程表中的資源,這樣,這些已經僵死的孤兒進程 就能瞑目而去了。

四:測試
#一、產生殭屍進程的程序test.py內容以下

#coding:utf-8
from multiprocessing import Process
import time,os

def run():
    print('',os.getpid())

if __name__ == '__main__':
    p=Process(target=run)
    p.start()
    
    print('',os.getpid())
    time.sleep(1000)


#二、在unix或linux系統上執行
[root@vm172-31-0-19 ~]# python3  test.py &
[1] 18652
[root@vm172-31-0-19 ~]# 主 18652
子 18653

[root@vm172-31-0-19 ~]# ps aux |grep Z
USER       PID %CPU %MEM    VSZ   RSS TTY      STAT START   TIME COMMAND
root     18653  0.0  0.0      0     0 pts/0    Z    20:02   0:00 [python3] <defunct> #出現殭屍進程
root     18656  0.0  0.0 112648   952 pts/0    S+   20:02   0:00 grep --color=auto Z

[root@vm172-31-0-19 ~]# top #執行top命令發現1zombie
top - 20:03:42 up 31 min,  3 users,  load average: 0.01, 0.06, 0.12
Tasks:  93 total,   2 running,  90 sleeping,   0 stopped,   1 zombie
%Cpu(s):  0.0 us,  0.3 sy,  0.0 ni, 99.7 id,  0.0 wa,  0.0 hi,  0.0 si,  0.0 st
KiB Mem :  1016884 total,    97184 free,    70848 used,   848852 buff/cache
KiB Swap:        0 total,        0 free,        0 used.   782540 avail Mem 

  PID USER      PR  NI    VIRT    RES    SHR S %CPU %MEM     TIME+ COMMAND                                                                                                                                        
root      20   0   29788   1256    988 S  0.3  0.1   0:01.50 elfin                                                                                                                      


#三、
等待父進程正常結束後會調用wait/waitpid去回收殭屍進程
但若是父進程是一個死循環,永遠不會結束,那麼該殭屍進程就會一直存在,殭屍進程過多,就是有害的
解決方法一:殺死父進程
解決方法二:對開啓的子進程應該記得使用join,join會回收殭屍進程
參考python2源碼註釋
class Process(object):
    def join(self, timeout=None):
        '''
        Wait until child process terminates
        '''
        assert self._parent_pid == os.getpid(), 'can only join a child process'
        assert self._popen is not None, 'can only join a started process'
        res = self._popen.wait(timeout)
        if res is not None:
            _current_process._children.discard(self)

join方法中調用了wait,告訴系統釋放殭屍進程。discard爲從本身的children中剔除

解決方法三:http://blog.csdn.net/u010571844/article/details/50419798

 

四 守護進程

主進程建立守護進程

  其一:守護進程會在主進程代碼執行結束後就終止

  其二:守護進程內沒法再開啓子進程,不然拋出異常:AssertionError: daemonic processes are not allowed to have children

注意:進程之間是互相獨立的,主進程代碼運行結束,守護進程隨即終止

from multiprocessing import Process
import time
import random

class Eat(Process):
    def __init__(self,name):
        self.name=name
        super().__init__()
    def run(self):
        print('%s is eating' %self.name)
        time.sleep(random.randrange(1,3))
        print('%s is eat end' %self.name)


p=Eat('lucy')
p.daemon=True #必定要在p.start()前設置,設置p爲守護進程,禁止p建立子進程,而且父進程代碼執行結束,p即終止運行
p.start()
print('')
 1 #主進程代碼運行完畢,守護進程就會結束
 2 from multiprocessing import Process
 3 from threading import Thread
 4 import time
 5 def foo():
 6     print(123)
 7     time.sleep(1)
 8     print("end123")
 9 
10 def bar():
11     print(456)
12     time.sleep(3)
13     print("end456")
14 
15 
16 p1=Process(target=foo)
17 p2=Process(target=bar)
18 
19 p1.daemon=True
20 p1.start()
21 p2.start()
22 print("main-------") #打印該行則主進程代碼結束,則守護進程p1應該被終止,可能會有p1任務執行的打印信息123,由於主進程打印main----時,p1也執行了,可是隨即被終止
迷惑人的例子

 

五 進程同步(鎖)

進程之間數據不共享,可是共享同一套文件系統,因此訪問同一個文件,或同一個打印終端,是沒有問題的,

而共享帶來的是競爭,競爭帶來的結果就是錯亂,如何控制,就是加鎖處理

part1:多個進程共享同一打印終端

 1 #併發運行,效率高,但競爭同一打印終端,帶來了打印錯亂
 2 from multiprocessing import Process
 3 import os,time
 4 def work():
 5     print('%s is running' %os.getpid())
 6     time.sleep(2)
 7     print('%s is done' %os.getpid())
 8 
 9 if __name__ == '__main__':
10     for i in range(3):
11         p=Process(target=work)
12         p.start()
併發運行,效率高,但競爭同一打印終端,帶來了打印錯亂
 1 #由併發變成了串行,犧牲了運行效率,但避免了競爭
 2 from multiprocessing import Process,Lock
 3 import os,time
 4 def work(lock):
 5     lock.acquire()
 6     print('%s is running' %os.getpid())
 7     time.sleep(2)
 8     print('%s is done' %os.getpid())
 9     lock.release()
10 if __name__ == '__main__':
11     lock=Lock()
12     for i in range(3):
13         p=Process(target=work,args=(lock,))
14         p.start()
加鎖:由併發變成了串行,犧牲了運行效率,但避免了競爭

part2:多個進程共享同一文件

文件當數據庫,模擬搶票

 1 #文件db的內容爲:{"count":1}
 2 #注意必定要用雙引號,否則json沒法識別
 3 from multiprocessing import Process,Lock
 4 import time,json,random
 5 def search():
 6     dic=json.load(open('db.txt'))
 7     print('\033[43m剩餘票數%s\033[0m' %dic['count'])
 8 
 9 def get():
10     dic=json.load(open('db.txt'))
11     time.sleep(0.1) #模擬讀數據的網絡延遲
12     if dic['count'] >0:
13         dic['count']-=1
14         time.sleep(0.2) #模擬寫數據的網絡延遲
15         json.dump(dic,open('db.txt','w'))
16         print('\033[43m購票成功\033[0m')
17 
18 def task(lock):
19     search()
20     get()
21 if __name__ == '__main__':
22     lock=Lock()
23     for i in range(100): #模擬併發100個客戶端搶票
24         p=Process(target=task,args=(lock,))
25         p.start()
併發運行,效率高,但競爭寫同一文件,數據寫入錯亂
 1 #文件db的內容爲:{"count":1}
 2 #注意必定要用雙引號,否則json沒法識別
 3 from multiprocessing import Process,Lock
 4 import time,json,random
 5 def search():
 6     dic=json.load(open('db.txt'))
 7     print('\033[43m剩餘票數%s\033[0m' %dic['count'])
 8 
 9 def get():
10     dic=json.load(open('db.txt'))
11     time.sleep(0.1) #模擬讀數據的網絡延遲
12     if dic['count'] >0:
13         dic['count']-=1
14         time.sleep(0.2) #模擬寫數據的網絡延遲
15         json.dump(dic,open('db.txt','w'))
16         print('\033[43m購票成功\033[0m')
17 
18 def task(lock):
19     search()
20     lock.acquire()
21     get()
22     lock.release()
23 if __name__ == '__main__':
24     lock=Lock()
25     for i in range(100): #模擬併發100個客戶端搶票
26         p=Process(target=task,args=(lock,))
27         p.start()
加鎖:購票行爲由併發變成了串行,犧牲了運行效率,但保證了數據安全

總結:

#加鎖能夠保證多個進程修改同一塊數據時,同一時間只能有一個任務能夠進行修改,即串行的修改,沒錯,速度是慢了,但犧牲了速度卻保證了數據安全。
雖然能夠用文件共享數據實現進程間通訊,但問題是:
1.效率低(共享數據基於文件,而文件是硬盤上的數據)
2.須要本身加鎖處理



#所以咱們最好找尋一種解決方案可以兼顧:一、效率高(多個進程共享一塊內存的數據)二、幫咱們處理好鎖問題。這就是mutiprocessing模塊爲咱們提供的基於消息的IPC通訊機制:隊列和管道。
隊列和管道都是將數據存放於內存中
隊列又是基於(管道+鎖)實現的,可讓咱們從複雜的鎖問題中解脫出來,
咱們應該儘可能避免使用共享數據,儘量使用消息傳遞和隊列,避免處理複雜的同步和鎖問題,並且在進程數目增多時,每每能夠得到更好的可獲展性。

 

六 隊列

進程彼此之間互相隔離,要實現進程間通訊(IPC),multiprocessing模塊支持兩種形式:隊列和管道,這兩種方式都是使用消息傳遞的

 建立隊列的類(底層就是以管道和鎖定的方式實現)

# Queue([maxsize]):建立共享的進程隊列,Queue是多進程安全的隊列,可使用Queue實現多進程之間的數據傳遞。 

    參數介紹: 

 
 
 
  
 

 方法介紹:

    主要方法:
1 q.put方法用以插入數據到隊列中,put方法還有兩個可選參數:blocked和timeout。若是blocked爲True(默認值),而且timeout爲正值,該方法會阻塞timeout指定的時間,直到該隊列有剩餘的空間。若是超時,會拋出Queue.Full異常。若是blocked爲False,但該Queue已滿,會當即拋出Queue.Full異常。
2 q.get方法能夠從隊列讀取而且刪除一個元素。一樣,get方法有兩個可選參數:blocked和timeout。若是blocked爲True(默認值),而且timeout爲正值,那麼在等待時間內沒有取到任何元素,會拋出Queue.Empty異常。若是blocked爲False,有兩種狀況存在,若是Queue有一個值可用,則當即返回該值,不然,若是隊列爲空,則當即拋出Queue.Empty異常.
3  
4 q.get_nowait():同q.get(False)
5 q.put_nowait():同q.put(False)
6 
7 q.empty():調用此方法時q爲空則返回True,該結果不可靠,好比在返回True的過程當中,若是隊列中又加入了項目。
8 q.full():調用此方法時q已滿則返回True,該結果不可靠,好比在返回True的過程當中,若是隊列中的項目被取走。
9 q.qsize():返回隊列中目前項目的正確數量,結果也不可靠,理由同q.empty()和q.full()同樣

 其餘方法(瞭解):

1 q.cancel_join_thread():不會在進程退出時自動鏈接後臺線程。能夠防止join_thread()方法阻塞
2 q.close():關閉隊列,防止隊列中加入更多數據。調用此方法,後臺線程將繼續寫入那些已經入隊列但還沒有寫入的數據,但將在此方法完成時立刻關閉。若是q被垃圾收集,將調用此方法。關閉隊列不會在隊列使用者中產生任何類型的數據結束信號或異常。例如,若是某個使用者正在被阻塞在get()操做上,關閉生產者中的隊列不會致使get()方法返回錯誤。
3 q.join_thread():鏈接隊列的後臺線程。此方法用於在調用q.close()方法以後,等待全部隊列項被消耗。默認狀況下,此方法由不是q的原始建立者的全部進程調用。調用q.cancel_join_thread方法能夠禁止這種行爲

應用:

 1 '''
 2 multiprocessing模塊支持進程間通訊的兩種主要形式:管道和隊列
 3 都是基於消息傳遞實現的,可是隊列接口
 4 '''
 5 
 6 from multiprocessing import Process,Queue
 7 import time
 8 q=Queue(3)
 9 
10 
11 #put ,get ,put_nowait,get_nowait,full,empty
12 q.put(3)
13 q.put(3)
14 q.put(3)
15 print(q.full()) #滿了
16 
17 print(q.get())
18 print(q.get())
19 print(q.get())
20 print(q.empty()) #空了

  生產者消費者模型

在併發編程中使用生產者和消費者模式可以解決絕大多數併發問題。該模式經過平衡生產線程和消費線程的工做能力來提升程序的總體處理數據的速度。

    爲何要使用生產者和消費者模式

在線程世界裏,生產者就是生產數據的線程,消費者就是消費數據的線程。在多線程開發當中,若是生產者處理速度很快,而消費者處理速度很慢,那麼生產者就必須等待消費者處理完,才能繼續生產數據。一樣的道理,若是消費者的處理能力大於生產者,那麼消費者就必須等待生產者。爲了解決這個問題因而引入了生產者和消費者模式。

    什麼是生產者消費者模式

生產者消費者模式是經過一個容器來解決生產者和消費者的強耦合問題。生產者和消費者彼此之間不直接通信,而經過阻塞隊列來進行通信,因此生產者生產完數據以後不用等待消費者處理,直接扔給阻塞隊列,消費者不找生產者要數據,而是直接從阻塞隊列裏取,阻塞隊列就至關於一個緩衝區,平衡了生產者和消費者的處理能力。

基於隊列實現生產者消費者模型

 1 from multiprocessing import Process,Queue
 2 import time,random,os
 3 def consumer(q):
 4     while True:
 5         res=q.get()
 6         time.sleep(random.randint(1,3))
 7         print('\033[45m%s 吃 %s\033[0m' %(os.getpid(),res))
 8 
 9 def producer(q):
10     for i in range(10):
11         time.sleep(random.randint(1,3))
12         res='包子%s' %i
13         q.put(res)
14         print('\033[44m%s 生產了 %s\033[0m' %(os.getpid(),res))
15 
16 if __name__ == '__main__':
17     q=Queue()
18     #生產者們:即廚師們
19     p1=Process(target=producer,args=(q,))
20 
21     #消費者們:即吃貨們
22     c1=Process(target=consumer,args=(q,))
23 
24     #開始
25     p1.start()
26     c1.start()
27     print('')
 1 #生產者消費者模型總結
 2 
 3     #程序中有兩類角色
 4         一類負責生產數據(生產者)
 5         一類負責處理數據(消費者)
 6         
 7     #引入生產者消費者模型爲了解決的問題是:
 8         平衡生產者與消費者之間的工做能力,從而提升程序總體處理數據的速度
 9         
10     #如何實現:
11         生產者<-->隊列<——>消費者
12     #生產者消費者模型實現類程序的解耦和

此時的問題是主進程永遠不會結束,緣由是:生產者p在生產完後就結束了,可是消費者c在取空了q以後,則一直處於死循環中且卡在q.get()這一步。

解決方式無非是讓生產者在生產完畢後,往隊列中再發一個結束信號,這樣消費者在接收到結束信號後就能夠break出死循環

 1 from multiprocessing import Process,Queue
 2 import time,random,os
 3 def consumer(q):
 4     while True:
 5         res=q.get()
 6         if res is None:break #收到結束信號則結束
 7         time.sleep(random.randint(1,3))
 8         print('\033[45m%s 吃 %s\033[0m' %(os.getpid(),res))
 9 
10 def producer(q):
11     for i in range(10):
12         time.sleep(random.randint(1,3))
13         res='包子%s' %i
14         q.put(res)
15         print('\033[44m%s 生產了 %s\033[0m' %(os.getpid(),res))
16     q.put(None) #發送結束信號
17 if __name__ == '__main__':
18     q=Queue()
19     #生產者們:即廚師們
20     p1=Process(target=producer,args=(q,))
21 
22     #消費者們:即吃貨們
23     c1=Process(target=consumer,args=(q,))
24 
25     #開始
26     p1.start()
27     c1.start()
28     print('')
生產者在生產完畢後發送結束信號None

注意:結束信號None,不必定要由生產者發,主進程裏一樣能夠發,但主進程須要等生產者結束後才應該發送該信號

 1 from multiprocessing import Process,Queue
 2 import time,random,os
 3 def consumer(q):
 4     while True:
 5         res=q.get()
 6         if res is None:break #收到結束信號則結束
 7         time.sleep(random.randint(1,3))
 8         print('\033[45m%s 吃 %s\033[0m' %(os.getpid(),res))
 9 
10 def producer(q):
11     for i in range(2):
12         time.sleep(random.randint(1,3))
13         res='包子%s' %i
14         q.put(res)
15         print('\033[44m%s 生產了 %s\033[0m' %(os.getpid(),res))
16 
17 if __name__ == '__main__':
18     q=Queue()
19     #生產者們:即廚師們
20     p1=Process(target=producer,args=(q,))
21 
22     #消費者們:即吃貨們
23     c1=Process(target=consumer,args=(q,))
24 
25     #開始
26     p1.start()
27     c1.start()
28 
29     p1.join()
30     q.put(None) #發送結束信號
31     print('')
主進程在生產者生產完畢後發送結束信號None

但上述解決方式,在有多個生產者和多個消費者時,咱們則須要用一個簡單的方式去解決

 1 from multiprocessing import Process,Queue
 2 import time,random,os
 3 def consumer(q):
 4     while True:
 5         res=q.get()
 6         if res is None:break #收到結束信號則結束
 7         time.sleep(random.randint(1,3))
 8         print('\033[45m%s 吃 %s\033[0m' %(os.getpid(),res))
 9 
10 def producer(name,q):
11     for i in range(2):
12         time.sleep(random.randint(1,3))
13         res='%s%s' %(name,i)
14         q.put(res)
15         print('\033[44m%s 生產了 %s\033[0m' %(os.getpid(),res))
16 
17 
18 
19 if __name__ == '__main__':
20     q=Queue()
21     #生產者們:即廚師們
22     p1=Process(target=producer,args=('包子',q))
23     p2=Process(target=producer,args=('骨頭',q))
24     p3=Process(target=producer,args=('泔水',q))
25 
26     #消費者們:即吃貨們
27     c1=Process(target=consumer,args=(q,))
28     c2=Process(target=consumer,args=(q,))
29 
30     #開始
31     p1.start()
32     p2.start()
33     p3.start()
34     c1.start()
35 
36     p1.join() #必須保證生產者所有生產完畢,才應該發送結束信號
37     p2.join()
38     p3.join()
39     q.put(None) #有幾個消費者就應該發送幾回結束信號None
40     q.put(None) #發送結束信號
41     print('')
有幾個消費者就須要發送幾回結束信號:至關low

其實咱們的思路無非是發送結束信號而已,有另一種隊列提供了這種機制

1 #JoinableQueue([maxsize]):這就像是一個Queue對象,但隊列容許項目的使用者通知生成者項目已經被成功處理。通知進程是使用共享的信號和條件變量來實現的。
2 
3    #參數介紹:
4     maxsize是隊列中容許最大項數,省略則無大小限制。    
5   #方法介紹:
6     JoinableQueue的實例p除了與Queue對象相同的方法以外還具備:
7     q.task_done():使用者使用此方法發出信號,表示q.get()的返回項目已經被處理。若是調用此方法的次數大於從隊列中刪除項目的數量,將引起ValueError異常
8     q.join():生產者調用此方法進行阻塞,直到隊列中全部的項目均被處理。阻塞將持續到隊列中的每一個項目均調用q.task_done()方法爲止
 1 from multiprocessing import Process,JoinableQueue
 2 import time,random,os
 3 def consumer(q):
 4     while True:
 5         res=q.get()
 6         time.sleep(random.randint(1,3))
 7         print('\033[45m%s 吃 %s\033[0m' %(os.getpid(),res))
 8 
 9         q.task_done() #向q.join()發送一次信號,證實一個數據已經被取走了
10 
11 def producer(name,q):
12     for i in range(10):
13         time.sleep(random.randint(1,3))
14         res='%s%s' %(name,i)
15         q.put(res)
16         print('\033[44m%s 生產了 %s\033[0m' %(os.getpid(),res))
17     q.join()
18 
19 
20 if __name__ == '__main__':
21     q=JoinableQueue()
22     #生產者們:即廚師們
23     p1=Process(target=producer,args=('包子',q))
24     p2=Process(target=producer,args=('骨頭',q))
25     p3=Process(target=producer,args=('泔水',q))
26 
27     #消費者們:即吃貨們
28     c1=Process(target=consumer,args=(q,))
29     c2=Process(target=consumer,args=(q,))
30     c1.daemon=True
31     c2.daemon=True
32 
33     #開始
34     p_l=[p1,p2,p3,c1,c2]
35     for p in p_l:
36         p.start()
37 
38     p1.join()
39     p2.join()
40     p3.join()
41     print('') 
42     
43     #主進程等--->p1,p2,p3等---->c1,c2
44     #p1,p2,p3結束了,證實c1,c2確定全都收完了p1,p2,p3發到隊列的數據
45     #於是c1,c2也沒有存在的價值了,應該隨着主進程的結束而結束,因此設置成守護進程
View Code

 

七 管道

進程間通訊(IPC)方式二:管道(不推薦使用,瞭解便可)

 1 #建立管道的類:
 2 Pipe([duplex]):在進程之間建立一條管道,並返回元組(conn1,conn2),其中conn1,conn2表示管道兩端的鏈接對象,強調一點:必須在產生Process對象以前產生管道
 3 #參數介紹:
 4 dumplex:默認管道是全雙工的,若是將duplex射成False,conn1只能用於接收,conn2只能用於發送。
 5 #主要方法:
 6     conn1.recv():接收conn2.send(obj)發送的對象。若是沒有消息可接收,recv方法會一直阻塞。若是鏈接的另一端已經關閉,那麼recv方法會拋出EOFError。
 7     conn1.send(obj):經過鏈接發送對象。obj是與序列化兼容的任意對象
 8  #其餘方法:
 9 conn1.close():關閉鏈接。若是conn1被垃圾回收,將自動調用此方法
10 conn1.fileno():返回鏈接使用的整數文件描述符
11 conn1.poll([timeout]):若是鏈接上的數據可用,返回True。timeout指定等待的最長時限。若是省略此參數,方法將當即返回結果。若是將timeout射成None,操做將無限期地等待數據到達。
12  
13 conn1.recv_bytes([maxlength]):接收c.send_bytes()方法發送的一條完整的字節消息。maxlength指定要接收的最大字節數。若是進入的消息,超過了這個最大值,將引起IOError異常,而且在鏈接上沒法進行進一步讀取。若是鏈接的另一端已經關閉,不再存在任何數據,將引起EOFError異常。
14 conn.send_bytes(buffer [, offset [, size]]):經過鏈接發送字節數據緩衝區,buffer是支持緩衝區接口的任意對象,offset是緩衝區中的字節偏移量,而size是要發送字節數。結果數據以單條消息的形式發出,而後調用c.recv_bytes()函數進行接收    
15  
16 conn1.recv_bytes_into(buffer [, offset]):接收一條完整的字節消息,並把它保存在buffer對象中,該對象支持可寫入的緩衝區接口(即bytearray對象或相似的對象)。offset指定緩衝區中放置消息處的字節位移。返回值是收到的字節數。若是消息長度大於可用的緩衝區空間,將引起BufferTooShort異常。
介紹
 1 from multiprocessing import Process,Pipe
 2 
 3 import time,os
 4 def consumer(p,name):
 5     left,right=p
 6     left.close()
 7     while True:
 8         try:
 9             baozi=right.recv()
10             print('%s 收到包子:%s' %(name,baozi))
11         except EOFError:
12             right.close()
13             break
14 def producer(seq,p):
15     left,right=p
16     right.close()
17     for i in seq:
18         left.send(i)
19         # time.sleep(1)
20     else:
21         left.close()
22 if __name__ == '__main__':
23     left,right=Pipe()
24 
25     c1=Process(target=consumer,args=((left,right),'c1'))
26     c1.start()
27 
28 
29     seq=(i for i in range(10))
30     producer(seq,(left,right))
31 
32     right.close()
33     left.close()
34 
35     c1.join()
36     print('主進程')
基於管道實現進程間通訊(與隊列的方式是相似的,隊列就是管道加鎖實現的)

注意:生產者和消費者都沒有使用管道的某個端點,就應該將其關閉,如在生產者中關閉管道的右端,在消費者中關閉管道的左端。若是忘記執行這些步驟,程序可能再消費者中的recv()操做上掛起。管道是由操做系統進行引用計數的,必須在全部進程中關閉管道後才能生產EOFError異常。所以在生產者中關閉管道不會有任何效果,付費消費者中也關閉了相同的管道端點。

 1 from multiprocessing import Process,Pipe
 2 
 3 import time,os
 4 def adder(p,name):
 5     server,client=p
 6     client.close()
 7     while True:
 8         try:
 9             x,y=server.recv()
10         except EOFError:
11             server.close()
12             break
13         res=x+y
14         server.send(res)
15     print('server done')
16 if __name__ == '__main__':
17     server,client=Pipe()
18 
19     c1=Process(target=adder,args=((server,client),'c1'))
20     c1.start()
21 
22     server.close()
23 
24     client.send((10,20))
25     print(client.recv())
26     client.close()
27 
28     c1.join()
29     print('主進程')
30 #注意:send()和recv()方法使用pickle模塊對對象進行序列化。
管道能夠用於雙向通訊,利用一般在客戶端/服務器中使用的請求/響應模型或遠程過程調用,就可使用管道編寫與進程交互的程序

 

八 共享數據

展望將來,基於消息傳遞的併發編程是大勢所趨

即使是使用線程,推薦作法也是將程序設計爲大量獨立的線程集合

經過消息隊列交換數據。這樣極大地減小了對使用鎖定和其餘同步手段的需求,

還能夠擴展到分佈式系統中

進程間通訊應該儘可能避免使用本節所講的共享數據的方式

1 進程間數據是獨立的,能夠藉助於隊列或管道實現通訊,兩者都是基於消息傳遞的
2 
3 雖然進程間數據獨立,但能夠經過Manager實現數據共享,事實上Manager的功能遠不止於此
4 
5 A manager object returned by Manager() controls a server process which holds Python objects and allows other processes to manipulate them using proxies.
6 
7 A manager returned by Manager() will support types list, dict, Namespace, Lock, RLock, Semaphore, BoundedSemaphore, Condition, Event, Barrier, Queue, Value and Array. For example,
 1 from multiprocessing import Manager,Process,Lock
 2 import os
 3 def work(d,lock):
 4     # with lock: #不加鎖而操做共享的數據,確定會出現數據錯亂
 5         d['count']-=1
 6 
 7 if __name__ == '__main__':
 8     lock=Lock()
 9     with Manager() as m:
10         dic=m.dict({'count':100})
11         p_l=[]
12         for i in range(100):
13             p=Process(target=work,args=(dic,lock))
14             p_l.append(p)
15             p.start()
16         for p in p_l:
17             p.join()
18         print(dic)
19         #{'count': 94}
進程之間操做共享的數據

 

九 信號量

 1 互斥鎖 同時只容許一個線程更改數據,而Semaphore是同時容許必定數量的線程更改數據 ,好比廁全部3個坑,那最多隻容許3我的上廁所,後面的人只能等裏面有人出來了才能再進去,若是指定信號量爲3,那麼來一我的得到一把鎖,計數加1,當計數等於3時,後面的人均須要等待。一旦釋放,就有人能夠得到一把鎖
 2 
 3     信號量與進程池的概念很像,可是要區分開,信號量涉及到加鎖的概念
 4 
 5 from multiprocessing import Process,Semaphore
 6 import time,random
 7 
 8 def go_wc(sem,user):
 9     sem.acquire()
10     print('%s 佔到一個茅坑' %user)
11     time.sleep(random.randint(0,3)) #模擬每一個人拉屎速度不同,0表明有的人蹲下就起來了
12     sem.release()
13 
14 if __name__ == '__main__':
15     sem=Semaphore(5)
16     p_l=[]
17     for i in range(13):
18         p=Process(target=go_wc,args=(sem,'user%s' %i,))
19         p.start()
20         p_l.append(p)
21 
22     for i in p_l:
23         i.join()
24     print('============》')
信號量Semahpore(同線程同樣)

 

十 事件

 1 python線程的事件用於主線程控制其餘線程的執行,事件主要提供了三個方法 set、wait、clear。
 2 
 3     事件處理的機制:全局定義了一個「Flag」,若是「Flag」值爲 False,那麼當程序執行 event.wait 方法時就會阻塞,若是「Flag」值爲True,那麼event.wait 方法時便再也不阻塞。
 4 
 5 clear:將「Flag」設置爲False
 6 set:將「Flag」設置爲True
 7  
 8 
 9 #_*_coding:utf-8_*_
10 #!/usr/bin/env python
11 
12 from multiprocessing import Process,Event
13 import time,random
14 
15 def car(e,n):
16     while True:
17         if not e.is_set(): #Flase
18             print('\033[31m紅燈亮\033[0m,car%s等着' %n)
19             e.wait()
20             print('\033[32m車%s 看見綠燈亮了\033[0m' %n)
21             time.sleep(random.randint(3,6))
22             if not e.is_set():
23                 continue
24             print('走你,car', n)
25             break
26 
27 def police_car(e,n):
28     while True:
29         if not e.is_set():
30             print('\033[31m紅燈亮\033[0m,car%s等着' % n)
31             e.wait(1)
32             print('燈的是%s,警車走了,car %s' %(e.is_set(),n))
33             break
34 
35 def traffic_lights(e,inverval):
36     while True:
37         time.sleep(inverval)
38         if e.is_set():
39             e.clear() #e.is_set() ---->False
40         else:
41             e.set()
42 
43 if __name__ == '__main__':
44     e=Event()
45     # for i in range(10):
46     #     p=Process(target=car,args=(e,i,))
47     #     p.start()
48 
49     for i in range(5):
50         p = Process(target=police_car, args=(e, i,))
51         p.start()
52     t=Process(target=traffic_lights,args=(e,10))
53     t.start()
54 
55     print('============》')
Event(同線程同樣)

 

十一 進程池

在利用Python進行系統管理的時候,特別是同時操做多個文件目錄,或者遠程控制多臺主機,並行操做能夠節約大量的時間。多進程是實現併發的手段之一,須要注意的問題是:

  1. 很明顯須要併發執行的任務一般要遠大於核數
  2. 一個操做系統不可能無限開啓進程,一般有幾個核就開幾個進程
  3. 進程開啓過多,效率反而會降低(開啓進程是須要佔用系統資源的,並且開啓多餘核數目的進程也沒法作到並行)

例如當被操做對象數目不大時,能夠直接利用multiprocessing中的Process動態成生多個進程,十幾個還好,但若是是上百個,上千個。。。手動的去限制進程數量卻又太過繁瑣,此時能夠發揮進程池的功效。

咱們就能夠經過維護一個進程池來控制進程數目,好比httpd的進程模式,規定最小進程數和最大進程數... 
ps:對於遠程過程調用的高級應用程序而言,應該使用進程池,Pool能夠提供指定數量的進程,供用戶調用,當有新的請求提交到pool中時,若是池尚未滿,那麼就會建立一個新的進程用來執行該請求;但若是池中的進程數已經達到規定最大值,那麼該請求就會等待,直到池中有進程結束,就重用進程池中的進程。

    建立進程池的類:若是指定numprocess爲3,則進程池會從無到有建立三個進程,而後自始至終使用這三個進程去執行全部任務,不會開啓其餘進程

#  Pool([numprocess  [,initializer [, initargs]]]):建立進程池 

  參數介紹:

1 numprocess:要建立的進程數,若是省略,將默認使用cpu_count()的值
2 initializer:是每一個工做進程啓動時要執行的可調用對象,默認爲None
3 initargs:是要傳給initializer的參數組

方法介紹:

    主要方法
p.apply(func [, args [, kwargs]]):在一個池工做進程中執行func(*args,**kwargs),而後返回結果。須要強調的是:此操做並不會在全部池工做進程中並執行func函數。若是要經過不一樣參數併發地執行func函數,必須從不一樣線程調用p.apply()函數或者使用p.apply_async()
p.apply_async(func [, args [, kwargs]]):在一個池工做進程中執行func(*args,**kwargs),而後返回結果。此方法的結果是AsyncResult類的實例,callback是可調用對象,接收輸入參數。當func的結果變爲可用時,將理解傳遞給callback。callback禁止執行任何阻塞操做,不然將接收其餘異步操做中的結果。
   
p.close():關閉進程池,防止進一步操做。若是全部操做持續掛起,它們將在工做進程終止前完成
P.jion():等待全部工做進程退出。此方法只能在close()或teminate()以後調用

其餘方法(瞭解部分)

1 方法apply_async()和map_async()的返回值是AsyncResul的實例obj。實例具備如下方法
2 obj.get():返回結果,若是有必要則等待結果到達。timeout是可選的。若是在指定時間內尚未到達,將引起一場。若是遠程操做中引起了異常,它將在調用此方法時再次被引起。
3 obj.ready():若是調用完成,返回True
4 obj.successful():若是調用完成且沒有引起異常,返回True,若是在結果就緒以前調用此方法,引起異常
5 obj.wait([timeout]):等待結果變爲可用。
6 obj.terminate():當即終止全部工做進程,同時不執行任何清理或結束任何掛起工做。若是p被垃圾回收,將自動調用此函數

應用:

 1 from multiprocessing import Pool
 2 import os,time
 3 def work(n):
 4     print('%s run' %os.getpid())
 5     time.sleep(3)
 6     return n**2
 7 
 8 if __name__ == '__main__':
 9     p=Pool(3) #進程池中從無到有建立三個進程,之後一直是這三個進程在執行任務
10     res_l=[]
11     for i in range(10):
12         res=p.apply(work,args=(i,)) #同步調用,直到本次任務執行完畢拿到res,等待任務work執行的過程當中可能有阻塞也可能沒有阻塞,但無論該任務是否存在阻塞,同步調用都會在原地等着,只是等的過程當中如果任務發生了阻塞就會被奪走cpu的執行權限
13         res_l.append(res)
14     print(res_l)
同步調用apply
 1 from multiprocessing import Pool
 2 import os,time
 3 def work(n):
 4     print('%s run' %os.getpid())
 5     time.sleep(3)
 6     return n**2
 7 
 8 if __name__ == '__main__':
 9     p=Pool(3) #進程池中從無到有建立三個進程,之後一直是這三個進程在執行任務
10     res_l=[]
11     for i in range(10):
12         res=p.apply_async(work,args=(i,)) #同步運行,阻塞、直到本次任務執行完畢拿到res
13         res_l.append(res)
14 
15     #異步apply_async用法:若是使用異步提交的任務,主進程須要使用jion,等待進程池內任務都處理完,而後能夠用get收集結果,不然,主進程結束,進程池可能還沒來得及執行,也就跟着一塊兒結束了
16     p.close()
17     p.join()
18     for res in res_l:
19         print(res.get()) #使用get來獲取apply_aync的結果,若是是apply,則沒有get方法,由於apply是同步執行,馬上獲取結果,也根本無需get
異步調用apply_async
 1 #一:使用進程池(異步調用,apply_async)
 2 #coding: utf-8
 3 from multiprocessing import Process,Pool
 4 import time
 5 
 6 def func(msg):
 7     print( "msg:", msg)
 8     time.sleep(1)
 9     return msg
10 
11 if __name__ == "__main__":
12     pool = Pool(processes = 3)
13     res_l=[]
14     for i in range(10):
15         msg = "hello %d" %(i)
16         res=pool.apply_async(func, (msg, ))   #維持執行的進程總數爲processes,當一個進程執行完畢後會添加新的進程進去
17         res_l.append(res)
18     print("==============================>") #沒有後面的join,或get,則程序總體結束,進程池中的任務還沒來得及所有執行完也都跟着主進程一塊兒結束了
19 
20     pool.close() #關閉進程池,防止進一步操做。若是全部操做持續掛起,它們將在工做進程終止前完成
21     pool.join()   #調用join以前,先調用close函數,不然會出錯。執行完close後不會有新的進程加入到pool,join函數等待全部子進程結束
22 
23     print(res_l) #看到的是<multiprocessing.pool.ApplyResult object at 0x10357c4e0>對象組成的列表,而非最終的結果,但這一步是在join後執行的,證實結果已經計算完畢,剩下的事情就是調用每一個對象下的get方法去獲取結果
24     for i in res_l:
25         print(i.get()) #使用get來獲取apply_aync的結果,若是是apply,則沒有get方法,由於apply是同步執行,馬上獲取結果,也根本無需get
26 
27 #二:使用進程池(同步調用,apply)
28 #coding: utf-8
29 from multiprocessing import Process,Pool
30 import time
31 
32 def func(msg):
33     print( "msg:", msg)
34     time.sleep(0.1)
35     return msg
36 
37 if __name__ == "__main__":
38     pool = Pool(processes = 3)
39     res_l=[]
40     for i in range(10):
41         msg = "hello %d" %(i)
42         res=pool.apply(func, (msg, ))   #維持執行的進程總數爲processes,當一個進程執行完畢後會添加新的進程進去
43         res_l.append(res) #同步執行,即執行完一個拿到結果,再去執行另一個
44     print("==============================>")
45     pool.close()
46     pool.join()   #調用join以前,先調用close函數,不然會出錯。執行完close後不會有新的進程加入到pool,join函數等待全部子進程結束
47 
48     print(res_l) #看到的就是最終的結果組成的列表
49     for i in res_l: #apply是同步的,因此直接獲得結果,沒有get()方法
50         print(i)
詳解:apply_async與apply

回調函數:

須要回調函數的場景:進程池中任何一個任務一旦處理完了,就當即告知主進程:我好了額,你能夠處理個人結果了。主進程則調用一個函數去處理該結果,該函數即回調函數

咱們能夠把耗時間(阻塞)的任務放到進程池中,而後指定回調函數(主進程負責執行),這樣主進程在執行回調函數時就省去了I/O的過程,直接拿到的是任務的結果。

 1 from multiprocessing import Pool
 2 import requests
 3 import json
 4 import os
 5 
 6 def get_page(url):
 7     print('<進程%s> get %s' %(os.getpid(),url))
 8     respone=requests.get(url)
 9     if respone.status_code == 200:
10         return {'url':url,'text':respone.text}
11 
12 def pasrse_page(res):
13     print('<進程%s> parse %s' %(os.getpid(),res['url']))
14     parse_res='url:<%s> size:[%s]\n' %(res['url'],len(res['text']))
15     with open('db.txt','a') as f:
16         f.write(parse_res)
17 
18 
19 if __name__ == '__main__':
20     urls=[
21         'https://www.baidu.com',
22         'https://www.python.org',
23         'https://www.openstack.org',
24         'https://help.github.com/',
25         'http://www.sina.com.cn/'
26     ]
27 
28     p=Pool(3)
29     res_l=[]
30     for url in urls:
31         res=p.apply_async(get_page,args=(url,),callback=pasrse_page)
32         res_l.append(res)
33 
34     p.close()
35     p.join()
36     print([res.get() for res in res_l]) #拿到的是get_page的結果,其實徹底不必拿該結果,該結果已經傳給回調函數處理了
37 
38 '''
39 打印結果:
40 <進程3388> get https://www.baidu.com
41 <進程3389> get https://www.python.org
42 <進程3390> get https://www.openstack.org
43 <進程3388> get https://help.github.com/
44 <進程3387> parse https://www.baidu.com
45 <進程3389> get http://www.sina.com.cn/
46 <進程3387> parse https://www.python.org
47 <進程3387> parse https://help.github.com/
48 <進程3387> parse http://www.sina.com.cn/
49 <進程3387> parse https://www.openstack.org
50 [{'url': 'https://www.baidu.com', 'text': '<!DOCTYPE html>\r\n...',...}]
51 '''
View Code
 1 from multiprocessing import Pool
 2 import time,random
 3 import requests
 4 import re
 5 
 6 def get_page(url,pattern):
 7     response=requests.get(url)
 8     if response.status_code == 200:
 9         return (response.text,pattern)
10 
11 def parse_page(info):
12     page_content,pattern=info
13     res=re.findall(pattern,page_content)
14     for item in res:
15         dic={
16             'index':item[0],
17             'title':item[1],
18             'actor':item[2].strip()[3:],
19             'time':item[3][5:],
20             'score':item[4]+item[5]
21 
22         }
23         print(dic)
24 if __name__ == '__main__':
25     pattern1=re.compile(r'<dd>.*?board-index.*?>(\d+)<.*?title="(.*?)".*?star.*?>(.*?)<.*?releasetime.*?>(.*?)<.*?integer.*?>(.*?)<.*?fraction.*?>(.*?)<',re.S)
26 
27     url_dic={
28         'http://maoyan.com/board/7':pattern1,
29     }
30 
31     p=Pool()
32     res_l=[]
33     for url,pattern in url_dic.items():
34         res=p.apply_async(get_page,args=(url,pattern),callback=parse_page)
35         res_l.append(res)
36 
37     for i in res_l:
38         i.get()
39 
40     # res=requests.get('http://maoyan.com/board/7')
41     # print(re.findall(pattern,res.text))
爬蟲案例

若是在主進程中等待進程池中全部任務都執行完畢後,再統一處理結果,則無需回調函數

 1 from multiprocessing import Pool
 2 import time,random,os
 3 
 4 def work(n):
 5     time.sleep(1)
 6     return n**2
 7 if __name__ == '__main__':
 8     p=Pool()
 9 
10     res_l=[]
11     for i in range(10):
12         res=p.apply_async(work,args=(i,))
13         res_l.append(res)
14 
15     p.close()
16     p.join() #等待進程池中全部進程執行完畢
17 
18     nums=[]
19     for res in res_l:
20         nums.append(res.get()) #拿到全部結果
21     print(nums) #主進程拿到全部的處理結果,能夠在主進程中進行統一進行處理
View Code
相關文章
相關標籤/搜索