multiprocessing並不是是python的一個模塊,而是python中多進程管理的一個包,在學習的時候能夠與threading這個模塊做類比,正如咱們在上一篇轉載的文章中所提,python的多線程並不能作到真正的並行處理,只能完成相對的併發處理,那麼咱們須要的就是python的多進程來完成並行處理,把全部的cpu資源都利用起來。multiprocessing的很大一部分與threading使用同一套API,只不過換到了多進程的環境。這裏面要注意,對於多進程來講,win32平臺和unix平臺差異很大,咱們最好在linux上完成實現。html
使用這些共享API時,咱們應該注意如下問題(目前這是我能想到的,之後遇到再擴充):python
一、對join的處理linux
根據Unix環境高級編程中對進程控制一章的描述,當某個進程fork一個子進程後,該進程必需要調用wait等待子進程結束髮送的sigchld信號,對子進程進行資源回收等相關工做,不然,子進程會成爲僵死進程,被init收養。因此,在multiprocessing.Process實例化一個對象以後,該對象有必要調用join方法,由於在join方法中完成了對底層wait的處理,源碼以下:web
def join(self, timeout=None): ''' Wait until child process terminates ''' assert self._parent_pid == os.getpid(), 'can only join a child process' assert self._popen is not None, 'can only join a started process' res = self._popen.wait(timeout) if res is not None: _current_process._children.discard(self)
不過,調用該方法,要注意join的位置(threading模塊有提到),是在每一個子進程中阻塞仍是在父進程中阻塞,若是在子進程中阻塞可能達不到並行處理的目的,因此要根據具體需求。而對於多線程來講,因爲只有一個進程,全部子線程共享同一片內存,因此不是必需要進行join調用。例子以下:shell
#!/usr/bin/env python __author__ = 'webber' import os,time import multiprocessing # worker function def worker(sign, lock): lock.acquire() print sign, 'pid:',os.getpid() lock.release() time.sleep(1) # Main print 'Main:',os.getpid() plist = [] lock = multiprocessing.Lock() for j in range(5): p = multiprocessing.Process(target=worker,args=('process',lock)) p.start() plist.append(p) p.join() #for process in record: # process.join()
此外,還有一點關於GIL鎖的說明,在python多進程中,一樣須要全局解釋器鎖,由於每一個子進程都有一把GIL,那麼當它們向stdout輸出時,能夠同時爭搶stdout資源,致使在每一個子進程輸出時會把不一樣子進程的輸出字符混合在一塊兒,沒法閱讀,影響代碼的標誌位判斷,因此上例子中使用了Lock同步,在一個子進程輸出完成以後再容許另外一個子進程獲得stdout的權限,這樣避免了多個任務同時向終端輸出。編程
二、對IPC的處理服務器
multiprocessing包與threading模塊的另外一個差別特性體如今IPC上,python的multiprocessing包自帶了對Pipe和Queue的管理,效率上更高,而threading模塊須要與Queue模塊或os.popen()、subprocess.Popen()等配合使用。
根據Unix環境高級編程的第15章進程間通訊的描述,經典的IPC包括管道、FIFO、消息隊列、信號量、以及共享存儲。不過應用最多的仍是管道。書中指出咱們應該把管道當作是半雙工的,而且只能在具備公共祖先的兩個進程之間使用。
下面咱們用一下Pipe()和Queue()方法:網絡
a、關於Pipe()多線程
對照書中給出的底層pipe接口函數,咱們看到Pipe方法在Unix平臺上實現源碼以下:併發
def Pipe(duplex=True): ''' Returns pair of connection objects at either end of a pipe ''' if duplex: s1, s2 = socket.socketpair() s1.setblocking(True) s2.setblocking(True) c1 = _multiprocessing.Connection(os.dup(s1.fileno())) c2 = _multiprocessing.Connection(os.dup(s2.fileno())) s1.close() s2.close() else: fd1, fd2 = os.pipe() c1 = _multiprocessing.Connection(fd1, writable=False) c2 = _multiprocessing.Connection(fd2, readable=False) return c1, c2
首先,Pipe能夠是單向(half-duplex),也能夠是雙向的(duplex),默認爲雙向的。咱們能夠經過multiprocessing.Pipe(duplex=False)建立單向的管道。該方法返回一個元祖,包含兩個文件描述符,若是爲單向的,則爲(read-only connection,write-only connection);若是爲雙向的,則爲(read-write Connection, read-write Connection)。一個進程從Pipe一端輸入對象(fd[1]),而後被Pipe另外一端的進程接收(fd[0]),兩個進程要有同一個父進程或者其中一個是父進程。單向管道只容許管道一端的進程輸入,而雙向管道則容許從兩端輸入。這裏的雙向管道相似於書中提到的「協同進程」的概念。
例如:
#!/usr/bin/env python import multiprocessing as mul def proc1(pipe): # pipe.send('hello') print 'proc1 rec:',pipe.recv() def proc2(pipe): # print 'proc2 rec:',pipe.recv() pipe.send('hello too') pipe = mul.Pipe(duplex=False) #pipe = mul.Pipe() p1 = mul.Process(target=proc1,args=(pipe[0],)) # 讀管道 p2 = mul.Process(target=proc2,args=(pipe[1],)) # 寫管道 # 因爲管道是單向的,對象pipe[0]只有讀的權限(recv),而pipe[1]只有寫的權限(send)。 #print pipe p1.start() p2.start() p1.join() p2.join()
b、關於Queue()
Queue與Pipe相相似,都是先進先出的結構,但Queue容許多個進程放入,多個進程從隊列取出對象。這裏能夠與Queue模塊相類比學習。Queue方法實際上是Unix環境高級編程IPC中FIFO命名管道的實現方法。FIFO可用於有如下兩種狀況:
---shell命令使用FIFO將數據從一條管道傳送到另外一條時,無需建立中間臨時文件。
---客戶進程-服務器進程應用程序中,FIFO用做匯聚點,在客戶進程和服務器進程兩者之間傳遞數據。
如下就FIFO的第二種狀況寫一個python例子:
#!/usr/bin/env python # -*- coding:utf-8 -*- import multiprocessing import time import os # 客戶進程,向衆所周知的FIFO服務器進程發送請求 def client_proc(queue,msg): request = 'I am client ' + str(msg) + ' pid: '+ str(os.getpid()) + ' time:' + str(time.time()) # 注意信息的格式,都統一爲字符串類型 queue.put(request) def server_proc(queue,lock): msg = queue.get() lock.acquire() print msg + '--------------->I am server ' + 'pid: ' + str(os.getpid()) lock.release() plist_cli = [] plist_ser = [] lock = multiprocessing.Lock() queue = multiprocessing.Queue() # 參數爲空,默認爲隊列可無限長 for i in range(10): p1 = multiprocessing.Process(target=client_proc,args=(queue,i)) p2 = multiprocessing.Process(target=server_proc,args=(queue,lock)) p1.start() p2.start() plist_cli.append(p1) plist_ser.append(p2) for proc in plist_cli: proc.join() for proc in plist_ser: proc.join() queue.close()
輸出以下:
I am client 2 pid: 9867 time:1482489226.77--------------->I am server pid: 9879
I am client 0 pid: 9865 time:1482489226.77--------------->I am server pid: 9881
I am client 4 pid: 9869 time:1482489226.77--------------->I am server pid: 9884
I am client 1 pid: 9866 time:1482489226.77--------------->I am server pid: 9886
I am client 3 pid: 9868 time:1482489226.78--------------->I am server pid: 9888
I am client 7 pid: 9872 time:1482489226.78--------------->I am server pid: 9889
I am client 5 pid: 9870 time:1482489226.78--------------->I am server pid: 9892
I am client 6 pid: 9871 time:1482489226.78--------------->I am server pid: 9891
I am client 9 pid: 9878 time:1482489226.78--------------->I am server pid: 9893
I am client 8 pid: 9875 time:1482489226.78--------------->I am server pid: 9894
從輸出能夠看出,10個客戶端進程把生產信息放入隊列,10個服務端進程從隊列取出信息而且打印,從打印時間和msg的子進程編號來看,10個服務端進程爭奪stdout,經過Lock使它們有序輸出,不至於輸出信息混亂,msg編號沒有從0排至9正是由於它們被分配給了不一樣的cpu資源,不一樣cpu資源在處理速度上不會徹底同樣,因此爭奪stdout的能力也不一樣。
三、共享內存和Manager管理
衆所周知,在處理多進程時,每一個進程都有本身獨立的內存空間,因此在多進程環境中咱們應該儘可能避免共享資源,不然要依賴與IPC。python的多進程除了上面提到的經常使用的依賴於管道和FIFO以外,還能夠經過共享內存和Manager的方法來共享資源。這個不經常使用,因爲共享內存涉及同步的問題,會下降程序的效率而不推薦使用。之後涉及到再擴展。
四、進程池
參考博客:http://www.cnblogs.com/kaituorensheng/p/4465768.html
當咱們在編寫網絡服務端時,Unix網絡編程一書中提到服務端須要fork子進程,用子進程來處理監聽到的鏈接請求,創建鏈接套接字,並在子進程中關閉監聽套接字,父進程中關閉鏈接套接字。那麼,當鏈接的併發不是很大時,咱們能夠利用進程池的方式來處理到來的鏈接。multiprocessing.Pool能夠提供指定數量的進程供用戶調用,當有新的請求提交到pool中時,若是進程池尚未滿,那麼就會建立一個新的進程用來執行該請求;若是池中的進程數已經達到最大值,那麼該請求將會阻塞等待,直到池中有進程結束,纔會建立新的進程來處理該請求。
Pool方法默認的初始值以下:
def __init__(self, processes=None, initializer=None, initargs=(),maxtasksperchild=None)
一般,咱們應該指定進程池的大小,若是不指定,默認爲cpu的個數,即processes=cpu_count(),咱們能夠用該模塊自帶的方法查看本機的cpu個數,
print multiprocessing.cpu_count()。下面看個進程池的例子:
#!/usr/bin/env python # -*- coding:utf-8 -*- import multiprocessing import time def func(msg): print 'msg:',msg time.sleep(3) print 'end' pool = multiprocessing.Pool(processes=3) for i in xrange(4): msg = 'hello %d' % (i) pool.apply_async(func,(msg,)) #非阻塞 # pool.apply(func,(msg,)) #阻塞,apply()源自內建函數,用於間接的調用函數,而且按位置把元祖或字典做爲參數傳入。 # pool.imap(func,[msg,]) #非阻塞, 注意與apply傳的參數的區別 # pool.map(func,[msg,]) #阻塞 print 'Mark~~~~~~~~~~~~~~~' pool.close() pool.join() # 調用join以前,先調用close函數,不然會出錯。執行完close後不會有新的進程加入到pool,join函數等待全部子進程結束 print 'sub-process done'
注意apply_async和apply的差異,此外,進程池請求函數處理還能夠用map,imap,注意傳遞參數的區別。
#!/usr/bin/env python # -*- coding:utf-8 -*- import multiprocessing import time def func(msg): print 'msg:',msg time.sleep(3) print 'end' pool = multiprocessing.Pool(50) msg = range(50) #pool.imap(func,msg) #非阻塞, 注意與apply傳的參數的區別 pool.map(func,msg) #阻塞 print 'Mark~~~~~~~~~~~~~~~' pool.close() pool.join() print 'sub-process done'
此外,若是子進程的處理函數中包含返回值,咱們能夠在父進程中對子進程調用get方法,將返回值取出,這裏注意,要調用get方法的時候,進程池必須採用apply_async調用函數。例如:
if __name__ == "__main__": pool = multiprocessing.Pool(processes=4) result = [] for i in xrange(3): msg = "hello %d" %(i) result.append(pool.apply_async(func, (msg, ))) pool.close() pool.join() for res in result: print ":::", res.get() print "Sub-process(es) done."
最後,調用close()以後,進程池再也不建立新的進程;
調用join()以後,wait進程池中的所有進程。必須對Pool先調用close()方法才能join。
參考博客:http://www.lxway.com/4488626156.htm