併發編程之線程進階

1、互斥鎖算法

多進程中數據不安全,因此要加鎖。數據庫

多線程雖然有GIL鎖,可是因爲GIL鎖輪轉的策略(多線程之間時間片的輪轉),仍存在數據不安全的狀況,可是相對概率較低。安全

GIL鎖輪轉的策略:早期執行700條指令(不是700行,+= 操做至關於4條指令),如今是執行一個時間片時間,當前線程會讓出cpu給其餘線程使用。多線程

dis模塊中的方法能夠查看某個操做對應的cpu指令併發

 

解決線程之間的數據安全的問題:app

①多線程中,不在線程中操做全局變量dom

②涉及+=,-=,lis[0]+1,相似的操做必定要加鎖異步

③列表、字典自帶的方法都是線程安全的分佈式

④隊列也是數據安全的ide

 

線程不安全的案例

from threading import Thread count = 0 def fun_add(): global count for i in range(100000): count += 1

def fun_sub(): global count for i in range(100000): count -= 1 t_lis = [] for i in range(10): t1 = Thread(target=fun_add) t1.start() t_lis.append(t1) t2 = Thread(target=fun_sub) t2.start() t_lis.append(t2) for t in t_lis: t.join() print(count)            # -98445

 

使用互斥鎖解決線程安全問題(操做的指令都加上鎖)

from threading import Thread,Lock count = 0 def fun_add(lock): global count for i in range(100000): lock.acquire() count += 1 lock.release() def fun_sub(lock): global count for i in range(100000): lock.acquire() count -= 1 lock.release() t_lis = [] lock = Lock()                   # 建立鎖對象
for i in range(10): t1 = Thread(target=fun_add,args=(lock,)) t1.start() t_lis.append(t1) t2 = Thread(target=fun_sub,args=(lock,)) t2.start() t_lis.append(t2) for t in t_lis: t.join() # 等待全部的子線程執行完
print(count)                    # 0

 

2、遞歸鎖

當在併發的狀況下使用兩把鎖,會形成死鎖的現象。一個線程搶佔到一把鎖,另外一個線程搶佔到另外一把鎖,而操做須要同時搶佔兩把鎖才能執行操做。

解決方案:遞歸鎖

多少個acquire上鎖,就要有多少個release釋放鎖,一個線程先acquire後,其餘的線程只能等着。這個鎖比如像一串鑰匙。

 

遞歸鎖和互斥鎖的區別:

互斥鎖是兩把鎖多個線程搶佔,而遞歸鎖是一把鎖多個線程搶佔

在一個線程裏,用多個鎖的時候,用遞歸鎖實例化一個鎖,acquire屢次

在一個線程裏,只用一個所的時候,用互斥鎖爲了提升效率,在鎖多個資源的時候,應該酌情選用互斥鎖,用完一個資源應該立刻釋放

 

遞歸鎖可以快速的解決死鎖問題,可是遞歸鎖並非一個好的解決方案,死鎖現象的發生不是互斥鎖的問題,而是代碼的邏輯問題,遞歸鎖只是臨時快速解決死鎖的有效方案,解決時只需將遞歸鎖替換互斥鎖。後續須要將遞歸鎖從新替換成互斥鎖,完善代碼的邏輯,而且提升代碼的效率

多線程之間,用完一個資源再用另外一個資源,應該先釋放一個資源再去獲取一個資源的鎖

 

經典死鎖案例:科學家吃麪(互斥鎖)

from threading import Thread,Lock import time noodles_lock = Lock() fork_lock = Lock() def eat1(name,i,): fork_lock.acquire() print('%s%s拿到叉子'%(name,i)) noodles_lock.acquire() print('%s%s拿到麪條'%(name,i)) print('%s%s吃麪'%(name,i)) time.sleep(0.5) fork_lock.release() print('%s%s放下叉子' % (name, i)) noodles_lock.release() print('%s%s放下面條' % (name, i)) def eat2(name,i,): noodles_lock.acquire() print('%s%s拿到麪條'%(name,i)) fork_lock.acquire() print('%s%s拿到叉子' % (name, i)) print('%s%s吃麪'%(name,i)) time.sleep(0.5) noodles_lock.release() print('%s%s放下面條' % (name, i)) fork_lock.release() print('%s%s放下叉子' % (name, i)) for i in range(2):Thread(target=eat1,args=('科學家',i+1)).start() for i in range(3,5):Thread(target=eat2,args=('科學家',i)).start() ''' 科學家1拿到叉子 科學家1拿到麪條 科學家1吃麪 科學家1放下叉子 科學家1放下面條 科學家3拿到麪條 科學家2拿到叉子 '''

 

遞歸鎖解決方案

