GIL , 線程池 , 同步 , 異步 , 隊列 , 事件

一.什麼是GIL

官方解釋:
'''
In CPython, the global interpreter lock, or GIL, is a mutex that prevents multiple 
native threads from executing Python bytecodes at once. This lock is necessary mainly 
because CPython’s memory management is not thread-safe. (However, since the GIL 
exists, other features have grown to depend on the guarantees that it enforces.)
''' 釋義:
在CPython中,這個全局解釋器鎖,也稱爲GIL,是一個互斥鎖,防止多個線程在同一時間執行Python字節碼,這個鎖是很是重要的,
由於CPython的內存管理非線程安全的,不少其餘的特性依賴於GIL,因此即便它影響了程序效率也沒法將其直接去除 總結: 在CPython中,GIL會把線程的並行變成串行,致使效率下降

 

須要知道的是,解釋器並不僅有CPython,還有PyPy,JPython等等。GIL也僅存在與CPython中,這並非Python這門語言的問題,而是CPython解釋器的問題!python

 

 

二.GIL帶來的問題

首先必須明確執行一個py文件,分爲三個步驟mysql

  1. 從硬盤加載Python解釋器到內存程序員

  2. 從硬盤加載py文件到內存算法

  3. 解釋器解析py文件內容,交給CPU執行sql

其次須要明確的是每當執行一個py文件,就會當即啓動一個python解釋器,編程

當執行test.py時其內存結構以下:瀏覽器

GIL,叫作全局解釋器鎖,加到了解釋器上,而且是一把互斥鎖,那麼這把鎖對應用程序到底有什麼影響?安全

這就須要知道解釋器的做用,以及解釋器與應用程序代碼之間的關係服務器

py文件中的內容本質都是字符串,只有在被解釋器解釋時,才具有語法意義,解釋器會將py代碼翻譯爲當前系統支持的指令交給系統執行。微信

當進程中僅存在一條線程時,GIL鎖的存在沒有不會有任何影響,可是若是進程中有多個線程時,GIL鎖就開始發揮做用了。以下圖:

開啓子線程時,給子線程指定了一個target表示該子線程要處理的任務即要執行的代碼。代碼要執行則必須交由解釋器,即多個線程之間就須要共享解釋器,爲了不共享帶來的數據競爭問題,因而就給解釋器加上了互斥鎖!

因爲互斥鎖的特性,程序串行,保證數據安全,下降執行效率,GIL將使得程序總體效率下降!

 

 

三.爲何須要GIL

GIL與GC的孽緣 :

在使用Python中進行編程時,程序員無需參與內存的管理工做,這是由於Python有自帶的內存管理機制,簡稱GC。那麼GC與GIL有什麼關聯?

要搞清楚這個問題,需先了解GC的工做原理,Python中內存管理使用的是引用計數,每一個數會被加上一個整型的計數器,表示這個數據被引用的次數,當這個整數變爲0時則表示該數據已經沒有人使用,成了垃圾數據。

當內存佔用達到某個閾值時,GC會將其餘線程掛起,而後執行垃圾清理操做,垃圾清理也是一串代碼,也就須要一條線程來執行。

 

示例代碼:

from threading import  Thread
def task():
    a = 10
    print(a)

# 開啓三個子線程執行task函數
Thread(target=task).start()
Thread(target=task).start()
Thread(target=task).start()

 

上述代碼內存結構以下:

經過上圖能夠看出,GC與其餘線程都在競爭解釋器的執行權,而CPU什麼時候切換,以及切換到哪一個線程都是沒法預支的,這樣一來就形成了競爭問題 !

假設線程1正在定義變量a=10,而定義變量第一步會先到到內存中申請空間把10存進去,第二步將10的內存地址與變量名a進行綁定,若是在執行完第一步後,CPU切換到了GC線程,GC線程發現10的地址引用計數爲0則將其當成垃圾進行了清理,等CPU再次切換到線程1時,剛剛保存的數據10已經被清理掉了,致使沒法正常定義變量。

