Python3 多線程

本文是 Comyn 老師 Python 多線程課程的筆記,感謝老師優質的課程!html

兩個概念:python

  • 併發:假同時,一段時間內同時處理多個任務,單核均可以;
  • 並行:真同時,同時處理多個任務,必須多核。

主流操做系統上完成併發的手段有進程和線程,主流的編程語言提供了用戶空間的調度:協程。Python 也不例外。linux

因爲如今的操做系統上的進程愈來愈輕量,致使進程和線程之間的區別愈來愈少。事實上,Linux 並無原生的線程,線程是經過進程實現的。編程

python 中每個進程會啓動一個解釋器,而線程會共享一個解釋器。bootstrap

Python 中的線程是經過標準庫 threading 實現的。而啓動一個線程就是讓這個線程執行一些邏輯,這些邏輯就對應一個函數。安全

>>> import threading
>>> def worker(): # 讓多個線程來執行它
...     print('work')
...
>>> thread = threading.Thread(target=worker) # 建立了一個線程對象,target 參數是一個函數,即線程要執行的邏輯
>>> thread.start() # start 啓動一個線程,執行完畢後,自動退出,Python 沒有提供主動退出線程的方法
work
複製代碼

因爲 python 沒有提供退出線程的方法,所以咱們必定不能在邏輯中定義死循環,否則線程沒法退出。固然直接 kill -9 和刻意爲之的另說。而像那種監聽某個端口提供服務的進程,爲了保證不退出,一般都會有一個 while True 的死循環。bash

上面只是啓動了一個線程,很顯然沒什麼屌用。啓動多個線程的方式很是簡單,就是在它的外面套一個 for 循環就能夠了:markdown

import time
import threading

def worker(num):
    time.sleep(1)
    print('work-{}'.format(num))

for i in range(5):
    t = threading.Thread(target=worker, args=(i, )) # 啓動了五個線程,要啓動幾個就循環幾回
    t.start()
複製代碼

經過 args 給函數傳遞參數,也可使用 kwargs 經過字典傳遞。結果是在等待一秒以後,全部線程同時輸出了,而且在一個線程的換行符尚未打印出來的時候,下一個線程就輸出了,這就涉及到線程安全的問題了。很顯然,print 並非線程安全的。數據結構

線程相比於進程更輕量,上下文切換的代價沒有進程那麼大,但即便如此,線程數量也不宜過多。多線程

標識一個線程

threading.current_thread() 能夠返回當前的線程對象。

>>> threading.Thread(target=lambda: print(threading.current_thread())).start()
<Thread(Thread-13, started 140007299499776)>
複製代碼

返回的線程對象咱們能夠經過一個變量進行接收:

thread = threading.current_thread()
複製代碼

它有不少屬性和方法:

  • name:返回線程的名字;
  • ident:返回該線程的惟一標識符;
  • is_alive:告知該線程是否存活;
  • enumerate:能夠經過循環它打印出全部的線程;

咱們建立線程對象的時候是能夠給它取名字的:

t = threading.Thread(target=worker, name='thread1')
複製代碼

這個 name 能夠經過 logging 的 threadName 得到。

logging

前面提到過,print 並非線程安全的,而 logging 模塊線程安全。

>>> import logging
>>> logging.warning('hehe')
WARNING:root:hehe
>>> logging.info('hehe') # 默認只輸出 warning 以上級別
複製代碼

咱們能夠對其進行一些基礎的配置,讓其記錄 DEBUG 以上的級別,以及記錄線程名:

>>> logging.basicConfig(level=logging.DEBUG, format='%(asctime)s %(levelname)s %(threadName)s %(message)s')
>>> logging.info('hehe')
2017-09-23 15:41:36,868 INFO MainThread hehe
複製代碼

知道了它的簡單用法以後,咱們就可使用多線程了:

import logging
import threading

logging.basicConfig(level=logging.DEBUG, format='%(asctime)s %(levelname)s %(threadName)s %(lineno)d %(message)s')

