多線程
python
1 線程:mysql
線程顧名思義,就是一條流水線工做的過程,一條流水線必須屬於一個車間,一個車間的工做過程是一個進程,車間負責把資源整合到一塊兒,是一個資源單位,而一個車間內至少有一個流水線sql
進程只是用來把資源集中到一塊兒(進程只是一個資源單位,或者說資源集合),而線程纔是cpu上的執行單位
安全
多線程:
服務器
多線程(即多個控制線程)的概念是,在一個進程中存在多個控制線程,多個控制線程共享該進程的地址空間,至關於一個車間內有多條流水線,都共用一個車間的資源多線程
1. 多線程共享一個進程的地址空間併發
2. 線程比進程更輕量級,線程比進程更容易建立可撤銷,在許多操做系統中,建立一個線程比建立一個進程要快10-100倍,在有大量線程須要動態和快速修改時,這一特性頗有用app
3. 若多個線程都是cpu密集型的,那麼並不能得到性能上的加強,可是若是存在大量的計算和大量的I/O處理,擁有多個線程容許這些活動彼此重疊運行,從而會加快程序執行的速度。dom
4. 在多cpu系統中,爲了最大限度的利用多核,能夠開啓多個線程,比開進程開銷要小的多。(這一條並不適用於python)ide
2 開啓線程的兩種方式
#開啓線程的方式一:使用替換threading模塊提供的Thread from threading import Thread from multiprocessing import Process def task(): print('is running') if __name__ == '__main__': t=Thread(target=task,) # t=Process(target=task,) t.start() print('主') #開啓線程的方式二:自定義類,繼承Thread from threading import Thread from multiprocessing import Process class MyThread(Thread): def __init__(self,name): super().__init__() self.name=name def run(self): print('%s is running' %self.name) if __name__ == '__main__': t=MyThread('egon') # t=Process(target=task,) t.start() print('主')
3 在一個進程下開啓多個線程與在一個進程下開啓多個子進程的區別
from threading import Thread from multiprocessing import Process import os def task(): print('%s is running' %os.getpid()) if __name__ == '__main__': t1=Thread(target=task,) t2=Thread(target=task,) t1.start() t2.start() # 在主進程下開啓多個線程,每一個線程都跟主進程的pid同樣 print('主',os.getpid()) t3=Process(target=task,) t4=Process(target=task,) t3.start() t4.start() # 開多個進程,每一個進程都有不一樣的pid print('主',os.getpid())
# 多線程共享統一進程裏的資源 from threading import Thread from multiprocessing import Process n=100 def work(): global n n=0 if __name__ == '__main__': p=Process(target=work,) p.start() p.join() print('主',n) t=Thread(target=work,) t.start() t.join() print('主',n)
4 守護進程
不管是進程仍是線程,都遵循:守護xxx會等待主xxx運行完畢後被銷燬
須要強調的是:運行完畢並不是終止運行
1.對主進程來講,運行完畢指的是主進程代碼運行完畢 2.對主線程來講,運行完畢指的是主線程所在的進程內全部非守護線程通通運行完畢,主線程纔算運行完畢
詳細解釋:
1 主進程在其代碼結束後就已經算運行完畢了(守護進程在此時就被回收),而後主進程會一直等非守護的子進程都運行完畢後回收子進程的資源(不然會產生殭屍進程),纔會結束 2 主線程在其餘非守護線程運行完畢後纔算運行完畢(守護線程在此時就被回收)。由於主線程的結束意味着進程的結束,進程總體的資源都將被回收,而進程必須保證非守護線程都運行完畢後才能結束
#先看:守護進程 # from multiprocessing import Process import time def task1(): print('123') time.sleep(1) print('123done') def task2(): print('456') time.sleep(10) print('456done') if __name__ == '__main__': p1=Process(target=task1) p2=Process(target=task2) p1.daemon = True p1.start() p2.start() print('主') #再看:守護線程 from threading import Thread import time def task1(): print('123') time.sleep(10) print('123done') def task2(): print('456') time.sleep(1) print('456done') if __name__ == '__main__': t1=Thread(target=task1) t2=Thread(target=task2) t1.daemon=True t1.start() t2.start() print('主')
5 同步鎖(GIL鎖)
須要注意的點: 1.線程搶的是GIL鎖,GIL鎖至關於執行權限,拿到執行權限後才能拿到互斥鎖Lock,其餘線程也能夠搶到GIL,但若是發現Lock仍然沒有被釋放則阻塞,即使是拿到執行權限GIL也要馬上交出來 2.join是等待全部,即總體串行,而鎖只是鎖住修改共享數據的部分,即部分串行,要想保證數據安全的根本原理在於讓併發變成串行,join與互斥鎖均可以實現,毫無疑問,互斥鎖的部分串行效率要更高
機智的同窗可能會問到這個問題,就是既然你以前說過了,Python已經有一個GIL來保證同一時間只能有一個線程來執行了,爲何這裏還須要lock?
首先咱們須要達成共識:鎖的目的是爲了保護共享的數據,同一時間只能有一個線程來修改共享的數據
而後,咱們能夠得出結論:保護不一樣的數據就應該加不一樣的鎖。
最後,問題就很明朗了,GIL 與Lock是兩把鎖,保護的數據不同,前者是解釋器級別的(固然保護的就是解釋器級別的數據,好比垃圾回收的數據),後者是保護用戶本身開發的應用程序的數據,很明顯GIL不負責這件事,只能用戶自定義加鎖處理,即Lock
過程分析:全部線程搶的是GIL鎖,或者說全部線程搶的是執行權限
線程1搶到GIL鎖,拿到執行權限,開始執行,而後加了一把Lock,尚未執行完畢,即線程1還未釋放Lock,有可能線程2搶到GIL鎖,開始執行,執行過程當中發現Lock尚未被線程1釋放,因而線程2進入阻塞,被奪走執行權限,有可能線程1拿到GIL,而後正常執行到釋放Lock。。。這就致使了串行運行的效果
既然是串行,那咱們執行
t1.start()
t1.join
t2.start()
t2.join()
這也是串行執行啊,爲什麼還要加Lock呢,需知join是等待t1全部的代碼執行完,至關於鎖住了t1的全部代碼,而Lock只是鎖住一部分操做共享數據的代碼。
由於Python解釋器幫你自動按期進行內存回收,你能夠理解爲python解釋器裏有一個獨立的線程,每過一段時間它起wake up作一次全局輪詢看看哪些內存數據是能夠被清空的,此時你本身的程序 裏的線程和 py解釋器本身的線程是併發運行的,假設你的線程刪除了一個變量,py解釋器的垃圾回收線程在清空這個變量的過程當中的clearing時刻,可能一個其它線程正好又從新給這個還沒來及得清空的內存空間賦值了,結果就有可能新賦值的數據被刪除了,爲了解決相似的問題,python解釋器簡單粗暴的加了鎖,即當一個線程運行時,其它人都不能動,這樣就解決了上述的問題, 這能夠說是Python早期版本的遺留問題
from threading import Thread n=100 def task(): print('is running') if __name__ == '__main__': t1=Thread(target=task,) t2=Thread(target=task,) t3=Thread(target=task,) # t=Process(target=task,) t1.start() t2.start() t3.start() print('主')
鎖一般被用來實現對共享資源的同步訪問。爲每個共享資源建立一個Lock對象,當你須要訪問該資源時,調用acquire方法來獲取鎖對象(若是其它線程已經得到了該鎖,則當前線程需等待其被釋放),待資源訪問完後,再調用release方法釋放鎖
互斥鎖
from threading import Thread,Lock import time n=100 def work(): global n mutex.acquire() temp=n time.sleep(0.1) n=temp-1 mutex.release() if __name__ == '__main__': mutex=Lock() l=[] start=time.time() for i in range(100): t=Thread(target=work) l.append(t) t.start() for t in l: t.join() print('run time:%s value:%s' %(time.time()-start,n))
同步鎖與互斥鎖:
1.100個線程去搶GIL鎖,即搶執行權限 2. 確定有一個線程先搶到GIL(暫且稱爲線程1),而後開始執行,一旦執行就會拿到lock.acquire() 3. 極有可能線程1還未運行完畢,就有另一個線程2搶到GIL,而後開始運行,但線程2發現互斥鎖lock還未被線程1釋放,因而阻塞,被迫交出執行權限,即釋放GIL 4.直到線程1從新搶到GIL,開始從上次暫停的位置繼續執行,直到正常釋放互斥鎖lock,而後其餘的線程再重複2 3 4的過程
互斥鎖與join的區別:
#不加鎖:併發執行,速度快,數據不安全from threading import current_thread,Thread,Lockimport os,timedef task(): global n print('%s is running' %current_thread().getName()) temp=n time.sleep(0.5) n=temp-1if __name__ == '__main__': n=100 lock=Lock() threads=[] start_time=time.time() for i in range(100): t=Thread(target=task) threads.append(t) t.start() for t in threads: t.join() stop_time=time.time() print('主:%s n:%s' %(stop_time-start_time,n))'''Thread-1 is running Thread-2 is running ...... Thread-100 is running 主:0.5216062068939209 n:99'''#不加鎖:未加鎖部分併發執行,加鎖部分串行執行,速度慢,數據安全from threading import current_thread,Thread,Lockimport os,timedef task(): #未加鎖的代碼併發運行 time.sleep(3) print('%s start to run' %current_thread().getName()) global n #加鎖的代碼串行運行 lock.acquire() temp=n time.sleep(0.5) n=temp-1 lock.release()if __name__ == '__main__': n=100 lock=Lock() threads=[] start_time=time.time() for i in range(100): t=Thread(target=task) threads.append(t) t.start() for t in threads: t.join() stop_time=time.time() print('主:%s n:%s' %(stop_time-start_time,n))'''Thread-1 is running Thread-2 is running ...... Thread-100 is running 主:53.294203758239746 n:0'''#有的同窗可能有疑問:既然加鎖會讓運行變成串行,那麼我在start以後當即使用join,就不用加鎖了啊,也是串行的效果啊#沒錯:在start以後馬上使用jion,確定會將100個任務的執行變成串行,毫無疑問,最終n的結果也確定是0,是安全的,但問題是#start後當即join:任務內的全部代碼都是串行執行的,而加鎖,只是加鎖的部分即修改共享數據的部分是串行的#單從保證數據安全方面,兩者均可以實現,但很明顯是加鎖的效率更高.from threading import current_thread,Thread,Lockimport os,timedef task(): time.sleep(3) print('%s start to run' %current_thread().getName()) global n temp=n time.sleep(0.5) n=temp-1if __name__ == '__main__': n=100 lock=Lock() start_time=time.time() for i in range(100): t=Thread(target=task) t.start() t.join() stop_time=time.time() print('主:%s n:%s' %(stop_time-start_time,n))'''Thread-1 start to run Thread-2 start to run ...... Thread-100 start to run 主:350.6937336921692 n:0 #耗時是多麼的恐怖'''
#多進程: #優勢:能夠利用多核優點 #缺點:開銷大 #多線程: #優勢:開銷小 #缺點:不能利用多核優點 from threading import Thread from multiprocessing import Process import time #計算密集型 def work(): res=1 for i in range(100000000): res+=i if __name__ == '__main__': p_l=[] start=time.time() for i in range(4): # p=Process(target=work) #6.7473859786987305 p=Thread(target=work) #24.466399431228638 p_l.append(p) p.start() for p in p_l: p.join() print(time.time()-start) from threading import Thread from multiprocessing import Process import time #IO密集型 def work(): time.sleep(2) if __name__ == '__main__': p_l=[] start=time.time() for i in range(400): # p=Process(target=work) #12.104692220687866 p=Thread(target=work) #2.038116455078125 p_l.append(p) p.start() for p in p_l: p.join() print(time.time()-start)
6 死鎖與遞歸鎖
死鎖:指兩個或兩個以上的進程或線程在執行過程當中,因爭奪資源而形成的一種互相等待的現象,若無外力做用,它們都將沒法推動下去。此時稱系統處於死鎖狀態或系統產生了死鎖,這些永遠在互相等待的進程稱爲死鎖進程
# 死鎖現象 from threading import Thread,Lock,RLock import time mutexA=Lock() mutexB=Lock() class Mythread(Thread): def run(self): self.f1() self.f2() def f1(self): mutexA.acquire() print('\033[45m%s 搶到A鎖\033[0m' %self.name) mutexB.acquire() print('\033[44m%s 搶到B鎖\033[0m' %self.name) mutexB.release() mutexA.release() def f2(self): mutexB.acquire() print('\033[44m%s 搶到B鎖\033[0m' %self.name) time.sleep(1) mutexA.acquire() print('\033[45m%s 搶到A鎖\033[0m' %self.name) mutexA.release() mutexB.release() if __name__ == '__main__': for i in range(20): t=Mythread() t.start() #遞歸鎖 from threading import Thread,Lock,RLock import time mutex=RLock() class Mythread(Thread): def run(self): self.f1() self.f2() def f1(self): mutex.acquire() print('\033[45m%s 搶到A鎖\033[0m' %self.name) mutex.acquire() print('\033[44m%s 搶到B鎖\033[0m' %self.name) mutex.release() mutex.release() def f2(self): mutex.acquire() print('\033[44m%s 搶到B鎖\033[0m' %self.name) time.sleep(1) mutex.acquire() print('\033[45m%s 搶到A鎖\033[0m' %self.name) mutex.release() mutex.release() if __name__ == '__main__': for i in range(20): t=Mythread() t.start()
解決方法,遞歸鎖,在Python中爲了支持在同一線程中屢次請求同一資源,python提供了可重入鎖RLock。
這個RLock內部維護着一個Lock和一個counter變量,counter記錄了acquire的次數,從而使得資源能夠被屢次require。直到一個線程全部的acquire都被release,其餘的線程才能得到資源。上面的例子若是使用RLock代替Lock,則不會發生死鎖:
mutexA=mutexB=threading.RLock() #一個線程拿到鎖,counter加1,該線程內又碰到加鎖的狀況,則counter繼續加1,這期間全部其餘線程都只能等待,等待該線程釋放全部鎖,即counter遞減到0爲止
7 信號量Semaphore
Semaphore管理一個內置的計數器,
每當調用acquire()時內置計數器-1;
調用release() 時內置計數器+1;
計數器不能小於0;當計數器爲0時,acquire()將阻塞線程直到其餘線程調用release()
from threading import Thread,current_thread,Semaphore import time,random sm=Semaphore(5) def work(): sm.acquire() print('%s 上廁所' %current_thread().getName()) time.sleep(random.randint(1,3)) sm.release() if __name__ == '__main__': for i in range(20): t=Thread(target=work) t.start()
與進程池是徹底不一樣的概念,進程池Pool(4),最大隻能產生4個進程,並且從頭至尾都只是這四個進程,不會產生新的,而信號量是產生一堆線程/進程
8 事件event
線程的一個關鍵特性是每一個線程都是獨立運行且狀態不可預測。若是程序中的其 他線程須要經過判斷某個線程的狀態來肯定本身下一步的操做,這時線程同步問題就會變得很是棘手。爲了解決這些問題,咱們須要使用threading庫中的Event對象。 對象包含一個可由線程設置的信號標誌,它容許線程等待某些事件的發生。在 初始狀況下,Event對象中的信號標誌被設置爲假。若是有線程等待一個Event對象, 而這個Event對象的標誌爲假,那麼這個線程將會被一直阻塞直至該標誌爲真。一個線程若是將一個Event對象的信號標誌設置爲真,它將喚醒全部等待這個Event對象的線程。若是一個線程等待一個已經被設置爲真的Event對象,那麼它將忽略這個事件, 繼續執行
event.isSet():返回event的狀態值; event.wait():若是 event.isSet()==False將阻塞線程; event.set(): 設置event的狀態值爲True,全部阻塞池的線程激活進入就緒狀態, 等待操做系統調度; event.clear():恢復event的狀態值爲False
例如,有多個工做線程嘗試連接MySQL,咱們想要在連接前確保MySQL服務正常才讓那些工做線程去鏈接MySQL服務器,若是鏈接不成功,都會去嘗試從新鏈接。那麼咱們就能夠採用threading.Event機制來協調各個工做線程的鏈接操做
from threading import Thread,current_thread,Event import time event=Event() def conn_mysql(): count=1 while not event.is_set(): if count > 3: raise ConnectionError('連接失敗') print('%s 等待第%s次連接mysql' %(current_thread().getName(),count)) event.wait(0.5) count+=1 print('%s 連接ok' % current_thread().getName()) def check_mysql(): print('%s 正在檢查mysql狀態' %current_thread().getName()) time.sleep(1) event.set() if __name__ == '__main__': t1=Thread(target=conn_mysql) t2=Thread(target=conn_mysql) check=Thread(target=check_mysql) t1.start() t2.start() check.start()
9 定時器
from threading import Timer def hello(n): print("hello, world",n) t = Timer(3, hello,args=(11,)) t.start() # after 1 seconds, "hello, world" will be printed
10 線程queue
import queue q=queue.Queue(3) #隊列:先進先出 q.put(1) q.put(2) q.put(3) print(q.get()) print(q.get()) print(q.get()) q=queue.LifoQueue(3) #堆棧:後進先出 q.put(1) q.put(2) q.put(3) print(q.get()) print(q.get()) print(q.get()) q=queue.PriorityQueue(3) #數字越小優先級越高 q.put((10,'data1')) q.put((11,'data2')) q.put((9,'data3')) print(q.get()) print(q.get()) print(q.get())
待整理。。。