固然其餘一些涉及到內存的操做一樣可能產生問題,爲了不GC與其餘線程競爭解釋器帶來的問題,CPython簡單粗暴的給解釋器加了互斥鎖

 

以下圖所示:

有了GIL後,多個線程將不可能在同一時間使用解釋器,從而保證瞭解釋器的數據安全。

 

 

GIL的加鎖與解鎖時機

加鎖的時機:

  在調用解釋器時當即加鎖

解鎖時機:

  • 當前線程遇到了IO時釋放

  • 當前線程執行時間超過設定值時釋放

 

 

但咱們並不能所以就否定Python這門語言,其緣由以下:

  1. GIL僅僅在CPython解釋器中存在,在其餘的解釋器中沒有,並非Python這門語言的缺點

  2. 在單核處理器下,多線程之間原本就沒法真正的並行執行

  3. 在多核處理下,運算效率的確是比單核處理器高,可是要知道現代應用程序多數都是基於網絡的(qq,微信,爬蟲,瀏覽器等等),CPU的運行效率是沒法決定網絡速度的,而網絡的速度是遠遠比不上處理器的運算速度,則意味着每次處理器在執行運算前都須要等待網絡IO,這樣一來多核優點也就沒有那麼明顯了

    舉個例子:

    任務1 從網絡上下載一個網頁,等待網絡IO的時間爲1分鐘,解析網頁數據花費,1秒鐘

    任務2 將用戶輸入數據並將其轉換爲大寫,等待用戶輸入時間爲1分鐘,轉換爲大寫花費,1秒鐘

    單核CPU下:1.開啓第一個任務後進入等待。2.切換到第二個任務也進入了等待。一分鐘後解析網頁數據花費1秒解析完成切換到第二個任務,轉換爲大寫花費1秒,那麼總耗時爲:1分+1秒+1秒 = 1分鐘2秒

    多核CPU下:1.CPU1處理第一個任務等待1分鐘,解析花費1秒鐘。1.CPU2處理第二個任務等待1分鐘,轉換大寫花費1秒鐘。因爲兩個任務是並行執行的因此總的執行時間爲1分鐘+1秒鐘 = 1分鐘1秒

    能夠發現,多核CPU對於總的執行時間提高只有1秒,可是這邊的1秒其實是誇張了,轉換大寫操做不可能須要1秒,時間很是短!

    上面的兩個任務都是須要大量IO時間的,這樣的任務稱之爲IO密集型,與之對應的是計算密集型即IO操做較少大部分都是計算任務。

    對於計算密集型任務,Python多線程的確比不上其餘語言!爲了解決這個弊端,Python推出了多進程技術,能夠良好的利用多核處理器來完成計算密集任務。

    總結:

    1.單核下不管是IO密集仍是計算密集GIL都不會產生任何影響

    2.多核下對於IO密集任務,GIL會有細微的影響,基本能夠忽略

    3.Cpython中IO密集任務應該採用多線程,計算密集型應該採用多進程

另外:之因此普遍採用CPython解釋器,就是由於大量的應用程序都是IO密集型的,還有另外一個很重要的緣由是CPython能夠無縫對接各類C語言實現的庫,這對於一些數學計算相關的應用程序而言很是的happy,直接就能使用各類現成的算法

 

計算密集型的效率測試:

from multiprocessing import Process
from threading import Thread
import time


def task():
    for i in range(10000000):
        i += 1


if __name__ == '__main__':
    start_time = time.time()
# 多進程 p1 = Process(target=task) # 2.053471565246582 p2 = Process(target=task) p3 = Process(target=task) p4 = Process(target=task) # 多線程 # p1 = Thread(target=task) # 3.169567823410034 # p2 = Thread(target=task) # p3 = Thread(target=task) # p4 = Thread(target=task) p1.start() p2.start() p3.start() p4.start() p1.join() p2.join() p3.join() p4.join() print(time.time() - start_time)

 

