PS:咱們知道現代操做系統好比Mac OS X,UNIX,Linux,Windows等,都是支持「多任務」的操做系統。多任務的實現共有3種方式:多進程模式;多線程模式;多進程+多線程模式。Python既支持多進程又支持多線程,下面咱們將會討論如何編寫這兩種多任務程序。
爲了讓Python程序實現多進程(multiprocessing),咱們先來了解操做系統在這方面的相關知識。python
Unix/Linx操做系統提供了一個fork()系統調用,它很是特殊,不一樣於普通的函數(調用一次,返回一次),fork()調用一次,返回兩次。這是由於操做系統自動把當前進程(父進程)複製了一份(子進程),而後分別在父進程和子進程中返回。數據庫
在子進程中永遠返回0,而在父進程中返回子進程的ID。這樣作是由於一個父進程能夠fork出不少子進程,因此父進程要記下每一個子進程的ID,而子進程只須要調用getppid()就能夠拿到父進程的ID。編程
在Python中的os模塊中封裝了常見的系統調用,其中就包括了fork,能夠在Python程序中輕鬆建立出子進程:windows
import os print('Process (%s) start...' % os.getpid()) # Only works on Unix/Linux/Mac: pid = os.fork() if pid == 0: print('I am child process (%s) and my parent is %s.' % (os.getpid(), os.getppid())) else: print('I (%s) just created a child process (%s).' % (os.getpid(), pid))
結果:服務器
Process (4423) start... I (4423) just created a child process (4424). I am child process (4424) and my parent is 4423.
注意:windows沒有fork調用。有了fork調用一個進程在接到新任務時就能夠複製出一個子進程來處理新任務。
既然Windows沒有fork調用,那怎麼在Windows上用Python編寫多進程的程序?由於Python是跨平臺的,其中的multiprocessing模塊就是跨平臺版本的多進程模塊。網絡
在multiprocessing模塊中提供了一個Process類來表明一個進程對象。下面的例子演示了啓動一個子進程並等待其結束:多線程
from multiprocessing import Process import os # 子進程要執行的代碼 def run_proc(name): print('Run child process %s (%s)...' % (name, os.getpid())) if __name__=='__main__': print('Parent process %s.' % os.getpid()) p = Process(target=run_proc, args=('test',)) print('Child process will start.') p.start() p.join() print('Child process end.') ''' Parent process 20280. Child process will start. Run child process test (5772)... Child process end. '''
Tips:用multiprocessing建立子進程時,Process類表明一個進程,只需傳入子進程需執行的函數和參數,用start方法啓動子進程,join()方法能夠等待子進程結束後再往下運行(一般用於進程間的同步)。
上面的方法都是啓動一個子進程,可是當咱們要啓動大量的子進程時,怎麼辦呢?能夠用進程池的方式批量建立子進程:併發
from multiprocessing import Pool import os, time, random def long_time_task(name): print('Run task %s (%s)...' % (name,os.getpid())) start = time.time() time.sleep(random.random() * 3) end = time.time() print('Task %s runs %0.2f seconds.' % (name, (end - start))) if __name__ == '__main__': print('Parent process %s.' % os.getpid()) p = Pool(4) for i in range(5): p.apply_async(long_time_task, args=(i,)) print('Waiting for all subprocess done...') p.close() p.join() print('All subprocesses done.') ''' Parent process 896. Waiting for all subprocess done... Run task 0 (9728)... Run task 1 (22216)... Run task 2 (20572)... Run task 3 (6844)... Task 0 runs 0.36 seconds. Run task 4 (9728)... Task 4 runs 0.43 seconds. Task 3 runs 0.94 seconds. Task 1 runs 1.19 seconds. Task 2 runs 2.72 seconds. All subprocesses done. '''
注意apply_async是異步的,就是說子進程執行的同時,主進程繼續向下執行。因此「Waiting for all subprocesses done...」先打印出來,close方法意味着不能再添加新的Process了。對Pool對象調用join()方法,會暫停主進程,等待全部的子進程執行完,因此「All subprocesses done.」最後打印。app
Tips:task4最後執行,是由於Pool的默認大小是4(CPU的核數),因此最多執行4個進程。固然這是Pool有意設計的限制,並非操做系統的限制,你也能夠本身改變它的默認大小,就能夠跑不止4個進程。
上面的子進程的代碼實現都是在主進程內部的,然而不少時候,子進程都是一個外部進程,咱們須要控制子進程的輸入和輸出。
subprocess(能夠在當前程序中執行其餘程序或命令)模塊可讓咱們很是方便地啓動一個外部子進程,而後控制其輸入和輸出:
import subprocess print('$ nslookup www.python.org') r = subprocess.call(['nslookup', 'www.python.org']) print('Exit cod:', r) ''' $ nslookup www.python.org 服務器: ns.sc.cninfo.net Address: 61.139.2.69 非權威應答: 名稱: dualstack.python.map.fastly.net Addresses: 2a04:4e42:36::223 151.101.72.223 Aliases: www.python.org Exit cod: 0 '''
上面的運行效果至關於在命令行直接輸入「nslookup www.python.org」(至關於開了一個進程)。
若是子進程還須要經過手動輸入一些參數,那麼能夠經過communicate()方法輸入:
import subprocess print('$ nslookup') p = subprocess.Popen(['nslookup'], stdin=subprocess.PIPE, stdout=subprocess.PIPE, stderr=subprocess.PIPE) output, err = p.communicate(b'set q=mx\npython.org\nexit\n') print(output.decode('gbk')) print('Exit code:', p.returncode) ''' $ nslookup 默認服務器: ns.sc.cninfo.net Address: 61.139.2.69 > > 服務器: ns.sc.cninfo.net Address: 61.139.2.69 python.org MX preference = 50, mail exchanger = mail.python.org mail.python.org internet address = 188.166.95.178 mail.python.org AAAA IPv6 address = 2a03:b0c0:2:d0::71:1 > Exit code: 0 '''
上面的代碼至關於在命令行直接輸入nslookup,而後手動輸入:
set q=mx
python.org
exit
Process之間確定是須要通訊的,操做系統提供了不少機制來實現進程間的通訊。Python的multiprocessing模塊包裝了底層的機制,提供了Queue(隊列)、Pipes(管道)等多種方式來交換數據。下面咱們就以Queue爲例,在父進程中建立兩個子進程,一個往Queue裏寫數據,一個從Queue中讀數據:
from multiprocessing import Process, Queue import os, time, random #寫數據進程執行的代碼: def write(q): print('Process to write: %s' % os.getpid()) for value in ['A', 'B', 'C']: print('Put %s to queue...' % value) q.put(value) time.sleep(random.random()) #讀數據進程執行的代碼: def read(q): print('Process to read: %s' % os.getpid()) while True: value = q.get(True) print('Get %s from queue.' % value) if __name__=='__main__': #父進程建立出Queue,並傳給各個子進程 q = Queue() pw = Process(target=write, args=(q,)) pr = Process(target=read, args=(q,)) #啓動子進程pw,寫入隊列 pw.start() #啓動子進程pr,讀取隊列 pr.start() #等待pw進程結束 pw.join() #pr進程死循環,沒法等待,只能強行終止; pr.terminate() ''' Process to write: 8768 Put A to queue... Process to read: 19700 Get A from queue. Put B to queue... Get B from queue. Put C to queue... Get C from queue. '''
在Linux/Unix下,multiprocessing模塊分裝了fork調用,而因爲Windows沒有fork調用,所以multiprocessing要想模擬出fork的效果。父進程中的全部Python對象都必須經過pickle序列化到子進程中,所以若是multiprocessing在Windows下調用失敗了,要先考慮是否是pickle失敗了。
前面已經說過了多個任務既能夠用多進程來實現,又能夠用多線程來實現。那麼多線程與多進程相比,有什麼優勢呢?線程共享相同的內存空間,不一樣的線程能夠讀取內存中的同一變量(每一個進程都有各自獨立的空間)。線程帶來的開銷要比進程小。
因爲線程是操做系統直接支持的執行單元,所以許多高級語言都內置了多線程的支持,Python也不例外,Python中的線程是真正的Posix Thread而不是模擬出來的線程。
要實現多線程,Python的標準庫提供了兩個模塊:_thread和threading,前者是低級模塊,後者是高級模塊,後者分裝了前者。絕大多數狀況下,咱們只須要使用threading這個高級的模塊。
啓動一個線程就是把一個函數傳入並建立Thread實例,而後調用start()執行:
import time, threading #新線程執行的代碼 def loop(): print('thread %s is runnging...' % threading.current_thread().name) n = 0 while n < 5: n = n + 1 print('thread %s >>> %s' % (threading.current_thread().name, n)) time.sleep(1) print('thread %s ended.' % threading.current_thread().name) print('thread %s is running...' % threading.current_thread().name) t = threading.Thread(target=loop, name='LoopTread') t.start() t.join() print('Thread %s ended.' % threading.current_thread().name) '''thread MainThread is running... thread LoopTread is runnging... thread LoopTread >>> 1 thread LoopTread >>> 2 thread LoopTread >>> 3 thread LoopTread >>> 4 thread LoopTread >>> 5 thread LoopTread ended. Thread MainThread ended. '''
因爲任何進程都會默認啓動一個線程,咱們就把這個線程稱爲主線程,主線程又能夠啓動新的線程。上面的current_thread()函數返回當前線程的實例,主線程的名字就MainThread,子線程的名字是在建立時咱們指定的。
使用多線程仍是有風險的,由於在多線程全部變量被全部線程共享,此時可能會出現多個線程同時改變一個變量,致使出現錯誤。爲了不這個錯誤的出現,咱們應該加鎖lock。
咱們先不使用lock,來看一個錯誤的實例:
import time, threading #假定這是你的銀行存款 balance = 0 def change_it(n):#先存後取結果應該爲0 global balance #共享變量 balance = balance + n balance = balance - n def run_thread(n): for n in range(100000): change_it(n) t1 = threading.Thread(target=run_thread, args=(5,)) t2 = threading.Thread(target=run_thread, args=(8,)) t1.start() t2.start() t1.join() t2.join() print(balance)
咱們啓動了連個線程,先存後取,理論上結果應該爲0,可是線程對的調度也是由操做系統決定,因此,當t1 和 t2交替執行,循環次數夠多,結果就不必定是0了。由於高級語言的一條語句在CPU執行時是若干條語句。
因此若是咱們要保證balance的計算正確,就應該就上一把鎖,使該變量同一時刻只能被一個線程操做。在這裏咱們就能夠給change_it()加上一把鎖:
balance = 0 lock = threading.Lock() def run_thread(n): for i in range(100000): # 先要獲取鎖: lock.acquire() try: # 放心地改吧: change_it(n) finally: # 改完了必定要釋放鎖: lock.release()
當多個線程同時執行lock.acquire()時,只有一個線程能成功地獲取鎖,而後繼續執行下面的代碼,其餘線程就只能等待直到或取到鎖爲止。因此獲取到鎖的線程在用完後必定要釋放鎖,不然等待鎖開啓的線程,將永遠等待,因此咱們用try...finally來確保鎖必定會被釋放。
Tips:鎖的壞處就是阻止了多線程的併發執行,效率大大地降低了。當不一樣的線程持有不一樣的鎖,並試圖獲取對方的鎖時,可能會形成死鎖。
小結:多線程編程,模型複雜,容易發生衝突,必須加鎖以隔離,同時又要當心死鎖的發生。Python解釋器因爲設計時有GIL全局鎖。致使了多線程沒法利用多核,這就是模擬出來的併發(線程數量大於處理器數量)。
咱們已經知道多線中變量是能夠共享的,在多線程的環境下,每一個線程都有本身的數據。那麼每個線程應該也能夠擁有本身的局部變量,線程使用本身的局部變量比使用全局變量好,由於局部變量只能本身使用,不會影響其餘的線程,而使用全局變量的話則必須加鎖。
那麼具體怎麼在Python中使用線程的局部變量呢?那就是使用ThreadLocal,先來看一個例子:
import threading #建立全局ThreadLocal對象: local_school = threading.local() def process_student(): #獲取當前線程關聯的student: std = local_school.student print('Hello, %s (in %s)' % (std, threading.current_thread().name)) def process_thread(name): #綁定當前線程關聯的student: local_school.student = name process_student() t1 = threading.Thread(target=process_thread, args=('Alice',), name='Thread-A') t2 = threading.Thread(target=process_thread,args=('Bob',), name='Thread-B') t1.start() t2.start() t1.join() t2.join() ''' Hello, Alice (in Thread-A) Hello, Bob (in Thread-B) '''
全局變量local_school就是一個ThreadLocal對象,每一個線程對她均可以讀寫student屬性,但互不影響。你能夠把local_school當作全局變量,但每一個屬性如local_school.student都是線程的局部變量,能夠任意讀寫而互不干擾,也不用管理鎖的問題,ThreadLocal內部會處理。
ThreaLocal最經常使用的地方就是爲每一個線程綁定一個數據庫鏈接,HTTP請求用戶信息身份等。這樣一個線程的全部調用到的處理函數均可以很是方便地訪問這些資源。
Tip:一個ThreadLocal變量雖然是全局變量,但每一個線程都只能讀寫本身線程的獨立副本,互不干擾。ThreadLocal解決了參數在一個線程中各個函數之間互相傳遞的問題。
前面咱們已經介紹了多進程和多線程,這是實現多任務最經常使用的兩種方式。如今,咱們來討論下這兩種方式的優缺點。
首先,要實現多任務,一般咱們會設計Master-Worker模式,Master負責分配任務,Worker負責執行任務,所以,在多任務環境下,一般是一個Master,多個Worker。
若是咱們用多進程實現Master-Worker,主進程就是Master,其餘進程就是Worker。若是用多線程實現Master-Worker,主線程就是Master,其餘線程就是Worker。
其中多進程模式最大的優勢就是穩定性高,這是由於一個子進程崩潰了,不會影響主進程和其餘子進程(固然主進程crash了,全部的進程就crash了,可是機率很低畢竟Master進程只負責分配任務),著名的Apache最先採用的就是多進程模式。可是多進程的缺點就是建立進程的代價大,在Unix/Linux系統下,用fork調用還行,可是在Windows下建立進程的開銷巨大。另外,操做系統能同時運行的進程也是有限的,在CPU和內存的限制下,若是有幾千個進程同時運行,那麼操做系統連調度都會成問題。
而多線程模式一般比多進程模式快一點,但也快不到哪去。並且,多線程模式致命的缺點就是由於任何一個線程crash了均可能形成整個進程crash,由於全部線程共享進程的內存。
Tips: 在Windows下,多線程的效率比多進程要高,因此微軟的IIS服務器默認採用多線程的模式。因爲多線程存在穩定性問題,IIS的穩定性就不如Apache。可是如今爲了平衡,IIS和Apache如今又有了多進程+多線程的混合模式。
咱們須要考慮任務的類型,咱們能夠把任務分爲計算密集型和IO密集型。
顧名思義,計算密集型任務的特色就是要進行大量的計算,消耗大量的CPU資源,如計算圓周率,對視頻進行高清解碼等,全靠CPU的運算能力。這種計算密集型任務最好不要用多任務完成,由於這樣會切換不少次才能執行完,切換任務花費的時間就很長了,就會致使CPU的效率低下。
Tips:因爲計算密集型任務主要消耗CPU資源,所以代碼運行效率就很是重要了。由於Python這樣的腳本語言運行效率很低,因此對於計算密集型任務,最好用C語言編寫。
再來講IO密集型任務,涉及到網絡、磁盤的IO任務都是IO密集型任務,特色是CPU消耗不多,任務的大部分時間都在等待IO操做的完成。對於IO密集型任務,任務越多,CPU效率越高(但仍是有一個限度)。
Tips:常見的大部分任務都是IO密集型任務,好比Web應用,對於IO密集型任務,最適合的語言就是開發效率最高(代碼量最少)的語言,因此腳本語言是首選,C語言最差。
考慮到CPU和IO之間巨大的速度差別,單進程單線程模式會致使別的任務沒法執行,所以咱們才須要多進程或多線程的模型來支持多任務併發。異步文件IO方式中,線程發送一個IO請求到內核,而後繼續處理其餘的事情,內核完成IO請求後,將會通知線程IO操做完成了。
若是充分利用操做系統提供的異步IO支持,就能夠利用單進程單線程模型來執行多任務,這種全新的模型稱爲事件驅動模型。使用異步IO編程模型來實現多任務是一個主要的趨勢。
在Python中,單進程單線程的異步編程模型稱爲協程,有了協程的支持就能夠基於事件驅動編寫高效的多任務程序了。
Process能夠分佈到多臺機器上,而Thread最多隻能分佈到同一臺機器上上的多個CPU中。
咱們已經知道Python的multiprocessing模塊支持多進程,其中的managers子模塊還支持把多進程分佈到多臺機器上。
例子:若是咱們已經有一個經過Queue通訊的多進程程序在同一臺機器上運行,如今,因爲處理任務的進程任務繁重,但願把服務進程和處理任務的進程分佈到兩臺機器上。怎麼用分佈式進程實現?
咱們先看服務進程,服務進程負責啓動Queue,把Queue註冊到網絡上,而後往Queue裏面寫入任務:
import random, time, queue from multiprocessing.managers import BaseManager # 發送任務的隊列: task_queue = queue.Queue() # 接收結果的隊列: result_queue = queue.Queue() # 從BaseManager繼承的QueueManager: class QueueManager(BaseManager): pass # 把兩個Queue都註冊到網絡上, callable參數關聯了Queue對象: QueueManager.register('get_task_queue', callable=lambda: task_queue) QueueManager.register('get_result_queue', callable=lambda: result_queue) # 綁定端口5000, 設置驗證碼'abc': manager = QueueManager(address=('', 5000), authkey='abc') # 啓動Queue: manager.start() # 得到經過網絡訪問的Queue對象: task = manager.get_task_queue() result = manager.get_result_queue() # 放幾個任務進去: for i in range(10): n = random.randint(0, 10000) print('Put task %d...' % n) task.put(n) # 從result隊列讀取結果: print('Try get results...') for i in range(10): r = result.get(timeout=10) #暫停10秒等待分佈式進程處理結果並返回 print('Result: %s' % r) # 關閉: manager.shutdown() print('master exit.')
接着在另外一臺機器上啓動任務進程也能夠是本機:
# task_worker.py import time, sys, queue from multiprocessing.managers import BaseManager # 建立相似的QueueManager: class QueueManager(BaseManager): pass # 因爲這個QueueManager只從網絡上獲取Queue,因此註冊時只提供名字: QueueManager.register('get_task_queue') QueueManager.register('get_result_queue') # 鏈接到服務器,也就是運行task_master.py的機器: server_addr = '127.0.0.1' print('Connect to server %s...' % server_addr) # 端口和驗證碼注意保持與task_master.py設置的徹底一致: m = QueueManager(address=(server_addr, 5000), authkey='abc') # 從網絡鏈接: m.connect() # 獲取Queue的對象: task = m.get_task_queue() result = m.get_result_queue() # 從task隊列取任務,並把結果寫入result隊列: for i in range(10): try: n = task.get(timeout=1) print('run task %d * %d...' % (n, n)) r = '%d * %d = %d' % (n, n, n*n) time.sleep(1) result.put(r) except Queue.Empty: print('task queue is empty.') # 處理結束: print('worker exit.')
注意:先啓動master進程,完成兩個隊列的網上註冊,接着發出請求隊列task,等待result隊列的結果;此時啓動worker進程對task隊列進行操做,而後寫入到result隊列中;master獲得響應結果,打印出result。
Tips:Python的分佈式進程的接口簡單,封裝良好,適合須要把繁重任務分佈到多臺機器的環境下。注意Queue的做用是用來傳遞任務和接收結果,每一個任務的描述數據量要儘可能小。好比發送一個處理日誌文件的任務,就不要發送幾百兆的日誌文件自己,而是發送日誌文件存放的完整路徑,由Worker進程再去共享的磁盤上讀取文件。