def worker():
    logging.info(logging.info('work'))

for i in range(5):
    t = threading.Thread(target=worker)
    t.start()
複製代碼

使用 logging 就沒有問題了,所以咱們一般使用它來替代 print。

logging 還能夠將異常的棧追蹤信息記錄下來,這在排查錯誤的時候很是方便:

import logging

try:
    config['DE']['xxx']
except Exception as e:
    logging.exception(e)
print('xxx')
複製代碼

daemon 與 non-daemon

daemon 在 linux 上是守護進程的意思,它始終在後臺運行。而在 Python 中的 daemon 線程會在主線程退出以後退出。也就是說,若是不是 daemon 線程,主線程退出以後,非 daemon 線程還會繼續執行,直到結束退出。

線程默認不是 daemon,若是想要設置爲 daemon,那就在建立線程對象的時候,給它傳遞 daemon=True 便可。

>>> t = threading.Thread(target=worker, daemon=True)
>>> t.daemon
Out[20]: True
複製代碼

經過下面的例子證實以前的說法:

import time
import logging
import threading

logging.basicConfig(level=logging.DEBUG, format='%(asctime)s %(levelname)s %(threadName)s %(message)s')

def worker():
    logging.info('start')
    time.sleep(2)
    logging.info('end')

if __name__ == '__main__':
    logging.info('start')
    t1 = threading.Thread(target=worker, name='non-daemon')
    t1.start()
    t2 = threading.Thread(target=worker, name='daemon', daemon=True)
    t2.start()
    logging.info('end')

# 執行結果
2017-09-24 04:08:49,027 INFO MainThread start
2017-09-24 04:08:49,028 INFO non-daemon start
2017-09-24 04:08:49,028 INFO daemon start
2017-09-24 04:08:49,028 INFO MainThread end
2017-09-24 04:08:51,031 INFO non-daemon end
複製代碼

執行上面的代碼你會發現有的時候主線程退出了,可是 daemon 線程還會執行完成。這是由於雖然從日誌中看到主線程退出,可是事實上主線程是沒有退出的,它會等待非 daemon 線程執行完畢後纔會退出,這樣就給了 daemon 線程的執行時間了。當咱們將 t1 給註釋掉以後,就不可能出現主線程退出後,daemon 線程仍然執行的狀況了。

若是咱們在 t2.start() 以後增長一行 t2.join(),那即便它是 daemon 線程,主線程依然會等待它執行完畢後再退出。由於 join 會阻塞直到線程執行完畢。join 支持一個參數,那就是阻塞的秒數。t2.join(1) 表示只阻塞一秒,這個時候即便 t2 沒有執行完成,主線程依然會退出。join 用的比較多,它並不佔用 CPU 時間。

建立線程的另外一種方法

上面建立線程的方法是經過實例化 Thread,咱們還能夠經過下面這種方式:

import logging
import threading

class Mythread(threading.Thread):
    def run(self):
        logging.warning('worker')

t = Mythread()
t.start()
複製代碼

經過繼承 + 重寫 run 方法來到達啓動多線程的效果,run 等同於以前 target 指定的函數。可是 Python 中這種方法使用的不多。

當咱們建立一個線程對象的時候,除了可使用 start 啓動它以外,還能夠經過 run 來啓動。若是不是以繼承的方式建立線程,一個線程對象的 run 和 start 只能執行其中一個。

thread local

定義一個 thread local 對象。

ctx = threading.local()
複製代碼

這時的 ctx 沒有任何屬性,咱們能夠給它增長屬性:

>>> ctx.data = 5
>>> ctx.data
Out[25]: 5
複製代碼

繼續:

>>> data = 'abc' # 定義一個變量
>>> def worker():
...     logging.warning(data)
...     logging.warning(ctx.data)
...
>>> worker() # 執行沒什麼問題
WARNING:root:abc
WARNING:root:5
>>> threading.Thread(target=worker).start() # 可是經過線程執行就不行了
WARNING:root:abc # data 能夠直接打出來
Exception in thread Thread-9:
Traceback (most recent call last):
  File "/usr/local/python3/lib/python3.6/threading.py", line 916, in _bootstrap_inner
    self.run()
  File "/usr/local/python3/lib/python3.6/threading.py", line 864, in run
    self._target(*self._args, **self._kwargs)
  File "<ipython-input-32-2e99199c517b>", line 3, in worker
    logging.warning(ctx.data)
AttributeError: '_thread._local' object has no attribute 'data' # 可是 ctx.data 提示沒有
複製代碼

這是由於 ctx.data 是一個 thread local 的變量,咱們能夠給它賦值任意屬性,可是隻對當前線程可見。線程獨享!

使用 run 方法,它會將 target 放在主線程中;start 則會將其放到子線程中,兩者只能執行一個。

定時器

也能夠稱爲延時執行。Python 中存在一種特殊的線程,可用於延遲執行。它繼承自 Thread 類,所以它也是 Thread 對象。

>>> def worker():
...     logging.warning('worker')
...
>>> t = threading.Timer(interval=5, function=worker)
>>> t.start()
複製代碼
  • interval:延時多少秒執行,默認爲 30;
  • function:等同於 target。

能夠看到執行 start 方法後,五秒後纔有輸出。在等待的過程當中,它能夠經過 cancel() 終止。

它也能夠設置線程名,只不過要這樣:

>>> t = threading.Timer(interval=5, function=worker)
>>> t.name = 'Timer'
>>> t.deamon = True # 設置是否爲 daemon
複製代碼

當 function 指定的函數開始執行的時候,沒法經過 cancel() 終止。

Timer 的定時執行功能很弱,若是真的有這方面的須要,可使用 APSchedule。

event

第一種線程同步的方式。同步意味着阻塞,若是線程之間沒有聯繫,徹底沒有必要使用同步。有這麼一種需求:worker 線程作一些事情,當它完成以後,通知 boss 線程,由 boss 完成處理後續工做。這可能並不難實現,可是 boss 線程要統計 worker 線程的執行時間呢?

這就要用到線程間通訊的機制了,最簡單的是 event:

>>> event = threading.Event()
>>> event.set()
>>> event.wait()
Out[8]: True
複製代碼

它是一個 threading.Event 的對象,有 set 和 wait 這兩個方法。wait 會阻塞線程直到 set 方法被調用。

有了這兩種方法以後,咱們就能夠完成上面的需求了:

import time
import random
import logging
import datetime
import threading

def worker(event: threading.Event):
    time.sleep(random.randint(1, 5))
    event.set()

def boss(event: threading.Event):
    start = datetime.datetime.now()
    event.wait()
    logging.warning('worker exit {}'.format(datetime.datetime.now() - start))

def start():
    event = threading.Event()
    b = threading.Thread(target=boss, args=(event,), name='boss')
    b.start()
    for x in range(5):
        threading.Thread(target=worker, args=(event,), name='worker').start()

start()
複製代碼

五個 worker 線程,誰先執行完成就誰執行 event.set(),一旦 event.set 被執行,boss 線程也就會繼續執行並輸出日誌了。可是會有一個問題,因爲是隨機 sleep 時間,也就是說最快 boss 線程能夠一秒就退出,可是還有四個 worker 線程還在執行,這四個線程拉長了整個腳本的執行時間。

再作修改:

import time
import random
import logging
import datetime
import threading

logging.basicConfig(level=logging.DEBUG, format='%(asctime)s %(levelname)s %(threadName)s %(message)s')

def worker(event: threading.Event):
    s = random.randint(1, 5)
    event.wait(s) # 先阻塞
    event.set() # 一下全放開了
    logging.info('sleep {}'.format(s))