IO密集型的效率測試 :

from multiprocessing import Process
from threading import Thread
import time
def task():
    with open("test.txt",encoding="utf-8") as f:
        f.read()
if __name__ == '__main__':
    start_time = time.time()
    # 多進程
    # p1 = Process(target=task)
    # p2 = Process(target=task)
    # p3 = Process(target=task)
    # p4 = Process(target=task)

    # 多線程
    p1 = Thread(target=task)
    p2 = Thread(target=task)
    p3 = Thread(target=task)
    p4 = Thread(target=task)

    p1.start()
    p2.start()
    p3.start()
    p4.start()

    p1.join()
    p2.join()
    p3.join()
    p4.join()

    print(time.time()-start_time)

 

 

五.自定義的線程鎖與GIL的區別

GIL保護的是解釋器級別的數據安全,好比對象的引用計數,垃圾分代數據等等,具體參考垃圾回收機制詳解。

對於程序中本身定義的數據則沒有任何的保護效果,這一點在沒有介紹GIL前咱們就已經知道了,因此當程序中出現了共享自定義的數據時就要本身加鎖

以下例:

from threading import Thread,Lock
import time

a = 0
def task():
    global a
    temp = a
    time.sleep(0.01) 
    a = temp + 1
    
t1 = Thread(target=task)
t2 = Thread(target=task)
t1.start()
t2.start()


t1.join()
t2.join()
print(a)

 

過程分析:

1.線程1得到CPU執行權,並獲取GIL鎖執行代碼 ,獲得a的值爲0後進入睡眠,釋放CPU並釋放GIL

2.線程2得到CPU執行權,並獲取GIL鎖執行代碼 ,獲得a的值爲0後進入睡眠,釋放CPU並釋放GIL

3.線程1睡醒後得到CPU執行權,並獲取GIL執行代碼 ,將temp的值0+1後賦給a,執行完畢釋放CPU並釋放GIL

4.線程2睡醒後得到CPU執行權,並獲取GIL執行代碼 ,將temp的值0+1後賦給a,執行完畢釋放CPU並釋放GIL,最後a的值也就是1

之因此出現問題是由於兩個線程在併發的執行同一段代碼,解決方案就是加鎖!

 

from threading import Thread,Lock
import time

lock = Lock()
a = 0
def task():
    global a
    lock.acquire()
    temp = a
    time.sleep(0.01)
    a = temp + 1
    lock.release() 

    
t1 = Thread(target=task)
t2 = Thread(target=task)

t1.start()
t2.start()

t1.join()
t2.join()
print(a)

 

過程分析:

1.線程1得到CPU執行權,並獲取GIL鎖執行代碼 ,獲得a的值爲0後進入睡眠,釋放CPU並釋放GIL,不釋放lock

2.線程2得到CPU執行權,並獲取GIL鎖,嘗試獲取lock失敗,沒法執行,釋放CPU並釋放GIL

3.線程1睡醒後得到CPU執行權,並獲取GIL繼續執行代碼 ,將temp的值0+1後賦給a,執行完畢釋放CPU釋放GIL,釋放lock,此時a的值爲1

4.線程2得到CPU執行權,獲取GIL鎖,嘗試獲取lock成功,執行代碼,獲得a的值爲1後進入睡眠,釋放CPU並釋放GIL,不釋放lock

5.線程2睡醒後得到CPU執行權,獲取GIL繼續執行代碼 ,將temp的值1+1後賦給a,執行完畢釋放CPU釋放GIL,釋放lock,此時a的值爲2

 

 

 

 

六:進程池與線程池

什麼是進程/線程池?

池表示一個容器,本質上就是一個存儲進程或線程的列表

 

池子中存儲線程仍是進程?

若是是IO密集型任務使用線程池,若是是計算密集任務則使用進程池

 

