線程相關 GIL queue event 死鎖與遞歸鎖 信號量l

#1、GIL全局解釋器鎖

GIl是一個互斥鎖:保證數據的安全(以犧牲效率來換取數據的安全)
阻止同一個進程內多個線程同時執行(不能並行但能實現併發)
併發:看起來像同時進行的)
GIL全局解釋器存在的緣由是由於Cpython解釋器的內存管理不是線程安全的

垃圾回收機制:  (能夠做爲一種線程)
    一、引用計數
    二、標記清除
    三、分帶回收

同一個進程下的多個線程不能實現並行可是可以實現併發,多個進程下的線程可以實現並行

一、存在四個任務:計算密集型的任務 每一個耗時10s
    單核狀況下:多線程好一點,消耗的資源較少
    多核狀況下:
        開四個進程:10s多一些
        一個進程下開啓四個線程:40多秒
二、存在四個任務:IO密集型的任務 每一個任務IO 10s
    單核狀況下:多線程好一些
    多核狀況下:多線程好一些

  

#計算密集與IO密集狀況下線程與進程的耗時比較

#計算密集型

from multiprocessing import Process
from threading import Thread
import os,time

def work():
    res = 0
    for i in range(12345678):
        res*=i

if __name__ == '__main__':
    l = []
    print(os.cpu_count())  #查看cpu核數
    start_time = time.time()
    for i in range(4):   #4個進程或者4個線程去計算
        # p = Process(target=work) #run time is 3.224184513092041
        p = Thread(target=work)   #run time is 6.93839693069458
        l.append(p)
        p.start()

    for p in l:
        p.join()
    stop_time = time.time()
    print('run time is %s'%(stop_time-start_time))


#IO密集型

from multiprocessing import Process
from threading import Thread
import time

def work():
    time.sleep(2)

if __name__ == '__main__':
    l = []
    start_time = time.time()
    for i in range(4):
        # p = Process(target=work)  #2.416138172149658
        p = Thread(target=work) #2.0021145343780518
        l.append(p)
        p.start()
    for p in l:
        p.join()
    stop_time = time.time()
    print(stop_time-start_time)


#結果驗證正確

  

#2、GIL與普通鎖的對比

#不加其餘鎖

from threading import Thread
import time

n = 100
def task():
    global n
    tmp = n
    time.sleep(0.1)
    n = tmp - 1

t_list = []
for i in range(100):
    t = Thread(target=task)
    t.start()
    t_list.append(t)
for t in t_list:
    t.join()
print(n)  #99
#解釋:當啓動線程後,因爲GIL的存在,一次只能運行一個線程,在其拿到n=100的值後,遇到IO,進入阻塞態,這時
#系統強制結束該線程,換其餘線程來執行,如此循環,致使全部線程拿到的都是n=100的值,最後一個線程結束後,返回的值是
# n=100-1,則爲99。 這個結果沒有達到咱們的初衷,所以須要依靠其餘鎖來保證數據的安全


#加上其餘鎖

from threading import Thread,Lock
import time

n=100
mutex = Lock()
def task():
    global n
    mutex.acquire()
    tmp = n
    time.sleep(0.1)
    n = tmp - 1
    mutex.release()

t_list = []
for i in range(100):
    t = Thread(target=task)
    t_list.append(t)
    t.start()
for t in t_list:
    t.join()
print(n) #0

#解釋:加上了鎖之後,第一個線程先拿到了鎖,拿到了n=100這個數據,而後遇到了IO阻塞,系統強制結束該進程,其餘線程開始運行,可是
#發現數據被加鎖了,沒法進一步運行,到時間了,系統又讓其強制結束,如此往復,直到第一個進程再次拿到了執行權限,此時的阻塞已通過去,
# 該進程進行相關運算,再釋放鎖,交給其餘線程去競爭,如此循環往復,最後獲得n = 0



所以對於不一樣的數據,要想保證安全,須要加上不一樣的鎖去處理
GIL並不能保證數據的安全,他是對Cpython解釋器加鎖,針對的是線程
保證的是在同一個進程中多個線程一個時間內只能運行一個線程

  

#3、死鎖與遞歸鎖

#死鎖

from threading import Thread,Lock
import time

mutex1 = Lock()
mutex2 = Lock()

class MyThead(Thread):
    def run(self):
        self.fun1()
        self.fun2()

    def fun1(self):
        mutex1.acquire()
        print('%s 搶到A鎖'%self.name)
        mutex2.acquire()
        print('%s 搶到了B鎖'%self.name)
        mutex2.release()
        print('%s釋放了B鎖'%self.name)
        mutex1.release()
        print('%s 釋放了A鎖'%self.name)

    def fun2(self):
        mutex2.acquire()
        print('%s搶到了B鎖'%self.name)
        time.sleep(1)
        mutex1.acquire()
        print('%s 搶到了A鎖'%self.name)
        mutex1.release()
        print('%s 釋放了A鎖'%self.name)
        mutex2.release()
        print('%s 釋放了B鎖'%self.name)

for i in range(100):
    t = MyThead()
    t.start()

#結果:

