1、互斥鎖算法
多進程中數據不安全,因此要加鎖。數據庫
多線程雖然有GIL鎖,可是因爲GIL鎖輪轉的策略(多線程之間時間片的輪轉),仍存在數據不安全的狀況,可是相對概率較低。安全
GIL鎖輪轉的策略:早期執行700條指令(不是700行,+= 操做至關於4條指令),如今是執行一個時間片時間,當前線程會讓出cpu給其餘線程使用。多線程
dis模塊中的方法能夠查看某個操做對應的cpu指令併發
解決線程之間的數據安全的問題:app
①多線程中,不在線程中操做全局變量dom
②涉及+=,-=,lis[0]+1,相似的操做必定要加鎖異步
③列表、字典自帶的方法都是線程安全的分佈式
④隊列也是數據安全的ide
線程不安全的案例
from threading import Thread count = 0 def fun_add(): global count for i in range(100000): count += 1
def fun_sub(): global count for i in range(100000): count -= 1 t_lis = [] for i in range(10): t1 = Thread(target=fun_add) t1.start() t_lis.append(t1) t2 = Thread(target=fun_sub) t2.start() t_lis.append(t2) for t in t_lis: t.join() print(count) # -98445
使用互斥鎖解決線程安全問題(操做的指令都加上鎖)
from threading import Thread,Lock count = 0 def fun_add(lock): global count for i in range(100000): lock.acquire() count += 1 lock.release() def fun_sub(lock): global count for i in range(100000): lock.acquire() count -= 1 lock.release() t_lis = [] lock = Lock() # 建立鎖對象
for i in range(10): t1 = Thread(target=fun_add,args=(lock,)) t1.start() t_lis.append(t1) t2 = Thread(target=fun_sub,args=(lock,)) t2.start() t_lis.append(t2) for t in t_lis: t.join() # 等待全部的子線程執行完
print(count) # 0
2、遞歸鎖
當在併發的狀況下使用兩把鎖,會形成死鎖的現象。一個線程搶佔到一把鎖,另外一個線程搶佔到另外一把鎖,而操做須要同時搶佔兩把鎖才能執行操做。
解決方案:遞歸鎖
多少個acquire上鎖,就要有多少個release釋放鎖,一個線程先acquire後,其餘的線程只能等着。這個鎖比如像一串鑰匙。
遞歸鎖和互斥鎖的區別:
互斥鎖是兩把鎖多個線程搶佔,而遞歸鎖是一把鎖多個線程搶佔
在一個線程裏,用多個鎖的時候,用遞歸鎖實例化一個鎖,acquire屢次
在一個線程裏,只用一個所的時候,用互斥鎖爲了提升效率,在鎖多個資源的時候,應該酌情選用互斥鎖,用完一個資源應該立刻釋放
遞歸鎖可以快速的解決死鎖問題,可是遞歸鎖並非一個好的解決方案,死鎖現象的發生不是互斥鎖的問題,而是代碼的邏輯問題,遞歸鎖只是臨時快速解決死鎖的有效方案,解決時只需將遞歸鎖替換互斥鎖。後續須要將遞歸鎖從新替換成互斥鎖,完善代碼的邏輯,而且提升代碼的效率
多線程之間,用完一個資源再用另外一個資源,應該先釋放一個資源再去獲取一個資源的鎖
經典死鎖案例:科學家吃麪(互斥鎖)
from threading import Thread,Lock import time noodles_lock = Lock() fork_lock = Lock() def eat1(name,i,): fork_lock.acquire() print('%s%s拿到叉子'%(name,i)) noodles_lock.acquire() print('%s%s拿到麪條'%(name,i)) print('%s%s吃麪'%(name,i)) time.sleep(0.5) fork_lock.release() print('%s%s放下叉子' % (name, i)) noodles_lock.release() print('%s%s放下面條' % (name, i)) def eat2(name,i,): noodles_lock.acquire() print('%s%s拿到麪條'%(name,i)) fork_lock.acquire() print('%s%s拿到叉子' % (name, i)) print('%s%s吃麪'%(name,i)) time.sleep(0.5) noodles_lock.release() print('%s%s放下面條' % (name, i)) fork_lock.release() print('%s%s放下叉子' % (name, i)) for i in range(2):Thread(target=eat1,args=('科學家',i+1)).start() for i in range(3,5):Thread(target=eat2,args=('科學家',i)).start() ''' 科學家1拿到叉子 科學家1拿到麪條 科學家1吃麪 科學家1放下叉子 科學家1放下面條 科學家3拿到麪條 科學家2拿到叉子 '''
遞歸鎖解決方案
from threading import Thread,RLock import time fork_lock = noodles_lock = RLock() # 遞歸鎖
def eat1(name,i,): fork_lock.acquire() print('%s%s拿到叉子'%(name,i)) noodles_lock.acquire() print('%s%s拿到麪條'%(name,i)) print('%s%s吃麪'%(name,i)) time.sleep(0.5) fork_lock.release() print('%s%s放下叉子' % (name, i)) noodles_lock.release() print('%s%s放下面條' % (name, i)) def eat2(name,i,): noodles_lock.acquire() print('%s%s拿到麪條'%(name,i)) fork_lock.acquire() print('%s%s拿到叉子' % (name, i)) print('%s%s吃麪'%(name,i)) time.sleep(0.5) noodles_lock.release() print('%s%s放下面條' % (name, i)) fork_lock.release() print('%s%s放下叉子' % (name, i)) for i in range(2):Thread(target=eat1,args=('科學家',i+1)).start() for i in range(3,5):Thread(target=eat2,args=('科學家',i)).start() ''' 科學家1拿到叉子 科學家1拿到麪條 科學家1吃麪 科學家1放下叉子 科學家1放下面條 科學家2拿到叉子 科學家2拿到麪條 科學家2吃麪 科學家2放下叉子 科學家2放下面條 科學家3拿到麪條 科學家3拿到叉子 科學家3吃麪 科學家3放下面條 科學家3放下叉子 科學家4拿到麪條 科學家4拿到叉子 科學家4吃麪 科學家4放下面條 科學家4放下叉子 '''
3、信號量
信號量是基於鎖+計數器實現的,使用方式跟進程的信號量同樣使用
from threading import Semaphore,Thread
from threading import Semaphore,Thread import time def func(index,sem): sem.acquire() print(index) time.sleep(2) sem.release() sem = Semaphore(4) for i in range(12): Thread(target=func,args=(i,sem)).start()
4、事件
事件的應用:檢測數據庫鏈接
from threading import Event,Thread
方法:
wait() # 能夠設置阻塞的時間
set() # 將信號設置爲true
clear() # 將信號設置爲False
is_set() # 查看信號的狀態
事件的默認狀態時False
# 檢測數據庫鏈接
from threading import Event,Thread import time def check(e): time.sleep(2) e.set() # 將信號設置爲True
def connect(e): for i in range(3): e.wait(1) # 阻塞一秒
if e.is_set(): # 查看信號的狀態
print('鏈接成功') break
else:print('鏈接失敗') e = Event() Thread(target=check,args=(e,)).start() Thread(target=connect,args=(e,)).start()
5、條件
方法:
notify() # 控制流量,通知多少個能夠經過,有參數。
wait() # 阻塞全部進程
notify_all() # 所有放行,通常配合notify()使用
這兩個方法都是線程不安全的,每一個方法使用的先後都須要加鎖,條件裏面有鎖的方法。
# 條件
from threading import Condition,Thread import time def get_through(name,c): print('%s在等待'%name) c.acquire() c.wait() # 阻塞,等待經過線程的命令
print('%s經過'%name) c.release() name_list = ['劉一','陳二','張三','李四','王五','趙六','孫七','周八'] c = Condition() for i in name_list: t = Thread(target=get_through,args=(i,c)) t.start() for k in range(4): c.acquire() c.notify(2) # 設置每次經過的線程數
c.release() time.sleep(5)
6、定時器
使用場景:定時任務
Timer(n,函數) 實例化時接收兩個參數,(執行的m秒數,執行的函數)
不影響主線程
# 定時器
from threading import Timer def func(): print('action') t = Timer(5,func) # 建立子線程,而且設置開啓子線程的時間
t.start()
7、隊列
qps概念:每秒鐘接收到的請求數
隊列的線程是安全的,隊列用於作排隊相關的邏輯,幫助維持相應的順序
特色:先進先出
方法:
get()
put()
get_nowait()
put_nowait()
import queue q = queue.Queue() q.put(1) print(q.get())
8、新的隊列
from queue import LifoQueue
相似於棧,特色是後進先出,而且不容許插隊
應用:算法的完成,有點相似分佈式的思想,例如:三級菜單
from queue import LifoQueue q = LifoQueue() for i in range(1,6): q.put(i) for i in range(1,6): print(q.get(),end=' ') # 5 4 3 2 1
9、優先級隊列
只能放同一種相似的值
應用場景:會員服務
①若是是數值,按照數值從小到大取值
from queue import PriorityQueue q = PriorityQueue() q.put(10) q.put(5) q.put(20) for i in range(3): print(q.get(),end=' ') # 5 10 20
②若是是字符串,按照ASICC編碼來取值
from queue import PriorityQueue q = PriorityQueue() q.put('c') q.put('a') q.put('b') for i in range(3): print(q.get(), end=' ') # a b c
③若是是數字、字母組成的元組,按第一個元素來取值,從小到大取值
from queue import PriorityQueue q = PriorityQueue() q.put((3,'zxc')) q.put((3,'abc')) q.put((1,'asd')) q.put((2,'qwe')) for i in range(4): print(q.get(),end=' ') # (1, 'asd') (2, 'qwe') (3, 'abc') (3, 'zxc')
10、線程池
concurrent.futures 模塊不只提供線程池,還提供進程池。
from concurrent.futures import ThreadPoolExecutor # 線程池
from concurrent.futures import ProcessPoolExecutor # 進程池
實例化的線程池數量 = 5 * cpu_count
方法:
submit(函數,參數) 異步提交任務,只能按位置傳參,不用加args=
ret = submit() 獲取返回值,須要經過result()方法取值
ret.result() 獲取值
map(函數,iterable) 取代for循環submit操做
shutdown() 等於進程池的close()和join()方法,阻塞直到任務完成
① 有返回值
from concurrent.futures import ThreadPoolExecutor from threading import currentThread def func(i): print('子線程號:',currentThread().ident) # 打印子線程的線程號
return i * '*' tp = ThreadPoolExecutor(5) # 建立線程池,建立5個線程
ret_lis = [] for i in range(15): ret = tp.submit(func,i) # 異步提交任務
ret_lis.append(ret) # 將返回值存到列表
for ret in ret_lis: print(ret.result()) # 經過result()方法獲取返回值的值
print('主線程',currentThread().ident) # 打印主線程的線程號
② 無返回值
from concurrent.futures import ThreadPoolExecutor from threading import currentThread def func(): print('子進程',currentThread().ident) # 打印子線程的線程號
tp = ThreadPoolExecutor(3) # 建立線程池,開啓3個線程
for i in range(9): tp.submit(func) # 異步提交任務
tp.shutdown() # 阻塞主線程,待全部的子線程運行完
print('主線程',currentThread().ident)
③ map方法
使用may方法必須傳入參數
from concurrent.futures import ThreadPoolExecutor from threading import currentThread import time def func(n): # 使用map必須有一個參數
print('子線程號:',currentThread().ident) time.sleep(1) tp = ThreadPoolExecutor(3) ret = tp.map(func,range(15)) # map函數會傳入一個參數 # for i in range(15): # tp.submit(func,i) # 異步提交任務
print('主線程:',currentThread().ident)
11、回調函數
線程池和進程池的回調函數經過submit實現的,
add_done_callback調用回調函數,不須要傳參,回調函數須要經過result()取值
線程池的回調函數由子線程完成
from concurrent.futures import ThreadPoolExecutor from threading import currentThread import time def func(i): print('子線程:',currentThread().ident) # 獲取子線程的線程號
time.sleep(1) return i def call_back(ret): print('ret>',ret.result()) # 經過result()方法取值
print('callback線程號:',currentThread().ident) # 獲取回調函數的線程號
tp = ThreadPoolExecutor(3) for i in range(9): tp.submit(func,(i+1)).add_done_callback(call_back) # add_done_callback回調函數的方法,函數不須要傳入參數
tp.shutdown() # 阻塞主線程,等待全部子線程執行完
print('主線程號:',currentThread().ident)
進程池的回調函數由主進程完成
from concurrent.futures import ProcessPoolExecutor import time,os def func(i): print('子進程:',os.getpid()) # 獲取子進程的進程號
time.sleep(1) return i def call_back(ret): print('ret>',ret.result()) # 經過result()方法取值
print('callback進程號:',os.getpid()) # 獲取回調函數的進程號
if __name__ == '__main__': tp = ProcessPoolExecutor(3) for i in range(9): tp.submit(func,(i+1)).add_done_callback(call_back) # add_done_callback回調函數的方法,函數不須要傳入參數
tp.shutdown() # 阻塞主進程
print('主線程號:',os.getpid())
12、local模塊
from threading import local
不一樣線程的ID存儲的值和取到的值是不一樣的
多個線程之間使用threading.local對象,能夠實現多個線程之間的數據隔離
import time import random from threading import local,Thread loc = local() def func2(): global loc print(loc.name,loc.age) def func1(name,age): global loc loc.name = name loc.age = age time.sleep(random.random()) func2() Thread(target=func1,args=('xiaobai',20)).start() Thread(target=func1,args=('xiaohei',25)).start() ''' xiaobai 20 xiaohei 25 '''