爲何須要進程/線程池?

在不少狀況下須要控制進程或線程的數量在一個合理的範圍,例如TCP程序中,一個客戶端對應一個線程,雖然線程的開銷小,但確定不能無限的開,不然系統資源早晚被耗盡,解決的辦法就是控制線程的數量。

線程/進程池不只幫咱們控制線程/進程的數量,還幫咱們完成了線程/進程的建立,銷燬,以及任務的分配

 

進程池的使用:

from concurrent.futures import ThreadPoolExecutor,ProcessPoolExecutor
import time,os

# 建立進程池,指定最大進程數爲3,此時不會建立進程,不指定數量時,默認爲CPU和核數
pool = ProcessPoolExecutor(3)

def task():
    time.sleep(1)
    print(os.getpid(),"working..")

if __name__ == '__main__':
    for i in range(10):
        pool.submit(task) # 提交任務時當即建立進程

    # 任務執行完成後也不會當即銷燬進程
    time.sleep(2)

    for i in range(10):
        pool.submit(task) #再有新任務是 直接使用以前已經建立好的進程來執行

 

線程池的使用:

from concurrent.futures import ThreadPoolExecutor,ProcessPoolExecutor
from threading import current_thread,active_count
import time,os

# 建立進程池,指定最大線程數爲3,此時不會建立線程,不指定數量時,默認爲CPU和核數*5
pool = ThreadPoolExecutor(3)
print(active_count()) # 只有一個主線

def task():
    time.sleep(1)
    print(current_thread().name,"working..")

if __name__ == '__main__':
    for i in range(10):
        pool.submit(task) # 第一次提交任務時當即建立線程

    # 任務執行完成後也不會當即銷燬
    time.sleep(2)

    for i in range(10):
        pool.submit(task) #再有新任務時 直接使用以前已經建立好的線程來執行

 

案例:TCP中的應用

首先要明確,TCP是IO密集型,應該使用線程池

 

 

 

七.同步異步-阻塞非阻塞

同步異步-阻塞非阻塞,常常會被程序員說起,而且概念很是容易混淆!

 

阻塞非阻塞 ------指的是程序的運行狀態

阻塞:當程序執行過程當中遇到了IO操做,在執行IO操做時,程序沒法繼續執行其餘代碼,稱爲阻塞!

非阻塞:程序在正常運行沒有遇到IO操做,或者經過某種方式使程序即便遇到了也不會停在原地,還能夠執行其餘操做,以提升CPU的佔用率

 

同步-異步-------- 指的是提交任務的方式

同步:指調用發起任務後必須在原地等待任務執行完成,才能繼續執行

異步:指調用發起任務後不用等待任務執行,能夠當即開啓執行其餘操做

 

同步會有等待的效果可是這和阻塞是徹底不一樣的,阻塞時程序會被剝奪CPU執行權,而同步調用則不會!

 

很明顯異步調用效率更高,可是任務的執行結果如何獲取呢?

 

程序中的異步調用並獲取結果方式1:

from concurrent.futures import ThreadPoolExecutor
from threading import current_thread
import time

pool = ThreadPoolExecutor(3)
def task(i):
    time.sleep(0.01)
    print(current_thread().name,"working..")
    return i ** i

if __name__ == '__main__':
    objs = []
    for i in range(3):
        res_obj = pool.submit(task,i) # 異步方式提交任務# 會返回一個對象用於表示任務結果
        objs.append(res_obj)

# 該函數默認是阻塞的 會等待池子中全部任務執行結束後執行
pool.shutdown(wait=True)

# 從結果對象中取出執行結果
for res_obj in objs:
    print(res_obj.result())
print("over")

 

程序中的異步調用並獲取結果方式2:

from concurrent.futures import ThreadPoolExecutor
from threading import current_thread
import time

pool = ThreadPoolExecutor(3)
def task(i):
    time.sleep(0.01)
    print(current_thread().name,"working..")
    return i ** i

if __name__ == '__main__':
    objs = []
    for i in range(3):
        res_obj = pool.submit(task,i) # 會返回一個對象用於表示任務結果
        print(res_obj.result()) #result是同步的一旦調用就必須等待 任務執行完成拿到結果
print("over")

 

 

 

8.異步回調

什麼是異步回調

異步回調指的是:在發起一個異步任務的同時指定一個函數,在異步任務完成時會自動的調用這個函數

 

爲何須要異步回調

以前在使用線程池或進程池提交任務時,若是想要處理任務的執行結果則必須調用result函數或是shutdown函數,而它們都是是阻塞的,會等到任務執行完畢後才能繼續執行,這樣一來在這個等待過程當中就沒法執行其餘任務,下降了效率,因此須要一種方案,即保證解析結果的線程不用等待,又能保證數據可以及時被解析,該方案就是異步回調

 

異步回調的使用

先來看一個案例:

在編寫爬蟲程序時,一般都是兩個步驟:

1.從服務器下載一個網頁文件

2.讀取而且解析文件內容,提取有用的數據

按照以上流程能夠編寫一個簡單的爬蟲程序

  要請求網頁數據則須要使用到第三方的請求庫requests能夠經過pip或是pycharm來安裝,在pycharm中點擊settings->解釋器->點擊+號->搜索requests->安裝

 

import requests,re,os,random,time
from concurrent.futures import ProcessPoolExecutor

def get_data(url):
    print("%s 正在請求%s" % (os.getpid(),url))
    time.sleep(random.randint(1,2))
    response = requests.get(url)
    print(os.getpid(),"請求成功 數據長度",len(response.content))
    #parser(response) # 3.直接調用解析方法  哪一個進程請求完成就那個進程解析數據  強行使兩個操做耦合到一塊兒了
    return response

def parser(obj):
    data = obj.result()
    htm = data.content.decode("utf-8")
    ls = re.findall("href=.*?com",htm)
    print(os.getpid(),"解析成功",len(ls),"個連接")

if __name__ == '__main__':
    pool = ProcessPoolExecutor(3)
    urls = ["https://www.baidu.com",
            "https://www.sina.com",
            "https://www.python.org",
            "https://www.tmall.com",
            "https://www.mysql.com",
            "https://www.apple.com.cn"]
    # objs = []
    for url in urls:
        # res = pool.submit(get_data,url).result() # 1.同步的方式獲取結果 將致使全部請求任務不能併發
        # parser(res)

        obj = pool.submit(get_data,url) # 
        obj.add_done_callback(parser) # 4.使用異步回調,保證了數據能夠被及時處理,而且請求和解析解開了耦合
        # objs.append(obj)
        
    # pool.shutdown() # 2.等待全部任務執行結束在統一的解析
    # for obj in objs:
    #     res = obj.result()
    #     parser(res)
    # 1.請求任務能夠併發 可是結果不能被及時解析 必須等全部請求完成才能解析
    # 2.解析任務變成了串行

 

總結:異步回調使用方法就是在提交任務後獲得一個Futures對象,調用對象的add_done_callback來指定一個回調函數,

若是把任務比喻爲燒水,沒有回調時就只能守着水壺等待水開,有了回調至關於換了一個會響的水壺,燒水期間可用做其餘的事情,等待水開了水壺會自動發出聲音,這時候再回來處理。水壺自動發出聲音就是回調。

注意:

  1. 使用進程池時,回調函數都是主進程中執行執行

  2. 使用線程池時,回調函數的執行線程是不肯定的,哪一個線程空閒就交給哪一個線程

  3. 回調函數默認接收一個參數就是這個任務對象本身,再經過對象的result函數來獲取任務的處理結果

 

 

 

9.線程隊列

1.Queue 先進先出隊列

與多進程中的Queue使用方式徹底相同,區別僅僅是不能被多進程共享。