def boss(event: threading.Event):
    start = datetime.datetime.now()
    event.wait()
    logging.info('worker exit {}'.format(datetime.datetime.now() - start))

def start():
    event = threading.Event()
    threading.Thread(target=boss, args=(event,), name='boss').start()
    for x in range(5):
        threading.Thread(target=worker, args=(event,), name='worker-{}'.format(x)).start()

start()

# 執行結果
2017-09-25 06:15:42,114 INFO worker-0 sleep 2
2017-09-25 06:15:42,115 INFO boss worker exit 0:00:02.004014
2017-09-25 06:15:42,116 INFO worker-1 sleep 5
2017-09-25 06:15:42,116 INFO worker-2 sleep 4
2017-09-25 06:15:42,116 INFO worker-3 sleep 3
2017-09-25 06:15:42,117 INFO worker-4 sleep 2
複製代碼

能夠看到都在同一秒退出了,這是由於 wait 能夠指定超時時間,時間一到它就再也不阻塞。這樣阻塞時間最短的那個線程就會執行 set,這樣一來全部阻塞的線程同時放開了,因而同一時間都執行完成了。所以,wait 會阻塞線程直到 set 方法被調用,或者超時時間到。

event 能夠被多個線程所持有,多個線程能夠同時被阻塞,一旦其中一個線程執行了 set,那麼全部的線程都再也不阻塞。event 能夠在線程之間發送信號,一般用於某個線程須要其餘線程處理某些動做以後才能啓動。

event 還有一個特性,若是先 set 而後 wait,無論有沒有指定超時,它都瞬間返回 True(由於阻塞被放開,因此沒法再阻塞);而若是直接 wait,且給它一個超時時間,那麼超時完成以後,它會返回 False。咱們能夠根據這個特色來完成定時的操做。

import logging
import threading

logging.basicConfig(level=logging.DEBUG, format='%(asctime)s %(levelname)s %(threadName)s %(message)s')

def worker(event: threading.Event):
    while not event.wait(3):
        logging.info('running')

event = threading.Event()
threading.Thread(target=worker, args=(event,)).start()
複製代碼

每三秒會輸出一第二天志,會無限輸出下去。可是若是執行 event.set() 就會終止死循環。

event 還有一些方法:

  • is_set:用來判斷有沒有 set 過;
  • clean:清除 set 標誌,一般用來作線程退出的條件。
def worker(event):
     while not event.is_set():
         pass
複製代碼

wait 會主動讓出 CPU 時間片,time.sleep 卻不會。假如它們分到了 10ms 的 CPU 時間,都使用了 5ms,那麼剩餘的 5ms wait 會讓給別人,而 sleep 會本身用完。所以咱們會使用 wait 而不是 sleep。

實現定時器

延時執行。

import logging
import threading

logging.basicConfig(level=logging.DEBUG, format='%(asctime)s %(levelname)s %(threadName)s %(message)s')

class Timer:
    def __init__(self, interval, function, *args, **kwargs):
        self.interval = interval
        self.function = function
        self.args = args
        self.kwargs = kwargs
        self.event = threading.Event()
        self.thread = threading.Thread(target=self.__thread)

    def __thread(self):
        if not self.event.wait(self.interval):
            self.function(*self.args, **self.kwargs)

    def start(self):
        self.thread.start()

    def cancel(self):
        self.event.set()

def worker():
    logging.info('running')

t = Timer(interval=2, function=worker)
t.start()
複製代碼

Lock

第二種線程同步的方式。lock 用來保護共享資源,其他幾種線程同步的方式都是用了它。

import random
import logging
import threading

logging.basicConfig(level=logging.DEBUG, format='%(asctime)s %(levelname)s %(threadName)s %(message)s')

class Counter:
    def __init__(self):
        self.__val = 0

 @property
    def value(self):
        return self.__val

    def inc(self):
        self.__val += 1

    def dec(self):
        self.__val -= 1

counter = Counter()

