python多線程和多進程

1 概念梳理:

1.1 線程

1.1.1 什麼是線程

線程是操做系統可以進行運算調度的最小單位。它被包含在進程之中,是進程中的實際運做單位。一條線程指的是進程中一個單一順序的控制流,一個進程中能夠併發多個線程,每條線程並行執行不一樣的任務。一個線程是一個execution context(執行上下文),即一個cpu執行時所須要的一串指令。html

1.1.2 線程的工做方式

假設你正在讀一本書,沒有讀完,你想休息一下,可是你想在回來時恢復到當時讀的具體進度。有一個方法就是記下頁數、行數與字數這三個數值,這些數值就是execution context。若是你的室友在你休息的時候,使用相同的方法讀這本書。你和她只須要這三個數字記下來就能夠在交替的時間共同閱讀這本書了。python

線程的工做方式與此相似。CPU會給你一個在同一時間可以作多個運算的幻覺,實際上它在每一個運算上只花了極少的時間,本質上CPU同一時刻只幹了一件事。它能這樣作就是由於它有每一個運算的execution context。就像你可以和你朋友共享同一本書同樣,多任務也能共享同一塊CPU。linux

1.2 進程

一個程序的執行實例就是一個進程。每個進程提供執行程序所需的全部資源。(進程本質上是資源的集合)git

一個進程有一個虛擬的地址空間、可執行的代碼、操做系統的接口、安全的上下文(記錄啓動該進程的用戶和權限等等)、惟一的進程ID、環境變量、優先級類、最小和最大的工做空間(內存空間),還要有至少一個線程。程序員

每個進程啓動時都會最早產生一個線程,即主線程。而後主線程會再建立其餘的子線程。github