q =  Queue(3)
q.put(1)
q.put(2)
q.put(3)
print(q.get(timeout=1))
print(q.get(timeout=1))
print(q.get(timeout=1))

 

2.LifoQueue 後進先出隊列

該隊列能夠模擬堆棧,實現先進後出,後進先出

lq = LifoQueue()

lq.put(1)
lq.put(2)
lq.put(3)

print(lq.get())
print(lq.get())
print(lq.get())

 

3.PriorityQueue 優先級隊列

該隊列能夠爲每一個元素指定一個優先級,這個優先級能夠是數字,字符串或其餘類型,可是必須是能夠比較大小的類型,取出數據時會按照從小到大的順序取出

pq = PriorityQueue()
# 數字優先級
pq.put((10,"a"))
pq.put((11,"a"))
pq.put((-11111,"a"))

print(pq.get())
print(pq.get())
print(pq.get())
# 字符串優先級
pq.put(("b","a"))
pq.put(("c","a"))
pq.put(("a","a"))

print(pq.get())
print(pq.get())
print(pq.get())

 

 

10.線程事件Event

什麼是事件

事件表示在某個時間發生了某個事情的通知信號,用於線程間協同工做。

由於不一樣線程之間是獨立運行的狀態不可預測,因此一個線程與另外一個線程間的數據是不一樣步的,當一個線程須要利用另外一個線程的狀態來肯定本身的下一步操做時,就必須保持線程間數據的同步,Event就能夠實現線程間同步

Event介紹

Event象包含一個可由線程設置的信號標誌,它容許線程等待某些事件的發生。在 初始狀況下,Event對象中的信號標誌被設置爲假。若是有線程等待一個Event對象, 而這個Event對象的標誌爲假,那麼這個線程將會被一直阻塞直至該標誌爲真。一個線程若是將一個Event對象的信號標誌設置爲真,它將喚醒全部等待這個Event對象的線程。若是一個線程等待一個已經被設置爲真的Event對象,那麼它將忽略這個事件, 繼續執行

 

可用方法:

event.isSet()   #:返回event的狀態值;
event.wait()   #:將阻塞線程;直到event的狀態爲True
event.set()    #:設置event的狀態值爲True,全部阻塞池的線程激活進入就緒狀態, 等待操做系統調度;
event.clear()    #:恢復event的狀態值爲False

 

使用案例:


#
在連接mysql服務器前必須保證mysql已經啓動,而啓動須要花費一些時間,因此客戶端不能當即發起連接 須要等待msyql啓動完成後當即發起連接 from threading import Event,Thread import time boot = False def start(): global boot print("正正在啓動服務器.....") time.sleep(5) print("服務器啓動完成!") boot = True def connect(): while True: if boot: print("連接成功") break else: print("連接失敗") time.sleep(1) Thread(target=start).start() Thread(target=connect).start() Thread(target=connect).start()

 

使用Event改造後:

from threading import Event,Thread
import time

e = Event()
def start():print("正正在啓動服務器.....")
    time.sleep(3)
    print("服務器啓動完成!")
    e.set()

def connect():
    e.wait()
    print("連接成功")
    
Thread(target=start).start()
Thread(target=connect).start()
Thread(target=connect).start()

 

增長需求,每次嘗試連接等待1秒,嘗試次數爲3次

from threading import Event,Thread
import time

e = Event()
def start():
    global boot
    print("正正在啓動服務器.....")
    time.sleep(5)
    print("服務器啓動完成!")
    e.set()

def connect():
    for i in range(1,4):
        print("第%s次嘗試連接" % i)
        e.wait(1)
        if e.isSet():
            print("連接成功")
            break
        else:
            print("第%s次連接失敗" % i)
    else:
        print("服務器未啓動!")

Thread(target=start).start()
Thread(target=connect).start()
# Thread(target=connect).start()
相關文章
相關標籤/搜索