Python中的多進程與多線程(一)

1、背景

  最近在Azkaban的測試工做中,須要在測試環境下模擬線上的調度場景進行穩定性測試。故而重操python舊業,經過python編寫腳原本構造相似線上的調度場景。在腳本編寫過程當中,碰到這樣一個需求:要在測試環境建立10000個做業流。前端

  最開始的想法是在一個azkaban project下循環調用10000次create job接口(每一個Flow只包含一個job)。因爲azkaban它自己沒有增長/刪除做業流的接口,全部的做業流修改、增長、刪除其實都是經過從新上傳項目zip包實現的,相應地每次調猛獁前端的create job接口,其實是在猛獁端對zip包的內容進行了從新的整合後再從新上傳zip包到azkaban,整個過程能夠拆解成以下過程:解壓zip包得到zip包內容,變動zip包內的文件內容,從新打包zip包,上傳到azkaban。所以,隨着循環次數越日後,zip包包含的內容會越多,接口執行一次的時間就越長。實踐發現,第一次調該接口的時間大體不到1秒,到循環1000次的時候接口調用一次的時間就達到了將近3秒。所以,若是期望一個循環10000次來構造該場景,顯然要耗費巨大的時間。python

  在此背景下, 天然而然地就想到用多進程/多線程的方式來處理該問題。編程

2、「多任務」的操做系統基礎

  你們都知道,操做系統能夠同時運行多個任務。好比你一邊聽音樂,一邊聊IM,一邊寫博客等。如今的cpu大都是多核的,但即便是過去的單核cpu也是支持多任務並行執行。瀏覽器

  單核cpu執行多任務的原理:操做系統交替輪流地執行各個任務。先讓任務1執行0.01秒,而後切換到任務2執行0.01秒,再切換到任務3執行0.01秒...這樣往復地執行下去。因爲cpu的執行速度很是快,因此使用者的主觀感覺就是這些任務在並行地執行。數據結構

  多核cpu執行多任務的原理:因爲實際應用中,任務的數量每每遠超過cpu的核數,因此操做系統其實是把這些多任務輪流地調度到每一個核心上執行。多線程

  對於操做系統來講,一個應用就是一個進程。好比打開一個瀏覽器,它是一個進程;打開一個記事本,它是一個進程。每一個進程有它特定的進程號。他們共享系統的內存資源。進程是操做系統分配資源的最小單位併發

  而對於每個進程而言,好比一個視頻播放器,它必須同時播放視頻和音頻,就至少須要同時運行兩個「子任務」,進程內的這些子任務就是經過線程來完成。線程是最小的執行單元。一個進程它能夠包含多個線程,這些線程相互獨立,同時又共享進程所擁有的資源。app

3、Python多進程編程

  1. multiprocessing

  multiprocessing是Python提供的一個跨平臺的多進程模塊,經過它能夠很方便地編寫多進程程序,在不一樣的平臺(Unix/Linux, Windows)均可以執行。dom

  下面就是使用multiprocessing編寫多進程程序的代碼:  async

#!/usr/bin/python
# -*- coding: utf-8 -*
__author__ = 'zni.feng'
import  sys
reload (sys)
sys.setdefaultencoding('utf-8')

from multiprocessing import Process
import os
import time

#子進程fun
def child_projcess_fun(name):
    print 'Child process %s with processId %s starts.' % (name, os.getpid())
    time.sleep(3)
    print 'Child process %s with processId %s ends.' % (name, os.getpid())

if __name__ == "__main__":
    print 'Parent processId is: %s.' % os.getpid()
    p = Process(target = child_projcess_fun, args=('zni',))
    print 'Process starts'
    p.start() #開始進程
    p.join() #等待子進程結束後再繼續往下執行
    print 'Process ends.'

程序的輸出:

Parent processId is: 11076.
Process starts
Child process zni with processId 11077 starts.
Child process zni with processId 11077 ends.
Process ends.
[Finished in 3.1s]

  2. Pool 

  某些狀況下,咱們但願批量建立多個子進程,或者給定子進程數的上限,避免無限地消耗系統的資源。經過Pool(進程池)的方式,就能夠完成這項工做,下面是使用Pool的代碼:

 1 #!/usr/bin/python
 2 # -*- coding: utf-8 -*
 3 __author__ = 'zni.feng'
 4 import  sys
 5 reload (sys)
 6 sys.setdefaultencoding('utf-8')
 7 
 8 from multiprocessing import Pool
 9 import os, time
10 
11 def child_process_test(name, sleep_time):
12     print 'Child process %s with processId %s starts.' % (name, os.getpid())
13     time.sleep(sleep_time)
14     print 'Child process %s with processId %s ends.' % (name, os.getpid())
15 
16 if __name__ == "__main__":
17     print 'Parent processId is: %s.' % os.getpid()
18     p = Pool()  #進程池默認大小是cpu的核數
19     #p = Pool(10) #生成一個容量爲10的進程池,即最大同時執行10個子進程
20     for i in range(5):
21         p.apply_async(child_process_test, args=('zni_'+str(i), i+1,)) #p.apply_async向進程池提交目標請求
22 
23     print 'Child processes are running.'
24     p.close()
25     p.join() #用來等待進程池中的全部子進程結束再向下執行代碼,必須在p.close()或者p.terminate()以後執行
26     print 'All Processes end.'