Thread-1 搶到A鎖
Thread-1 搶到了B鎖
Thread-1釋放了B鎖
Thread-1 釋放了A鎖
Thread-1搶到了B鎖
Thread-2 搶到A鎖
而後程序就會卡着,此時就陷入了死鎖 
由於線程1拿到了B鎖,線程2拿到了A鎖,此時彼此拿着對方的命脈,不給對方活路,除非有外力,否則就會一直這樣


#遞歸鎖

from threading import RLock,Thread
import time

mutexA = mutexB =RLock()

class MyThead(Thread):
    def run(self):
        self.fun1()
        self.fun2()
    def fun1(self):
        mutexA.acquire()
        print('%s搶到了A鎖'%self.name)
        mutexB.acquire()
        print('%s搶到了B鎖'%self.name)
        mutexB.release()
        print('%s釋放了B鎖'%self.name)
        mutexA.release()
        print('%s釋放了A鎖'%self.name)
    def fun2(self):
        mutexB.acquire()
        print('%s搶到了B鎖'%self.name)
        time.sleep(1)
        mutexA.acquire()
        print('%s搶到了A鎖'%self.name)
        mutexA.release()
        print('%s釋放了A鎖'%self.name)
        mutexB.release()
        print('%s釋放了B鎖'%self.name)

for i in range(10):
    t = MyThead()
    t.start()


#總結

自定義鎖一次acquire必須對應一次release,不能連續acquire
遞歸鎖Rlock:這個RLock內部維護着一個Lock和一個counter變量,counter記錄了acquire的次數,從而使得資源能夠被屢次require。
直到一個線程全部的acquire都被release,其餘的線程才能得到資源。
mutexA=mutexB=threading.RLock() #一個線程拿到鎖,counter加1,該線程內又碰到加鎖的狀況,則counter繼續加1,
這期間全部其餘線程都只能等待,等待該線程釋放全部鎖,即counter遞減到0爲止

  

#4、信號量

from threading import Thread,Semaphore
import time,random

sm = Semaphore(5)  #至關於廁全部5個茅坑

def task(name):
    sm.acquire()
    print('%s正在蹲坑'%name)
    time.sleep(random.randint(1,5)) #模擬蹲坑耗時
    sm.release()

for i in range(20):
    t = Thread(target=task,args=('巴豆%s號'%i,))
    t.start()
#和普通的互斥鎖區別在於,普通的是獨立衛生間,全部人搶一個坑位
#信號量 是公共衛生間,有多個茅坑,全部人搶多個坑位

  

#5、線程 queue   使用import queue ,用法與進程的Queue同樣
#先進先出 queue.Queue
#先進後出 queue.LifoQueue
#優先級   queue.PriorityQueue


import queue

q = queue.Queue(3) #隊列裏最多放置的數據個數
q.put(1)
q.put(2)
print(q.get()) #1
print(q.get()) #2

q = queue.LifoQueue(3)
q.put(1)
q.put(2)
print(q.get()) #2
print(q.get()) #1

q = queue.PriorityQueue(3)
q.put((10,'a'))
q.put((-1,'a'))
q.put((0,'a'))
print(q.get()) #(-1, 'a')
print(q.get()) #(0, 'a')
print(q.get()) #(10, 'a')
#對於優先級,元組裏的第一個元素一般是數字,也能夠是非數字之間去比較大小
#比較的結果中,該元素越小,優先級越高

  

#6、event事件

線程的一個關鍵特性是每一個線程都是獨立運行且狀態不可預測。若是程序中的其 他線程須要經過判斷某個線程的狀態來肯定本身下一步
的操做,這時線程同步問題就會變得很是棘手。爲了解決這些問題,咱們須要使用threading庫中的Event對象。 對象包含一個可由線程
設置的信號標誌,它容許線程等待某些事件的發生。在 初始狀況下,Event對象中的信號標誌被設置爲假。若是有線程等待一個Event對象
, 而這個Event對象的標誌爲假,那麼這個線程將會被一直阻塞直至該標誌爲真。一個線程若是將一個Event對象的信號標誌設置爲真,
它將喚醒全部等待這個Event對象的線程。
若是一個線程等待一個已經被設置爲真的Event對象,那麼它將忽略這個事件, 繼續執行

event.isSet():返回event的狀態值;

event.wait():若是 event.isSet()==False將阻塞線程;

event.set(): 設置event的狀態值爲True,全部阻塞池的線程激活進入就緒狀態, 等待操做系統調度;

event.clear():恢復event的狀態值爲False。

#多線程嘗試鏈接MySQL
from threading import Thread,Event
import threading
import time,random

# event = Event()
def conn_mysql():
    count = 1
    while not event.is_set():
        if count > 3:
            raise TimeoutError('連接超時')
        print('<%s>第%s嘗試連接'%(threading.current_thread().getName(),count))
        event.wait(0.5)
        count += 1
    print('<%s>連接成功'%threading.current_thread().getName())

def check_mysql():
    print('\033[45m[%s]正在檢查mysql\033[0m'%threading.current_thread().getName())
    time.sleep(random.random())
    event.set()
event = Event()
conn1 = Thread(target=conn_mysql)
conn2 = Thread(target=conn_mysql)
check = Thread(target=check_mysql)

conn1.start()
conn2.start()
check.start()
相關文章
相關標籤/搜索