def fn():
    if random.choice([-1, 1]) > 0:
        logging.info('inc')
        counter.inc()
    else:
        logging.info('dec')
        counter.dec()

for x in range(10):
    threading.Thread(target=fn).start()

print(counter.value)
複製代碼

上面的代碼即便你知道它加了多少次減了多少次,但你不能確定它的結果,這是由於資源的爭用。Lock 對象可用於解決這種問題:

>>> lock = threading.Lock()
>>> lock.acquire()
Out[4]: True
複製代碼

對於 lock 實例,只能調用一次 acquire 方法,再次調用會被阻塞,直到 release 方法被調用。根據它的這種特性,可用來改造以前的 Counter。

class Counter:
    def __init__(self):
        self.__val = 0
        self._lock = threading.Lock()

 @property
    def value(self):
        return self.__val

    def inc(self):
        self._lock.acquire()
        self.__val += 1
        self._lock.release()

    def dec(self):
        self._lock.acquire()
        self.__val -= 1
        self._lock.release()
複製代碼

這樣一來,無論有多少個線程,同一時間只會有一個線程可以修改 __val。可是這樣會有一個問題,若是執行加減的時候發生了異常(雖然這裏不會),那麼 release 永遠就不會執行,那麼就會造成死鎖,所以咱們要使用 try finally。

def inc(self):
        try:
            self._lock.acquire()
            self.__val += 1
        finally:
            self._lock.release()
複製代碼

從上面這種結構中咱們能夠聯想到 with,事實上它是支持 with 的,所以咱們能夠定義的更爲簡單:

def inc(self):
        with self._lock:
            self.__val += 1
複製代碼

凡是用鎖的地方,必定要在 finally 中使用 release,不然就會有鎖死的可能性。

而對於讀來講,若是不加鎖,就會存在髒讀的可能性,就看能不能忍受了。經過加鎖以後,Counter 類就變成線程安全了,咱們能夠放心的使用。

鎖是併發的難點,它會將併發變爲串行,掌握了鎖,併發就沒有絲毫問題了。那麼什麼時候須要加鎖?凡有共享資源的地方都要加鎖。

lock 對象能夠接收兩個參數:

  • blocking:當再次加鎖時,若是它爲 False,那麼不會阻塞,而是返回 False;
  • timeout:若是 blocking 爲 True,timeout 大於等於 0 會阻塞到超時,並返回 False。

預先啓動 10 個線程處理一些任務,當其中一個線程在處理其中一個任務時,其餘線程能夠處理其餘任務,這時候就能夠用到非阻塞鎖。第一個線程對該任務加非阻塞鎖,因爲以前沒有加過鎖,所以能夠加上。第二個線程再加的時候就加不上了,而且返回 False,這時就可讓它跳過這個任務去執行下一個任務了。

import logging
import threading

logging.basicConfig(level=logging.DEBUG, format='%(asctime)s %(levelname)s %(threadName)s %(message)s')

def worker(tasks):
    for task in tasks:
        # 第一個執行加鎖的線程能夠鎖,它的值爲 True。因爲鎖住了,剩下的九個線程執行的時候它的值都爲 False
        # 所以 loggi.info 語句只會執行 10 次
        if task.lock.acquire(False):
            logging.info(task.name)

class Task:
    def __init__(self, name):
        self.name = name
        self.lock = threading.Lock()

tasks = [Task(x) for x in range(10)]

for i in range(5):
    threading.Thread(target=worker, args=(tasks,), name='work-{}'.format(i)).start()
複製代碼

若是任務有前後順序的話,就只能串行了。

RLock

可重入鎖在同一個線程內可屢次加鎖,可是隻能有一個線程成功,而且 acquire 幾回,就須要 release 幾回。

>>> rlock = threading.RLock()
>>> rlock.acquire()
Out[13]: True
>>> rlock.acquire()
Out[14]: True
>>> rlock.release()
>>> rlock.release()
複製代碼

condition