from threading import Thread,RLock import time fork_lock = noodles_lock = RLock()          # 遞歸鎖
def eat1(name,i,): fork_lock.acquire() print('%s%s拿到叉子'%(name,i)) noodles_lock.acquire() print('%s%s拿到麪條'%(name,i)) print('%s%s吃麪'%(name,i)) time.sleep(0.5) fork_lock.release() print('%s%s放下叉子' % (name, i)) noodles_lock.release() print('%s%s放下面條' % (name, i)) def eat2(name,i,): noodles_lock.acquire() print('%s%s拿到麪條'%(name,i)) fork_lock.acquire() print('%s%s拿到叉子' % (name, i)) print('%s%s吃麪'%(name,i)) time.sleep(0.5) noodles_lock.release() print('%s%s放下面條' % (name, i)) fork_lock.release() print('%s%s放下叉子' % (name, i)) for i in range(2):Thread(target=eat1,args=('科學家',i+1)).start() for i in range(3,5):Thread(target=eat2,args=('科學家',i)).start() ''' 科學家1拿到叉子 科學家1拿到麪條 科學家1吃麪 科學家1放下叉子 科學家1放下面條 科學家2拿到叉子 科學家2拿到麪條 科學家2吃麪 科學家2放下叉子 科學家2放下面條 科學家3拿到麪條 科學家3拿到叉子 科學家3吃麪 科學家3放下面條 科學家3放下叉子 科學家4拿到麪條 科學家4拿到叉子 科學家4吃麪 科學家4放下面條 科學家4放下叉子 '''

 

3、信號量

信號量是基於鎖+計數器實現的,使用方式跟進程的信號量同樣使用

from threading import Semaphore,Thread

from threading import Semaphore,Thread import time def func(index,sem): sem.acquire() print(index) time.sleep(2) sem.release() sem = Semaphore(4) for i in range(12): Thread(target=func,args=(i,sem)).start()

 

4、事件

事件的應用:檢測數據庫鏈接

from threading import Event,Thread

方法:

wait()    # 能夠設置阻塞的時間

set()     # 將信號設置爲true

clear()       # 將信號設置爲False

is_set()   # 查看信號的狀態

事件的默認狀態時False

# 檢測數據庫鏈接

from threading import Event,Thread import time def check(e): time.sleep(2) e.set() # 將信號設置爲True

def connect(e): for i in range(3): e.wait(1)       # 阻塞一秒
        if e.is_set():  # 查看信號的狀態 
            print('鏈接成功') break
    else:print('鏈接失敗') e = Event() Thread(target=check,args=(e,)).start() Thread(target=connect,args=(e,)).start()

 

5、條件

方法:

notify()      # 控制流量,通知多少個能夠經過,有參數。

wait()        # 阻塞全部進程

notify_all()      # 所有放行,通常配合notify()使用

這兩個方法都是線程不安全的,每一個方法使用的先後都須要加鎖,條件裏面有鎖的方法。

# 條件

from threading import Condition,Thread import time def get_through(name,c): print('%s在等待'%name) c.acquire() c.wait() # 阻塞,等待經過線程的命令
    print('%s經過'%name) c.release() name_list = ['劉一','陳二','張三','李四','王五','趙六','孫七','周八'] c = Condition() for i in name_list: t = Thread(target=get_through,args=(i,c)) t.start() for k in range(4): c.acquire() c.notify(2)         # 設置每次經過的線程數
 c.release() time.sleep(5)

 

6、定時器

 使用場景:定時任務

Timer(n,函數)    實例化時接收兩個參數,(執行的m秒數,執行的函數)

不影響主線程

# 定時器

from threading import Timer def func(): print('action') t = Timer(5,func)           # 建立子線程,而且設置開啓子線程的時間
t.start()

 

7、隊列

qps概念:每秒鐘接收到的請求數

隊列的線程是安全的,隊列用於作排隊相關的邏輯,幫助維持相應的順序

特色:先進先出

方法:

get()

put()

get_nowait()

put_nowait()

import queue q = queue.Queue() q.put(1) print(q.get())

 

8、新的隊列

from queue import LifoQueue

相似於棧,特色是後進先出,而且不容許插隊

應用:算法的完成,有點相似分佈式的思想,例如:三級菜單

from queue import LifoQueue q = LifoQueue() for i in range(1,6): q.put(i) for i in range(1,6): print(q.get(),end=' ')      # 5 4 3 2 1 

 

9、優先級隊列

只能放同一種相似的值

應用場景:會員服務

 

①若是是數值,按照數值從小到大取值

from queue import PriorityQueue q = PriorityQueue() q.put(10) q.put(5) q.put(20) for i in range(3): print(q.get(),end=' ')      # 5 10 20 

 

②若是是字符串,按照ASICC編碼來取值

