學習Python多線程的資料不少,吐槽Python多線程的博客也很多。本文主要介紹Python多線程實際應用,且假設讀者已經瞭解多線程的基本概念。若是讀者對進程線程概念不甚瞭解,可參見知名博主 阮一峯 轉譯的一篇博客:《進程與線程的一個簡單解釋》。html
Python中多線程主要有兩個模塊,_thread和threading模塊。前者更底層,後者更經常使用,能知足絕大部分編程需求,今天主要圍繞threading模塊展開介紹。啓動一個線程須要用threading模塊中的Thread。python
線程的啓動須要先建立Thread對象,而後調用該對象的start()方法,參見下例:算法
import time import threading def func(n): while n > 0: print("線程name:", threading.current_thread().name, "參數n:", n) n -= 1 time.sleep(1) t = threading.Thread(target=func, args=(5,)) t.start() print("主線程:", threading.current_thread().name) # 運行結果: # 線程name: Thread-1 參數n: 5 # 主線程: MainThread # 線程name: Thread-1 參數n: 4 # 線程name: Thread-1 參數n: 3 # 線程name: Thread-1 參數n: 2 # 線程name: Thread-1 參數n: 1
上例中,threading.current_thread().name 是獲取當前線程的name屬性。數據庫
Thread中,形參target傳入函數名,args傳入函數對應的參數,參數必須是可迭代對象,若是是元組且只有一個參數必須寫成(參數,)的形式,逗號不能省略。編程
一旦啓動一個線程,該線程將由操做系統來全權管理,獨立執行直到目標函數返回。通常狀況下,線程的操做有如下幾種:安全
t.is_alive() # 查詢線程對象的狀態,返回布爾值 t.join() # 將線程加入到當前線程,並等待其終止 t = Thread(target=countdown, args=(10,), daemon=True) # 後臺線程 t.start()
查看線程狀態示例:服務器
import time import threading def func(n): while n > 0: print("線程name:", threading.current_thread().name, "參數n:", n) n -= 1 time.sleep(1) t = threading.Thread(target=func, args=(2,)) t.start() print("主線程:", threading.current_thread().name) if t.is_alive(): print("活着的") else: print("未存活") print("主線程結束")
讓主線程等待其餘線程,就是主線程會在join()處一直等待全部線程都結束以後,再繼續運行。參見下例:多線程
import time import threading def func(n): while n > 0: print("線程name:", threading.current_thread().name, "參數n:", n) n -= 1 time.sleep(1) t = threading.Thread(target=func, args=(2,)) t.start() t.join() print("主線程:", threading.current_thread().name) print("主線程結束") # 運行結果: # 線程name: Thread-1 參數n: 2 # 線程name: Thread-1 參數n: 1 # 主線程: MainThread # 主線程結束
後臺線程參見下例:app
import time import threading def func(n): while n > 0: print("參數n:", n) n -= 1 time.sleep(1) t = threading.Thread(target=func, args=(10, ), daemon=True) t.start() time.sleep(3) print("主線程結束") # 參數n: 10 # 參數n: 9 # 參數n: 8 # 參數n: 7 # 主線程結束
後臺線程沒法等待,但主線程終止時後臺線程自動銷燬。 若是要對線程進行高級操做,如發送信號,終止線程,都須要本身實現。下例經過輪詢控制線程退出:異步
import time from threading import Thread class StopThread: def __init__(self): self._flag = True def terminate(self): self._flag = False def run(self, n): while self._flag and n > 0: print('num>>:', n) n -= 1 time.sleep(1) obj = StopThread() t = Thread(target=obj.run, args=(11,)) t.start() time.sleep(5) # 表示do something obj.terminate() # 終止線程 t.join() print("主線程結束")
上例經過類中的_flag控制線程的終止,當主線程執行5秒以後,主動將_flag賦值爲False終止線程。經過輪詢終止線程存在一個問題,若是while self._flag and n > 0:這句後,某次循環一直阻塞在I/O操做上,根本不會進行下一次循環,天然就沒法終止。這該怎麼辦呢?留一個思考題。
多線程還能夠經過繼承Thread實現,以下:
import time from threading import Thread class A(Thread): def __init__(self,): super().__init__() def run(self): print("run1..", ) time.sleep(5) print("run2..") obj = A() obj.start() print("主線程結束")
當咱們用多個線程同時修改同一份數據時,怎麼保證最終結果是咱們期許的呢?舉個例子,當前有一個全局變量a=0,若是有10個線程同時對其加1,這就出現了線程間的競爭,到底應該聽誰的呢?這時候,應該用線程鎖來解決。也就是當某一個線程A對該數據操做時,對該數據加鎖,其餘線程只能等着,等待A操做完以後釋放了該鎖,其餘線程才能操做該數據,一旦某個線程得到操做數據的權限,當即又加上鎖。如此便能保證數據的安全準確。奇怪的是,在Python3中,即便不加鎖,好像也不會發生數據出錯的狀況。或許這個例子不是很好,也或許是Python3中自動加了鎖。但願有知道的讀者賜教一下。這個奇怪的現象就是下例了:
from threading import Thread import time def add_one(a): time.sleep(1) print("in thread a:", a) a[1] += 1 if __name__ == '__main__': array = [0, 1, 4] thread_obj_list = [] for i in range(50): t = Thread(target=add_one, args=(array,)) t.start() thread_obj_list.append(t) for j in thread_obj_list: j.join() print("array result::", array) # array result:: [0, 51, 4]
咱們看到,最後array的第二個元素是51,並無出錯,這真是使人費解。好了,言歸正傳,來看看線程鎖的幾個方法吧:
lock = threading.Lock() # Lock對象 lock.acquire() # 鎖定 lock.release() # 解鎖
Lock有「鎖定」或「解鎖」兩種狀態之一。它是在解鎖狀態下建立的。它有兩個基本方法,acquire() 和 release()。
當狀態爲解鎖時,acquire()將狀態更改成鎖定並當即返回。當狀態被鎖定時,acquire()塊直到對另外一個協程中的release()的調用將其改變爲解鎖,而後acquire()調用將其重置爲鎖定並返回。
release()方法只應在鎖定狀態下調用;它將狀態更改成已解鎖並當即返回。若是嘗試釋放已解鎖的鎖,則會引起 RuntimeError。
下面是一個具體的使用例子:
from threading import Thread import time import threading lock = threading.Lock() def add_one(a): time.sleep(1) lock.acquire() a[1] += 1 lock.release() if __name__ == '__main__': array = [0, 1, 4] thread_obj_list = [] for i in range(50): t = Thread(target=add_one, args=(array,)) t.start() thread_obj_list.append(t) for j in thread_obj_list: j.join() print("array result::", array) # array result:: [0, 51, 4]
acquire()和release()方法成對出現。可是這樣手動釋放有時候可能會遺忘,這時候能夠考慮用上下文管理協議。關於上下文管理協議,可參見做者的這篇文章【Python上下文管理器】。
Lock對象支持with語句:
def add_one(a): time.sleep(1) with lock: a[1] += 1
可重入鎖(又稱遞歸鎖,RLock),就是大鎖中包含子鎖的狀況下使用。在這種狀況下,再用Lock時,就會出現死鎖現象,此時應該用threading.RLock()對象了,用法同Lock,參見下例:
from threading import Thread import time import threading lock = threading.RLock() def add_one(a): lock.acquire() a[1] += 1 lock.release() def add_two(b): time.sleep(1) lock.acquire() b[1] += 2 add_one(b) lock.release() if __name__ == '__main__': array = [0, 1, 4] thread_obj_list = [] for i in range(50): t = Thread(target=add_two, args=(array,)) t.start() thread_obj_list.append(t) for j in thread_obj_list: j.join() print("array result::", array) # array result:: [0, 151, 4]
上例讀者能夠試試Lock(),看看什麼效果。RLock()還支持上下文管理協議,上例中的兩個函數能夠改爲這樣:
def add_one(a): with rlock: a[1] += 1 def add_two(b): time.sleep(1) with rlock: b[1] += 2 add_one(b)
全局解釋器鎖(英語:Global Interpreter Lock,縮寫GIL),是計算機程序設計語言解釋器用於同步線程的一種機制,它使得任什麼時候刻僅有一個線程在執行。因此不少人說Python的線程是假線程,並能利用多核,並不能真正並行。之因此感受到線程並行,是由於線程上下文不斷切換的緣故。Python 3.2開始使用新的GIL。新的GIL實現中用一個固定的超時時間來指示當前的線程放棄全局鎖。在當前線程保持這個鎖,且其餘線程請求這個鎖時,當前線程就會在5毫秒後被強制釋放該鎖。關於全局鎖,強調三點:
(1)GIL的存在,同一時刻只能有一個線程在運行。
(2)GIL是CPython的特性,Jython,pypy等並沒有GIL。
(3)Cpython的多線程適用於I/O密集型問題,計算密集型問題可以使用多進程編程。
在多線程編程中,有時候某個線程依賴另外一個線程的狀態,須要使用threading庫中的Event對象。 Event對象包含一個可由線程設置的信號標誌,它容許線程等待某些事件的發生。可將線程設置等待Event對象, 直到有其餘線程將Event對象設置爲真,這些等待Event對象的線程將開始執行。Event()對象的經常使用方法:
event = threading.Event() # 建立threading.Event()對象 event.is_set() # 獲取event的設置值,默認爲False event.set() # 設置event的值爲True event.clear() # 設置event的值爲False event.wait() # 等到event的值被設爲True就執行
下面經過「交通訊號燈」問題示範event的使用:
import threading import time def traffic_light(event): count = 0 event.set() while True: # 若是計數器[0, 5)之間, 紅燈,event=False if 0 <= count < 5: event.clear() print("light is Red") # 若是計數器[5, 10)之間, 綠燈,event=True elif 5 <= count < 10: event.set() print("light is Green") # 若是計數器大於10,紅燈,將event設置爲False,計數器置爲0 else: event.clear() count = 0 time.sleep(1) count += 1 def car(name, event): while True: if not event.is_set(): # event爲False, 表示紅燈, 車只能等待 print("RED, the %s is waiting..." % name) # 此處會阻塞住,直到event被設置爲True在執行 event.wait() print("Green, The %s going...." % name) e = threading.Event() light = threading.Thread(target=traffic_light, args=(e,)) light.start() car1 = threading.Thread(target=car, args=("Tesla", e, )) car1.start()
交通訊號燈有紅燈和綠燈兩種狀態,每5秒切換一次狀態,而car()函數中,只要燈變綠就放car通行。運行試試看。
event對象的一個重要特色是當它被設置爲真時會喚醒全部等待它的線程。若是你只想喚醒單個或者必定數目的線程,最好是使用信號量或者 Condition
對象來替代。
Condition對象
condition對象老是與鎖關聯,能夠手動傳入鎖對象,也能夠不傳入使用默認值。當有多個線程須要等待某個變量改變時,纔開始執行。這種狀況能夠用condition對象實現。condition對象的主要方法有:
condition = threading.Condition(lock=None) # 建立Condition對象 參數能夠不傳 condition.acquire() # 加鎖 condition.release() # 解鎖 condition.wait(timeout=None) # 阻塞,直到有調用notify(),或者notify_all()時再觸發 condition.wait_for(predicate, timeout=None) # 阻塞,等待predicate條件爲真時執行 condition.notify(n=1) # 通知n個wait()的線程執行, n默認爲1 condition.notify_all() # 通知全部wait着的線程執行 with condition: # 支持with語法,沒必要每次手動調用acquire()/release()
看一個例子不是很優雅的例子:
import threading import time condition = threading.Condition() # 建立condition對象 def func(): condition.acquire() # 若是沒有with語句,必寫這句,否者報錯 condition.wait() # 阻塞,等待其餘線程調用notify() print("in func..") condition.release() # 與acquire()成對出現 # 啓10個線程 for i in range(10): t = threading.Thread(target=func, args=()) t.start() time.sleep(5) condition.acquire() condition.notify(2) # 通知兩個線程執行 condition.release() # in func.. # in func.. # 其餘8個線程會繼續等待...
上例中,咱們看到啓動的10個線程會等待5秒鐘而且調用了notify(2)以後,纔會通知兩個線程繼續運行。且這兩個線程執行完畢以後,其餘8個線程仍然會阻塞在condition.wait() 處。
頻繁寫acquire() / release()很繁瑣,下面是優雅的寫法:
import threading import time condition = threading.Condition() # 建立condition對象 def func(n): with condition: # with更優雅 condition.wait() # 阻塞,等待其餘線程調用notify() print("in func..", n) # 啓10個線程 for i in range(10): t = threading.Thread(target=func, args=(i,)) t.start() time.sleep(5) with condition: condition.notify_all() # 通知全部線程執行
運行下,是否是等待5秒以後,全部線程都繼續執行了?
信號量一般用於防範容量有限的資源,例如數據庫服務器。通常而言信號量能夠控制釋放固定量的線程。好比啓動100個線程,信號量的控制值設爲5,那麼前5個線程拿到信號量以後,其他線程只能阻塞,等到這5個線程釋放信號量鎖以後才能去拿鎖。參見下例:
import threading import time def func(n): # semaphore.acquire() with semaphore: time.sleep(2) print("Thread::", n) # semaphore.release() semaphore = threading.BoundedSemaphore(5) # 信號量, 每次釋放5個線程 thread_list = [] for i in range(23): t = threading.Thread(target=func, args=(i,)) thread_list.append(t) t.start() for j in thread_list: j.join() print("all threads done")
上例中,能夠看到線程是每5個一組進行釋放的。
Barriers字面意思是「屏障」,是Python線程(或進程)同步原語。每一個線程中都調用wait()方法,當其中一個線程執行到wait方法處會立阻塞;一直等到全部線程都執行到wait方法處,全部線程再繼續執行。參見下例:
import time import threading bar = threading.Barrier(3) # 建立barrier對象,指定知足3個線程 def worker1(): print("worker1") time.sleep(1) bar.wait() print("worker1 end") def worker2(): print("worker2") time.sleep(2) bar.wait() print("worker2 end") def worker3(): print("worker3") time.sleep(5) bar.wait() print("worker3 end") thread_list = [] t1 = threading.Thread(target=worker1) t2 = threading.Thread(target=worker2) t3 = threading.Thread(target=worker3) thread_list.append(t1) thread_list.append(t2) thread_list.append(t3) for t in thread_list: t.start() # 每一個線程中都調用了wait()方法,在全部(此處設置爲3)線程調用wait方法以前是阻塞的。 # 也就是說,只有等到3個線程都執行到了wait方法這句時,全部線程才繼續執行。
上例中,能夠看到,全部線程會先各自運行wait()方法以前的代碼,到wait()處阻塞。等待最後一個線程執行到wait()處,也就是5秒以後,全部線程恢復執行。
兩個或多個線程之間相互發送數據最安全的方式可能就是使用 queue 庫中的隊列了。建立一個線程共享的 Queue 對象,線程經過使用 put()和 get()操做來向隊列中添加或者刪除元素。Queue對象已經內置了鎖機制,編程時沒必要手動操做鎖。下例producer()函數表明包子鋪,生產包子放入隊列中;consumer()函數表明吃包子的人,不斷從隊列中取出包子吃掉;以此演示線程間經過隊列通訊。
from queue import Queue import threading import time q = Queue(10) def producer(): n = 0 while True: q.put("包子%s" % n) print("包子鋪生產 包子%s" % n) n += 1 time.sleep(2) def consumer(): while True: r = q.get() print("bucker 吃掉 %s" % r) time.sleep(1) t1 = threading.Thread(target=producer) t1.start() t2 = threading.Thread(target=consumer) t2.start()
形如上例的編程模型,又叫生產者-消費者模型。它下降了程序以前的耦合,使得隊列的上游只關注生產數據,隊列的下游只關注消費數據。在票務系統,或者資源有限的狀況中可用此模型。補充兩點:
(1)get() 和 put() 方法都支持非阻塞方式和設定超時。
(2)q.qsize() , q.full() , q.empty() 等能夠獲取一個隊列的當前大小和狀態。但它們不是線程安全的,儘可能別用。
Python3.2開始,增長了標準庫concurrent.futures,該庫中的ThreadPoolExecutor是自帶的線程池。簡單使用:
from concurrent.futures import ThreadPoolExecutor import time def tell(i): print("this is tread {}.".format(i)) time.sleep(1) if __name__ == '__main__': future = ThreadPoolExecutor(10) a = "ddd" for i in range(100): future.submit(tell, (i,)) # 添加一個線程到線程池 future.shutdown(wait=True) # 此函數用於釋放異步執行操做後的系統資源。
其中,submit()方法第一個參數爲函數名,第二個爲函數的參數。shutdown(wait=True)用於釋放異步執行操做後的系統資源。ThreadPoolExecutor還有一個優勢就是:任務提交者更方便的從被調用函數中獲取返回值。參見下例:
import concurrent.futures import requests URLS = ['http://www.cnblogs.com/zingp/p/5878330.html', 'http://www.cnblogs.com/zingp/', 'https://docs.python.org/'] # 爬取網頁內容 def load_url(url, timeout): with requests.get(url, timeout=timeout) as conn: return conn.text with concurrent.futures.ThreadPoolExecutor(max_workers=3) as executor: # 建立future對象和對應的url的字典 future_to_url = {executor.submit(load_url, url, 60): url for url in URLS} for future in concurrent.futures.as_completed(future_to_url): url = future_to_url[future] try: data = future.result() except Exception as err: print('url:%s -- err: %s' % (url, err)) else: print(url, len(data)) # http://www.cnblogs.com/zingp/ 12391 # http://www.cnblogs.com/zingp/p/5878330.html 90029 # https://docs.python.org/ 9980
上例建立一個大小爲3的線程池,用了很多with語句,並用future.result() 獲取函數返回值。最終,咱們看到爬取了三個網頁,並得到網頁內容。future.result()操做會阻塞,直到對應的函數執行完成並返回一個結果。
此外,ThreadPoolExecutor還提供了異步回調的功能,大大簡化了多線程編程中處理線程返回結果的難度,參見下例:
from concurrent.futures import ThreadPoolExecutor import time import threading def tell(i): print("this is tread {}.".format(i)) time.sleep(1) return [i, threading.get_ident()] # 必須有返回,經過.result()拿到返回值 def callback(obj): # obj 至關於傳過來的future獨享,且回調函數必須有這個參數 result = obj.result() # 線程函數的返回值 print(result) if __name__ == '__main__': future = ThreadPoolExecutor(10) a = "ddd" for i in range(100): # 線程運行結束後將future對象傳給回調函數callback(obj) future.submit(tell, i,).add_done_callback(callback) future.shutdown(wait=True) # 此函數用於釋放異步執行操做後的系統資源。
Python3.2之前並無自帶線程池,那時每每採用自定義線程池。下面一個就是自定義線程池的例子,看看是否可以看得懂:
import queue import threading import contextlib StopEvent = object() class ThreadPool(object): """定義一個線程池類。""" def __init__(self, max_num, max_task_num=None): if max_task_num: self.q = queue.Queue(max_task_num) else: self.q = queue.Queue() self.max_num = max_num self.cancel = False self.terminal = False self.generate_list = [] self.free_list = [] def run(self, func, args, callback=None): """ 線程池執行一個任務。 :param func: 任務函數; :param args: 任務函數所需參數; :param callback: 任務執行失敗或成功後執行的回調函數,回調函數有兩個參數一、任務函數執行狀態; 二、任務函數返回值(默認爲None,即:不執行回調函數); :return: 若是線程池已經終止,則返回True不然None。 """ if self.cancel: return if len(self.free_list) == 0 and len(self.generate_list) < self.max_num: self.generate_thread() w = (func, args, callback,) self.q.put(w) def generate_thread(self): """ 建立一個線程。 """ t = threading.Thread(target=self.call) t.start() def call(self): """ 循環去獲取任務函數並執行任務函數。 """ current_thread = threading.currentThread() self.generate_list.append(current_thread) event = self.q.get() while event != StopEvent: func, arguments, callback = event try: result = func(*arguments) success = True except Exception as e: success = False result = None if callback is not None: try: callback(success, result) except Exception as e: pass with self.worker_state(self.free_list, current_thread): if self.terminal: event = StopEvent else: event = self.q.get() else: self.generate_list.remove(current_thread) def close(self): """ 執行完全部的任務後,全部線程中止。 """ self.cancel = True full_size = len(self.generate_list) while full_size: self.q.put(StopEvent) full_size -= 1 def terminate(self): """ 不管是否還有任務,終止線程。 """ self.terminal = True while self.generate_list: self.q.put(StopEvent) self.q.queue.clear() @contextlib.contextmanager def worker_state(self, state_list, worker_thread): """ 用於記錄線程中正在等待的線程數。 """ state_list.append(worker_thread) try: # 遇到yield就返回回去執行with中的語句,執行完了回來。 yield finally: state_list.remove(worker_thread)
建立大的線程池的一個可能須要關注的問題是內存的使用。 例如,若是你在OS X系統上面建立2000個線程,系統顯示Python進程使用了超過9GB的虛擬內存。 不過,這個計算一般是有偏差的。當建立一個線程時,操做系統會預留一個虛擬內存區域來 放置線程的執行棧(一般是8MB大小)。可是這個內存只有一小片斷被實際映射到真實內存中。 所以,Python進程使用到的真實內存其實很小 (好比,對於2000個線程來說,只使用到了70MB的真實內存,而不是9GB)。若是擔憂虛擬內存大小,可使用 threading.stack_size() 函數來下降它。
import threading threading.stack_size(65536)
若是加上這條語句並再次運行前面的建立2000個線程試驗, 會發現Python進程只使用到了大概210MB的虛擬內存,而真實內存使用量沒有變。 注意線程棧大小必須至少爲32768字節,一般是系統內存頁大小(409六、8192等)的整數倍。
同步的定義是:在發出一個功能調用時,在沒有獲得結果以前,該調用就不返回,同時其它線程也不能調用這個方法。按照這個定義,其實絕大多數函數都是同步調用。
可是一般說進程、線程同步,每每特指多進程、線程編程時,多個進程、線程之間協同步調,按預約的前後次序進行運行。好比線程A和線程B一塊兒配合,A執行到必定程度依賴B的某個結果,因而停下來示意B運行,B開始執行,執行完將結果返回給A,A接着執行。這裏的「同」應該理解爲協同、協助、互相配合。
在多線程編程裏面,一些敏感數據不容許被多個線程同時訪問,此時就使用同步訪問技術,保證數據在任什麼時候刻,最多有一個線程訪問,以保證數據的完整性。
原語:前文說起原語,不少同窗可能不瞭解這個名詞的意思。內核或微核提供核外調用的過程或函數稱爲原語(primitive)。操做系統用語範疇。是由若干多機器指令構成的完成某種特定功能的一段程序,具備不可分割性。即原語的執行必須是連續的,在執行過程當中不容許被中斷。不一樣層次之間對話的語言稱爲原語,即不一樣層之間經過原語來實現信息交換。
(1)Python多線程編程經常使用threading模塊。啓動一個多線程須要建立一個Thread對象,調用star()方法啓動線程。注意is_alive() /join()方法和daemon參數的使用。
(2)python多線程鎖有Lock / Rlock, 全局鎖GIL。GIL是CPython特性,同一時刻只能運行一個線程,不能利用多核資源。
(3)線程同步原語有Event / Condition / Semaphore / Barrier。Event用於經常使用語通知所有線程,condition和Semapher經常使用於通知必定數量的線程, Barrier用於多個線程必須完成某些步驟再一塊兒執行。
(4)Lock / Rlock / Event / Condition / Semaphore 支持上下文管理協議(with語句,好用)。
(5)線程間通訊能夠用queue模塊中的Queue隊列,get()和put()已加鎖,是線程安全的。qsize()/full()/empty() 等能夠獲取一個隊列的當前大小和狀態, 不是線程安全的,儘可能別用。
(6)concurrent.futures中的ThreadPoolExecutor是Python3.2以後自帶的線程池模塊,十分好用,支持with語句,經過future.result()獲取線程返回值。
(7)Python多線程適用於I/O密集型問題,CPU密集型問題能夠用C代碼優化底層算法提高性能,需注意一個寫的很差的C語言擴展會致使這個問題更加嚴重;也能夠用pypy或者多進程。
以上是本篇所有內容,歡迎讀者批評指正。
參考資料: