本文是 Comyn 老師 Python 多線程課程的筆記,感謝老師優質的課程!html
兩個概念:python
主流操做系統上完成併發的手段有進程和線程,主流的編程語言提供了用戶空間的調度:協程。Python 也不例外。linux
因爲如今的操做系統上的進程愈來愈輕量,致使進程和線程之間的區別愈來愈少。事實上,Linux 並無原生的線程,線程是經過進程實現的。編程
python 中每個進程會啓動一個解釋器,而線程會共享一個解釋器。bootstrap
Python 中的線程是經過標準庫 threading 實現的。而啓動一個線程就是讓這個線程執行一些邏輯,這些邏輯就對應一個函數。安全
>>> import threading >>> def worker(): # 讓多個線程來執行它 ... print('work') ... >>> thread = threading.Thread(target=worker) # 建立了一個線程對象,target 參數是一個函數,即線程要執行的邏輯 >>> thread.start() # start 啓動一個線程,執行完畢後,自動退出,Python 沒有提供主動退出線程的方法 work 複製代碼
因爲 python 沒有提供退出線程的方法,所以咱們必定不能在邏輯中定義死循環,否則線程沒法退出。固然直接 kill -9 和刻意爲之的另說。而像那種監聽某個端口提供服務的進程,爲了保證不退出,一般都會有一個 while True 的死循環。bash
上面只是啓動了一個線程,很顯然沒什麼屌用。啓動多個線程的方式很是簡單,就是在它的外面套一個 for 循環就能夠了:markdown
import time import threading def worker(num): time.sleep(1) print('work-{}'.format(num)) for i in range(5): t = threading.Thread(target=worker, args=(i, )) # 啓動了五個線程,要啓動幾個就循環幾回 t.start() 複製代碼
經過 args 給函數傳遞參數,也可使用 kwargs 經過字典傳遞。結果是在等待一秒以後,全部線程同時輸出了,而且在一個線程的換行符尚未打印出來的時候,下一個線程就輸出了,這就涉及到線程安全的問題了。很顯然,print 並非線程安全的。數據結構
線程相比於進程更輕量,上下文切換的代價沒有進程那麼大,但即便如此,線程數量也不宜過多。多線程
threading.current_thread()
能夠返回當前的線程對象。
>>> threading.Thread(target=lambda: print(threading.current_thread())).start() <Thread(Thread-13, started 140007299499776)> 複製代碼
返回的線程對象咱們能夠經過一個變量進行接收:
thread = threading.current_thread()
複製代碼
它有不少屬性和方法:
name
:返回線程的名字;ident
:返回該線程的惟一標識符;is_alive
:告知該線程是否存活;enumerate
:能夠經過循環它打印出全部的線程;咱們建立線程對象的時候是能夠給它取名字的:
t = threading.Thread(target=worker, name='thread1') 複製代碼
這個 name 能夠經過 logging 的 threadName 得到。
前面提到過,print 並非線程安全的,而 logging 模塊線程安全。
>>> import logging >>> logging.warning('hehe') WARNING:root:hehe >>> logging.info('hehe') # 默認只輸出 warning 以上級別 複製代碼
咱們能夠對其進行一些基礎的配置,讓其記錄 DEBUG 以上的級別,以及記錄線程名:
>>> logging.basicConfig(level=logging.DEBUG, format='%(asctime)s %(levelname)s %(threadName)s %(message)s') >>> logging.info('hehe') 2017-09-23 15:41:36,868 INFO MainThread hehe 複製代碼
知道了它的簡單用法以後,咱們就可使用多線程了:
import logging import threading logging.basicConfig(level=logging.DEBUG, format='%(asctime)s %(levelname)s %(threadName)s %(lineno)d %(message)s') def worker(): logging.info(logging.info('work')) for i in range(5): t = threading.Thread(target=worker) t.start() 複製代碼
使用 logging 就沒有問題了,所以咱們一般使用它來替代 print。
logging 還能夠將異常的棧追蹤信息記錄下來,這在排查錯誤的時候很是方便:
import logging try: config['DE']['xxx'] except Exception as e: logging.exception(e) print('xxx') 複製代碼
daemon 在 linux 上是守護進程的意思,它始終在後臺運行。而在 Python 中的 daemon 線程會在主線程退出以後退出。也就是說,若是不是 daemon 線程,主線程退出以後,非 daemon 線程還會繼續執行,直到結束退出。
線程默認不是 daemon,若是想要設置爲 daemon,那就在建立線程對象的時候,給它傳遞 daemon=True 便可。
>>> t = threading.Thread(target=worker, daemon=True) >>> t.daemon Out[20]: True 複製代碼
經過下面的例子證實以前的說法:
import time import logging import threading logging.basicConfig(level=logging.DEBUG, format='%(asctime)s %(levelname)s %(threadName)s %(message)s') def worker(): logging.info('start') time.sleep(2) logging.info('end') if __name__ == '__main__': logging.info('start') t1 = threading.Thread(target=worker, name='non-daemon') t1.start() t2 = threading.Thread(target=worker, name='daemon', daemon=True) t2.start() logging.info('end') # 執行結果 2017-09-24 04:08:49,027 INFO MainThread start 2017-09-24 04:08:49,028 INFO non-daemon start 2017-09-24 04:08:49,028 INFO daemon start 2017-09-24 04:08:49,028 INFO MainThread end 2017-09-24 04:08:51,031 INFO non-daemon end 複製代碼
執行上面的代碼你會發現有的時候主線程退出了,可是 daemon 線程還會執行完成。這是由於雖然從日誌中看到主線程退出,可是事實上主線程是沒有退出的,它會等待非 daemon 線程執行完畢後纔會退出,這樣就給了 daemon 線程的執行時間了。當咱們將 t1 給註釋掉以後,就不可能出現主線程退出後,daemon 線程仍然執行的狀況了。
若是咱們在 t2.start() 以後增長一行 t2.join(),那即便它是 daemon 線程,主線程依然會等待它執行完畢後再退出。由於 join 會阻塞直到線程執行完畢。join 支持一個參數,那就是阻塞的秒數。t2.join(1) 表示只阻塞一秒,這個時候即便 t2 沒有執行完成,主線程依然會退出。join 用的比較多,它並不佔用 CPU 時間。
上面建立線程的方法是經過實例化 Thread,咱們還能夠經過下面這種方式:
import logging import threading class Mythread(threading.Thread): def run(self): logging.warning('worker') t = Mythread() t.start() 複製代碼
經過繼承 + 重寫 run 方法來到達啓動多線程的效果,run 等同於以前 target 指定的函數。可是 Python 中這種方法使用的不多。
當咱們建立一個線程對象的時候,除了可使用 start 啓動它以外,還能夠經過 run 來啓動。若是不是以繼承的方式建立線程,一個線程對象的 run 和 start 只能執行其中一個。
定義一個 thread local 對象。
ctx = threading.local()
複製代碼
這時的 ctx 沒有任何屬性,咱們能夠給它增長屬性:
>>> ctx.data = 5 >>> ctx.data Out[25]: 5 複製代碼
繼續:
>>> data = 'abc' # 定義一個變量 >>> def worker(): ... logging.warning(data) ... logging.warning(ctx.data) ... >>> worker() # 執行沒什麼問題 WARNING:root:abc WARNING:root:5 >>> threading.Thread(target=worker).start() # 可是經過線程執行就不行了 WARNING:root:abc # data 能夠直接打出來 Exception in thread Thread-9: Traceback (most recent call last): File "/usr/local/python3/lib/python3.6/threading.py", line 916, in _bootstrap_inner self.run() File "/usr/local/python3/lib/python3.6/threading.py", line 864, in run self._target(*self._args, **self._kwargs) File "<ipython-input-32-2e99199c517b>", line 3, in worker logging.warning(ctx.data) AttributeError: '_thread._local' object has no attribute 'data' # 可是 ctx.data 提示沒有 複製代碼
這是由於 ctx.data 是一個 thread local 的變量,咱們能夠給它賦值任意屬性,可是隻對當前線程可見。線程獨享!
使用 run 方法,它會將 target 放在主線程中;start 則會將其放到子線程中,兩者只能執行一個。
也能夠稱爲延時執行。Python 中存在一種特殊的線程,可用於延遲執行。它繼承自 Thread 類,所以它也是 Thread 對象。
>>> def worker(): ... logging.warning('worker') ... >>> t = threading.Timer(interval=5, function=worker) >>> t.start() 複製代碼
能夠看到執行 start 方法後,五秒後纔有輸出。在等待的過程當中,它能夠經過 cancel()
終止。
它也能夠設置線程名,只不過要這樣:
>>> t = threading.Timer(interval=5, function=worker) >>> t.name = 'Timer' >>> t.deamon = True # 設置是否爲 daemon 複製代碼
當 function 指定的函數開始執行的時候,沒法經過 cancel() 終止。
Timer 的定時執行功能很弱,若是真的有這方面的須要,可使用 APSchedule。
第一種線程同步的方式。同步意味着阻塞,若是線程之間沒有聯繫,徹底沒有必要使用同步。有這麼一種需求:worker 線程作一些事情,當它完成以後,通知 boss 線程,由 boss 完成處理後續工做。這可能並不難實現,可是 boss 線程要統計 worker 線程的執行時間呢?
這就要用到線程間通訊的機制了,最簡單的是 event:
>>> event = threading.Event() >>> event.set() >>> event.wait() Out[8]: True 複製代碼
它是一個 threading.Event 的對象,有 set 和 wait 這兩個方法。wait 會阻塞線程直到 set 方法被調用。
有了這兩種方法以後,咱們就能夠完成上面的需求了:
import time import random import logging import datetime import threading def worker(event: threading.Event): time.sleep(random.randint(1, 5)) event.set() def boss(event: threading.Event): start = datetime.datetime.now() event.wait() logging.warning('worker exit {}'.format(datetime.datetime.now() - start)) def start(): event = threading.Event() b = threading.Thread(target=boss, args=(event,), name='boss') b.start() for x in range(5): threading.Thread(target=worker, args=(event,), name='worker').start() start() 複製代碼
五個 worker 線程,誰先執行完成就誰執行 event.set(),一旦 event.set 被執行,boss 線程也就會繼續執行並輸出日誌了。可是會有一個問題,因爲是隨機 sleep 時間,也就是說最快 boss 線程能夠一秒就退出,可是還有四個 worker 線程還在執行,這四個線程拉長了整個腳本的執行時間。
再作修改:
import time import random import logging import datetime import threading logging.basicConfig(level=logging.DEBUG, format='%(asctime)s %(levelname)s %(threadName)s %(message)s') def worker(event: threading.Event): s = random.randint(1, 5) event.wait(s) # 先阻塞 event.set() # 一下全放開了 logging.info('sleep {}'.format(s)) def boss(event: threading.Event): start = datetime.datetime.now() event.wait() logging.info('worker exit {}'.format(datetime.datetime.now() - start)) def start(): event = threading.Event() threading.Thread(target=boss, args=(event,), name='boss').start() for x in range(5): threading.Thread(target=worker, args=(event,), name='worker-{}'.format(x)).start() start() # 執行結果 2017-09-25 06:15:42,114 INFO worker-0 sleep 2 2017-09-25 06:15:42,115 INFO boss worker exit 0:00:02.004014 2017-09-25 06:15:42,116 INFO worker-1 sleep 5 2017-09-25 06:15:42,116 INFO worker-2 sleep 4 2017-09-25 06:15:42,116 INFO worker-3 sleep 3 2017-09-25 06:15:42,117 INFO worker-4 sleep 2 複製代碼
能夠看到都在同一秒退出了,這是由於 wait 能夠指定超時時間,時間一到它就再也不阻塞。這樣阻塞時間最短的那個線程就會執行 set,這樣一來全部阻塞的線程同時放開了,因而同一時間都執行完成了。所以,wait 會阻塞線程直到 set 方法被調用,或者超時時間到。
event 能夠被多個線程所持有,多個線程能夠同時被阻塞,一旦其中一個線程執行了 set,那麼全部的線程都再也不阻塞。event 能夠在線程之間發送信號,一般用於某個線程須要其餘線程處理某些動做以後才能啓動。
event 還有一個特性,若是先 set 而後 wait,無論有沒有指定超時,它都瞬間返回 True(由於阻塞被放開,因此沒法再阻塞);而若是直接 wait,且給它一個超時時間,那麼超時完成以後,它會返回 False。咱們能夠根據這個特色來完成定時的操做。
import logging import threading logging.basicConfig(level=logging.DEBUG, format='%(asctime)s %(levelname)s %(threadName)s %(message)s') def worker(event: threading.Event): while not event.wait(3): logging.info('running') event = threading.Event() threading.Thread(target=worker, args=(event,)).start() 複製代碼
每三秒會輸出一第二天志,會無限輸出下去。可是若是執行 event.set() 就會終止死循環。
event 還有一些方法:
is_set
:用來判斷有沒有 set 過;clean
:清除 set 標誌,一般用來作線程退出的條件。def worker(event): while not event.is_set(): pass 複製代碼
wait 會主動讓出 CPU 時間片,time.sleep 卻不會。假如它們分到了 10ms 的 CPU 時間,都使用了 5ms,那麼剩餘的 5ms wait 會讓給別人,而 sleep 會本身用完。所以咱們會使用 wait 而不是 sleep。
延時執行。
import logging import threading logging.basicConfig(level=logging.DEBUG, format='%(asctime)s %(levelname)s %(threadName)s %(message)s') class Timer: def __init__(self, interval, function, *args, **kwargs): self.interval = interval self.function = function self.args = args self.kwargs = kwargs self.event = threading.Event() self.thread = threading.Thread(target=self.__thread) def __thread(self): if not self.event.wait(self.interval): self.function(*self.args, **self.kwargs) def start(self): self.thread.start() def cancel(self): self.event.set() def worker(): logging.info('running') t = Timer(interval=2, function=worker) t.start() 複製代碼
第二種線程同步的方式。lock 用來保護共享資源,其他幾種線程同步的方式都是用了它。
import random import logging import threading logging.basicConfig(level=logging.DEBUG, format='%(asctime)s %(levelname)s %(threadName)s %(message)s') class Counter: def __init__(self): self.__val = 0 @property def value(self): return self.__val def inc(self): self.__val += 1 def dec(self): self.__val -= 1 counter = Counter() def fn(): if random.choice([-1, 1]) > 0: logging.info('inc') counter.inc() else: logging.info('dec') counter.dec() for x in range(10): threading.Thread(target=fn).start() print(counter.value) 複製代碼
上面的代碼即便你知道它加了多少次減了多少次,但你不能確定它的結果,這是由於資源的爭用。Lock 對象可用於解決這種問題:
>>> lock = threading.Lock() >>> lock.acquire() Out[4]: True 複製代碼
對於 lock 實例,只能調用一次 acquire 方法,再次調用會被阻塞,直到 release 方法被調用。根據它的這種特性,可用來改造以前的 Counter。
class Counter: def __init__(self): self.__val = 0 self._lock = threading.Lock() @property def value(self): return self.__val def inc(self): self._lock.acquire() self.__val += 1 self._lock.release() def dec(self): self._lock.acquire() self.__val -= 1 self._lock.release() 複製代碼
這樣一來,無論有多少個線程,同一時間只會有一個線程可以修改 __val。可是這樣會有一個問題,若是執行加減的時候發生了異常(雖然這裏不會),那麼 release 永遠就不會執行,那麼就會造成死鎖,所以咱們要使用 try finally。
def inc(self): try: self._lock.acquire() self.__val += 1 finally: self._lock.release() 複製代碼
從上面這種結構中咱們能夠聯想到 with,事實上它是支持 with 的,所以咱們能夠定義的更爲簡單:
def inc(self): with self._lock: self.__val += 1 複製代碼
凡是用鎖的地方,必定要在 finally 中使用 release,不然就會有鎖死的可能性。
而對於讀來講,若是不加鎖,就會存在髒讀的可能性,就看能不能忍受了。經過加鎖以後,Counter 類就變成線程安全了,咱們能夠放心的使用。
鎖是併發的難點,它會將併發變爲串行,掌握了鎖,併發就沒有絲毫問題了。那麼什麼時候須要加鎖?凡有共享資源的地方都要加鎖。
lock 對象能夠接收兩個參數:
blocking
:當再次加鎖時,若是它爲 False,那麼不會阻塞,而是返回 False;timeout
:若是 blocking 爲 True,timeout 大於等於 0 會阻塞到超時,並返回 False。預先啓動 10 個線程處理一些任務,當其中一個線程在處理其中一個任務時,其餘線程能夠處理其餘任務,這時候就能夠用到非阻塞鎖。第一個線程對該任務加非阻塞鎖,因爲以前沒有加過鎖,所以能夠加上。第二個線程再加的時候就加不上了,而且返回 False,這時就可讓它跳過這個任務去執行下一個任務了。
import logging import threading logging.basicConfig(level=logging.DEBUG, format='%(asctime)s %(levelname)s %(threadName)s %(message)s') def worker(tasks): for task in tasks: # 第一個執行加鎖的線程能夠鎖,它的值爲 True。因爲鎖住了,剩下的九個線程執行的時候它的值都爲 False # 所以 loggi.info 語句只會執行 10 次 if task.lock.acquire(False): logging.info(task.name) class Task: def __init__(self, name): self.name = name self.lock = threading.Lock() tasks = [Task(x) for x in range(10)] for i in range(5): threading.Thread(target=worker, args=(tasks,), name='work-{}'.format(i)).start() 複製代碼
若是任務有前後順序的話,就只能串行了。
可重入鎖在同一個線程內可屢次加鎖,可是隻能有一個線程成功,而且 acquire 幾回,就須要 release 幾回。
>>> rlock = threading.RLock() >>> rlock.acquire() Out[13]: True >>> rlock.acquire() Out[14]: True >>> rlock.release() >>> rlock.release() 複製代碼
第三種線程同步的方式。一般用於生產者消費者模式,生產者生產消息以後,使用 notify 和 notify_all 通知消費者進行消費。而消費者使用 wait 方法阻塞等待生產者的通知。
import random import logging import threading logging.basicConfig(level=logging.DEBUG, format='%(asctime)s %(levelname)s %(threadName)s %(message)s') class Dispatcher: def __init__(self): self.data = None self.event = threading.Event() self.cond = threading.Condition() def consumer(self): while not self.event.wait(1): with self.cond: self.cond.wait() # 會阻塞,直到 notifyAll 被執行 logging.info(self.data) def producer(self): for _ in range(10): data = random.randint(0, 100) logging.info(data) self.data = data with self.cond: self.cond.notify_all() self.event.wait(1) self.event.set() d = Dispatcher() p = threading.Thread(target=d.producer, name='producer') c = threading.Thread(target=d.consumer, name='consumer') p.start() c.start() 複製代碼
有生產者修改共享資源,而後通知消費者進行消費。
wait
:會阻塞,直到被 notify 喚醒;notifyAll
:老版的駝峯寫法,現已改成下面的,但爲了兼容仍然存在;notify_all
:用於通知全部 wait 的線程,能夠理解爲廣播;notify
:接收一個數字,表示喚醒多少個 wait 線程,默認爲 1。能夠理解爲單播。好比下面的示例中,雖然啓動了四個消費者進程,可是隻容許兩個同時消費,至因而哪兩個就不得而知了。
import random import logging import threading logging.basicConfig(level=logging.DEBUG, format='%(asctime)s %(levelname)s %(threadName)s %(message)s') class Dispatcher: def __init__(self): self.data = None self.event = threading.Event() self.cond = threading.Condition() def consumer(self): while not self.event.is_set(): with self.cond: self.cond.wait() logging.info(self.data) def producer(self): for _ in range(10): data = random.randint(0, 100) logging.info(data) self.data = data with self.cond: self.cond.notify(2) self.event.wait(1) self.event.set() d = Dispatcher() p = threading.Thread(target=d.producer, name='producer') for i in range(4): threading.Thread(target=d.consumer, name='consumer-{}'.format(i)).start() p.start() 複製代碼
按理來講,由於有鎖的存在,因此只有在消費者的 with 代碼塊執行完畢,鎖釋放以後,生產者才能進入本身的 with 代碼塊。這樣就可以保證,消費者只有在消費完畢以後生產者才能繼續生產。可是我在運行過程當中生產者根本不會等待消費者消費,它本身一個勁的跑。
不管 notify、notify_all 仍是 wait,都必須先 acquire,完成以後必須確保 release,所以一般使用 with 語法。
第四種線程同步的方式,柵欄的意思,只有湊齊一撥人以後才往下走。從下面這段代碼中就能理解它的做用:
import logging import threading logging.basicConfig(level=logging.DEBUG, format='%(asctime)s %(levelname)s %(threadName)s %(message)s') def worker(barrier: threading.Barrier): logging.info('waiting for {} threads'.format(barrier.n_waiting)) try: # 上面的代碼各個線程何時執行,怎麼執行都無所謂 # 可是全部線程都會在這裏同時等待,只有全部線程都執行到這了,才同時執行下面的代碼 worker_id = barrier.wait() except threading.BrokenBarrierError: logging.warning('aborting') else: logging.info('after barrier {}'.format(worker_id)) # 實例化的時候指定攔多少個線程,若是啓動了四個線程,只要三個到齊了就能夠同時往下走了 barrier = threading.Barrier(3) for i in range(3): threading.Thread(target=worker, args=(barrier,), name='worker-{}'.format(i)).start() logging.info('start') 複製代碼
barrier 對象的一些屬性和方法:
wait
:阻塞線程,它能夠指定超時時間,超時時間一到拋出 BrokenBarrierError 異常。若是執行過 abort 方法,那麼再執行 wait 也會拋出 BrokenBarrierError 異常;reset
:清除對象執行 abort 的痕跡。執行 abort 後執行 rest,接着執行 wait 就不會拋異常了;n_waiting
:當前有多少個線程在等待;abort
:通知已經在等待的線程沒必要再等了,不能由於它一個而讓其餘線程在那傻等。而一旦執行了這個方法, wait 就會拋出 BrokenBarrierError 異常,所以不處於 wait 狀態的線程是不會拋出這個異常的。適用場景:好比有十種工做,每一個線程負責一種,只有這十個線程都初始化完成後才能工做。
最後五種線程同步的方式。信號量和鎖很像,鎖是爲 1 的信號量。
# 建立一個爲 3 的信號量 >>> s = threading.Semaphore(3) >>> s.acquire() Out[84]: True >>> s.acquire(False) Out[85]: True >>> s.acquire(False) Out[86]: True >>> s.acquire(False) Out[87]: False 複製代碼
它能夠鎖屢次,上面鎖了三次都沒有問題,等到第四次的時候就不行了。因爲鎖只能鎖一次,因此它是爲 1 的信號量。RLock 也能鎖屢次,它是它只能用在同一個線程上,信號量卻能夠在多個線程中使用。
建立一個鏈接池的時候能夠用到它:
import time import random import logging import threading logging.basicConfig(level=logging.DEBUG, format='%(asctime)s %(levelname)s %(threadName)s %(message)s') class Pool: def __init__(self, num): self.num = num # 指定池子的鏈接數 self.conns = [self._make_connect(x) for x in range(num)] self.s = threading.Semaphore(num) # 這個函數是拿到鏈接以後作的操做 def _make_connect(self, name): return name # 從池子中取出一個鏈接 def get(self): self.s.acquire() return self.conns.pop() def return_resource(self, conn): # 執行完畢後,將鏈接放回池子中 self.conns.insert(0, conn) self.s.release() def worker(pool): logging.info('started') name = pool.get() logging.info('get connect {}'.format(name)) time.sleep(random.randint(1, 3)) pool.return_resource(name) logging.info('return resource {}'.format(name)) pool = Pool(3) for i in range(5): threading.Thread(target=worker, args=(pool,), name='worker-{}'.format(i)).start() 複製代碼
若是不使用信號量的話,咱們還須要對池子是否爲空進行判斷。爲何將鏈接放回鏈接池中的 insert 操做不須要加鎖呢?這是由於 GIL 的影響。
信號量也是對資源的保護,可是和鎖不同的地方在於,鎖限制只有一個線程能夠訪問共享資源,而信號量限制指定個線程能夠訪問共享資源。事實上咱們只須要使用信號量就能夠了,由於鎖自己就是信號量的一種。
隊列,它是進程間通訊的一種方式,隊列有三種:
FIFO
:Queue.Queue(maxsize=0),先進先出,線程安全;LIFO
:Queue.LifoQueue(maxsize=0),後進先出;Priority
:Queue.PriorityQueue(maxsize=0),優先隊列。建立一個先進先出隊列:
>>> import queue >>> q = queue.Queue() # 隊列長度無限 複製代碼
對象的屬性和方法:
empty()
:判斷隊列是否爲空(不可靠)。由於等你獲取隊列的長度時,可能已經有人往裏面放入了數據;full()
:隊列是否滿了(不可靠);maxsize
:查看隊列的最大長度;qsize()
:看到隊列當前長度(不可靠);clear()
:清空隊列;join()
:等到隊列爲空的時候,才進行操做;put()
:往隊列裏面添加內容,能夠爲任意數據結構。put(self, item, block=True, timeout=None),block 表示是否爲隊列是否爲阻塞狀態。隊列滿了,再往裏面加內容,隊列會阻塞。若是不阻塞會返回一個異常,默認爲阻塞狀態;timeout 是阻塞的時間,若是隊列滿了,再往隊列裏面添加數據時,timeout 時間後會拋出異常。若是爲 None(默認),它會一直阻塞,直到有線程從隊列中取出數據;get()
:從隊列中取內容。若是是先進先出隊列,它會取出最早存進去的數據。get(self, block=True, timeout=None),若是隊列是空的,而且 timeout 爲 None,它會一直阻塞,直到有線程往隊列裏面存入數據;put_nowait(item)
:等效於 put(item, block=False);get_nowait()
:等效 get(item, block=False)。咱們能夠經過它來重寫生產者消費者模型:
#!/usr/local/python3/bin/python3 import queue import random import logging import threading logging.basicConfig(level=logging.DEBUG, format='%(asctime)s %(levelname)s %(threadName)s %(message)s') def producer(queue: queue.Queue, event: threading.Event): while not event.wait(3): data = random.randint(0, 100) logging.info(data) queue.put(data) def consumer(queue: queue.Queue, event: threading.Event): while not event.is_set(): logging.info(queue.get()) q = queue.Queue() e = threading.Event() threading.Thread(target=consumer, args=(q, e), name='consumer').start() threading.Thread(target=producer, args=(q, e), name='producer').start() 複製代碼
經過 e.set() 就能中止它。相對 condition 實現的生產者消費者模型,它的優點在於能夠暫存數據,這在生產者和消費者速率不一致的時候很好用;而它的缺陷在於沒法廣播,沒法通知多個線程同時消費一條消息。由於咱們一般能夠將它們結合起來使用。
取出隊列中全部數據:
while not q.enpty(): q.get() 複製代碼
全局解釋器鎖,這是 Python 爭議很大的一個點。正是因爲它的存在,在操做內置容器時,解釋器會在解釋器級別增長一個鎖,所以 Python 全部內置容器(字典、列表等)都是線程安全的,多線程環境下使用沒有絲毫問題。而致使的後果就是 Python 的併發性能不好。
Python 中 collection, logging 等標準庫都是線程安全的。
官網地址,Python3.2 引入的異步模塊。
建立一個線程池:
from concurrent import futures pool = futures.ThreadPoolExecutor(max_workers=5) 複製代碼
pool 對象有三個方法。submit 用於執行一個函數:
>>> fut = pool.submit(lambda: 1+1) # 執行一段邏輯,也就是一個函數 >>> fut.result() # 獲取執行結果 Out[116]: 2 >>> fut.done() # 查看函數是否執行完成 Out[117]: True >>> fut.running() # 是否處於運行狀態 Out[118]: False >>> fut.cancel() # 一個已經開始運行的線程是沒法結束的,沒開始的(好比 pool 滿了在阻塞)能夠 Out[119]: False >>> fut.exception() # 若是函數中產生了異常,能夠經過它來獲取異常的實例 複製代碼
傳遞參數:
pool.submit(self.create_vm, vm_attributes, extra_attributes, conns)
複製代碼
經過這種方式使用線程,不須要將數據發送到隊列中。
進程池由 ProcessPoolExecutor 實現,它們簡化了進程和線程的操做,而且對返回值和異常進行了處理。
建議使用 futures,雖然它沒法設置線程名(3.6 以後能夠)、daemon 等屬性,可是問題不大。