from queue import PriorityQueue q = PriorityQueue() q.put('c') q.put('a') q.put('b') for i in range(3): print(q.get(), end=' ')     # a b c 

 

③若是是數字、字母組成的元組,按第一個元素來取值,從小到大取值

from queue import PriorityQueue q = PriorityQueue() q.put((3,'zxc')) q.put((3,'abc')) q.put((1,'asd')) q.put((2,'qwe')) for i in range(4): print(q.get(),end=' ')      # (1, 'asd') (2, 'qwe') (3, 'abc') (3, 'zxc') 

 

10、線程池

concurrent.futures 模塊不只提供線程池,還提供進程池。

from concurrent.futures import ThreadPoolExecutor         # 線程池
from concurrent.futures import ProcessPoolExecutor            # 進程池

實例化的線程池數量 = 5 * cpu_count

 

方法:

submit(函數,參數)    異步提交任務,只能按位置傳參,不用加args=

ret = submit()         獲取返回值,須要經過result()方法取值

ret.result()        獲取值

map(函數,iterable)     取代for循環submit操做

shutdown()         等於進程池的close()和join()方法,阻塞直到任務完成

 

① 有返回值

from concurrent.futures import ThreadPoolExecutor from threading import currentThread def func(i): print('子線程號:',currentThread().ident)        # 打印子線程的線程號
    return i * '*' tp = ThreadPoolExecutor(5)          # 建立線程池,建立5個線程
ret_lis = [] for i in range(15): ret = tp.submit(func,i)         # 異步提交任務
    ret_lis.append(ret)             # 將返回值存到列表
for ret in ret_lis: print(ret.result())               # 經過result()方法獲取返回值的值
print('主線程',currentThread().ident)   # 打印主線程的線程號

 

 

② 無返回值

from concurrent.futures import ThreadPoolExecutor from threading import currentThread def func(): print('子進程',currentThread().ident)      # 打印子線程的線程號
 tp = ThreadPoolExecutor(3)                    # 建立線程池,開啓3個線程
for i in range(9): tp.submit(func) # 異步提交任務
tp.shutdown()                                 # 阻塞主線程,待全部的子線程運行完
print('主線程',currentThread().ident)

 

③ map方法

使用may方法必須傳入參數

from concurrent.futures import ThreadPoolExecutor from threading import currentThread import time def func(n):                            # 使用map必須有一個參數
    print('子線程號:',currentThread().ident) time.sleep(1) tp = ThreadPoolExecutor(3) ret = tp.map(func,range(15))            # map函數會傳入一個參數 # for i in range(15): # tp.submit(func,i) # 異步提交任務
print('主線程:',currentThread().ident)

 

 

 11、回調函數

線程池和進程池的回調函數經過submit實現的,

add_done_callback調用回調函數,不須要傳參,回調函數須要經過result()取值

 

線程池的回調函數由子線程完成

from concurrent.futures import ThreadPoolExecutor from threading import currentThread import time def func(i): print('子線程:',currentThread().ident)         # 獲取子線程的線程號
    time.sleep(1) return i def call_back(ret): print('ret>',ret.result())                      # 經過result()方法取值
    print('callback線程號:',currentThread().ident)   # 獲取回調函數的線程號
 tp = ThreadPoolExecutor(3) for i in range(9): tp.submit(func,(i+1)).add_done_callback(call_back)  # add_done_callback回調函數的方法,函數不須要傳入參數
tp.shutdown()                                           # 阻塞主線程,等待全部子線程執行完
print('主線程號:',currentThread().ident)

 

進程池的回調函數由主進程完成

from concurrent.futures import ProcessPoolExecutor import time,os def func(i): print('子進程:',os.getpid())         # 獲取子進程的進程號
    time.sleep(1) return i def call_back(ret): print('ret>',ret.result())            # 經過result()方法取值
    print('callback進程號:',os.getpid())   # 獲取回調函數的進程號

if __name__ == '__main__': tp = ProcessPoolExecutor(3) for i in range(9): tp.submit(func,(i+1)).add_done_callback(call_back)  # add_done_callback回調函數的方法,函數不須要傳入參數
    tp.shutdown()                           # 阻塞主進程
    print('主線程號:',os.getpid())

 

12、local模塊

from threading import local

不一樣線程的ID存儲的值和取到的值是不一樣的

多個線程之間使用threading.local對象,能夠實現多個線程之間的數據隔離

import time
import random
from threading import local,Thread

loc = local()
def func2():
    global loc
    print(loc.name,loc.age)

def func1(name,age):
    global loc
    loc.name = name
    loc.age = age
    time.sleep(random.random())
    func2()

Thread(target=func1,args=('xiaobai',20)).start()
Thread(target=func1,args=('xiaohei',25)).start()

'''
xiaobai 20
xiaohei 25
'''
相關文章
相關標籤/搜索