第三種線程同步的方式。一般用於生產者消費者模式,生產者生產消息以後,使用 notify 和 notify_all 通知消費者進行消費。而消費者使用 wait 方法阻塞等待生產者的通知。

import random
import logging
import threading

logging.basicConfig(level=logging.DEBUG, format='%(asctime)s %(levelname)s %(threadName)s %(message)s')

class Dispatcher:
    def __init__(self):
        self.data = None
        self.event = threading.Event()
        self.cond = threading.Condition()

    def consumer(self):
        while not self.event.wait(1):
            with self.cond:
                self.cond.wait() # 會阻塞,直到 notifyAll 被執行
                logging.info(self.data)

    def producer(self):
        for _ in range(10):
            data = random.randint(0, 100)
            logging.info(data)
            self.data = data
            with self.cond:
                self.cond.notify_all()
            self.event.wait(1)
        self.event.set()

d = Dispatcher()
p = threading.Thread(target=d.producer, name='producer')
c = threading.Thread(target=d.consumer, name='consumer')
p.start()
c.start()
複製代碼

有生產者修改共享資源,而後通知消費者進行消費。

  • wait:會阻塞,直到被 notify 喚醒;
  • notifyAll:老版的駝峯寫法,現已改成下面的,但爲了兼容仍然存在;
  • notify_all:用於通知全部 wait 的線程,能夠理解爲廣播;
  • notify:接收一個數字,表示喚醒多少個 wait 線程,默認爲 1。能夠理解爲單播。

好比下面的示例中,雖然啓動了四個消費者進程,可是隻容許兩個同時消費,至因而哪兩個就不得而知了。

import random
import logging
import threading

logging.basicConfig(level=logging.DEBUG, format='%(asctime)s %(levelname)s %(threadName)s %(message)s')

class Dispatcher:
    def __init__(self):
        self.data = None
        self.event = threading.Event()
        self.cond = threading.Condition()

    def consumer(self):
        while not self.event.is_set():
            with self.cond:
                self.cond.wait()
                logging.info(self.data)

    def producer(self):
        for _ in range(10):
            data = random.randint(0, 100)
            logging.info(data)
            self.data = data
            with self.cond:
                self.cond.notify(2)
            self.event.wait(1)
        self.event.set()

d = Dispatcher()
p = threading.Thread(target=d.producer, name='producer')
for i in range(4):
    threading.Thread(target=d.consumer, name='consumer-{}'.format(i)).start()
p.start()
複製代碼

按理來講,由於有鎖的存在,因此只有在消費者的 with 代碼塊執行完畢,鎖釋放以後,生產者才能進入本身的 with 代碼塊。這樣就可以保證,消費者只有在消費完畢以後生產者才能繼續生產。可是我在運行過程當中生產者根本不會等待消費者消費,它本身一個勁的跑。

不管 notify、notify_all 仍是 wait,都必須先 acquire,完成以後必須確保 release,所以一般使用 with 語法。

barrier

第四種線程同步的方式,柵欄的意思,只有湊齊一撥人以後才往下走。從下面這段代碼中就能理解它的做用:

import logging
import threading

logging.basicConfig(level=logging.DEBUG, format='%(asctime)s %(levelname)s %(threadName)s %(message)s')

def worker(barrier: threading.Barrier):
    logging.info('waiting for {} threads'.format(barrier.n_waiting))
    try:
        # 上面的代碼各個線程何時執行,怎麼執行都無所謂
        # 可是全部線程都會在這裏同時等待,只有全部線程都執行到這了,才同時執行下面的代碼
        worker_id = barrier.wait()
    except threading.BrokenBarrierError:
        logging.warning('aborting')
    else:
        logging.info('after barrier {}'.format(worker_id))

# 實例化的時候指定攔多少個線程,若是啓動了四個線程,只要三個到齊了就能夠同時往下走了
barrier = threading.Barrier(3)
for i in range(3):
    threading.Thread(target=worker, args=(barrier,), name='worker-{}'.format(i)).start()
