最近在Azkaban的測試工做中,須要在測試環境下模擬線上的調度場景進行穩定性測試。故而重操python舊業,經過python編寫腳原本構造相似線上的調度場景。在腳本編寫過程當中,碰到這樣一個需求:要在測試環境建立10000個做業流。前端
最開始的想法是在一個azkaban project下循環調用10000次create job接口(每一個Flow只包含一個job)。因爲azkaban它自己沒有增長/刪除做業流的接口,全部的做業流修改、增長、刪除其實都是經過從新上傳項目zip包實現的,相應地每次調猛獁前端的create job接口,其實是在猛獁端對zip包的內容進行了從新的整合後再從新上傳zip包到azkaban,整個過程能夠拆解成以下過程:解壓zip包得到zip包內容,變動zip包內的文件內容,從新打包zip包,上傳到azkaban。所以,隨着循環次數越日後,zip包包含的內容會越多,接口執行一次的時間就越長。實踐發現,第一次調該接口的時間大體不到1秒,到循環1000次的時候接口調用一次的時間就達到了將近3秒。所以,若是期望一個循環10000次來構造該場景,顯然要耗費巨大的時間。python
在此背景下, 天然而然地就想到用多進程/多線程的方式來處理該問題。編程
你們都知道,操做系統能夠同時運行多個任務。好比你一邊聽音樂,一邊聊IM,一邊寫博客等。如今的cpu大都是多核的,但即便是過去的單核cpu也是支持多任務並行執行。瀏覽器
單核cpu執行多任務的原理:操做系統交替輪流地執行各個任務。先讓任務1執行0.01秒,而後切換到任務2執行0.01秒,再切換到任務3執行0.01秒...這樣往復地執行下去。因爲cpu的執行速度很是快,因此使用者的主觀感覺就是這些任務在並行地執行。數據結構
多核cpu執行多任務的原理:因爲實際應用中,任務的數量每每遠超過cpu的核數,因此操做系統其實是把這些多任務輪流地調度到每一個核心上執行。多線程
對於操做系統來講,一個應用就是一個進程。好比打開一個瀏覽器,它是一個進程;打開一個記事本,它是一個進程。每一個進程有它特定的進程號。他們共享系統的內存資源。進程是操做系統分配資源的最小單位。併發
而對於每個進程而言,好比一個視頻播放器,它必須同時播放視頻和音頻,就至少須要同時運行兩個「子任務」,進程內的這些子任務就是經過線程來完成。線程是最小的執行單元。一個進程它能夠包含多個線程,這些線程相互獨立,同時又共享進程所擁有的資源。app
multiprocessing是Python提供的一個跨平臺的多進程模塊,經過它能夠很方便地編寫多進程程序,在不一樣的平臺(Unix/Linux, Windows)均可以執行。dom
下面就是使用multiprocessing編寫多進程程序的代碼: async
#!/usr/bin/python # -*- coding: utf-8 -* __author__ = 'zni.feng' import sys reload (sys) sys.setdefaultencoding('utf-8') from multiprocessing import Process import os import time #子進程fun def child_projcess_fun(name): print 'Child process %s with processId %s starts.' % (name, os.getpid()) time.sleep(3) print 'Child process %s with processId %s ends.' % (name, os.getpid()) if __name__ == "__main__": print 'Parent processId is: %s.' % os.getpid() p = Process(target = child_projcess_fun, args=('zni',)) print 'Process starts' p.start() #開始進程 p.join() #等待子進程結束後再繼續往下執行 print 'Process ends.'
程序的輸出:
Parent processId is: 11076.
Process starts
Child process zni with processId 11077 starts.
Child process zni with processId 11077 ends.
Process ends.
[Finished in 3.1s]
某些狀況下,咱們但願批量建立多個子進程,或者給定子進程數的上限,避免無限地消耗系統的資源。經過Pool(進程池)的方式,就能夠完成這項工做,下面是使用Pool的代碼:
1 #!/usr/bin/python 2 # -*- coding: utf-8 -* 3 __author__ = 'zni.feng' 4 import sys 5 reload (sys) 6 sys.setdefaultencoding('utf-8') 7 8 from multiprocessing import Pool 9 import os, time 10 11 def child_process_test(name, sleep_time): 12 print 'Child process %s with processId %s starts.' % (name, os.getpid()) 13 time.sleep(sleep_time) 14 print 'Child process %s with processId %s ends.' % (name, os.getpid()) 15 16 if __name__ == "__main__": 17 print 'Parent processId is: %s.' % os.getpid() 18 p = Pool() #進程池默認大小是cpu的核數 19 #p = Pool(10) #生成一個容量爲10的進程池,即最大同時執行10個子進程 20 for i in range(5): 21 p.apply_async(child_process_test, args=('zni_'+str(i), i+1,)) #p.apply_async向進程池提交目標請求 22 23 print 'Child processes are running.' 24 p.close() 25 p.join() #用來等待進程池中的全部子進程結束再向下執行代碼,必須在p.close()或者p.terminate()以後執行 26 print 'All Processes end.'
程序的輸出:
Parent processId is: 5050.
Child processes are running.
Child process zni_0 with processId 5052 starts.
Child process zni_1 with processId 5053 starts.
Child process zni_2 with processId 5054 starts.
Child process zni_3 with processId 5055 starts.
Child process zni_0 with processId 5052 ends.
Child process zni_4 with processId 5052 starts.
Child process zni_1 with processId 5053 ends.
Child process zni_2 with processId 5054 ends.
Child process zni_3 with processId 5055 ends.
Child process zni_4 with processId 5052 ends.
All Processes end.
[Finished in 6.2s]
close()方法和terminate()方法的區別:
close:關閉進程池,使之不能再添加新的進程。已經執行的進程會等待繼續執行直到結束。
terminate:強制終止線程池,正在執行的進程也會被強制終止。
Python的multiprocessing模塊提供了多種進程間通訊的方式,如Queue、Pipe等。
3.1 Queue、Lock
Queue是multiprocessing提供的一個模塊,它的數據結構就是"FIFO——first in first out"的隊列,經常使用的方法有:put(object)入隊;get()出隊;empty()判斷隊列是否爲空。
Lock:當多個子進程對同一個queue執行寫操做時,爲了不併發操做產生衝突,能夠經過加鎖的方式使得某個子進程對queue擁有惟一的寫權限,其餘子進程必須等待該鎖釋放後才能再開始執行寫操做。
下面就是使用Queue進行進程間通訊的代碼:在父進程裏建立兩個子進程,分別實現對queue的讀和寫操做
1 #!/usr/bin/python 2 # -*- coding: utf-8 -* 3 __author__ = 'zni.feng' 4 import sys 5 reload (sys) 6 sys.setdefaultencoding('utf-8') 7 from multiprocessing import Process, Queue, Lock 8 import os, time, random 9 #寫數據進程 10 def write(q, lock, name): 11 print 'Child Process %s starts' % name 12 #得到鎖 13 lock.acquire() 14 for value in ['A' , 'B', 'C']: 15 print 'Put %s to queue...' % value 16 q.put(value) 17 time.sleep(random.random()) 18 #釋放鎖 19 lock.release() 20 print 'Child Process %s ends' % name 21 22 #讀數據進程 23 def read(q, lock, name): 24 print 'Child Process %s starts' % name 25 while True: #持續地讀取q中的數據 26 value =q.get() 27 print 'Get %s from queue.' % value 28 print 'Child Process %s ends' % name 29 30 if __name__ == "__main__": 31 #父進程建立queue,並共享給各個子進程 32 q= Queue() 33 #建立鎖 34 lock = Lock() 35 #建立第一個「寫」子進程 36 pw = Process(target = write , args=(q, lock, 'WRITE', )) 37 #建立「讀」進程 38 pr = Process(target = read, args=(q,lock, 'READ',)) 39 #啓動子進程pw,寫入: 40 pw.start() 41 #啓動子進程pr,讀取: 42 pr.start() 43 #等待pw結束: 44 pw.join() 45 #pr是個死循環,經過terminate殺死: 46 pr.terminate() 47 print 'Test finish.'
程序的輸出結果爲:
Child Process WRITE starts
Put A to queue...
Child Process READ starts
Get A from queue.
Put B to queue...
Get B from queue.
Put C to queue...
Get C from queue.
Child Process WRITE ends
Test finish.
[Finished in 2.0s]
3.2 Pipe
Pipe是另外一種進程間通訊的方式,俗稱「管道」。它由兩端組成,一端往管道里寫入數據,另外一端從管道里讀取數據。
下面就是使用Pipe通訊的代碼:
1 #!/usr/bin/python 2 # -*- coding: utf-8 -* 3 __author__ = 'zni.feng' 4 import sys 5 reload (sys) 6 sys.setdefaultencoding('utf-8') 7 from multiprocessing import Process, Pipe 8 import os, time, random 9 10 #發送數據進程 11 def send(child_pipe, name): 12 print 'Child Process %s starts' % name 13 child_pipe.send('This is Mr.Ni') 14 child_pipe.close() 15 time.sleep(random.random()) 16 print 'Child Process %s ends' % name 17 18 #接收數據進程 19 def recv(parent_pipe, name): 20 print 'Child Process %s starts' % name 21 print parent_pipe.recv() 22 time.sleep(random.random()) 23 print 'Child Process %s ends' % name 24 25 if __name__ == "__main__": 26 #建立管道 27 parent,child = Pipe() 28 #建立send進程 29 ps = Process(target=send, args=(child, 'SEND')) 30 #建立recv進程 31 pr = Process(target=recv, args=(parent, 'RECEIVE')) 32 #啓動send進程 33 ps.start() 34 #等待send進程結束 35 ps.join() 36 #啓動recv進程 37 pr.start() 38 #等待recv進程結束 39 pr.join() 40 print 'Test finish.'
程序的輸出結果以下:
Child Process SEND starts
Child Process SEND ends
Child Process RECEIVE starts
This is Mr.Ni
Child Process RECEIVE ends
Test finish.
[Finished in 1.8s]