進程:在已經瞭解了操做系統中進程的概念後,咱們對進程有了必定的瞭解:php
有了進程爲什麼還要線程?html
(1)什麼是線程?node
a.線程指的是流水線式的工做過程,一個進程內最少自帶一個線程,其實進程根本不能執行,進程不是執行單位,而是資源單位,分配資源的單位.線程纔是執行單位.python
(2)進程與線程的對比:程序員
a.同一個進程內的多個線程是共享該進程的資源的,不一樣進程內的線程資源是隔離的.web
b.建立線程對資源的消耗遠遠小於建立進程的消耗面試
(3)進程有不少優勢,它提供了多道編程,提升了計算機的利用率,讓每一個人感受本身獨享着CPU和其餘資源.然而,進程也是有缺點的,主要體如今兩點上:數據庫
a.進程只能在同一時間執行一個任務,若是想要同時執行兩個或多個任務,進程就無能爲力了.編程
b.進程在執行的過程當中若是阻塞,例如等待輸入,整個進程就會掛起,即便進程中有些工做不依賴於輸入的數據,也將沒法執行.windows
線程的出現
60年代,在OS中能擁有資源和獨立運行的基本單位是進程,然而隨着計算機技術的發展,進程出現了不少弊端,一是因爲進程是資源擁有者,建立,撤銷與切換存在較大的時空開銷,所以須要引入輕型進程;二是因爲對稱多處理機(SMP)出現,能夠知足多個運行單位,而多個進程運行開銷過大.
所以,在80年代,出現了能獨立運行的基本單位--線程(Threads).
須要注意的是: 進程是資源分配的最小單位,線程是CPU調度的最小單位.每一個進程中至少有一個線程.
在傳統操做系統中,每一個進程有一個地址空間,並且默認就有一個控制線程.咱們能夠把一個車間的工做過程看做是一個進程,把一條流水線工做的過程看做是一個線程.車間不只要負責把資源整合到一塊兒,並且一個車間內至少要有一條流水線.
因此總結來講: 進程只是用來把資源集中到一塊兒(進程只是一個資源單位,或者說資源集合),而線程纔是CPU上的執行單位.
多線程的概念是: 在一個進程中存在多個控制線程,多個控制線程共享該進程的地址空間.
線程和進程的關係
線程與進程的區別可概括爲如下4點:
地址空間和其餘資源(如打開文件): 進程間相互獨立,同一進程的各線程間共享. 某進程內的線程在其餘進程不可見.
通訊: 進程間通訊IPC,線程間能夠直接讀寫進程數據段(如全局變量)來進行通訊----須要進程同步和互斥手段的輔助,以保證數據的一致性(相似於進程中的鎖的做用).
調度和切換: 線程上下文切換比進程上下文切換要快得多.
在多線程操做系統中,進程不是一個可執行的實體,真正去執行程序的不是進程,而是線程.能夠理解爲進程就是一個線程的容器.
併發 :同一時刻能同時接收多個客戶端的請求
線程
輕型進程 輕量級的進程
在同一個進程中的多個線程是能夠共享一部分數據的
線程的開啓\銷燬\切換都比進程要高效不少
1.多個進程可不能夠利用多核(多個CPU) 能夠
2.多個線程可不能夠利用多核(多個CPU) 能夠
多進程和多線程之間的區別
進程 數據隔離 開銷大
線程 數據共享 開銷小
python當中的多線程
不能訪問多個cpu
是Cpython解釋器致使的,GIL鎖
GIL鎖 = 全局解釋器鎖,致使了同一時刻只能有一個線程訪問CPU
jpython pypy解釋器中的多線程是能夠訪問多核的
在多線程的操做系統中,一般是在一個進程中包括多個線程,每一個線程都是做爲利用CPU的基本單位,是花費最小開銷的實體. 線程具備如下屬性:
1. 輕型實體:線程中的實體基本上不擁有系統資源,只是有一些必不可少的,能保證獨立運行的資源,線程的實體包括程序,數據和TCB.線程是動態概念,它的動態特性有線程控制塊TCB(Thread Control Block)描述。
xxxxxxxxxx
#TCB包括如下信息:
(1)線程狀態.
(2)當線程不運行時,被保存的現場資源.
(3)一組執行堆棧.
(4)存放每一個線程的局部變量主存區.
(5)訪問同一個進程中的主存和其它資源.
用於指示被執行指令序列的程序計數器、保留局部變量、少數狀態參數和返回地址等的一組寄存器和堆棧.
2. 獨立調度和分派的基本單位:在多線程OS中,線程是能獨立運行的基本單位,於是也是獨立調度和分派的基本單位.因爲線程很「輕」,故線程的切換很是迅速且開銷小(在同一進程中的線程).
3. 共享進程資源:線程在同一進程中的各個線程, 均可以共享該進程所擁有的資源, 這首先表如今: 全部線程都具備相同的進程id, 這意味着, 線程能夠訪問該進程的每個內存資源; 此外, 還能夠訪問進程所擁有的已打開文件、定時器、信號量機構等. 因爲同一個進程內的線程共享內存和文件, 因此線程之間互相通訊沒必要調用內核
可併發執行:在一個進程中的多個線程之間, 能夠併發執行, 甚至容許在一個進程中全部線程都能併發執行; 一樣, 不一樣進程中的線程也能併發執行, 充分利用和發揮了處理機與外圍設備並行工做的能力.
Python代碼的執行由Python虛擬機(也叫解釋器主循環)來控制. Pyhton在設計之初就考慮到: 在主循環中同時只有一個線程在執行. 雖然Python解釋器中能夠"運行"多個線程,但在任意時刻只有一個線程在解釋器中運行. 對Python虛擬機的訪問由全局解釋器鎖(GIL)來控制,正是這個鎖能保證同一時刻只有一個線程在運行.在多線程環境中,Python虛擬機按如下方式執行:
a. 設置GIL
b. 切換到一個線程去執行
c. 運行指定數量的字節碼指令或者線程主動讓出控制(能夠調用time.sleep(0))
d. 把線程設置爲睡眠狀態
e. 解鎖GIL
f. 再次重複以上全部步驟
在調用外部代碼(如C/C++擴展函數)的時候,GIL將會被鎖定,直到這個函數結束爲止(因爲在這期間沒有Python的字節碼被運行,因此不會作線程切換)編寫擴展的程序員能夠主動解鎖GIL.
Python線程模塊的選擇
Pyhton提供了幾個用於多線程編程的模塊,包括thread,threading和Queue等. thread和threading模塊容許程序員建立和管理線程. thread模塊提供了基本的線程和鎖的支持,threading提供了更高級別,功能更強的線程管理的功能. Queue模塊容許用戶建立一個能夠用於多個線程之間共享數據的隊列數據結構.
避免使用thread模塊,由於更高級別的threading模塊更爲先進,對線程的支持更爲完善,並且使用thread模塊裏的屬性有可能會與threading出現衝突; 其次低級別的thread模塊的同步原語不多(實際上只有一個),而threading模塊則有不少;再者,thread模塊中當主線程序結束時,全部的線程都會被強制結束掉,沒有警告也沒有正常的清除工做,至少threading模塊能確保重要的子線程退出後程序才退出.
就像咱們熟悉的time模塊,它比其餘模塊更接近底層,越是接近底層,用起來也越麻煩,就像時間日期轉換之類的就比較麻煩,可是後面咱們學到一個datetime模塊,提供了更爲簡便的時間日期處理方法,它是創建在time模塊的基礎上來的. 又如socket和socketserver(底層仍是用的socket)等等,這裏的threading就是thread的高級模塊.
thread模塊不支持守護進程,當主線程退出時,全部的子線程不論它們是否還在工做,都會被強行退出.而threading模塊支持守護進程,守護進程通常是一個等待客戶請求的服務器,若是沒有客戶提出請求它就會在那裏等待,若是設定一個線程爲守護線程,就表示這個線程是不重要的,在進程退出的時候,不用等待這個線程退出.
multiprocess模塊徹底模仿了threading模塊的接口,兩者在使用層面有很大的類似性.
線程建立的兩種方式
xxxxxxxxxx
#方式一
import time
from threading import Thread # 引入線程模塊
def func(n): # 自定義一個函數
time.sleep(1)
print(n)
if __name__ == '__main__':
t = Thread(target=func, args=("hello,world",)) # 建立線程對象
t.start() # 開啓線程
print("主線程結束")
#方式二
import time
from threading import Thread # 引入線程模塊
class MyThread(Thread): # 自定義一個類
def __init__(self, n): # 傳參n
super().__init__() # 本身想要傳參,必須先super()執行父類的init方法,再寫本身的實例變量
self.n = n
def run(self): # 自定義一個run()方法,內容不定,可是run名稱不能變
time.sleep(1)
print(self.n)
if __name__ == '__main__':
t = MyThread("hello,world") # 建立線程對象
t.start() # 開啓線程
print("主線程結束")
Thread實例對象的方法
# isAlive(): 返回線程是否活動的。
# getName(): 返回線程名。
# setName(): 設置線程名。
threading模塊提供的一些方法:
# threading.currentThread(): 返回當前的線程變量。
# threading.enumerate(): 返回一個包含正在運行的線程的list。正在運行指線程啓動後、結束前,不包括啓動前和終止後的線程。
# threading.activeCount(): 返回正在運行的線程數量,與len(threading.enumerate())有相同的結果。
#判斷線程的存活
from threading import Thread
def func():
print(123)
t = Thread(target=func)
t.start()
print(t.is_alive()) # 返回線程是不是活動的
#結果
#123
#False
import time
from threading import Thread
def func():
time.sleep(0.1)
print(123)
t = Thread(target=func)
t.start()
print(t.is_alive())
#結果
#True
#123
#設置修改線程名
import time
from threading import Thread
def func():
time.sleep(0.1)
print(123)
t = Thread(target=func)
t.start()
print(t.is_alive()) # 返回線程是否活動的
print(t.getName()) #返回線程名
t.setName('t1') # 設置線程名
print(t.getName())
#執行輸出:
#True
#Thread-1
#t1
#123
#輸出現有存活線程
import time
from threading import Thread,currentThread,enumerate,activeCount
def func():
time.sleep(0.1)
#print(123)
t = Thread(target=func)
t.start()
print(currentThread) # 返回當前的線程變量
print(enumerate()) #返回一個包含正在運行的線程的list
print(activeCount()) # 返回正在運行的線程數量
#結果
<function current_thread at 0x000001C35EF6A268>
[<_MainThread(MainThread, started 20008)>, <Thread(Thread-1, started 20232)>]
2
MainThread, started 20008表示主線程,Thread-1, started 20232表示子線程。它會打印出2個。
因此activeCount的結果爲2
進程與線程開啓效率比較
xxxxxxxxxx
import time
from threading import Thread
from multiprocessing import Process
def func(n):
sum = 0
for i in range(n):
sum += i
if __name__ == '__main__':
t_start_time = time.time() #開始時間
t_list = []
for i in range(10):
t = Thread(target=func, args=(100,))
t.start()
t_list.append(t)
[tt.join() for tt in t_list]
t_end_time = time.time() #結束時間
t_dif_time = t_end_time - t_start_time #時間差
p_start_time = time.time() #開始時間
p_list = []
for ii in range(10):
p = Process(target=func, args=(100,))
p.start()
p_list.append(p)
[pp.join() for pp in p_list]
p_end_time = time.time() #結束時間
p_dif_time = p_end_time - p_start_time #時間差
print("線程的執行時間是>>>", t_dif_time)
print("進程的執行時間是>>>", p_dif_time)
print("主線程結束")
# 執行結果:
# 線程的執行時間是>>> 0.0010013580322265625
# 進程的執行時間是>>> 0.37227368354797363
# 主線程結束,
#從上面代碼的結果中能夠看出: 執行同一個任務,線程的執行時間遠遠小於進程的執行時間.所以,線程的效率是比較高的.
同一進程下線程是資源共享的
xxxxxxxxxx
from threading import Thread
tn = 0
def func():
global tn
tn += 1
t_l = []
for i in range(100):
t = Thread(target=func)
t.start()
t_l.append(t)
for t in t_l:t.join()
print(tn) #100
#可是進程之間的數據是隔離的
from multiprocessing import Process
pn = 0
def func():
global pn
pn += 1
if __name__ == '__main__':
p_l = []
for i in range(100):
p = Process(target=func)
p.start()
p_l.append(p)
for p in p_l:p.join()
print(pn) #0 由於全部的子進程並無操做主進程的pn
線程共享數據時,數據是不安全的
xxxxxxxxxx
#數據共享時數據不安全
import time
from threading import Thread
num = 100 #全局變量
def func():
global num
# 模擬num-=1的"取值->計算->賦值"過程
mid = num #取值
mid = mid - 1 #計算
time.sleep(0.0001)
num = mid #賦值
if __name__ == '__main__':
t_list = []
for i in range(10): #建立10個子線程
t = Thread(target=func,)
t.start()
t_list.append(t)
[tt.join() for tt in t_list] # 主線程等待子線程執行結束
print('主線程結束,此時全局變量爲>>>', num)
# 執行結果:
# 主線程結束,此時全局變量爲>>> 99
#經過引入線程模塊裏的鎖來解決不安全問題
線程中的其餘方法
xxxxxxxxxx
#currentTread 當前線程的屬性,不要忘了,主進程也有個主線程
import os
from threading import Thread,currentThread
def func():
t = currentThread()
print(t.name,t.ident,os.getpid()) #Thread-1 2940 10716
tobj = Thread(target=func)
tobj.start()
print('tobj :',tobj) #tobj : <Thread(Thread-1, started 2940)>
t2 = currentThread()
print(t2.name,t2.ident,os.getpid()) #MainThread 12360 10716
#例,# lst = [1,2,3,4,5,6,7,8,9,10]按照順序把列表中的每個元素都計算一個平方,使用多線程的方式,而且將結果按照順序返回
import time
import random
from threading import Thread,currentThread
dic = {}
def func(i):
t = currentThread()
time.sleep(random.random())
dic[t.ident] = i**2
t_lst = []
for i in range(1,11):
t = Thread(target=func,args=(i,))
t.start()
t_lst.append(t) #按照順序加進去
for t in t_lst:
t.join()
print(dic[t.ident]) #根據進程號來取
#面試題
from threading import active_count # 返回當前有多少個正在工做的線程
import time
import random
from threading import Thread,currentThread
dic = {}
def func(i):
t = currentThread()
time.sleep(random.random())
dic[t.ident] = i**2
for i in range(10):
Thread(target=func,args=(i,)).start()
print(active_count()) # ??? 結果1,由於主線程也算一個
#線程有terminate麼?
#沒有terminate 不能強制結束
#全部的子線程都會在執行完全部的任務以後自動結束
線程(threading)的一些其餘方法總結
xxxxxxxxxx
from threading import Thread
import threading
import time
def work():
time.sleep(1)
print("子線程對象>>>", threading.current_thread()) # 子線程對象
print("子線程名稱>>>", threading.current_thread().getName()) # 子線程名稱
print("子線程ID>>>", threading.get_ident()) # 子線程ID
if __name__ == '__main__':
t = Thread(target=work) # 建立子線程
t.start() # 開啓子線程
print("主線程對象>>>", threading.current_thread()) # 主線程對象
print("主線程名稱>>>", threading.current_thread().getName()) # 主線程名稱
print("主線程ID>>>", threading.current_thread().ident) # 主線程ID
print("主線程ID>>>", threading.get_ident()) # 主線程ID
time.sleep(1) # 阻塞住,此時主線程代碼運行的同時子線程代碼也在運行
print(threading.enumerate()) # 拿到全部正在運行的線程對象(包括主線程)
print(threading.active_count()) # 拿到全部正在運行的線程對象的數量
print("主線程/主進程執行完畢")
多線程的其餘用法
threading.local()這個方法的特色用來保存一個全局變量,可是這個全局變量只有在當前線程才能訪問,若是你在開發多線程應用的時候 須要每一個線程保存一個單獨的數據供當前線程操做,能夠考慮使用這個方法,簡單有效。舉例:每一個子線程使用全局對象a,但每一個線程定義的屬性a.xx是該線程獨有的,Python提供了 threading.local 類,將這個類實例化獲得一個全局對象,可是不一樣的線程使用這個對象存儲的數據其它線程不可見(本質上就是不一樣的線程使用這個對象時爲其建立一個獨立的字典)。 #代碼示例 import time import threading v = threading.local() def func(arg): # 內部會爲當前線程建立一個空間用於存儲:phone=本身的值 v.phone = arg**2 time.sleep(2) print(v.phone,arg) # 去當前線程本身空間取值 for i in range(10): t =threading.Thread(target=func,args=(i,)) t.start() #結果 4 2 0 0 1 1 49 7 9 3 …… #做用:內部自動爲每一個線程維護一個空間(字典),用於當前存取屬於本身的值。保證線程之間的數據隔離。 { 線程ID: {...} 線程ID: {...} 線程ID: {...} 線程ID: {...} } #拓展 import time import threading INFO = {} class Local(object): def __getattr__(self, item): ident = threading.get_ident() return INFO[ident][item] def __setattr__(self, key, value): ident = threading.get_ident() if ident in INFO: INFO[ident][key] = value else: INFO[ident] = {key: value} # 實例化 obj = Local() def func(arg): # 調用對象的 __setattr__方法(「phone」,1) obj.phone = arg time.sleep(2) print(obj.phone, arg) if __name__ == '__main__': for i in range(10): t = threading.Thread(target=func, args=(i,)) t.start() # 效果相似
和進程相似。
同進程的同樣, Semaphore管理一個內置的計數器:
每當調用acquire()時內置計數器-1;
調用release()時內置計數器+1;
計數器不能小於0;當計數器爲0時, acquire()將阻塞線程直到其餘線程調用release().
互斥鎖同時只容許一個線程更改數據,而信號量Semaphore是同時容許必定數量的線程更改數據 。 假設商場裏有4個迷你唱吧,因此同時能夠進去4我的,若是來了第五我的就要在外面等待,等到有人出來才能再進去玩。 實現: 信號量同步基於內部計數器,每調用一次acquire(),計數器減1;每調用一次release(),計數器加1.當計數器爲0時,acquire()調用被阻塞。這是迪科斯徹(Dijkstra)信號量概念P()和V()的Python實現。信號量同步機制適用於訪問像服務器這樣的有限資源。信號量與進程池的概念很像,可是要區分開,信號量涉及到加鎖的概念 from threading import Thread,Semaphore import threading import time def func(): if sm.acquire(): # 加鎖 print(threading.currentThread().getName() + " get semaphore") time.sleep(3) sm.release() # 解鎖 if __name__ == '__main__': sm = Semaphore(5) # 建立信號量對象,限制鎖內每次只能進入5個線程 for i in range(25): # 建立25個線程 t = Thread(target=func,) t.start() #與進程池是徹底不一樣的概念,進程池Pool(4),最大隻能產生4個進程,並且從頭至尾都只是這四個進程,不會產生新的,而信號量是產生一堆線程/進程,semaphore 容許同一時刻n個線程執行這段代碼
不管是進程仍是線程,都遵循: 守護進程(線程)會等待主進程(線程)運行完畢後被銷燬. 須要強調的是: 運行完畢並不是終止運行
#1. 對於主進程來講,運行完畢指的是主進程代碼運行完畢,很好理解守護進程實際上是依賴於主進程的,那麼子進程也能夠理解爲一堆線程的集合體,那麼主進程的代碼結束,必須同時對子進程的資源進行回收,因此守護進程必須在主進程代碼結束時結束 #2. 對於主線程來講,運行完畢指的是主線程所在的進程內全部非守護線程所有執行完畢,主線程纔算運行完畢.守護線程徹底能夠最後一個消失 解析: 1. 主進程在其代碼結束後就已經算運行完畢了(守護進程在此時就被回收),而後主進程會一直等非守護的子進程都運行完畢後回收子進程的資源(不然會產生殭屍進程),纔會結束. 2. 主線程在其餘非守護線程運行完畢後纔算運行完畢(守護線程在此時就被回收). 由於主線程的結束意味着進程的結束,進程總體的資源都將被回收,而進程必須保證非守護線程都運行完畢後才能結束,由於進程執行結束是要回收資源的.
實力證實
import time from threading import Thread def func1(n): time.sleep(4) print(n) def func2(n): time.sleep(2) print(n) if __name__ == '__main__': t1 = Thread(target=func1, args=("我是子線程1號",)) t1.daemon = True #設置守護線程 t1.start() t2 = Thread(target=func2, args=("我是子線程2號",)) t2.start() print("主線程結束") # 執行結果: 顯然守護線程還沒執行,所有線程結束,因此被動結束 # 主線程結束 # 我是子線程2號 import time from threading import Thread def func1(n): time.sleep(1) print(n) def func2(n): time.sleep(2) print(n) if __name__ == '__main__': t1 = Thread(target=func1, args=("我是子線程1號",)) t1.deamon = True # 設置守護線程,必須放到start前面 t1.start() t2 = Thread(target=func2, args=("我是子線程2號",)) t2.start() print("主線程結束") # 執行結果: # 主線程結束 # 我是子線程1號 # 我是子線程2號 #對比兩個例子的執行結果,能夠看出,主線程等待全部非守護線程的結束才結束.當主線程的代碼運行結束後,還要等待非守護線程執行完畢,在這個等待的過程當中,守護線程並無消亡,還在繼續執行.
對比守護進程和守護線程:
互斥鎖:在同一個線程中,不能連續acquire屢次,而且能夠作到多個線程中被鎖的代碼同時只有一個線程執行
#1.線程搶的是GIL鎖,GIL鎖至關於執行權限,拿到執行權限後才能拿到互斥鎖Lock,其餘線程也能夠搶到GIL,但若是發現Lock仍然沒有被釋放則阻塞,即使是拿到執行權限GIL也要馬上交出來 #2.join是等待全部,即總體串行,而鎖只是鎖住修改共享數據的部分,即部分串行,要想保證數據安全的根本原理在於讓併發變成串行,join與互斥鎖均可以實現,毫無疑問,互斥鎖的部分串行效率要更高 import time from threading import Thread def func(): global n temp = n time.sleep(1) n = temp -1 n = 100 t_lst = [] for i in range(100): t = Thread(target=func) t.start() t_lst.append(t) for t in t_lst:t.join() print(n) #99,由於多個線程搶奪同一個資源,沒有秩序 #其實只要去掉賦值這個過程,直接每一個進程在原有的數據上進行修改就不會出現問題,(不用枷鎖) from threading import Thread def func(): global n n -= 1 n = 100 for i in range(100): t = Thread(target=func) t.start() print(n)
緣由很簡單,由於異步執行全部線程,幾乎同時對全局的n進行操做,換句話說100可線程都取到n=100,獲得的結果也都是99,因此形成了數據不安全。
from threading import Lock lock = Lock() # 在同一個線程中,可以被一個鎖的多個acquire阻住,這種鎖就叫作互斥鎖 lock.acquire() # 鎖被拿走 lock.acquire() lock.acquire() #科學家吃麪問題:要完成一件事情 須要兩個必要因素,要想吃到面,須要: 叉子,面。資源的互相搶佔的問題 —— 死鎖 import time from threading import Thread,Lock noodle_lock=Lock() fork_lock=Lock() def eat1(noodle_lock,fork_lock,name): # 吃麪 noodle_lock.acquire() # 麪條加鎖 print('%s 搶到了麪條'%name) fork_lock.acquire() # 叉子加鎖 print('%s 搶到了叉子'%name) print('%s 正在吃麪'%name) fork_lock.release() # 叉子解鎖 print('%s歸還了叉子'%name) noodle_lock.release() # 麪條解鎖 print('%s歸還了麪條' % name) def eat2(noodle_lock,fork_lock,name): #也是吃麪,不一樣的是:先搶叉子,再搶面 fork_lock.acquire() print('%s搶到了叉子' % name) time.sleep(0.5) noodle_lock.acquire() print('%s搶到了面'%name) print('%s正在吃麪'%name) noodle_lock.release() print('%s歸還了面' % name) fork_lock.release() print('%s歸還了叉子' % name) noodle_lock = Lock() # 麪條鎖 fork_lock = Lock() # 叉子鎖 t1 = Thread(target=eat1,args=(noodle_lock,fork_lock,'nazha')).start() # 最開始他有很大概率獲得兩把鑰匙 t2 = Thread(target=eat2,args=(noodle_lock,fork_lock,'egon')).start() #他要叉,一有就拿走 t3 = Thread(target=eat1,args=(noodle_lock,fork_lock,'yuan')).start() #他要面,一有就拿走,兩兩卡死 t4 = Thread(target=eat2,args=(noodle_lock,fork_lock,'alex')).start()
在Python中爲了支持在同一線程中屢次請求同一資源,python提供了可重入鎖RLock。這個RLock內部維護着一個Lock和一個counter變量,counter記錄了acquire的次數,從而使得資源能夠被屢次require。直到一個線程全部的acquire都被release,其餘的線程才能得到資源。
從必定程度上能夠避免死鎖現象,使用遞歸鎖也會產生死鎖現象,也就是說在交叉使用鎖時也會出現死鎖
#簡單例子 from threading import Thread,RLock rlock = RLock() # 建立遞歸鎖 rlock.acquire() # 加第一個鎖 print('***') rlock.acquire() # 加第二個鎖 print('***') #結果: *** *** #結論:遞歸鎖在同一個線程中對同一個鎖屢次acquire不會產生阻塞 from threading import Thread,RLock def func(rlock,flag): rlock.acquire() # 第一道鎖 print(flag*10) rlock.acquire() # 第二道鎖 print(flag * 10) rlock.acquire() # 第三道鎖 print(flag * 10) rlock.acquire() # 第四道鎖 print(flag * 10) rlock = RLock() # 建立遞歸鎖 Thread(target=func,args=(rlock,'*')).start() # 傳入遞歸鎖和* Thread(target=func,args=(rlock,'-')).start() #結果 ********** ********** ********** ********** # 這裏沒有輸出---------,由於線程進去了沒有出來 #解決辦法 from threading import Thread,RLock def func(rlock,flag): rlock.acquire() # 第一道鎖 print(flag*10) rlock.acquire() # 第二道鎖 print(flag * 10) rlock.acquire() # 第三道鎖 print(flag * 10) rlock.acquire() # 第四道鎖 print(flag * 10) rlock.release() # 解鎖 rlock.release() rlock.release() rlock.release() rlock = RLock() # 建立遞歸鎖 Thread(target=func,args=(rlock,'*')).start() Thread(target=func,args=(rlock,'-')).start() #使用遞歸鎖解決科學家吃麪問題 import time from threading import Thread,RLock def eat1(noodle_lock,fork_lock,name): # 吃麪 noodle_lock.acquire() # 拿到麪條的整串鑰匙 print('%s 搶到了麪條'%name) fork_lock.acquire() # 拿到叉子的整串鑰匙 print('%s 搶到了叉子'%name) print('%s 正在吃麪'%name) fork_lock.release() # 叉子解鎖 print('%s歸還了叉子'%name) noodle_lock.release() # 麪條解鎖 print('%s歸還了麪條' % name) def eat2(noodle_lock,fork_lock,name): #也是吃麪,不一樣的是:先搶叉子,再搶面 fork_lock.acquire() print('%s搶到了叉子' % name) time.sleep(0.5) noodle_lock.acquire() print('%s搶到了面'%name) print('%s正在吃麪'%name) noodle_lock.release() print('%s歸還了面' % name) fork_lock.release() print('%s歸還了叉子' % name) noodle_lock = fork_lock = RLock() # 麪條鎖和叉子鎖,表示一串鑰匙 t1 = Thread(target=eat1,args=(noodle_lock,fork_lock,'nazha')).start() t2 = Thread(target=eat2,args=(noodle_lock,fork_lock,'egon')).start() t3 = Thread(target=eat1,args=(noodle_lock,fork_lock,'yuan')).start() t4 = Thread(target=eat2,args=(noodle_lock,fork_lock,'alex')).start()
在Cpython解釋器中,同一個進程下開啓的多線程,同一時刻只能有一個線程執行,沒法利用多核優點,可是其餘python解釋器能實現
首先須要明確的一點是GIL
並非Python的特性,它是在實現Python解析器(CPython)時所引入的一個概念。就比如C++是一套語言(語法)標準,可是能夠用不一樣的編譯器來編譯成可執行代碼。有名的編譯器例如GCC,INTEL C++,Visual C++等。Python也同樣,一樣一段代碼能夠經過CPython,PyPy,Psyco等不一樣的Python執行環境來執行。像其中的JPython就沒有GIL.
若是多個線程的target=work,那麼執行流程是:
多個線程先訪問到解釋器的代碼,即拿到執行權限,而後將target的代碼交給解釋器的代碼去執行
解釋器的代碼是全部線程共享的,因此垃圾回收線程也可能訪問到解釋器的代碼而去執行,這就致使了一個問題:對於同一個數據100,可能線程1執行x=100的同時,而垃圾回收執行的是回收100的操做,解決這種問題沒有什麼高明的方法,就是加鎖處理,以下圖的GIL,保證python解釋器同一時間只能執行一個任務的代碼
GIL vs Lock
GIL保護的是解釋器級的數據,保護用戶本身的數據則須要本身加鎖處理,以下圖
既然Python已經有了一個GIL來保證同一時間只能有一個線程Laura執行,那麼爲何還須要Lock呢? 首先咱們須要達成共識: 鎖的目的是爲了保護共享的數據,同一時間只能有一個線程來修改共享的數據. 而後,咱們能夠得出結論: 保護不一樣的數據就應該加不一樣的鎖. 因而,問題就比較明朗了,GIL與Lock是兩把鎖,保護的數據不同:前者是解釋器級別的(保護的是解釋器級別的數據,好比垃圾回收的數據),後者是保護用戶本身開發的應用程序的數據,很明顯GIL不負責這件事,只能用戶自定義加鎖處理,即Lock.
GIL與多線程
有了GIL的存在,同一時刻同一進程中只有一個線程被執行
聽到這裏,有的同窗立馬質問:進程能夠利用多核,可是開銷大,而python的多線程開銷小,但卻沒法利用多核優點,也就是說python沒用了,php纔是最牛逼的語言,彆着急啊,老孃還沒講完呢,要解決這個問題,咱們須要在幾個點上達成一致:
Queue就是一個線程隊列的類,自帶lock鎖,實現了線程安全的數據類型,隊列是一個線程安全的數據類型
注意:
queue.Queue(maxsize=0) -- 先進先出
q=queue.Queue() q.put('first') q.put('second') q.put('third') # q.put_nowait() #沒有數據就報錯,能夠經過try來搞 print(q.get()) print(q.get()) print(q.get()) # q.get_nowait() #沒有數據就報錯,能夠經過try來搞 # 執行結果: (先進先出) # first # second # third
queue.LifoQueue(maxsize=0) -- last in first out 後進先出
import queue q=queue.LifoQueue() #隊列,相似於棧,後進先出的順序 q.put('first') q.put('second') q.put('third') # q.put_nowait() print(q.get()) print(q.get()) print(q.get()) # q.get_nowait() # 執行結果:(後進先出) # third # second # first
queue.PriorityQueue(maxsize=0) -- 存儲數據時能夠設置優先級隊列
from queue import PriorityQueue q = queue.PriorityQueue() # 建立棧 # put()方法放置一個元組進入棧,元組的第一個元素是優先級(一般是數字,也能夠是非數字之間的比較),數字越小優先級越高 q.put((-10, "a")) q.put((-5, "a")) # 負數也能夠 # q.put((20,"ws")) # 若是兩個值的優先級同樣,那麼按照後面的值的acsii碼順序來排序,若是字符串第一個數元素相同,比較第二個元素的acsii碼順序 # q.put((20,"wd")) # q.put((20,{"a": 11})) # TypeError: unorderable types: dict() < dict() 不能是字典 # q.put((20,("w", 1))) # 優先級相同的兩個數據,他們後面的值必須是相同的數據類型才能比較,能夠是元祖,也是經過元素的ascii碼順序來排序 q.put((20, "b")) q.put((20, "a")) q.put((0, "b")) q.put((30, "c")) print(q.get()) print(q.get()) print(q.get()) print(q.get()) print(q.get()) print(q.get()) # 數字越小優先級越高,優先級高的優先出隊
早期的時候並無線程池,如今Python提供了一個新的標準或者說內置的模塊,這個模塊裏面提供了新的線程池和進程池.
concurrent.futures的用法
#1 介紹 concurrent.futures 模塊提供了高度封裝的異步調用接口 ThreadPoolExecutor:線程池,提供異步調用 ProcessPoolExecutor: 進程池,提供異步調用 Both implement the same interface, which is defined by the abstract Executor class. #2 基本方法 #submit(fn, *args, **kwargs) 異步提交任務 #map(func, *iterables, timeout=None, chunksize=1) 取代for循環submit的操做 #shutdown(wait=True) 至關於進程池的pool.close()+pool.join()操做 wait=True,等待池內全部任務執行完畢回收完資源後才繼續 wait=False,當即返回,並不會等待池內的任務執行完畢 但無論wait參數爲什麼值,整個程序都會等到全部任務執行完畢 submit和map必須在shutdown以前 #result(timeout=None) 取得結果,這是個阻塞的,一直會等相應的返回值 #add_done_callback(fn) 回調函數 # ProcessPoolExecutor的用法 from concurrent.futures import ThreadPoolExecutor,ProcessPoolExecutor import os,time,random def task(n): print('%s is runing' %os.getpid()) time.sleep(random.randint(1,3)) return n**2 if __name__ == '__main__': executor=ProcessPoolExecutor(max_workers=3) #這是限制了使用進程,能夠改成ThreadPoolEcecutor,效果同樣 futures=[] for i in range(11): future=executor.submit(task,i) #多線程執行函數,第二個參數必須是一個參數,能夠封裝成元組 futures.append(future) executor.shutdown(True) print('+++>') for future in futures: #最後輸出結果 print(future.result()) #map的用法 from concurrent.futures import ThreadPoolExecutor,ProcessPoolExecutor import os,time,random def task(n): print('%s is runing' %os.getpid()) time.sleep(random.randint(1,3)) return n**2 if __name__ == '__main__': executor=ThreadPoolExecutor(max_workers=3) # for i in range(11): # future=executor.submit(task,i) ret_l=executor.map(task,range(1,12)) #map取代了for+submit,效果一致 print('+++>') for future in ret_l: #最後輸出結果 print(future.result()) #池的誤區 import time from concurrent.futures import ThreadPoolExecutor def func(i): print(i*'*') time.sleep(1) thread_pool = ThreadPoolExecutor(5) #建立一個最大可容納2個任務的線程池 for i in range(6): thread_pool.submit(func,i) # 異步提交任務,往線程池裏面加入一個任務 print('wahaha') #結果: * ** *** **** wahaha ***** #回調函數 import time from concurrent.futures import ThreadPoolExecutor def func(i): print(i*'*') time.sleep(1) return i**2 def callback(arg): #回調函數 print(arg.result()*'-') # 取得結果,並乘以- thread_pool = ThreadPoolExecutor(5) #建立一個最大可容納2個任務的線程池 for i in range(6): ret = thread_pool.submit(func,i).add_done_callback(callback) # 異步提交任務,執行回調函數 thread_pool.shutdown() # 至關於進程池的pool.close()+pool.join()操做 print('wahaha') #執行輸出 * ** *** **** ---------------- ***** ---- - --------- ------------------------- wahaha
同進程的同樣,線程的一個關鍵特性是每一個線程都是獨立運行且狀態不可預測。若是程序中的其 他線程須要經過判斷某個線程的狀態來肯定本身下一步的操做,這時線程同步問題就會變得很是棘手。爲了解決這些問題,咱們須要使用threading庫中的Event對象
。 對象包含一個可由線程設置的信號標誌,它容許線程等待某些事件的發生。在初始狀況下,Event對象中的信號標誌被設置爲假。若是有線程等待一個Event對象, 而這個Event對象的標誌爲假,那麼這個線程將會被一直阻塞直至該標誌爲真。一個線程若是將一個Event對象的信號標誌設置爲真,它將喚醒全部等待這個Event對象的線程。若是一個線程等待一個已經被設置爲真的Event對象,那麼它將忽略這個事件, 繼續執行.
event的用法
event.isSet():#返回event的狀態值; event.wait(): #若是 event.isSet()==False將阻塞線程;可設置時間 event.set(): #設置event的狀態值爲True,全部阻塞池的線程激活進入就緒狀態, 等待操做系統調度; event.clear():#恢復event的狀態值爲False。
例如,有多個工做線程嘗試連接MySQL,咱們想要在連接前確保MySQL服務正常才讓那些工做線程去鏈接MySQL服務器,若是鏈接不成功,都會去嘗試從新鏈接。那麼咱們就能夠採用threading.Event機制來協調各個工做線程的鏈接操做。
import time import random from threading import Event,Thread # 鏈接數據庫 def connect_db(e): count = 1 # 初始計數器 while count < 4: print('嘗試第%s次檢測鏈接'%count) e.wait(0.5) # 等待0.5,再去執行下面的代碼,若是不寫參數則會也一直在這等待 # 若是不傳參數會一直等到事件爲True爲止 # 若是傳參數 傳一個時間參數,到時間後, count += 1 # 加1 if e.is_set(): # 判斷狀態是否爲True print('鏈接成功') break else: print('鏈接失敗') def check_conn(e): '''檢測數據庫是否能夠鏈接''' time.sleep(random.randint(1,2)) # 等待1~2秒,顯然這裏時間太長 e.set() # 設置狀態爲True e = Event() Thread(target=check_conn,args=(e,)).start() Thread(target=connect_db,args=(e,)).start() #隨機數,至少鏈接兩次,若是等待2秒,e.wait(0.5)執行4次,才能達到2秒。可是whlie循環,不可能達到4。因此3次所有失敗。第1種狀況,等待1秒,e.wait(0.5)必然執行2次,才能達到1秒,狀態變成True,輸出鏈接成功。第2種狀況,是偶然事件。由於CPU同一時刻只能執行一個線程。因此失敗了3次,才輸出鏈接成功。
你要作一件事情 是有前提的 你就先去處理前提的問題 —— 前提處理好了 把狀態設置成True,來控制即將要作的事情能夠開始
使得線程等待,只有知足某條件時,才釋放n個線程,Python提供的Condition對象提供了對複雜線程同步問題的支持。Condition被稱爲條件變量,除了提供與Lock相似的acquire和release方法外,還提供了wait和notify方法。線程首先acquire一個條件變量,而後判斷一些條件。若是條件不知足則wait;若是條件知足,進行一些處理改變條件後,經過notify方法通知其餘線程,其餘處於wait狀態的線程接到通知後會從新判斷條件。不斷的重複這一過程,從而解決複雜的同步問題。
from threading import Condition,Thread # acquire release # notify -- wait的行爲 # 10線程 wait # notify(1) def func(i,con): con.acquire() # 進入鎖定池 con.wait() # 等待通知,先後必需要有acquire和release print(i*'*') con.release() # 解鎖 con = Condition() #條件變量 for i in range(10): Thread(target=func,args=(i,con)).start() while True: n = int(input('>>>')) #n 表示激活幾個線程 con.acquire() # 加鎖 con.notify(n) # 通知其餘線程,其餘處於wait狀態的線程接到通知後會從新判斷條件,解除wait狀態。先後必需要有acquire和release con.release() # 解鎖 #從結果上來看,發生數據混亂 #con.notify(n) 表示按照要求放開線程 #con.notify_all() 表示一次性放開線程 總結: semaphore 容許同一時刻n個線程執行這段代碼 event 有一個內部的事件來控制wait的行爲,控制的是全部的線程 condition 有一個內部的條件來控制wait的行爲,它能夠逐個或者分批次的控制線程的走向
定時器,指定n秒後執行某個操做,你可能說爲何不用sleep,由於那是同步阻塞的,雖然能夠用time.sleep(5)完成5秒的過程,可是它會阻塞主線程,若是用timer,就不會,它是異步的。
import time from threading import Timer def func(): print('*'*10) print('子線程',time.strftime("%Y-%m-%d %H:%M:%S")) t =Timer(5,func) # 要開始一個線程,等到5秒以後纔開啓並執行 t.start() print('主進程',time.strftime("%Y-%m-%d %H:%M:%S")) #執行輸出: #主進程 2018-05-17 19:41:08 #********** #子線程 2018-05-17 19:41:13