說明:本文是基於Py2.X環境,3.X在我電腦上出了些問題。二者差異並不大。安全
Python實現多進程的方式主要有兩種:一種方法是使用os模塊中的fork方法; 另外一種是使用multiprocessing模塊。這兩種方法的區別在於前者僅適用於Unix/Linux操做操做。對win是不支持的,然後者則是跨平臺的實現方式。app
Unix/Linux操做系統提供了一個fork()系統調用,它很是特殊。普通的函數調用,調用一次,返回一次,可是fork()調用一次,返回兩次,由於操做系統自動把當前進程(稱爲父進程)複製了一份(稱爲子進程),而後,分別在父進程和子進程內返回。dom
子進程永遠返回0,而父進程返回子進程的ID。這樣作的理由是,一個父進程能夠fork出不少子進程,因此,父進程要記下每一個子進程的ID,而子進程只須要調用getppid()就能夠拿到父進程的ID。async
Python的os模塊封裝了常見的系統調用,其中就包括fork,能夠在Python程序中輕鬆建立子進程:ide
import os print 'Process (%s) start...' % os.getpid() pid = os.fork() if pid == 0: print 'I am child process (%s) and my parent is %s.' % (os.getpid(), os.getppid()) else: print 'I (%s) just created a child process (%s).' % (os.getpid(), pid) 獲得: Process (2450) start... I (2450) just created a child process (2451). I am child process (2451) and my parent is 2450.
multiprocessing模塊提供了一個Process類來描述一個進程對象,建立子進程時,只須要傳入一個執行函數和函數的參數便可完成一個Process實例的建立,用start()方法啓動進程,用join()方法實現進程間的同步。join()方法能夠等待子進程結束後再繼續往下運行,一般用於進程間的同步。函數
# -*- coding:utf-8 -*- from multiprocessing import Process import os # 子進程要執行的代碼 def run_proc(name): print 'Run child process %s (%s)...' % (name, os.getpid()) if __name__ == '__main__': print 'Parent process %s.' % os.getpid() p = Process(target=run_proc, args=('test',)) print 'Process will start.' p.start() p.join() print 'Process end.' 獲得: Parent process 2533. Process will start. Run child process test (2534)... Process end.
####multiprocessing模塊提供了一個pool類來表明進程池對象url
Pool能夠提供指定數量的進程供用戶調用,默認大小是cpu的核數,當有新的請求提交到pool中時,若是池尚未滿,那麼就會建立一個新的進程用來執行該請求,但若是池的進程數已經達到規定最大值,那麼該請求就會等待,直到池中有進程結束纔會建立新的進程來處理它。操作系統
# -*- coding:utf-8 -*- from multiprocessing import Pool import os, time, random def long_time_task(name): print 'Run task %s (%s)...' % (name, os.getpid()) start = time.time() time.sleep(random.random() * 3) end = time.time() print 'Task %s runs %0.2f seconds.' % (name, (end - start)) if __name__ == '__main__': print 'Parent process %s.' % os.getpid() p = Pool() for i in range(5): p.apply_async(long_time_task, args=(i,)) print 'Waiting for all subprocesses done...' p.close() p.join() print 'All subprocesses done.' 獲得: Parent process 2541. Waiting for all subprocesses done... Run task 0 (2543)... Run task 1 (2544)... Run task 2 (2545)... Run task 3 (2546)... Task 0 runs 0.02 seconds. Run task 4 (2543)... Task 2 runs 0.60 seconds. Task 4 runs 1.18 seconds. Task 3 runs 1.26 seconds. Task 1 runs 1.66 seconds. All subprocesses done.
對Pool對象調用join()方法會等待全部子進程執行完畢,調用join()以前必須先調用close(),調用close()以後就不能繼續添加新的Process了。code
Process之間確定是須要通訊的,操做系統提供了不少機制來實現進程間的通訊。Python的multiprocessing模塊包裝了底層的機制,提供了Queue、Pipes等多種方式來交換數據。二者的區別在於Pipe經常使用於兩個進程間的通信而Queue用於多個進程間實現通信。對象
Queue是多進程安全的隊列,可使用Queue實現多進程之間的數據傳輸,有兩個方法:put和get進行Queue操做。
put方法用以插入數據隊列中它能夠有兩個可選參數:blocked和timeout,若是blocked爲True(默認值)而且timeout是正值,該方法會阻塞timeout指定的時間,直到該隊列有剩餘空間,若是超時,會拋出Queue.Full異常,若是blocked爲False,但該Queue已滿,則會當即拋出Queue.Full異常。
# -*- coding:utf-8 -*- from multiprocessing import Process, Queue import os, time, random # 寫數據進程執行的代碼: def write(q): for value in ['A', 'B', 'C']: print 'Put %s to queue...' % value q.put(value) time.sleep(random.random()) # 讀數據進程執行的代碼: def read(q): while True: value = q.get(True) print 'Get %s from queue.' % value if __name__ == '__main__': # 父進程建立Queue,並傳給各個子進程: q = Queue() pw = Process(target=write, args=(q,)) pr = Process(target=read, args=(q,)) # 啓動子進程pw,寫入: pw.start() # 啓動子進程pr,讀取: pr.start() # 等待pw結束: pw.join() # pr進程裏是死循環,沒法等待其結束,只能強行終止: pr.terminate() 獲得: Put A to queue... Get A from queue. Put B to queue... Get B from queue. Put C to queue... Get C from queue.
Pipe經常使用來在兩個進程間進行通訊,兩個進程分別位於管道的兩端。
Pipe方法返回(conn1,conn2)表明一個管道的兩個端,Pipe方法有duplex參數,若是duplex參數爲True(默認值),那麼這個管道是全雙工模式,也就是說conn1和conn2都可收發,若duplex爲False,conn1只負責接收消息,conn2只負責發送消息。send和recv方法分別是發送和接收消息的方法。例如,在全雙工模式下,能夠調用conn1.send發送消息,conn1.recv接收消息。若是沒有消息可接收,recv方法會一直阻塞。若是管道已經被關閉,那麼recv方法會拋出EOFError.
import multiprocessing import random import time, os def proc_send(pipe, urls): for url in urls: print "process(%s) send:%s" % (os.getpid(), url) pipe.send(url) time.sleep(random.random()) def proc_recv(pipe): while True: print "Process(%s) rev:%s" % (os.getpid(), pipe.recv()) time.sleep(random.random()) if __name__ == "__main__": pipe = multiprocessing.Pipe() p1 = multiprocessing.Process(target=proc_send,args=(pipe[0],['url_'+str(i) for i in range(10)])) p2 = multiprocessing.Process(target=proc_recv,args=(pipe[1],)) p1.start() p2.start() p1.join() p2.join() 獲得: process(1134) send:url_0 Process(1135) rev:url_0 process(1134) send:url_1 Process(1135) rev:url_1 process(1134) send:url_2 Process(1135) rev:url_2 process(1134) send:url_3 Process(1135) rev:url_3 process(1134) send:url_4 Process(1135) rev:url_4 process(1134) send:url_5 Process(1135) rev:url_5 process(1134) send:url_6 Process(1135) rev:url_6 process(1134) send:url_7 Process(1135) rev:url_7 process(1134) send:url_8 Process(1135) rev:url_8 process(1134) send:url_9 Process(1135) rev:url_9