logging.info('start')
複製代碼

barrier 對象的一些屬性和方法:

  • wait:阻塞線程,它能夠指定超時時間,超時時間一到拋出 BrokenBarrierError 異常。若是執行過 abort 方法,那麼再執行 wait 也會拋出 BrokenBarrierError 異常;
  • reset:清除對象執行 abort 的痕跡。執行 abort 後執行 rest,接着執行 wait 就不會拋異常了;
  • n_waiting:當前有多少個線程在等待;
  • abort:通知已經在等待的線程沒必要再等了,不能由於它一個而讓其餘線程在那傻等。而一旦執行了這個方法, wait 就會拋出 BrokenBarrierError 異常,所以不處於 wait 狀態的線程是不會拋出這個異常的。

適用場景:好比有十種工做,每一個線程負責一種,只有這十個線程都初始化完成後才能工做。

semaphore

最後五種線程同步的方式。信號量和鎖很像,鎖是爲 1 的信號量。

# 建立一個爲 3 的信號量
>>> s = threading.Semaphore(3)
>>> s.acquire()
Out[84]: True
>>> s.acquire(False)
Out[85]: True
>>> s.acquire(False)
Out[86]: True
>>> s.acquire(False)
Out[87]: False
複製代碼

它能夠鎖屢次,上面鎖了三次都沒有問題,等到第四次的時候就不行了。因爲鎖只能鎖一次,因此它是爲 1 的信號量。RLock 也能鎖屢次,它是它只能用在同一個線程上,信號量卻能夠在多個線程中使用。

建立一個鏈接池的時候能夠用到它:

import time
import random
import logging
import threading

logging.basicConfig(level=logging.DEBUG, format='%(asctime)s %(levelname)s %(threadName)s %(message)s')

class Pool:
    def __init__(self, num):
        self.num = num # 指定池子的鏈接數
        self.conns = [self._make_connect(x) for x in range(num)]
        self.s = threading.Semaphore(num)

    # 這個函數是拿到鏈接以後作的操做
    def _make_connect(self, name):
        return name

    # 從池子中取出一個鏈接
    def get(self):
        self.s.acquire()
        return self.conns.pop()

    def return_resource(self, conn):
        # 執行完畢後,將鏈接放回池子中
        self.conns.insert(0, conn)
        self.s.release()

def worker(pool):
    logging.info('started')
    name = pool.get()
    logging.info('get connect {}'.format(name))
    time.sleep(random.randint(1, 3))
    pool.return_resource(name)
    logging.info('return resource {}'.format(name))

pool = Pool(3)
for i in range(5):
    threading.Thread(target=worker, args=(pool,), name='worker-{}'.format(i)).start()
複製代碼

若是不使用信號量的話,咱們還須要對池子是否爲空進行判斷。爲何將鏈接放回鏈接池中的 insert 操做不須要加鎖呢?這是由於 GIL 的影響。

信號量也是對資源的保護,可是和鎖不同的地方在於,鎖限制只有一個線程能夠訪問共享資源,而信號量限制指定個線程能夠訪問共享資源。事實上咱們只須要使用信號量就能夠了,由於鎖自己就是信號量的一種。

queue

隊列,它是進程間通訊的一種方式,隊列有三種:

  • FIFO:Queue.Queue(maxsize=0),先進先出,線程安全;
  • LIFO:Queue.LifoQueue(maxsize=0),後進先出;
  • Priority:Queue.PriorityQueue(maxsize=0),優先隊列。

建立一個先進先出隊列:

>>> import queue
>>> q = queue.Queue() # 隊列長度無限
複製代碼