程序的輸出:

Parent processId is: 5050.
Child processes are running.
Child process zni_0 with processId 5052 starts.
Child process zni_1 with processId 5053 starts.
Child process zni_2 with processId 5054 starts.
Child process zni_3 with processId 5055 starts.
Child process zni_0 with processId 5052 ends.
Child process zni_4 with processId 5052 starts.
Child process zni_1 with processId 5053 ends.
Child process zni_2 with processId 5054 ends.
Child process zni_3 with processId 5055 ends.
Child process zni_4 with processId 5052 ends.
All Processes end.
[Finished in 6.2s]

close()方法和terminate()方法的區別:

  close:關閉進程池,使之不能再添加新的進程。已經執行的進程會等待繼續執行直到結束。

  terminate:強制終止線程池,正在執行的進程也會被強制終止。

  3. 進程間通訊 

  Python的multiprocessing模塊提供了多種進程間通訊的方式,如Queue、Pipe等。

  3.1 Queue、Lock

  Queue是multiprocessing提供的一個模塊,它的數據結構就是"FIFO——first in first out"的隊列,經常使用的方法有:put(object)入隊;get()出隊;empty()判斷隊列是否爲空。

  Lock:當多個子進程對同一個queue執行寫操做時,爲了不併發操做產生衝突,能夠經過加鎖的方式使得某個子進程對queue擁有惟一的寫權限,其餘子進程必須等待該鎖釋放後才能再開始執行寫操做。

  下面就是使用Queue進行進程間通訊的代碼:在父進程裏建立兩個子進程,分別實現對queue的讀和寫操做

 1 #!/usr/bin/python
 2 # -*- coding: utf-8 -*
 3 __author__ = 'zni.feng'
 4 import  sys
 5 reload (sys)
 6 sys.setdefaultencoding('utf-8')
 7 from multiprocessing import Process, Queue, Lock
 8 import os, time, random
 9 #寫數據進程
10 def write(q, lock, name):
11     print 'Child Process %s starts' % name
12     #得到鎖
13     lock.acquire()
14     for value in ['A' , 'B', 'C']:
15         print 'Put %s to queue...' % value
16         q.put(value)
17         time.sleep(random.random())
18     #釋放鎖
19     lock.release()
20     print 'Child Process %s ends' % name
21 
22 #讀數據進程
23 def read(q, lock, name):
24     print 'Child Process %s starts' % name
25     while True: #持續地讀取q中的數據
26         value =q.get()
27         print 'Get %s from queue.' % value
28     print 'Child Process %s ends' % name
29 
30 if __name__ == "__main__":
31     #父進程建立queue,並共享給各個子進程
32     q= Queue()
33     #建立鎖
34     lock = Lock()
35     #建立第一個「寫」子進程
36     pw = Process(target = write , args=(q, lock, 'WRITE', ))
37     #建立「讀」進程
38     pr = Process(target = read, args=(q,lock, 'READ',))
39     #啓動子進程pw,寫入:
40     pw.start()
41     #啓動子進程pr,讀取:
42     pr.start()
43     #等待pw結束:
44     pw.join()
45     #pr是個死循環,經過terminate殺死:
46     pr.terminate()
47     print 'Test finish.'

  程序的輸出結果爲:

Child Process WRITE starts
Put A to queue...
Child Process READ starts
Get A from queue.
Put B to queue...
Get B from queue.
Put C to queue...
Get C from queue.
Child Process WRITE ends
Test finish.
[Finished in 2.0s]

  3.2 Pipe

  Pipe是另外一種進程間通訊的方式,俗稱「管道」。它由兩端組成,一端往管道里寫入數據,另外一端從管道里讀取數據。
  下面就是使用Pipe通訊的代碼:

 1 #!/usr/bin/python
 2 # -*- coding: utf-8 -*
 3 __author__ = 'zni.feng'
 4 import  sys
 5 reload (sys)
 6 sys.setdefaultencoding('utf-8')
 7 from multiprocessing import Process, Pipe
 8 import os, time, random
 9 
10 #發送數據進程
11 def send(child_pipe, name):
12     print 'Child Process %s starts' % name
13     child_pipe.send('This is Mr.Ni')
14     child_pipe.close()
15     time.sleep(random.random())
16     print 'Child Process %s ends' % name
17 
18 #接收數據進程
19 def recv(parent_pipe, name):
20     print 'Child Process %s starts' % name
21     print parent_pipe.recv()
22     time.sleep(random.random())
23     print 'Child Process %s ends' % name
24 
25 if __name__ == "__main__":
26     #建立管道
27     parent,child = Pipe()
28     #建立send進程
29     ps = Process(target=send, args=(child, 'SEND'))
30     #建立recv進程
31     pr = Process(target=recv, args=(parent, 'RECEIVE'))
32     #啓動send進程
33     ps.start()
34     #等待send進程結束
35     ps.join()
36     #啓動recv進程
37     pr.start()
38     #等待recv進程結束
39     pr.join()
40     print 'Test finish.'

  程序的輸出結果以下:

Child Process SEND starts
Child Process SEND ends
Child Process RECEIVE starts
This is Mr.Ni
Child Process RECEIVE ends
Test finish.
[Finished in 1.8s]
相關文章
相關標籤/搜索