在操做系統看來,一個任務就是一個進程,而一個進程內部若是要作多個任務就是有多個線程。一個進程至少有一個線程。python
真正的並行執行任務是由多個CUP分別執行任務,實際中是由,操做系統輪流讓各個任務交替執行,任務1執行0.01秒,任務2執行0.01秒,以後再依次切換。linux
Python中支持兩種模式:windows
多進程模式服務器
多線程模式網絡
Linux操做系統下,提供了一個fork()系統調用。調用一次fork(),返回兩次,由於操做系統自動把當前的進程(做爲父進程)複製了一份(稱爲子進程),而後子進程返回0,父進程返回子進程的ID。多線程
# multiprocessing.py import os print 'Process (%s) start...' % os.getpid() pid = os.fork() if pid==0: print 'I am child process (%s) and my parent is %s.' % (os.getpid(), os.getppid()) else: print 'I (%s) just created a child process (%s).' % (os.getpid(), pid)
Process (876) start... I (876) just created a child process (877). I am child process (877) and my parent is 876.
因爲windows下沒有fork()調用,提供了multiprocessing模塊進行跨平臺版本的多進程模塊。併發
用Process類表明建立進程對象,傳入一個執行函數和函數的參數。以後再用start()方法啓動,jion()方法能夠等待子進程結束後再繼續往下進行,一般用於進程間的同步。app
from multiprocessing import Process import os # 子進程要執行的代碼 def run_proc(name): print 'Run child process %s (%s)...' % (name, os.getpid()) if __name__=='__main__': print 'Parent process %s.' % os.getpid() p = Process(target=run_proc, args=('test',)) print 'Process will start.' p.start() p.join() print 'Process end.'
Parent process 928. Process will start. Run child process test (929)... Process end.
Pool進程池建立多個子進程dom
對Pool對象建立多個子進程後,用close()方法結束建立,再用join()方法等待全部子進程執行完畢。在每一個子進程中會隨機休眠一段時間,其餘的子進程在這段休眠時間裏就會調用。異步
from multiprocessing import Pool import os, time, random def long_time_task(name): print 'Run task %s (%s)...' % (name, os.getpid()) start = time.time() time.sleep(random.random() * 3) end = time.time() print 'Task %s runs %0.2f seconds.' % (name, (end - start)) if __name__=='__main__': print 'Parent process %s.' % os.getpid() p = Pool() for i in range(5): p.apply_async(long_time_task, args=(i,)) print 'Waiting for all subprocesses done...' p.close() p.join() print 'All subprocesses done.'
Parent process 669. Waiting for all subprocesses done... Run task 0 (671)... Run task 1 (672)... Run task 2 (673)... Run task 3 (674)... Task 2 runs 0.14 seconds. Run task 4 (673)... Task 1 runs 0.27 seconds. Task 3 runs 0.86 seconds. Task 0 runs 1.41 seconds. Task 4 runs 1.91 seconds. All subprocesses done.
進程間通訊
Python的miltiprocessing模塊包裝了底層的機制,提供了Queue、Pipes等多種方式來交換數據。
在父進程中建立兩個子進程,一個往Queue中寫數據,一個從Queue裏讀數據。
from multiprocessing import Process, Queue import os, time, random # 寫數據進程執行的代碼: def write(q): for value in ['A', 'B', 'C']: print 'Put %s to queue...' % value q.put(value) time.sleep(random.random()) # 讀數據進程執行的代碼: def read(q): while True: value = q.get(True) print 'Get %s from queue.' % value if __name__=='__main__': # 父進程建立Queue,並傳給各個子進程: q = Queue() pw = Process(target=write, args=(q,)) pr = Process(target=read, args=(q,)) # 啓動子進程pw,寫入: pw.start() # 啓動子進程pr,讀取: pr.start() # 等待pw結束: pw.join() # pr進程裏是死循環,沒法等待其結束,只能強行終止: pr.terminate()
Put A to queue... Get A from queue. Put B to queue... Get B from queue. Put C to queue... Get C from queue.
Python中提供兩個模塊,thread是低級模塊,threading是高級模塊,對thread進行了封裝。
import time, threading # 新線程執行的代碼: def loop(): print 'thread %s is running...' % threading.current_thread().name n = 0 while n < 5: n = n + 1 print 'thread %s >>> %s' % (threading.current_thread().name, n) time.sleep(1) print 'thread %s ended.' % threading.current_thread().name print 'thread %s is running...' % threading.current_thread().name t = threading.Thread(target=loop, name='LoopThread') t.start() t.join() print 'thread %s ended.' % threading.current_thread().name
thread MainThread is running... thread LoopThread is running... thread LoopThread >>> 1 thread LoopThread >>> 2 thread LoopThread >>> 3 thread LoopThread >>> 4 thread LoopThread >>> 5 thread LoopThread ended. thread MainThread ended.
多進程和多線程最大的不一樣在於,多進程中,同一個變量,各自有一份拷貝存在於每一個進程中,互不影響,而多線程中,全部變量都由全部線程共享,因此,任何一個變量均可以被任何一個線程修改。所以,線程之間共享數據最大的危險在於多個線程同時該變一個變量,把內容給改亂了。
所以得加上一把鎖lock
balance = 0 lock = threading.Lock() def run_thread(n): for i in range(100000): # 先要獲取鎖: lock.acquire() try: # 放心地改吧: change_it(n) finally: # 改完了必定要釋放鎖: lock.release()
當多個線程同時執行lock.acquire()時,只有一個線程能成功地獲取鎖,而後繼續執行代碼,其餘線程就繼續等待直到得到鎖爲止。
可是,這樣實際上就不是並行處理了。
Python的多進程因爲存在GIL鎖的問題,因此多線程實際上不能有效利用多核。多線程的併發在Python中是無用的。
全局變量local_school就是一個ThreadLoacl對象,每一個Thread對它均可以讀寫student屬性,可是互不影響。能夠把local_school當作全局變量,但每一個屬性如local_school.student都是線程的局部變量,能夠任意讀寫而互不干擾,也不用管理鎖的問題,ThreadLocal內部會處理。
import threading # 建立全局ThreadLocal對象: local_school = threading.local() def process_student(): print 'Hello, %s (in %s)' % (local_school.student, threading.current_thread().name) def process_thread(name): # 綁定ThreadLocal的student: local_school.student = name process_student() t1 = threading.Thread(target= process_thread, args=('Alice',), name='Thread-A') t2 = threading.Thread(target= process_thread, args=('Bob',), name='Thread-B') t1.start() t2.start() t1.join() t2.join()
Hello, Alice (in Thread-A) Hello, Bob (in Thread-B)
多進程的優勢是穩定性高,一個崩潰,不會影響其餘的進程,可是,代價大,在linux下,調用fork還能夠,可是windows下進程開銷巨大。
多線程模式比多進程快一點,可是也快不了多少,缺點十分明顯,因爲共享進程的內存,一個線程崩了,就都崩了。
計算密集型和IO密集型:
計算密集型會消耗大量的CPU資源,代碼的運行效率就相當重要,Python等腳本語言運行效率低,不適合。
IO密集型涉及到網絡、磁盤IO的任務,它們的CUP消耗較少,任務的主要時間在等待IO操做完成,CUP效率沒法徹底使用,因此適合開發效率高的語言。
現代操做系統對IO操做進行了巨大的改進,支持異步IO。利用異步IO,就能夠用單進程模型來執行多任務,這種全新的模型稱爲事件驅動型。
多臺電腦協助工做,一臺電腦做爲調度者,依靠網絡通訊,將任務分佈到其餘電腦的進程中。
經過manager模塊把Queue經過網絡暴露出去,讓其餘機器的進程能夠訪問Queue
服務器繼承中,負責啓動Queue,把Queue註冊到網絡上,而後往Queue裏面寫入任務:
# taskmanager.py import random, time, Queue from multiprocessing.managers import BaseManager # 發送任務的隊列: task_queue = Queue.Queue() # 接收結果的隊列: result_queue = Queue.Queue() # 從BaseManager繼承的QueueManager: class QueueManager(BaseManager): pass # 把兩個Queue都註冊到網絡上, callable參數關聯了Queue對象: QueueManager.register('get_task_queue', callable=lambda: task_queue) QueueManager.register('get_result_queue', callable=lambda: result_queue) # 綁定端口5000, 設置驗證碼'abc': manager = QueueManager(address=('', 5000), authkey='abc') # 啓動Queue: manager.start() # 得到經過網絡訪問的Queue對象: task = manager.get_task_queue() result = manager.get_result_queue() # 放幾個任務進去: for i in range(10): n = random.randint(0, 10000) print('Put task %d...' % n) task.put(n) # 從result隊列讀取結果: print('Try get results...') for i in range(10): r = result.get(timeout=10) print('Result: %s' % r) # 關閉: manager.shutdown()
在另外一臺機器上啓動任務進程:
# taskworker.py import time, sys, Queue from multiprocessing.managers import BaseManager # 建立相似的QueueManager: class QueueManager(BaseManager): pass # 因爲這個QueueManager只從網絡上獲取Queue,因此註冊時只提供名字: QueueManager.register('get_task_queue') QueueManager.register('get_result_queue') # 鏈接到服務器,也就是運行taskmanager.py的機器: server_addr = '127.0.0.1' print('Connect to server %s...' % server_addr) # 端口和驗證碼注意保持與taskmanager.py設置的徹底一致: m = QueueManager(address=(server_addr, 5000), authkey='abc') # 從網絡鏈接: m.connect() # 獲取Queue的對象: task = m.get_task_queue() result = m.get_result_queue() # 從task隊列取任務,並把結果寫入result隊列: for i in range(10): try: n = task.get(timeout=1) print('run task %d * %d...' % (n, n)) r = '%d * %d = %d' % (n, n, n*n) time.sleep(1) result.put(r) except Queue.Empty: print('task queue is empty.') # 處理結束: print('worker exit.')
服務進程啓動以下:
$ python taskmanager.py Put task 3411... Put task 1605... Put task 1398... Put task 4729... Put task 5300... Put task 7471... Put task 68... Put task 4219... Put task 339... Put task 7866... Try get results...
工做進程啓動以下:
$ python taskworker.py 127.0.0.1 Connect to server 127.0.0.1... run task 3411 * 3411... run task 1605 * 1605... run task 1398 * 1398... run task 4729 * 4729... run task 5300 * 5300... run task 7471 * 7471... run task 68 * 68... run task 4219 * 4219... run task 339 * 339... run task 7866 * 7866... worker exit.
等到工做進程結束後,服務進程以下:
Result: 3411 * 3411 = 11634921 Result: 1605 * 1605 = 2576025 Result: 1398 * 1398 = 1954404 Result: 4729 * 4729 = 22363441 Result: 5300 * 5300 = 28090000 Result: 7471 * 7471 = 55815841 Result: 68 * 68 = 4624 Result: 4219 * 4219 = 17799961 Result: 339 * 339 = 114921 Result: 7866 * 7866 = 61873956
注意Queue的做用是來傳遞任務和接受結果的,每一個任務的描述量要儘可能小。好比發送一個處理日誌文件的任務,不要發送幾百兆的日誌文件自己,而是發送日誌文件存放的完整路徑,由Worker進程再去共享的磁盤上讀取文件。
注:本文爲學習廖雪峯Python入門整理後的筆記