Python高級編程-多進程

要讓Python程序實現多進程(multiprocessing),咱們先了解操做系統的相關知識。python

Unix/Linux操做系統提供了一個fork()系統調用,它很是特殊。普通的函數調用,調用一次,返回一次,可是fork()調用一次,返回兩次,由於操做系統自動把當前進程(稱爲父進程)複製了一份(稱爲子進程),而後,分別在父進程和子進程內返回。服務器

子進程永遠返回0,而父進程返回子進程的ID。這樣作的理由是,一個父進程能夠fork出不少子進程,因此,父進程要記下每一個子進程的ID,而子進程只須要調用getppid()就能夠拿到父進程的ID。多線程

Python的os模塊封裝了常見的系統調用,其中就包括fork,能夠在Python程序中輕鬆建立子進程:app

 1 # /usr/bin/python
 2 import os
 3 import time
 4 
 5 
 6 def main():
 7         pid = os.fork() 
 8         if pid == 0: # 子進程中返回0
 9                 print("I am child process %d, my parent is %d" % (os.getpid(), os.getppid()))
10                 time.sleep(1) 
11         else: # 父進程中返回子進程id
12                 print("I %d just created child %d" % (os.getpid(), pid))
13                 time.sleep(1) # 防止父進程提早結束,子進程將由init進程接管,致使子進程中的os。getppid()輸出的進程id是1
14 
15 if __name__ == '__main__':
16         main()

程序運行結果:async

1 I 20981 just created child 20982
2 I am child process 20982, my parent is 20981

因爲Windows沒有fork調用,上面的代碼在Windows上沒法運行。在Linux,Mac,Unix上均可以運行函數

有了fork調用,一個進程在接到新任務時就能夠複製出一個子進程來處理新任務,常見的Apache服務器就是由父進程監聽端口,每當有新的http請求時,就fork出子進程來處理新的http請求。spa

 

multiprocessing

若是你打算編寫多進程的服務程序,Unix/Linux無疑是正確的選擇。因爲Windows沒有fork調用,難道在Windows上沒法用Python編寫多進程的程序?操作系統

因爲Python是跨平臺的,天然也應該提供一個跨平臺的多進程支持。multiprocessing模塊就是跨平臺版本的多進程模塊。線程

multiprocessing模塊提供了一個Process類來表明一個進程對象,下面的例子演示了啓動一個子進程並等待其結束:設計

(1)Process類:

 1 import multiprocessing
 2 import os
 3 import time
 4 
 5 
 6 # 子進程要執行的代碼
 7 def run_process(i):
 8     print("%d Child %s process run" % (i, multiprocessing.current_process().name, ))
 9     time.sleep(1)
10     print("%d Child %s process end" % (i, multiprocessing.current_process().name,))
11 
12 
13 def main():
14     print("Process %d run" % (os.getpid()))
15     p1 = multiprocessing.Process(target=run_process, args=(1,)) # 和多線程Thread類建立實例類似
16     p1.start()
17     p1.join() # 主進程等待子進程結束
18     print("Process %d stop" % (os.getpid()))
19 
20 if __name__ == '__main__':
21     main()

執行結果以下:

1 Process 22324 run
2 1 Child Process-1 process run
3 1 Child Process-1 process end
4 Process 22324 stop

建立子進程時,只須要傳入一個執行函數和函數的參數,建立一個Process實例,用start()方法啓動,這樣建立進程比fork()還要簡單。

join()方法能夠等待子進程結束後再繼續往下運行,一般用於進程間的同步。

(2)Pool類:

在使用Python進行系統管理時,特別是同時操做多個文件目錄或者遠程控制多臺主機,並行操做能夠節約大量的時間。若是操做的對象數目不大時,還能夠直接使用Process類動態的生成多個進程,十幾個還好,可是若是上百個甚至更多,那手動去限制進程數量就顯得特別的繁瑣,此時Pool類就派上用場了。 
Pool類能夠提供指定數量的進程供用戶調用,當有新的請求提交到Pool中時,若是池尚未滿,就會建立一個新的進程來執行請求。若是池滿,請求就會告知先等待,直到池中有進程結束,纔會建立新的進程來執行這些請求。

 1 import multiprocessing
 2 import os
 3 import time
 4 
 5 
 6 def run_process(i):
 7     print("%d Child %s process run at %s" % (i, multiprocessing.current_process().name, time.time()))
 8     time.sleep(1)
 9     print("%d Child %s process end" % (i, multiprocessing.current_process().name,))
10 
11 
12 def main():
13     print("Process %d run" % (os.getpid()))
14     p2 = multiprocessing.Pool(multiprocessing.cpu_count())
15     for i in range(5):
16         p2.apply_async(run_process, args=(i,))  # 該函數用於啓動進程,傳遞不定參數,主進程是非阻塞且支持結果返回進行回調。
17     p2.close()  # 關閉進程池(pool),使其不在接受新的任務。
18     p2.join()  # 主進程阻塞等待子進程的退出,join方法必須在close或terminate以後使用。
19     print("Process %d stop" % (os.getpid()))
20 
21 if __name__ == '__main__':
22     main()

運行結果:

 1 Process 29676 run
 2 0 Child SpawnPoolWorker-1 process run at 1487744098.910444
 3 1 Child SpawnPoolWorker-3 process run at 1487744098.931447
 4 2 Child SpawnPoolWorker-4 process run at 1487744098.936447
 5 3 Child SpawnPoolWorker-2 process run at 1487744098.96145
 6 0 Child SpawnPoolWorker-1 process end
 7 4 Child SpawnPoolWorker-1 process run at 1487744099.911545
 8 1 Child SpawnPoolWorker-3 process end
 9 2 Child SpawnPoolWorker-4 process end
10 3 Child SpawnPoolWorker-2 process end
11 4 Child SpawnPoolWorker-1 process end
12 Process 29676 stop

代碼解讀:

Pool對象調用join()方法會等待全部子進程執行完畢,調用join()以前必須先調用close(),調用close()以後就不能繼續添加新的Process了。

請注意輸出的結果,task 0123是馬上執行的,而task 4要等待前面某個task完成後才執行,這是由於Pool的默認大小在個人電腦上是4,所以,最多同時執行4個進程。這是Pool有意設計的限制,並非操做系統的限制。若是改爲:

 p2 = multiprocessing.Pool(5) 

就能夠同時跑5個進程。

因爲Pool的默認大小是CPU的核數,若是你的電腦擁有8核CPU,你要提交至少9個子進程才能看到上面的等待效果。

pool類有一個map方法:  def map(self, func, iterable, chunksize=None) 與內置的map函數用法行爲基本一致,它會使進程阻塞直到返回結果:

 1 def square(n):  # 計算平方值
 2     time.sleep(1)  #計算一次休眠1s
 3     print(n*n,time.time())
 4     return n*n
 5 
 6 
 7 def main():
 8     print("Process %d run" % (os.getpid()))
 9     number_list = [1, 2, 3, 4, 5, 6]
10     p2 = multiprocessing.Pool(multiprocessing.cpu_count())  # 本機4核CPU 
11     p2.map(square, number_list)
12     print("Process %d stop" % (os.getpid()))
13 
14 if __name__ == '__main__':
15     main()

運行結果:

1 Process 12264 run
2 1 1487744750.823629
3 4 1487744750.82863
4 9 1487744750.860633
5 16 1487744750.873634
6 25 1487744751.823729
7 36 1487744751.82873
8 Process 12264 stop

由於列表中共有6個元素,因爲本機CPU有四核,在4個進程內的map方法同時能夠對4個元素求平方,因此對於6個元素的列表,程序耗時2s。

因爲map方法會使主進程阻塞,直到子進程返回,咱們並無調用p2.join(),主進程仍是等待子進程結束

相關文章
相關標籤/搜索