對象的屬性和方法:

  • empty():判斷隊列是否爲空(不可靠)。由於等你獲取隊列的長度時,可能已經有人往裏面放入了數據;
  • full():隊列是否滿了(不可靠);
  • maxsize:查看隊列的最大長度;
  • qsize():看到隊列當前長度(不可靠);
  • clear():清空隊列;
  • join():等到隊列爲空的時候,才進行操做;
  • put():往隊列裏面添加內容,能夠爲任意數據結構。put(self, item, block=True, timeout=None),block 表示是否爲隊列是否爲阻塞狀態。隊列滿了,再往裏面加內容,隊列會阻塞。若是不阻塞會返回一個異常,默認爲阻塞狀態;timeout 是阻塞的時間,若是隊列滿了,再往隊列裏面添加數據時,timeout 時間後會拋出異常。若是爲 None(默認),它會一直阻塞,直到有線程從隊列中取出數據;
  • get():從隊列中取內容。若是是先進先出隊列,它會取出最早存進去的數據。get(self, block=True, timeout=None),若是隊列是空的,而且 timeout 爲 None,它會一直阻塞,直到有線程往隊列裏面存入數據;
  • put_nowait(item):等效於 put(item, block=False);
  • get_nowait():等效 get(item, block=False)。

咱們能夠經過它來重寫生產者消費者模型:

#!/usr/local/python3/bin/python3

import queue
import random
import logging
import threading

logging.basicConfig(level=logging.DEBUG, format='%(asctime)s %(levelname)s %(threadName)s %(message)s')

def producer(queue: queue.Queue, event: threading.Event):
    while not event.wait(3):
        data = random.randint(0, 100)
        logging.info(data)
        queue.put(data)

def consumer(queue: queue.Queue, event: threading.Event):
    while not event.is_set():
        logging.info(queue.get())

q = queue.Queue()
e = threading.Event()
threading.Thread(target=consumer, args=(q, e), name='consumer').start()
threading.Thread(target=producer, args=(q, e), name='producer').start()
複製代碼

經過 e.set() 就能中止它。相對 condition 實現的生產者消費者模型,它的優點在於能夠暫存數據,這在生產者和消費者速率不一致的時候很好用;而它的缺陷在於沒法廣播,沒法通知多個線程同時消費一條消息。由於咱們一般能夠將它們結合起來使用。

取出隊列中全部數據:

while not q.enpty():
    q.get()
複製代碼

GIL

全局解釋器鎖,這是 Python 爭議很大的一個點。正是因爲它的存在,在操做內置容器時,解釋器會在解釋器級別增長一個鎖,所以 Python 全部內置容器(字典、列表等)都是線程安全的,多線程環境下使用沒有絲毫問題。而致使的後果就是 Python 的併發性能不好。

Python 中 collection, logging 等標準庫都是線程安全的。

concurrent.futures

官網地址,Python3.2 引入的異步模塊。

建立一個線程池:

from concurrent import futures
pool = futures.ThreadPoolExecutor(max_workers=5)
複製代碼

pool 對象有三個方法。submit 用於執行一個函數:

>>> fut = pool.submit(lambda: 1+1) # 執行一段邏輯,也就是一個函數
>>> fut.result() # 獲取執行結果
Out[116]: 2
>>> fut.done() # 查看函數是否執行完成
Out[117]: True
>>> fut.running() # 是否處於運行狀態
Out[118]: False
>>> fut.cancel() # 一個已經開始運行的線程是沒法結束的,沒開始的(好比 pool 滿了在阻塞)能夠
Out[119]: False
>>> fut.exception() # 若是函數中產生了異常,能夠經過它來獲取異常的實例
複製代碼

傳遞參數:

pool.submit(self.create_vm, vm_attributes, extra_attributes, conns)
複製代碼

經過這種方式使用線程,不須要將數據發送到隊列中。

進程池由 ProcessPoolExecutor 實現,它們簡化了進程和線程的操做,而且對返回值和異常進行了處理。

建議使用 futures,雖然它沒法設置線程名(3.6 以後能夠)、daemon 等屬性,可是問題不大。

相關文章
相關標籤/搜索