Python做爲一種解釋型語言,因爲使用了全局解釋鎖(GIL)的緣由,其代碼不能同時在多核CPU上併發的運行。這也致使在Python中使用多線程編程並不能實現併發,咱們得使用其餘的方法在Python中實現併發編程。編程
Python中不能經過使用多線程實現併發編程主要是由於全局解釋鎖的機制,因此首先解釋一下全局解釋鎖的概念。安全
首先,咱們知道C++和Java是編譯型語言,而Python則是一種解釋型語言。對於Python程序來講,它是直接被輸入到解釋器中直接運行的。解釋器在程序執行以前對其並不瞭解;它所知道的只是Python的規則,以及在執行過程當中怎樣去動態的應用這些規則。它也有一些優化,可是這基本上只是另外一個級別的優化。因爲解釋器無法很好的對程序進行推導,Python的大部分優化實際上是解釋器自身的優化。更快的解釋器天然意味着程序的運行也能「免費」的更快。也就是說,解釋器優化後,Python程序不用作修改就能夠享受優化後的好處。網絡
爲了利用多核系統,Python必須支持多線程運行。但做爲解釋型語言,Python的解釋器須要作到既安全又高效。解釋器要注意避免在不一樣的線程操做內部共享的數據,同時還要保證在管理用戶線程時保證老是有最大化的計算資源。爲了保證不一樣線程同時訪問數據時的安全性,Python使用了全局解釋器鎖(GIL)的機制。從名字上咱們很容易明白,它是一個加在解釋器上的全局(從解釋器的角度看)鎖(從互斥或者相似角度看)。這種方式固然很安全,但它也意味着:對於任何Python程序,無論有多少的處理器,任什麼時候候都老是隻有一個線程在執行。即:只有得到了全局解釋器鎖的線程才能操做Python對象或者調用Python/C API函數。多線程
因此,在Python中」不要使用多線程,請使用多進程」。具體來講,若是你的代碼是IO密集型的,使用多線程或者多進程都是能夠的,多進程比線程更易用,可是會消耗更多的內存;若是你的代碼是CPU密集型的,多進程(multiprocessing模塊)就明顯是更好的選擇——特別是所使用的機器是多核或多CPU的時候。併發
另外,Python的官方實現CPython帶有GIL,但並非全部的Python實現版本都是這樣的。IronPython,Jython,還有使用.NET框架實現的Python就沒有GIL。因此若是你不能忍受GIL,也能夠嘗試用一下其餘實現版本的Python。app
若是是一個計算型的任務,GIL就會讓多線程變慢。咱們舉個計算斐波那契數列的例子:框架
import time import threading def text(name): def profile(func): def wrapper(*args,**kwargs): start = time.time() res = func(*args,**kwargs) end = time.time() print('{} cost:{}'.format(name,end-start)) return res return wrapper return profile def fib(n): if n <= 2: return 1 return fib(n-1) + fib(n-2) @text('nothread') def nothread(): fib(35) fib(35) @text('hasthread') def hasthread(): for i in range(2): t = threading.Thread(target=fib,args=(35,)) t.start() main_thread = threading.current_thread() for t in threading.enumerate(): if t is main_thread: continue t.join() nothread() hasthread() ##輸出結果### nothread cost:6.141353607177734 hasthread cost:6.15336275100708
這種狀況還不如不用多線程!dom
GIL是必須的,這是Python設計的問題:Python解釋器是非線程安全的。這意味着當從線程內嘗試安全的訪問Python對象的時候將有一個全局的強制鎖。 在任什麼時候候,僅僅一個單一的線程可以獲取Python對象或者C API。每100個字節的Python指令解釋器將從新獲取鎖,這(潛在的)阻塞了I/O操做。由於鎖,CPU密集型的代碼使用線程庫時,不會得到性能的提升。ide
那是否是因爲GIL的存在,多線程庫就是個「雞肋」呢?固然不是。事實上咱們平時會接觸很是多的和網絡通訊或者數據輸入/輸出相關的程序,好比網絡爬蟲、文本處理等等。這時候因爲網絡狀況和I/O的性能的限制,Python解釋器會等待讀寫數據的函數調用返回,這個時候就能夠利用多線程庫提升併發效率了。函數
A. Semaphore(信號量)
在多線程編程中,爲了防止不一樣的線程同時對一個公用的資源(好比所有變量)進行修改,須要進行同時訪問的數量(一般是1)的限制。信號量同步基於內部計數器,每調用一次acquire(),計數器減1;每調用一次release(),計數器加1.當計數器爲0時,acquire()調用被阻塞。
import time from random import random from threading import Thread,Semaphore,current_thread,enumerate sema = Semaphore(3) def foo(tid): with sema: print('{} acquire sema'.format(tid)) wt = random() * 2 time.sleep(wt) print('{} release sema'.format(tid)) for i in range(5): t = Thread(target=foo,args=(i,)) t.start() main_thread = current_thread() for t in enumerate(): if t is main_thread: continue t.join() ####輸出結果##### 0 acquire sema 1 acquire sema 2 acquire sema 0 release sema 3 acquire sema 1 release sema 4 acquire sema 2 release sema 3 release sema 4 release sema
B. Lock(互斥鎖)
Lock也能夠叫作互斥鎖,其實至關於信號量爲1。咱們先看一個不加鎖的例子:
import time import threading value = 0 def getlock(): global value new = value + 1 time.sleep(0.001) # 讓線程有機會切換 value = new for i in range(100): t = threading.Thread(target=getlock) t.start() main_thread = threading.current_thread() for t in threading.enumerate(): if t == main_thread: continue t.join() print(value) ####輸出結果##### 不肯定(刷新值會發生改變)
如今,咱們來看看加鎖以後的狀況:
import time import threading value = 0 lock = threading.Lock() def getlock(): global value with lock: new = value + 1 time.sleep(0.001) # 讓線程有機會切換 value = new for i in range(100): t = threading.Thread(target=getlock) t.start() main_thread = threading.current_thread() for t in threading.enumerate(): if t == main_thread: continue t.join() print(value) ####輸出結果爲############# 100
咱們對value的自增長了鎖,就能夠保證告終果了。
先來講說死鎖,所謂死鎖: 是指兩個或兩個以上的進程或線程在執行過程當中,因爭奪資源而形成的一種互相等待的現象,若無外力做用,它們都將沒法推動下去。此時稱系統處於死鎖狀態或系統產生了死鎖,這些永遠在互相等待的進程稱爲死鎖進程。
import threading import time mutexA = threading.Lock() mutexB = threading.Lock() class MyThread(threading.Thread): def __init__(self): threading.Thread.__init__(self) def run(self): self.fun1() self.fun2() def fun1(self): mutexA.acquire() # 若是鎖被佔用,則阻塞在這裏,等待鎖的釋放 print ("I am %s , get res: %s---%s" %(self.name, "ResA",time.time())) mutexB.acquire() print ("I am %s , get res: %s---%s" %(self.name, "ResB",time.time())) mutexB.release() mutexA.release() def fun2(self): mutexB.acquire() print ("I am %s , get res: %s---%s" %(self.name, "ResB",time.time())) time.sleep(0.2) mutexA.acquire() print ("I am %s , get res: %s---%s" %(self.name, "ResA",time.time())) mutexA.release() mutexB.release() if __name__ == "__main__": print("start---------------------------%s"%time.time()) for i in range(0, 10): my_thread = MyThread() my_thread.start()
解決方案:
import threading import time mutex = threading.RLock() class MyThread(threading.Thread): def __init__(self): threading.Thread.__init__(self) def run(self): self.fun1() self.fun2() def fun1(self): mutex.acquire() # 若是鎖被佔用,則阻塞在這裏,等待鎖的釋放 print ("I am %s , get res: %s---%s" %(self.name, "ResA",time.time())) mutex.acquire() print ("I am %s , get res: %s---%s" %(self.name, "ResB",time.time())) mutex.release() mutex.release() def fun2(self): mutex.acquire() print ("I am %s , get res: %s---%s" %(self.name, "ResB",time.time())) time.sleep(0.2) mutex.acquire() print ("I am %s , get res: %s---%s" %(self.name, "ResA",time.time())) mutex.release() mutex.release() if __name__ == "__main__": print("start---------------------------%s"%time.time()) for i in range(0, 10): my_thread = MyThread() my_thread.start()
遞歸鎖內部維護了一個計數器,當有線程拿到了Lock之後,這個計數器會自動加1,只要這計數器的值大於0,那麼其餘線程就不能搶到改鎖,這就保證了,在同一時刻,僅有一個線程使用該鎖,從而避免了死鎖的方法。關於遞歸鎖內部實現,有興趣的能夠看看源碼。
一個線程等待特定條件,而另外一個線程發出特定條件知足的信號。最好說明的例子就是「生產者/消費者」模型:
import time import threading def consumer(cond): t = threading.current_thread() with cond: cond.wait() # 建立了一個鎖,等待producer解鎖 print('{}: Resource is available to consumer'.format(t.name)) def producer(cond): t = threading.current_thread() with cond: print('{}:Making resource available'.format(t.name)) cond.notifyAll() # 釋放鎖,喚醒消費者 condition = threading.Condition() c1 = threading.Thread(name='c1',target=consumer,args=(condition,)) p = threading.Thread(name='p',target=producer,args=(condition,)) c2 = threading.Thread(name='c2',target=consumer,args=(condition,)) c1.start() time.sleep(1) c2.start() time.sleep(1) p.start()
一個線程發送/傳遞事件,另外的線程等待事件的觸發。咱們一樣的用「生產者/消費者」模型的例子:
import time import threading from random import randint TIMEOUT = 2 def consumer(event, l): t = threading.currentThread() while 1: event_is_set = event.wait(TIMEOUT) if event_is_set: try: integer = l.pop() print('{} popped from list by {}'.format(integer,t.name)) event.clear() # 重置狀態 except IndexError: pass def producer(event, l): t = threading.currentThread() while 1: integer = randint(10,100) l.append(integer) print('{} append to list by {}'.format(integer, t.name)) event.set() time.sleep(1) event = threading.Event() l = [] threads = [] p = threading.Thread(name='producer1', target=producer, args=(event, l)) p.start() threads.append(p) for name in ('consumer1','consumer2'): t = threading.Thread(target=consumer, name=name, args=(event, l)) t.start() threads.append(t) for t in threads: t.join() print('ending')
能夠看到事件被2個消費者比較平均的接收並處理了。若是使用了wait方法,線程就會等待咱們設置事件,這也有助於保證任務的完成。
隊列在併發開發中最經常使用的。咱們藉助「生產者/消費者」模式來理解:生產者把生產的「消息」放入隊列,消費者從這個隊列中對去對應的消息執行。
你們主要關心以下4個方法就行了:
put: 向隊列中添加一個消息。
get: 從隊列中刪除並返回一個消息。
task_done: 當某一項任務完成時調用。
join: 阻塞直到全部的項目都被處理完。
import time import threading import random import queue q = queue.Queue() def double(n): return n*2 def producer(): while 1: wt = random.randint(1,10) time.sleep(random.random()) q.put((double, wt)) def consumer(): while 1: task, arg = q.get() print(arg, task(arg)) q.task_done() for target in (producer, consumer): t = threading.Thread(target=target) t.start()
Queue模塊還自帶了PriorityQueue(帶有優先級)和LifoQueue(先進先出)2種特殊隊列。咱們這裏展現下線程安全的優先級隊列的用法,
PriorityQueue要求咱們put的數據的格式是(priority_number, data)
,咱們看看下面的例子:
import time import threading from random import randint import queue q = queue.PriorityQueue() def double(n): return n * 2 def producer(): count = 0 while 1: if count > 5: break prit = randint(0,100) print("put :{}".format(prit)) q.put((prit, double, prit)) # (優先級,函數,參數) count += 1 def consumer(): while 1: if q.empty(): break pri,task,arg = q.get() print('[PRI:{}] {} * 2 = {}'.format(pri,arg,task(arg))) q.task_done() time.sleep(0.1) t = threading.Thread(target=producer) t.start() time.sleep(1) t = threading.Thread(target=consumer) t.start()
面向對象開發中,你們知道建立和銷燬對象是很費時間的,由於建立一個對象要獲取內存資源或者其它更多資源。無節制的建立和銷燬線程是一種極大的浪費。那咱們可不能夠把執行完任務的線程不銷燬而重複利用呢?彷彿就是把這些線程放進一個池子,一方面咱們能夠控制同時工做的線程數量,一方面也避免了建立和銷燬產生的開銷。
import time import threading from random import random import queue def double(n): return n * 2 class Worker(threading.Thread): def __init__(self, queue): super(Worker, self).__init__() self._q = queue self.daemon = True self.start() def run(self): while 1: f, args, kwargs = self._q.get() try: print('USE:{}'.format(self.name)) print(f(*args, **kwargs)) except Exception as e: print(e) self._q.task_done() class ThreadPool(object): def __init__(self, max_num=5): self._q = queue.Queue(max_num) for _ in range(max_num): Worker(self._q) # create worker thread def add_task(self, f, *args, **kwargs): self._q.put((f, args, kwargs)) def wait_compelete(self): self._q.join() pool = ThreadPool() for _ in range(8): wt = random() pool.add_task(double, wt) time.sleep(wt) pool.wait_compelete()