這一篇是Python併發的第四篇,主要介紹進程和線程的定義,Python線程和全局解釋器鎖以及Python如何使用thread模塊處理併發html
考慮一下這個場景,咱們有10000條數據須要處理,處理每條數據須要花費1秒,但讀取數據只須要0.1秒,每條數據互不干擾。該如何執行才能花費時間最短呢?python
在多線程(MT)編程出現以前,電腦程序的運行由一個執行序列組成,執行序列按順序在主機的中央處理器(CPU)中運行。不管是任務自己要求順序執行仍是整個程序是由多個子任務組成,程序都是按這種方式執行的。即便子任務相互獨立,互相無關(即,一個子任務的結果不影響其它子 任務的結果)時也是這樣。linux
HUGOMORE42git
對於上邊的問題,若是使用一個執行序列來完成,咱們大約須要花費 10000*0.1 + 10000 = 11000 秒。這個時間顯然是太長了。程序員
那咱們有沒有可能在執行計算的同時取數據呢?或者是同時處理幾條數據呢?若是能夠,這樣就能大幅提升任務的效率。這就是多線程編程的目的。github
對於本質上就是異步的, 須要有多個併發事務,各個事務的運行順序能夠是不肯定的,隨機的,不可預測的問題,多線程是最理想的解決方案。這樣的任務能夠被分紅多個執行流,每一個流都有一個要完成的目標,而後將獲得的結果合併,獲得最終的結果。算法
進程(有時被稱爲重量級進程)是程序的一次 執行。每一個進程都有本身的地址空間,內存,數據棧以及其它記錄其運行軌跡的輔助數據。操做系 統管理在其上運行的全部進程,併爲這些進程公平地分配時間。進程也能夠經過 fork 和 spawn 操做 來完成其它的任務。不過各個進程有本身的內存空間,數據棧等,因此只能使用進程間通信(IPC), 而不能直接共享信息。編程
線程(有時被稱爲輕量級進程)跟進程有些類似,不一樣的是,全部的線程運行在同一個進程中, 共享相同的運行環境。它們能夠想像成是在主進程或「主線程」中並行運行的「迷你進程」。安全
線程狀態如圖服務器
線程有開始,順序執行和結束三部分。它有一個本身的指令指針,記錄本身運行到什麼地方。 線程的運行可能被搶佔(中斷),或暫時的被掛起(也叫睡眠),讓其它的線程運行,這叫作讓步。 一個進程中的各個線程之間共享同一片數據空間,因此線程之間能夠比進程之間更方便地共享數據以及相互通信。
固然,這樣的共享並非徹底沒有危險的。若是多個線程共同訪問同一片數據,則因爲數據訪 問的順序不同,有可能致使數據結果的不一致的問題。這叫作競態條件(race condition)。
線程通常都是併發執行的,不過在單 CPU 的系統中,真正的併發是不可能的,每一個線程會被安排成每次只運行一小會,而後就把 CPU 讓出來,讓其它的線程去運行。因爲有的函數會在完成以前阻塞住,在沒有特別爲多線程作修改的情 況下,這種「貪婪」的函數會讓 CPU 的時間分配有所傾斜。致使各個線程分配到的運行時間可能不 盡相同,不盡公平。
首先須要明確的一點是GIL並非Python的特性,它是在實現Python解析器(CPython)時所引入的一個概念。就比如C++是一套語言(語法)標準,可是能夠用不一樣的編譯器來編譯成可執行代碼。一樣一段代碼能夠經過CPython,PyPy,Psyco等不一樣的Python執行環境來執行(其中的JPython就沒有GIL)。
那麼CPython實現中的GIL又是什麼呢?GIL全稱Global Interpreter Lock爲了不誤導,咱們仍是來看一下官方給出的解釋:
In CPython, the global interpreter lock, or GIL, is a mutex that prevents multiple native threads from executing Python bytecodes at once. This lock is necessary mainly because CPython’s memory management is not thread-safe. (However, since the GIL exists, other features have grown to depend on the guarantees that it enforces.)
儘管Python徹底支持多線程編程, 可是解釋器的C語言實現部分在徹底並行執行時並非線程安全的。 實際上,解釋器被一個全局解釋器鎖保護着,它確保任什麼時候候都只有一個Python線程執行。
在多線程環境中,Python 虛擬機按如下方式執行:
對全部面向 I/O 的(會調用內建的操做系統 C 代碼的)程序來講,GIL 會在這個 I/O 調用之 前被釋放,以容許其它的線程在這個線程等待 I/O 的時候運行。若是某線程並未使用不少 I/O 操做, 它會在本身的時間片內一直佔用處理器(和 GIL)。也就是說,I/O 密集型的 Python 程序比計算密集 型的程序更能充分利用多線程環境的好處。
當一個線程結束計算,它就退出了。線程能夠調用 thread.exit()之類的退出函數,也可使用 Python 退出進程的標準方法,如 sys.exit()或拋出一個 SystemExit 異常等。不過,你不能夠直接 「殺掉」("kill")一個線程。
在 Win32 和 Linux, Solaris, MacOS, *BSD 等大多數類 Unix 系統上運行時,Python 支持多線程 編程。Python 使用 POSIX 兼容的線程,即 pthreads。
默認狀況下,只要在解釋器中
>> import thread複製代碼
若是沒有報錯,則說明線程可用。
Python 供了幾個用於多線程編程的模塊,包括 thread, threading 和 Queue 等。thread 和 threading 模塊容許程序員建立和管理線程。thread 模塊 供了基本的線程和鎖的支持,而 threading 供了更高級別,功能更強的線程管理的功能。Queue 模塊容許用戶建立一個能夠用於多個線程之間 共享數據的隊列數據結構。
出於如下幾點考慮,咱們不建議您使用 thread 模塊。
除了產生線程外,thread 模塊也提供了基本的同步數 據結構鎖對象(lock object,也叫原語鎖,簡單鎖,互斥鎖,互斥量,二值信號量)。
thread 模塊函數
下面是一個使用 thread 的例子:
import thread
from time import sleep, time
def loop(num):
print('start loop at:', time())
sleep(num)
print('loop done at:', time())
def loop1(num):
print('start loop 1 at:', time())
sleep(num)
print('loop 1 done at:', time())
def main():
print('starting at:', time())
thread.start_new_thread(loop, (4,))
thread.start_new_thread(loop1, (5,))
sleep(6)
print('all DONE at:', time())
if __name__ == '__main__':
main()
('starting at:', 1489387024.886667)
('start loop at:', 1489387024.88705)
('start loop 1 at:', 1489387024.887277)
('loop done at:', 1489387028.888182)
('loop 1 done at:', 1489387029.888904)
('all DONE at:', 1489387030.889918)複製代碼
start_new_thread()要求必定要有前兩個參數。因此,就算咱們想要運行的函數不要參數,也要傳一個空的元組。
爲何要加上sleep(6)這一句呢? 由於,若是咱們沒有讓主線程停下來,那主線程就會運行下一條語句,顯示 「all done」,而後就關閉運行着 loop()和 loop1()的兩個線程,退出了。
咱們有沒有更好的辦法替換使用sleep() 這種不靠譜的同步方式呢?答案是使用鎖,使用了鎖,咱們就能夠在兩個線程都退出以後立刻退出。
#! -*- coding: utf-8 -*-
import thread
from time import sleep, time
loops = [4, 2]
def loop(nloop, nsec, lock):
print('start loop %s at: %s' % (nloop, time()))
sleep(nsec)
print('loop %s done at: %s' % (nloop, time()))
# 每一個線程都會被分配一個事先已經得到的鎖,在 sleep()的時間到了以後就釋放 相應的鎖以通知主線程,這個線程已經結束了。
lock.release()
def main():
print('starting at:', time())
locks = []
nloops = range(len(loops))
for i in nloops:
# 調用 thread.allocate_lock()函數建立一個鎖的列表
lock = thread.allocate_lock()
# 分別調用各個鎖的 acquire()函數得到, 得到鎖表示「把鎖鎖上」
lock.acquire()
locks.append(lock)
for i in nloops:
# 建立線程,每一個線程都用各自的循環號,睡眠時間和鎖爲參數去調用 loop()函數
thread.start_new_thread(loop, (i, loops[i], locks[i]))
for i in nloops:
# 在線程結束的時候,線程要本身去作解鎖操做
# 當前循環只是坐在那一直等(達到暫停主 線程的目的),直到兩個鎖都被解鎖爲止才繼續運行。
while locks[i].locked(): pass
print('all DONE at:', time())
if __name__ == '__main__':
main()複製代碼
爲何咱們不在建立鎖的循環裏建立線程呢?有如下幾個緣由:
threading 模塊不只提供了 Thread 類,還提供了各類很是好用的同步機制。
下面是threading 模塊裏全部的對象:
另外一個避免使用 thread 模塊的緣由是,它不支持守護線程。當主線程退出時,全部的子線程不 論它們是否還在工做,都會被強行退出。有時,咱們並不指望這種行爲,這時,就引入了守護線程 的概念
threading 模塊支持守護線程,它們是這樣工做的:守護線程通常是一個等待客戶請求的服務器, 若是沒有客戶 出請求,它就在那等着。若是你設定一個線程爲守護線程,就表示你在說這個線程 是不重要的,在進程退出的時候,不用等待這個線程退出。
若是你的主線程要退出的時候,不用等待那些子線程完成,那就設定這些線程的 daemon 屬性。 即,在線程開始(調用 thread.start())以前,調用 setDaemon()函數設定線程的 daemon 標誌 (thread.setDaemon(True))就表示這個線程「不重要」
若是你想要等待子線程完成再退出,那就什麼都不用作,或者顯式地調用 thread.setDaemon(False)以保證其 daemon 標誌爲 False。你能夠調用 thread.isDaemon()函數來判 斷其 daemon 標誌的值。新的子線程會繼承其父線程的 daemon 標誌。整個 Python 會在全部的非守護 線程退出後纔會結束,即進程中沒有非守護線程存在的時候才結束。
Thread類提供瞭如下方法:
用 Thread 類,你能夠用多種方法來建立線程。咱們在這裏介紹三種比較相像的方法。
下邊是三種不一樣方式的建立線程的示例:
#! -*- coding: utf-8 -*-
# 建立一個Thread的實例,傳給它一個函數
import threading
from time import sleep, time
loops = [4, 2]
def loop(nloop, nsec, lock):
print('start loop %s at: %s' % (nloop, time()))
sleep(nsec)
print('loop %s done at: %s' % (nloop, time()))
# 每一個線程都會被分配一個事先已經得到的鎖,在 sleep()的時間到了以後就釋放 相應的鎖以通知主線程,這個線程已經結束了。
def main():
print('starting at:', time())
threads = []
nloops = range(len(loops))
for i in nloops:
t = threading.Thread(target=loop, args=(i, loops[i]))
threads.append(t)
for i in nloops:
# start threads
threads[i].start()
for i in nloops:
# wait for all
# join()會等到線程結束,或者在給了 timeout 參數的時候,等到超時爲止。
# 使用 join()看上去 會比使用一個等待鎖釋放的無限循環清楚一些(這種鎖也被稱爲"spinlock")
threads[i].join() # threads to finish
print('all DONE at:', time())
if __name__ == '__main__':
main()複製代碼
與傳一個函數很類似的另外一個方法是在建立線程的時候,傳一個可調用的類的實例供線程啓動 的時候執行——這是多線程編程的一個更爲面向對象的方法。相對於一個或幾個函數來講,因爲類 對象裏可使用類的強大的功能,能夠保存更多的信息,這種方法更爲靈活
#! -*- coding: utf-8 -*-
# 建立一個 Thread 的實例,傳給它一個可調用的類對象
from threading import Thread
from time import sleep, time
loops = [4, 2]
class ThreadFunc(object):
def __init__(self, func, args, name=""):
self.name = name
self.func = func
self.args = args
def __call__(self):
# 建立新線程的時候,Thread 對象會調用咱們的 ThreadFunc 對象,這時會用到一個特殊函數 __call__()。
self.func(*self.args)
def loop(nloop, nsec):
print('start loop %s at: %s' % (nloop, time()))
sleep(nsec)
print('loop %s done at: %s' % (nloop, time()))
def main():
print('starting at:', time())
threads = []
nloops = range(len(loops))
for i in nloops:
t = Thread(target=ThreadFunc(loop, (i, loops[i]), loop.__name__))
threads.append(t)
for i in nloops:
# start threads
threads[i].start()
for i in nloops:
# wait for all
# join()會等到線程結束,或者在給了 timeout 參數的時候,等到超時爲止。
# 使用 join()看上去 會比使用一個等待鎖釋放的無限循環清楚一些(這種鎖也被稱爲"spinlock")
threads[i].join() # threads to finish
print('all DONE at:', time())
if __name__ == '__main__':
main()複製代碼
最後一個例子介紹如何子類化 Thread 類,這與上一個例子中的建立一個可調用的類很是像。使用子類化建立線程(第 29-30 行)使代碼看上去更清晰明瞭。
#! -*- coding: utf-8 -*-
# 建立一個 Thread 的實例,傳給它一個可調用的類對象
from threading import Thread
from time import sleep, time
loops = [4, 2]
class MyThread(Thread):
def __init__(self, func, args, name=""):
super(MyThread, self).__init__()
self.name = name
self.func = func
self.args = args
def getResult(self):
return self.res
def run(self):
# 建立新線程的時候,Thread 對象會調用咱們的 ThreadFunc 對象,這時會用到一個特殊函數 __call__()。
print 'starting', self.name, 'at:', time()
self.res = self.func(*self.args)
print self.name, 'finished at:', time()
def loop(nloop, nsec):
print('start loop %s at: %s' % (nloop, time()))
sleep(nsec)
print('loop %s done at: %s' % (nloop, time()))
def main():
print('starting at:', time())
threads = []
nloops = range(len(loops))
for i in nloops:
t = MyThread(loop, (i, loops[i]), loop.__name__)
threads.append(t)
for i in nloops:
# start threads
threads[i].start()
for i in nloops:
# wait for all
# join()會等到線程結束,或者在給了 timeout 參數的時候,等到超時爲止。
# 使用 join()看上去 會比使用一個等待鎖釋放的無限循環清楚一些(這種鎖也被稱爲"spinlock")
threads[i].join() # threads to finish
print('all DONE at:', time())
if __name__ == '__main__':
main()複製代碼
下面,咱們接咱們以前按以前併發的套路,用實現一下使用 threading 併發下載國旗
# python3
import threading
from threading import Thread
from flags import save_flag, show, main, get_flag
class MyThread(Thread):
def __init__(self, func, args, name=""):
super(MyThread, self).__init__()
self.name = name
self.func = func
self.args = args
def getResult(self):
return self.res
def run(self):
# 建立新線程的時候,Thread 對象會調用咱們的 ThreadFunc 對象,這時會用到一個特殊函數 __call__()。
self.res = self.func(*self.args)
def download_one(cc): # <3>
image = get_flag(cc)
show(cc)
save_flag(image, cc.lower() + '.gif')
return cc
def download_many(cc_list):
threads = []
for cc in cc_list:
thread = MyThread(download_one, (cc, ), download_one.__name__)
threads.append(thread)
for thread in threads:
# 啓動線程
thread.start()
for thread in threads:
# wait for all
# join()會等到線程結束,或者在給了 timeout 參數的時候,等到超時爲止。
# 使用 join()看上去 會比使用一個等待鎖釋放的無限循環清楚一些(這種鎖也被稱爲"spinlock")
thread.join()
return len(list(threads)) # <7>
if __name__ == '__main__':
main(download_many)複製代碼
執行代碼發現和使用協程相比速度基本一致。
除了各類同步對象和線程對象外,threading 模塊還 供了一些函數。
原語鎖定是一個同步原語,狀態是鎖定或未鎖定。兩個方法acquire()和release() 用於加鎖和釋放鎖。
RLock 可重入鎖是一個相似於Lock對象的同步原語,但同一個線程能夠屢次調用。
Lock 不支持遞歸加鎖,也就是說即使在同 線程中,也必須等待鎖釋放。一般建議改 RLock, 它會處理 "owning thread" 和 "recursion level" 狀態,對於同 線程的屢次請求鎖 爲,只累加
計數器。每次調 release() 將遞減該計數器,直到 0 時釋放鎖,所以 acquire() 和 release() 必須 要成對出現。
from time import sleep
from threading import current_thread, Thread
lock = Rlock()
def show():
with lock:
print current_thread().name, i
sleep(0.1)
def test():
with lock:
for i in range(3):
show(i)
for i in range(2):
Thread(target=test).start()複製代碼
事件用於在線程間通訊。一個線程發出一個信號,其餘一個或多個線程等待。
Event 經過經過 個內部標記來協調多線程運 。 法 wait() 阻塞線程執 ,直到標記爲 True。 set() 將標記設爲 True,clear() 更改標記爲 False。isSet() 用於判斷標記狀態。
from threading import Event
def test_event():
e = Event()
def test():
for i in range(5):
print 'start wait'
e.wait()
e.clear() # 若是不調用clear(),那麼標記一直爲 True,wait()就不會發生阻塞行爲
print i
Thread(target=test).start()
return e
e = test_event()複製代碼
條件變量和 Lock 參數同樣,也是一個,也是一個同步原語,當須要線程關注特定的狀態變化或事件的發生時使用這個鎖定。
能夠認爲,除了Lock帶有的鎖定池外,Condition還包含一個等待池,池中的線程處於狀態圖中的等待阻塞狀態,直到另外一個線程調用notify()/notifyAll()通知;獲得通知後線程進入鎖定池等待鎖定。
構造方法:
Condition([lock/rlock])
Condition 有如下這些方法:
from threading import Condition, current_thread, Thread
con = Condition()
def tc1():
with con:
for i in range(5):
print current_thread().name, i
sleep(0.3)
if i == 3:
con.wait()
def tc2():
with con:
for i in range(5):
print current_thread().name, i
sleep(0.1)
con.notify()
Thread(target=tc1).start()
Thread(target=tc2).start()
Thread-1 0
Thread-1 1
Thread-1 2
Thread-1 3 # 讓出鎖
Thread-2 0
Thread-2 1
Thread-2 2
Thread-2 3
Thread-2 4
Thread-1 4 # 從新獲取鎖,繼續執複製代碼
只有獲取鎖的線程才能調用 wait() 和 notify(),所以必須在鎖釋放前調用。
當 wait() 釋放鎖後,其餘線程也可進入 wait 狀態。notifyAll() 激活全部等待線程,讓它們去搶鎖而後完成後續執行。
如今咱們用一個經典的(生產者消費者)例子來介紹一下 Queue模塊。
生產者消費者的場景是: 生產者生產貨物,而後把貨物放到一個隊列之類的數據結構中,生產貨物所要花費的時間沒法預先肯定。消費者消耗生產者生產的貨物的時間也是不肯定的。
經常使用的 Queue 模塊的屬性:
Queue 模塊能夠用來進行線程間通信,讓各個線程之間共享數據。
如今,咱們建立一個隊列,讓 生產者(線程)把新生產的貨物放進去供消費者(線程)使用。
# python2
#! -*- coding: utf-8 -*-
from Queue import Queue
from random import randint
from time import sleep, time
from threading import Thread
class MyThread(Thread):
def __init__(self, func, args, name=""):
super(MyThread, self).__init__()
self.name = name
self.func = func
self.args = args
def getResult(self):
return self.res
def run(self):
# 建立新線程的時候,Thread 對象會調用咱們的 ThreadFunc 對象,這時會用到一個特殊函數 __call__()。
print 'starting', self.name, 'at:', time()
self.res = self.func(*self.args)
print self.name, 'finished at:', time()
# writeQ()和 readQ()函數分別用來把對象放入隊列和消耗隊列中的一個對象。在這裏咱們使用 字符串'xxx'來表示隊列中的對象。
def writeQ(queue):
print 'producing object for Q...'
queue.put('xxx', 1)
print "size now", queue.qsize()
def readQ(queue):
queue.get(1)
print("consumed object from Q... size now", queue.qsize())
def writer(queue, loops):
# writer()函數只作一件事,就是一次往隊列中放入一個對象,等待一會,而後再作一樣的事
for i in range(loops):
writeQ(queue)
sleep(1)
def reader(queue, loops):
# reader()函數只作一件事,就是一次從隊列中取出一個對象,等待一會,而後再作一樣的事
for i in range(loops):
readQ(queue)
sleep(randint(2, 5))
# 設置有多少個線程要被運行
funcs = [writer, reader]
nfuncs = range(len(funcs))
def main():
nloops = randint(10, 20)
q = Queue(32)
threads = []
for i in nfuncs:
t = MyThread(funcs[i], (q, nloops), funcs[i].__name__)
threads.append(t)
for i in nfuncs:
threads[i].start()
for i in nfuncs:
threads[i].join()
print threads[i].getResult()
print 'all DONE'
if __name__ == '__main__':
main()複製代碼
進程(有時被稱爲重量級進程)是程序的一次 執行。每一個進程都有本身的地址空間,內存,數據棧以及其它記錄其運行軌跡的輔助數據。
線程(有時被稱爲輕量級進程)跟進程有些類似,不一樣的是,全部的線程運行在同一個進程中, 共享相同的運行環境。它們能夠想像成是在主進程或「主線程」中並行運行的「迷你進程」。
這篇文章很好的解釋了 線程和進程的區別,推薦閱讀: www.ruanyifeng.com/blog/2013/0…
因爲GIL的緣故,對全部面向 I/O 的(會調用內建的操做系統 C 代碼的)程序來講,GIL 會在這個 I/O 調用之 前被釋放,以容許其它的線程在這個線程等待 I/O 的時候運行。若是某線程並未使用不少 I/O 操做, 它會在本身的時間片內一直佔用處理器(和 GIL)。也就是說,I/O 密集型的 Python 程序比計算密集 型的程序更能充分利用多線程環境的好處。
Python的線程就是C語言的一個pthread,並經過操做系統調度算法進行調度(例如linux是CFS)。爲了讓各個線程可以平均利用CPU時間,python會計算當前已執行的微代碼數量,達到必定閾值後就強制釋放GIL。而這時也會觸發一次操做系統的線程調度(固然是否真正進行上下文切換由操做系統自主決定)。
僞代碼
while True:
acquire GIL
for i in 1000:
do something
release GIL
/* Give Operating System a chance to do thread scheduling */複製代碼
這種模式在只有一個CPU核心的狀況下毫無問題。任何一個線程被喚起時都能成功得到到GIL(由於只有釋放了GIL纔會引起線程調度)。
但當CPU有多個核心的時候,問題就來了。從僞代碼能夠看到,從release GIL到acquire GIL之間幾乎是沒有間隙的。因此當其餘在其餘核心上的線程被喚醒時,大部分狀況下主線程已經又再一次獲取到GIL了。這個時候被喚醒執行的線程只能白白的浪費CPU時間,看着另外一個線程拿着GIL歡快的執行着。而後達到切換時間後進入待調度狀態,再被喚醒,再等待,以此往復惡性循環。
簡單的總結下就是:Python的多線程在多核CPU上,只對於IO密集型計算產生正面效果;而當有至少有一個CPU密集型線程存在,那麼多線程效率會因爲GIL而大幅降低。
>歡迎關注 | >請我喝芬達 |
---|---|