與進程相關的資源包括:python3.x

  • 內存頁(同一個進程中的全部線程共享同一個內存空間
  • 文件描述符(e.g. open sockets)
  • 安全憑證(e.g.啓動該進程的用戶ID)

1.3 進程與線程區別

1.同一個進程中的線程共享同一內存空間,可是進程之間是獨立的。
2.同一個進程中的全部線程的數據是共享的(進程通信),進程之間的數據是獨立的。
3.對主線程的修改可能會影響其餘線程的行爲,可是父進程的修改(除了刪除之外)不會影響其餘子進程。
4.線程是一個上下文的執行指令,而進程則是與運算相關的一簇資源。
5.同一個進程的線程之間能夠直接通訊,可是進程之間的交流須要藉助中間代理來實現。
6.建立新的線程很容易,可是建立新的進程須要對父進程作一次複製。
7.一個線程能夠操做同一進程的其餘線程,可是進程只能操做其子進程。
8.線程啓動速度快,進程啓動速度慢(可是二者運行速度沒有可比性)。安全

2 多線程

2.1 線程經常使用方法

方法 註釋
start() 線程準備就緒,等待CPU調度
setName() 爲線程設置名稱
getName() 獲取線程名稱
setDaemon(True) 設置爲守護線程
join() 逐個執行每一個線程,執行完畢後繼續往下執行
run() 線程被cpu調度後自動執行線程對象的run方法,若是想自定義線程類,直接重寫run方法就好了
2.1.1 Thread類

1.普通建立方式網絡

import threading import time def run(n): print("task", n) time.sleep(1) print('2s') time.sleep(1) print('1s') time.sleep(1) print('0s') time.sleep(1) t1 = threading.Thread(target=run, args=("t1",)) t2 = threading.Thread(target=run, args=("t2",)) t1.start() t2.start() """ task t1 task t2 2s 2s 1s 1s 0s 0s """

2.繼承threading.Thread來自定義線程類
其本質是重構Thread類中的run方法多線程

import threading import time class MyThread(threading.Thread): def __init__(self, n): super(MyThread, self).__init__() # 重構run函數必需要寫 self.n = n def run(self): print("task", self.n) time.sleep(1) print('2s') time.sleep(1) print('1s') time.sleep(1) print('0s') time.sleep(1) if __name__ == "__main__": t1 = MyThread("t1") t2 = MyThread("t2") t1.start() t2.start()
2.1.2 計算子線程執行的時間

注:sleep的時候是不會佔用cpu的,在sleep的時候操做系統會把線程暫時掛起。

join() #等此線程執行完後,再執行其餘線程或主線程 threading.current_thread() #輸出當前線程
import threading import time def run(n): print("task", n,threading.current_thread()) #輸出當前的線程 time.sleep(1) print('3s') time.sleep(1) print('2s') time.sleep(1) print('1s') strat_time = time.time() t_obj = [] #定義列表用於存放子線程實例 for i in range(3): t = threading.Thread(target=run, args=("t-%s" % i,)) t.start() t_obj.append(t) """ 由主線程生成的三個子線程 task t-0 <Thread(Thread-1, started 44828)> task t-1 <Thread(Thread-2, started 42804)> task t-2 <Thread(Thread-3, started 41384)> """ for tmp in t_obj: t.join() #爲每一個子線程添加join以後,主線程就會等這些子線程執行完以後再執行。 print("cost:", time.time() - strat_time) #主線程 print(threading.current_thread()) #輸出當前線程 """ <_MainThread(MainThread, started 43740)> """

2.1.3 統計當前活躍的線程數

因爲主線程比子線程快不少,當主線程執行active_count()時,其餘子線程都還沒執行完畢,所以利用主線程統計的活躍的線程數num = sub_num(子線程數量)+1(主線程自己)

import threading import time def run(n): print("task", n) time.sleep(1) #此時子線程停1s for i in range(3): t = threading.Thread(target=run, args=("t-%s" % i,)) t.start() time.sleep(0.5) #主線程停0.5秒 print(threading.active_count()) #輸出當前活躍的線程數 """ task t-0 task t-1 task t-2 4 """

因爲主線程比子線程慢不少,當主線程執行active_count()時,其餘子線程都已經執行完畢,所以利用主線程統計的活躍的線程數num = 1(主線程自己)

import threading import time def run(n): print("task", n) time.sleep(0.5) #此時子線程停0.5s for i in range(3): t = threading.Thread(target=run, args=("t-%s" % i,)) t.start() time.sleep(1) #主線程停1秒 print(threading.active_count()) #輸出活躍的線程數 """ task t-0 task t-1 task t-2 1 """

此外咱們還能發如今python內部默認會等待最後一個進程執行完後再執行exit(),或者說python內部在此時有一個隱藏的join()。

2.2 守護進程

咱們看下面這個例子,這裏使用setDaemon(True)把全部的子線程都變成了主線程的守護線程,所以當主進程結束後,子線程也會隨之結束。因此當主線程結束後,整個程序就退出了。

import threading import time def run(n): print("task", n) time.sleep(1) #此時子線程停1s print('3') time.sleep(1) print('2') time.sleep(1) print('1') for i in range(3): t = threading.Thread(target=run, args=("t-%s" % i,)) t.setDaemon(True) #把子進程設置爲守護線程,必須在start()以前設置 t.start() time.sleep(0.5) #主線程停0.5秒 print(threading.active_count()) #輸出活躍的線程數 """ task t-0 task t-1 task t-2 4 Process finished with exit code 0 """

2.3 GIL

在非python環境中,單核狀況下,同時只能有一個任務執行。多核時能夠支持多個線程同時執行。可是在python中,不管有多少核,同時只能執行一個線程。究其緣由,這就是因爲GIL的存在致使的。

GIL的全稱是Global Interpreter Lock(全局解釋器鎖),來源是python設計之初的考慮,爲了數據安全所作的決定。某個線程想要執行,必須先拿到GIL,咱們能夠把GIL看做是「通行證」,而且在一個python進程中,GIL只有一個。拿不到通行證的線程,就不容許進入CPU執行。GIL只在cpython中才有,由於cpython調用的是c語言的原生線程,因此他不能直接操做cpu,只能利用GIL保證同一時間只能有一個線程拿到數據。而在pypy和jpython中是沒有GIL的。

Python多線程的工做過程:
python在使用多線程的時候,調用的是c語言的原生線程。

  1. 拿到公共數據
  2. 申請gil
  3. python解釋器調用os原生線程
  4. os操做cpu執行運算
  5. 當該線程執行時間到後,不管運算是否已經執行完,gil都被要求釋放
  6. 進而由其餘進程重複上面的過程
  7. 等其餘進程執行完後,又會切換到以前的線程(從他記錄的上下文繼續執行)
    整個過程是每一個線程執行本身的運算,當執行時間到就進行切換(context switch)。
  • python針對不一樣類型的代碼執行效率也是不一樣的:

    一、CPU密集型代碼(各類循環處理、計算等等),在這種狀況下,因爲計算工做多,ticks計數很快就會達到閾值,而後觸發GIL的釋放與再競爭(多個線程來回切換固然是須要消耗資源的),因此python下的多線程對CPU密集型代碼並不友好。
    二、IO密集型代碼(文件處理、網絡爬蟲等涉及文件讀寫的操做),多線程可以有效提高效率(單線程下有IO操做會進行IO等待,形成沒必要要的時間浪費,而開啓多線程能在線程A等待時,自動切換到線程B,能夠不浪費CPU的資源,從而能提高程序執行效率)。因此python的多線程對IO密集型代碼比較友好。

  • 使用建議?

    python下想要充分利用多核CPU,就用多進程。由於每一個進程有各自獨立的GIL,互不干擾,這樣就能夠真正意義上的並行執行,在python中,多進程的執行效率優於多線程(僅僅針對多核CPU而言)。

  • GIL在python中的版本差別:

    一、在python2.x裏,GIL的釋放邏輯是當前線程碰見IO操做或者ticks計數達到100時進行釋放。(ticks能夠看做是python自身的一個計數器,專門作用於GIL,每次釋放後歸零,這個計數能夠經過sys.setcheckinterval 來調整)。而每次釋放GIL鎖,線程進行鎖競爭、切換線程,會消耗資源。而且因爲GIL鎖存在,python裏一個進程永遠只能同時執行一個線程(拿到GIL的線程才能執行),這就是爲何在多核CPU上,python的多線程效率並不高。
    二、在python3.x中,GIL不使用ticks計數,改成使用計時器(執行時間達到閾值後,當前線程釋放GIL),這樣對CPU密集型程序更加友好,但依然沒有解決GIL致使的同一時間只能執行一個線程的問題,因此效率依然不盡如人意。

2.4 線程鎖

因爲線程之間是進行隨機調度,而且每一個線程可能只執行n條執行以後,當多個線程同時修改同一條數據時可能會出現髒數據,因此,出現了線程鎖,即同一時刻容許一個線程執行操做。線程鎖用於鎖定資源,你能夠定義多個鎖, 像下面的代碼, 當你須要獨佔某一資源時,任何一個鎖均可以鎖這個資源,就比如你用不一樣的鎖均可以把相同的一個門鎖住是一個道理。

因爲線程之間是進行隨機調度,若是有多個線程同時操做一個對象,若是沒有很好地保護該對象,會形成程序結果的不可預期,咱們也稱此爲「線程不安全」。

#實測:在python2.七、mac os下,運行如下代碼可能會產生髒數據。可是在python3中就不必定會出現下面的問題。 import threading import time def run(n): global num num += 1 num = 0 t_obj = [] for i in range(20000): t = threading.Thread(target=run, args=("t-%s" % i,)) t.start() t_obj.append(t) for t in t_obj: t.join() print "num:", num """ 產生髒數據後的運行結果: num: 19999 """

2.5 互斥鎖(mutex)

爲了方式上面狀況的發生,就出現了互斥鎖(Lock)

import threading import time def run(n): lock.acquire() #獲取鎖 global num num += 1 lock.release() #釋放鎖 lock = threading.Lock() #實例化一個鎖對象 num = 0 t_obj = [] for i in range(20000): t = threading.Thread(target=run, args=("t-%s" % i,)) t.start() t_obj.append(t) for t in t_obj: t.join() print "num:", num

2.6 遞歸鎖

RLcok類的用法和Lock類如出一轍,但它支持嵌套,,在多個鎖沒有釋放的時候通常會使用使用RLcok類。

import threading import time gl_num = 0 lock = threading.RLock() def Func(): lock.acquire() global gl_num gl_num +=1 time.sleep(1) print gl_num lock.release() for i in range(10): t = threading.Thread(target=Func) t.start()

2.7 信號量(BoundedSemaphore類)

互斥鎖同時只容許一個線程更改數據,而Semaphore是同時容許必定數量的線程更改數據 ,好比廁全部3個坑,那最多隻容許3我的上廁所,後面的人只能等裏面有人出來了才能再進去。

import threading import time def run(n): semaphore.acquire() #加鎖 time.sleep(1) print("run the thread:%s\n" % n) semaphore.release() #釋放 num = 0 semaphore = threading.BoundedSemaphore(5) # 最多容許5個線程同時運行 for i in range(22): t = threading.Thread(target=run, args=("t-%s" % i,)) t.start() while threading.active_count() != 1: pass # print threading.active_count() else: print('-----all threads done-----')

2.8 事件(Event類)

python線程的事件用於主線程控制其餘線程的執行,事件是一個簡單的線程同步對象,其主要提供如下幾個方法:

方法 註釋
clear 將flag設置爲「False」
set 將flag設置爲「True」
is_set 判斷是否設置了flag
wait 會一直監聽flag,若是沒有檢測到flag就一直處於阻塞狀態

事件處理的機制:全局定義了一個「Flag」,當flag值爲「False」,那麼event.wait()就會阻塞,當flag值爲「True」,那麼event.wait()便再也不阻塞。

#利用Event類模擬紅綠燈 import threading import time event = threading.Event() def lighter(): count = 0 event.set() #初始值爲綠燈 while True: if 5 < count <=10 : event.clear() # 紅燈,清除標誌位 print("\33[41;1mred light is on...\033[0m") elif count > 10: event.set() # 綠燈,設置標誌位 count = 0 else: print("\33[42;1mgreen light is on...\033[0m") time.sleep(1) count += 1 def car(name): while True: if event.is_set(): #判斷是否設置了標誌位 print("[%s] running..."%name) time.sleep(1) else: print("[%s] sees red light,waiting..."%name) event.wait() print("[%s] green light is on,start going..."%name) light = threading.Thread(target=lighter,) light.start() car = threading.Thread(target=car,args=("MINI",)) car.start()

2.9 條件(Condition類)

使得線程等待,只有知足某條件時,才釋放n個線程

2.10 定時器(Timer類)

定時器,指定n秒後執行某操做

from threading import Timer def hello(): print("hello, world") t = Timer(1, hello) t.start() # after 1 seconds, "hello, world" will be printed

3 多進程

在linux中,每一個進程都是由父進程提供的。每啓動一個子進程就從父進程克隆一份數據,可是進程之間的數據自己是不能共享的。

from multiprocessing import Process import time def f(name): time.sleep(2) print('hello', name) if __name__ == '__main__': p = Process(target=f, args=('bob',)) p.start() p.join() 
from multiprocessing import Process import os def info(title): print(title) print('module name:', __name__) print('parent process:', os.getppid()) #獲取父進程id print('process id:', os.getpid()) #獲取本身的進程id print("\n\n") def f(name): info('\033[31;1mfunction f\033[0m') print('hello', name) if __name__ == '__main__': info('\033[32;1mmain process line\033[0m') p = Process(target=f, args=('bob',)) p.start() p.join()

3.1 進程間通訊

因爲進程之間數據是不共享的,因此不會出現多線程GIL帶來的問題。多進程之間的通訊經過Queue()或Pipe()來實現

3.1.1 Queue()

使用方法跟threading裏的queue差很少

from multiprocessing import Process, Queue def f(q): q.put([42, None, 'hello']) if __name__ == '__main__': q = Queue() p = Process(target=f, args=(q,)) p.start() print(q.get()) # prints "[42, None, 'hello']" p.join()
3.1.2 Pipe()

Pipe的本質是進程之間的數據傳遞,而不是數據共享,這和socket有點像。pipe()返回兩個鏈接對象分別表示管道的兩端,每端都有send()和recv()方法。若是兩個進程試圖在同一時間的同一端進行讀取和寫入那麼,這可能會損壞管道中的數據。

from multiprocessing import Process, Pipe def f(conn): conn.send([42, None, 'hello']) conn.close() if __name__ == '__main__': parent_conn, child_conn = Pipe() p = Process(target=f, args=(child_conn,)) p.start() print(parent_conn.recv()) # prints "[42, None, 'hello']" p.join()

3.2 Manager

經過Manager可實現進程間數據的共享。Manager()返回的manager對象會經過一個服務進程,來使其餘進程經過代理的方式操做python對象。manager對象支持 listdictNamespaceLockRLockSemaphoreBoundedSemaphoreConditionEventBarrierQueueValue ,Array.

from multiprocessing import Process, Manager def f(d, l): d[1] = '1' d['2'] = 2 d[0.25] = None l.append(1) print(l) if __name__ == '__main__': with Manager() as manager: d = manager.dict() l = manager.list(range(5)) p_list = [] for i in range(10): p = Process(target=f, args=(d, l)) p.start() p_list.append(p) for res in p_list: res.join() print(d) print(l)

3.3 進程鎖(進程同步)

數據輸出的時候保證不一樣進程的輸出內容在同一塊屏幕正常顯示,防止數據亂序的狀況。
Without using the lock output from the different processes is liable to get all mixed up.

from multiprocessing import Process, Lock def f(l, i): l.acquire() try: print('hello world', i) finally: l.release() if __name__ == '__main__': lock = Lock() for num in range(10): Process(target=f, args=(lock, num)).start()

3.4 進程池

因爲進程啓動的開銷比較大,使用多進程的時候會致使大量內存空間被消耗。爲了防止這種狀況發生可使用進程池,(因爲啓動線程的開銷比較小,因此不須要線程池這種概念,多線程只會頻繁得切換cpu致使系統變慢,並不會佔用過多的內存空間)

進程池中經常使用方法:
apply() 同步執行(串行)
apply_async() 異步執行(並行)
terminate() 馬上關閉進程池
join() 主進程等待全部子進程執行完畢。必須在close或terminate()以後。
close() 等待全部進程結束後,才關閉進程池。

from multiprocessing import Process,Pool import time def Foo(i): time.sleep(2) return i+100 def Bar(arg): print('-->exec done:',arg) pool = Pool(5) #容許進程池同時放入5個進程 for i in range(10): pool.apply_async(func=Foo, args=(i,),callback=Bar) #func子進程執行完後,纔會執行callback,不然callback不執行(並且callback是由父進程來執行了) #pool.apply(func=Foo, args=(i,)) print('end') pool.close() pool.join() #主進程等待全部子進程執行完畢。必須在close()或terminate()以後。

進程池內部維護一個進程序列,當使用時,去進程池中獲取一個進程,若是進程池序列中沒有可供使用的進程,那麼程序就會等待,直到進程池中有可用進程爲止。在上面的程序中產生了10個進程,可是隻能有5同時被放入進程池,剩下的都被暫時掛起,並不佔用內存空間,等前面的五個進程執行完後,再執行剩下5個進程。

4 補充:協程

線程和進程的操做是由程序觸發系統接口,最後的執行者是系統,它本質上是操做系統提供的功能。而協程的操做則是程序員指定的,在python中經過yield,人爲的實現併發處理。

協程存在的意義:對於多線程應用,CPU經過切片的方式來切換線程間的執行,線程切換時須要耗時。協程,則只使用一個線程,分解一個線程成爲多個「微線程」,在一個線程中規定某個代碼塊的執行順序。

協程的適用場景:當程序中存在大量不須要CPU的操做時(IO)。
經常使用第三方模塊gevent和greenlet。(本質上,gevent是對greenlet的高級封裝,所以通常用它就行,這是一個至關高效的模塊。)

4.1 greenlet

from greenlet import greenlet def test1(): print(12) gr2.switch() print(34) gr2.switch() def test2(): print(56) gr1.switch() print(78) gr1 = greenlet(test1) gr2 = greenlet(test2) gr1.switch()

實際上,greenlet就是經過switch方法在不一樣的任務之間進行切換。

4.2 gevent

from gevent import monkey; monkey.patch_all() import gevent import requests def f(url): print('GET: %s' % url) resp = requests.get(url) data = resp.text print('%d bytes received from %s.' % (len(data), url)) gevent.joinall([ gevent.spawn(f, 'https://www.python.org/'), gevent.spawn(f, 'https://www.yahoo.com/'), gevent.spawn(f, 'https://github.com/'), ])

經過joinall將任務f和它的參數進行統一調度,實現單線程中的協程。代碼封裝層次很高,實際使用只須要了解它的幾個主要方法便可。

原文:https://www.cnblogs.com/whatisfantasy/p/6440585.html

相關文章
相關標籤/搜索