使用線程可讓程序在相同的進程空間中併發地運行多個操做。python
內容目錄
- 線程對象 threading.Thread
- 肯定當前線程 getName()
- 守護線程與非守護線程 daemon
- 枚舉全部線程 enumerate()
- 子類化線程
- 定時器線程 Timer()
- 線程之間的信號 Event
- 控制對資源的訪問 Lock
- 重入鎖 RLock
- 鎖的上下文管理器with lock
- 同步線程 Condition Barrier
- 限制對資源的併發訪問 Semaphore
- 線程特殊數據 local()
使用Thread
最簡單的方法是用目標函數實例化它,並調用start()來讓它開始工做。安全
import threading def worker(): """thread worker function""" print('Worker') threads = [] for i in range(5): t = threading.Thread(target=worker) threads.append(t) t.start()
結果:網絡
Worker Worker Worker Worker Worker
生成一個線程並傳遞參數來告訴它該作什麼工做。任何類型的對象均可以做爲參數傳遞給線程。下面這個例子傳遞一個數字,而後線程就會打印出來。數據結構
import threading def worker(num): """thread worker function""" print(f'Worker: {num}') threads = [] for i in range(5): t = threading.Thread(target=worker,args=(i,)) threads.append(t) t.start()
結果:多線程
Worker: 0 Worker: 1 Worker: 2 Worker: 3 Worker: 4
使用參數來識別或命名線程是很麻煩且沒有必要的。每個線程實例都有一個帶有默認值的名稱,能夠隨着線程的建立而改變。併發
import threading import time def worker(): print(threading.current_thread().getName(), 'Starting') time.sleep(0.2) print(threading.current_thread().getName(), 'Exiting') def my_service(): print(threading.current_thread().getName(), 'Starting') time.sleep(0.3) print(threading.current_thread().getName(), 'Exiting') t = threading.Thread(name='my_service', target=my_service) w = threading.Thread(name='worker', target=worker) w2 = threading.Thread(target=worker) # use default name w.start() w2.start() t.start()
結果:"Thread-1"對應沒有命名的w2app
worker Starting Thread-1 Starting my_service Starting Thread-1 Exiting worker Exiting my_service Exiting
大多數程序不使用打印來調試。logging模塊支持使用%(threadName)s
在每一個日誌消息中嵌入線程名稱,而且也是線程安全的。dom
import logging import threading import time def worker(): logging.debug('Starting') time.sleep(0.2) logging.debug('Exiting') def my_service(): logging.debug('Starting') time.sleep(0.3) logging.debug('Exiting') logging.basicConfig( level=logging.DEBUG, format='[%(levelname)s] (%(threadName)-10s) %(message)s', ) t = threading.Thread(name='my_service', target=my_service) w = threading.Thread(name='worker', target=worker) w2 = threading.Thread(target=worker) # use default name w.start() w2.start() t.start()
結果:函數
[DEBUG] (worker ) Starting [DEBUG] (Thread-1 ) Starting [DEBUG] (my_service) Starting [DEBUG] (worker ) Exiting [DEBUG] (Thread-1 ) Exiting [DEBUG] (my_service) Exiting
守護 ---- 不阻塞主程序
非守護 -- 阻塞主程序
到目前爲止,示例程序都是等全部線程都完成了它們的工做才退出,線程默認是非守護的。ui
import threading import time import logging def daemon(): logging.debug('Starting') time.sleep(0.2) logging.debug('Exiting') def non_daemon(): logging.debug('Starting') logging.debug('Exiting') logging.basicConfig( level=logging.DEBUG, format='(%(threadName)-10s) %(message)s', ) d = threading.Thread(name='daemon', target=daemon, daemon=True) # 守護,不阻塞主程序 t = threading.Thread(name='non-daemon', target=non_daemon) # 默認非守護,阻塞主程序 d.start() t.start()
結果:因爲d是守護線程,主程序沒必要等待d完成就能夠退出。
(daemon ) Starting (non-daemon) Starting (non-daemon) Exiting
要想使主程序等待守護線程完成任務後才能退出,可使用join():
import threading import time import logging def daemon(): logging.debug('Starting') time.sleep(0.2) logging.debug('Exiting') def non_daemon(): logging.debug('Starting') logging.debug('Exiting') logging.basicConfig( level=logging.DEBUG, format='(%(threadName)-10s) %(message)s', ) d = threading.Thread(name='daemon', target=daemon, daemon=True) t = threading.Thread(name='non-daemon', target=non_daemon) d.start() t.start() d.join() t.join()
結果:
(daemon ) Starting (non-daemon) Starting (non-daemon) Exiting (daemon ) Exiting
另外,d.join(0.1)表示等待d 0.1秒。
enumerate()返回一個活動線程實例列表
import random import threading import time import logging def worker(): """thread worker function""" pause = random.randint(1, 5) / 10 logging.debug('sleeping %0.2f', pause) time.sleep(pause) logging.debug('ending') logging.basicConfig( level=logging.DEBUG, format='(%(threadName)-10s) %(message)s', ) for i in range(3): t = threading.Thread(target=worker, daemon=True) t.start() main_thread = threading.main_thread() for t in threading.enumerate(): if t is main_thread: continue logging.debug('joining %s', t.getName()) t.join()
結果:
(Thread-1 ) sleeping 0.40 (Thread-2 ) sleeping 0.50 (Thread-3 ) sleeping 0.10 (MainThread) joining Thread-1 (Thread-3 ) ending (Thread-1 ) ending (MainThread) joining Thread-2 (Thread-2 ) ending (MainThread) joining Thread-3
在啓動時,一個線程執行一些基本的初始化,而後調用它的run()方法,該方法調用傳遞給構造函數的目標函數。如今要建立線程的子類,覆蓋run()來作任何須要的事情。
import threading import logging class MyThread(threading.Thread): def run(self): logging.debug('running') logging.basicConfig( level=logging.DEBUG, format='(%(threadName)-10s) %(message)s', ) for i in range(5): t = MyThread() t.start()
結果:
(Thread-1 ) running (Thread-2 ) running (Thread-3 ) running (Thread-4 ) running (Thread-5 ) running
實例化的時候帶參數:
import threading import logging class MyThreadWithArgs(threading.Thread): def __init__(self, group=None, target=None, name=None, args=(), kwargs=None, *, daemon=None): super().__init__(group=group, target=target, name=name, daemon=daemon) self.args = args self.kwargs = kwargs def run(self): logging.debug('running with %s and %s', self.args, self.kwargs) logging.basicConfig( level=logging.DEBUG, format='(%(threadName)-10s) %(message)s', ) for i in range(5): t = MyThreadWithArgs(args=(i,), kwargs={'a': 'A', 'b': 'B'}) t.start()
結果:
(Thread-1 ) running with (0,) and {'a': 'A', 'b': 'B'} (Thread-2 ) running with (1,) and {'a': 'A', 'b': 'B'} (Thread-3 ) running with (2,) and {'a': 'A', 'b': 'B'} (Thread-4 ) running with (3,) and {'a': 'A', 'b': 'B'} (Thread-5 ) running with (4,) and {'a': 'A', 'b': 'B'}
threading.Timer()建立延時線程,可取消,取消後不在執行。
import threading import time import logging def delayed(): logging.debug('worker running') logging.basicConfig( level=logging.DEBUG, format='(%(threadName)-10s) %(message)s', ) t1 = threading.Timer(0.3, delayed) # 線程t1要在0.3s後執行 t1.setName('t1') t2 = threading.Timer(0.3, delayed) # 線程t2要在0.3s後執行 t2.setName('t2') logging.debug('starting timers') t1.start() t2.start() logging.debug('waiting before canceling %s', t2.getName()) time.sleep(0.2) logging.debug('canceling %s', t2.getName()) t2.cancel() # t2取消了,t2不會執行了 logging.debug('done')
結果:能夠看到t2沒有執行
(MainThread) starting timers (MainThread) waiting before canceling t2 (MainThread) canceling t2 (MainThread) done (t1 ) worker running
儘管使用多線程的目的是併發地運行單獨的操做,可是有時候,可以在兩個或多個線程中同步操做也很重要。事件對象是安全地在線程之間進行通訊的一種簡單方法。
python線程的事件(Event)用於主線程控制其餘線程的執行,事件主要提供了三個方法wait、clear、set,
一個事件管理一個內部標誌flag,調用者能夠用set()和clear()方法來控制它。其餘線程可使用wait()暫停直到flag被設置,有效地阻止進度,直到容許繼續爲止。
import logging import threading import time def wait_for_event(e): """Wait for the event to be set before doing anything""" logging.debug('wait_for_event starting') event_is_set = e.wait() logging.debug('event set: %s', event_is_set) def wait_for_event_timeout(e, t): """Wait t seconds and then timeout""" while not e.is_set(): logging.debug('wait_for_event_timeout starting') event_is_set = e.wait(t) logging.debug('event set: %s', event_is_set) if event_is_set: logging.debug('processing event') else: logging.debug('doing other work') logging.basicConfig( level=logging.DEBUG, format='(%(threadName)-10s) %(message)s', ) e = threading.Event() t1 = threading.Thread( name='block', target=wait_for_event, args=(e,), ) t1.start() t2 = threading.Thread( name='nonblock', target=wait_for_event_timeout, args=(e, 2), ) t2.start() logging.debug('Waiting before calling Event.set()') time.sleep(0.3) e.set() logging.debug('Event is set')
結果:
(block ) wait_for_event starting (nonblock ) wait_for_event_timeout starting (MainThread) Waiting before calling Event.set() (MainThread) Event is set (nonblock ) event set: True (block ) event set: True (nonblock ) processing event
除了同步線程的操做以外,還必須可以控制對共享資源的訪問,以防止污染或遺漏數據。Python的內置數據結構(列表、字典等)是線程安全的,在Python中實現的其餘數據結構,或者像整數和浮點數這樣的簡單類型,都沒有這種保護。使用Lock
防止同時訪問一個對象。
import logging import random import threading import time class Counter: def __init__(self, start=0): self.lock = threading.Lock() # 建立鎖 self.value = start def increment(self): logging.debug('Waiting for lock') self.lock.acquire() # 上鎖 try: logging.debug('Acquired lock') self.value = self.value + 1 finally: self.lock.release() # 釋放鎖 def worker(c): for i in range(2): pause = random.random() logging.debug('Sleeping %0.02f', pause) time.sleep(pause) c.increment() logging.debug('Done') logging.basicConfig( level=logging.DEBUG, format='(%(threadName)-10s) %(message)s', ) counter = Counter() # 實例化 for i in range(2): t = threading.Thread(target=worker, args=(counter,)) t.start() logging.debug('Waiting for worker threads') main_thread = threading.main_thread() for t in threading.enumerate(): if t is not main_thread: t.join() logging.debug('Counter: %d', counter.value)
結果:在這個例子中,worker()函數增長一個Counter實例,它管理一個鎖,以防止兩個線程同時改變其內部狀態。若是沒有使用鎖,那麼就有可能丟失value屬性的更改。
(Thread-1 ) Sleeping 0.80 (Thread-2 ) Sleeping 0.70 (MainThread) Waiting for worker threads (Thread-2 ) Waiting for lock (Thread-2 ) Acquired lock (Thread-2 ) Sleeping 0.01 (Thread-2 ) Waiting for lock (Thread-2 ) Acquired lock (Thread-2 ) Done (Thread-1 ) Waiting for lock (Thread-1 ) Acquired lock (Thread-1 ) Sleeping 0.63 (Thread-1 ) Waiting for lock (Thread-1 ) Acquired lock (Thread-1 ) Done (MainThread) Counter: 4
因爲lock.acquire()
會阻塞其餘進程,能夠用have_it = lock.acquire(0)
的方式無阻塞的嘗試獲取鎖, lock_holder()在保持和釋放鎖之間的循環,worker()無阻塞地進行嘗試上鎖。
import logging import threading import time def lock_holder(lock): logging.debug('Starting') while True: lock.acquire() try: logging.debug('Holding') time.sleep(0.5) finally: logging.debug('Not holding') lock.release() time.sleep(0.5) def worker(lock): logging.debug('Starting') num_tries = 0 num_acquires = 0 while num_acquires < 3: time.sleep(0.5) logging.debug('Trying to acquire') have_it = lock.acquire(0) try: num_tries += 1 if have_it: logging.debug('Iteration %d: Acquired', num_tries) num_acquires += 1 else: logging.debug('Iteration %d: Not acquired', num_tries) finally: if have_it: lock.release() logging.debug('Done after %d iterations', num_tries) logging.basicConfig( level=logging.DEBUG, format='(%(threadName)-10s) %(message)s', ) lock = threading.Lock() holder = threading.Thread( target=lock_holder, args=(lock,), name='LockHolder', daemon=True, ) holder.start() worker = threading.Thread( target=worker, args=(lock,), name='Worker', ) worker.start()
結果:
(LockHolder) Starting (LockHolder) Holding (Worker ) Starting (LockHolder) Not holding (Worker ) Trying to acquire (Worker ) Iteration 1: Acquired (LockHolder) Holding (Worker ) Trying to acquire (Worker ) Iteration 2: Not acquired (LockHolder) Not holding (Worker ) Trying to acquire (Worker ) Iteration 3: Acquired (LockHolder) Holding (Worker ) Trying to acquire (Worker ) Iteration 4: Not acquired (LockHolder) Not holding (Worker ) Trying to acquire (Worker ) Iteration 5: Acquired (Worker ) Done after 5 iterations
正常的Lock對象不能被屢次獲取,即便是相同的線程。若是一個鎖被同一個調用鏈中的多個函數訪問,那麼這會帶來不但願的反作用。
import threading lock = threading.Lock() print('First try :', lock.acquire()) print('Second try:', lock.acquire(0))
結果:lock.acquire(0),0超時防止阻塞
First try : True Second try: False
在這種狀況下,來自同一線程的獨立代碼須要「從新得到」鎖,而是使用RLock。
import threading lock = threading.RLock() print('First try :', lock.acquire()) print('Second try:', lock.acquire(0))
結果:
First try : True Second try: True
鎖實現上下文管理器API,而且與with語句兼容。使用消除了顯式獲取和釋放鎖的須要
import threading import logging def worker_with(lock): with lock: logging.debug('Lock acquired via with') def worker_no_with(lock): lock.acquire() try: logging.debug('Lock acquired directly') finally: lock.release() logging.basicConfig( level=logging.DEBUG, format='(%(threadName)-10s) %(message)s', ) lock = threading.Lock() w = threading.Thread(target=worker_with, args=(lock,)) nw = threading.Thread(target=worker_no_with, args=(lock,)) w.start() nw.start()
結果:這兩個函數workerwith()和workernowith()以等價的方式管理鎖。
(Thread-1 ) Lock acquired via with (Thread-2 ) Lock acquired directly
除了使用 Event
以外,另外一種同步線程的方法是使用一個Condition
對象。由於Condition
使用Lock
,因此它能夠綁定到共享資源,容許多個線程等待資源更新。在這個例子中, consumer()
線程在運行以前等待Condition
設置。producer()
線程負責設置Condition
,並通知其餘線程能夠繼續執行了。
import logging import threading import time def consumer(cond): """wait for the condition and use the resource""" logging.debug('Starting consumer thread') with cond: cond.wait() logging.debug('Resource is available to consumer') def producer(cond): """set up the resource to be used by the consumer""" logging.debug('Starting producer thread') with cond: logging.debug('Making resource available') cond.notifyAll() logging.basicConfig( level=logging.DEBUG, format='%(asctime)s (%(threadName)-2s) %(message)s', ) condition = threading.Condition() c1 = threading.Thread(name='c1', target=consumer, args=(condition,)) c2 = threading.Thread(name='c2', target=consumer, args=(condition,)) p = threading.Thread(name='p', target=producer, args=(condition,)) c1.start() time.sleep(0.2) c2.start() time.sleep(0.2) p.start()
結果,這裏用了with
,顯式地用acquire() 和 release()也能夠。
2018-05-01 15:39:49,561 (c1) Starting consumer thread 2018-05-01 15:39:49,761 (c2) Starting consumer thread 2018-05-01 15:39:49,962 (p ) Starting producer thread 2018-05-01 15:39:49,962 (p ) Making resource available 2018-05-01 15:39:49,963 (c1) Resource is available to consumer 2018-05-01 15:39:49,964 (c2) Resource is available to consumer
Barrier是另外一個線程同步機制
threading.Barrier(parties, action=None, timeout=None)
構建Barrier對象,parties 指定參與方數目,timeout是wait方法未指定時超時的默認值。
n_waiting 當前在柵欄中等待的線程數
parties 經過柵欄所需的線程數
wait(timeout=None) 等待經過柵欄,返回0到線程數-1的整數(barrier_id),每一個線程返回不一樣。若是wait方法設置了超時,並超時發送,柵欄將處於broken狀態。
import threading import time def worker(barrier): print(threading.current_thread().name, 'waiting for barrier with {} others'.format( barrier.n_waiting)) worker_id = barrier.wait() print(threading.current_thread().name, 'after barrier', worker_id) NUM_THREADS = 3 barrier = threading.Barrier(NUM_THREADS) threads = [ threading.Thread( name='worker-%s' % i, target=worker, args=(barrier,), ) for i in range(NUM_THREADS) ] for t in threads: print(t.name, 'starting') t.start() time.sleep(0.1) for t in threads: t.join()
結果:在這個例子中,Barrier被配置爲阻塞,直到三個線程正在等待。當條件知足時,全部的線程都會在同一時間經過控制點。
worker-0 starting worker-0 waiting for barrier with 0 others worker-1 starting worker-1 waiting for barrier with 1 others worker-2 starting worker-2 waiting for barrier with 2 others worker-2 after barrier 2 worker-0 after barrier 0 worker-1 after barrier 1
abort() 將Barrie置於broken狀態,等待中的線程或者調用等待方法的線程都會拋出threading.BrokenBarrieError異常
import threading import time def worker(barrier): print(threading.current_thread().name, 'waiting for barrier with {} others'.format( barrier.n_waiting)) try: worker_id = barrier.wait() except threading.BrokenBarrierError: print(threading.current_thread().name, 'aborting') else: print(threading.current_thread().name, 'after barrier', worker_id) NUM_THREADS = 3 barrier = threading.Barrier(NUM_THREADS + 1) threads = [ threading.Thread( name='worker-%s' % i, target=worker, args=(barrier,), ) for i in range(NUM_THREADS) ] for t in threads: print(t.name, 'starting') t.start() time.sleep(0.5) barrier.abort() for t in threads: t.join()
結果:指望阻塞的數量比實際線程數多一個,這樣就都阻塞了,這時候用abort()就停止了因此線程
worker-0 starting worker-0 waiting for barrier with 0 others worker-1 starting worker-1 waiting for barrier with 1 others worker-2 starting worker-2 waiting for barrier with 2 others worker-0 aborting worker-1 aborting worker-2 aborting
有時可能須要容許多個工做線程同時訪問一個資源,但要限制總數。例如,鏈接池支持同時鏈接,但數目多是固定的,或者一個網絡應用可能支持固定數目的併發下載。這些鏈接就可使用Semaphore來管理。
import logging import random import threading import time class ActivePool: def __init__(self): super(ActivePool, self).__init__() self.active = [] self.lock = threading.Lock() def makeActive(self, name): with self.lock: self.active.append(name) logging.debug('Running: %s', self.active) def makeInactive(self, name): with self.lock: self.active.remove(name) logging.debug('Running: %s', self.active) def worker(s, pool): logging.debug('Waiting to join the pool') # with上下文 with s: name = threading.current_thread().getName() pool.makeActive(name) time.sleep(0.1) pool.makeInactive(name) logging.basicConfig( level=logging.DEBUG, format='%(asctime)s (%(threadName)-2s) %(message)s', ) pool = ActivePool() s = threading.Semaphore(2) for i in range(4): t = threading.Thread( target=worker, name=str(i), args=(s, pool), ) t.start()
結果:能夠看到每次最多同時有2個線程運行,添加新線程要在其中有線程完成工做以後,不然會等待。
2018-05-01 18:18:46,359 (0 ) Waiting to join the pool 2018-05-01 18:18:46,360 (0 ) Running: ['0'] 2018-05-01 18:18:46,360 (1 ) Waiting to join the pool 2018-05-01 18:18:46,361 (1 ) Running: ['0', '1'] 2018-05-01 18:18:46,361 (2 ) Waiting to join the pool 2018-05-01 18:18:46,362 (3 ) Waiting to join the pool 2018-05-01 18:18:46,460 (0 ) Running: ['1'] 2018-05-01 18:18:46,460 (2 ) Running: ['1', '2'] 2018-05-01 18:18:46,461 (1 ) Running: ['2'] 2018-05-01 18:18:46,461 (3 ) Running: ['2', '3'] 2018-05-01 18:18:46,560 (2 ) Running: ['3'] 2018-05-01 18:18:46,561 (3 ) Running: []
雖然有些資源須要被鎖定,以便多個線程可使用它們,但也須要保護其餘資源,以便將它們隱藏在不擁有它們的線程中。local()
類建立了一個可以在單獨的線程中隱藏值的對象,其餘線程看不到,儘管名字同樣。
import random import threading import logging def show_value(data): try: val = data.value except AttributeError: logging.debug('No value yet') else: logging.debug('value=%s', val) def worker(data): show_value(data) data.value = random.randint(1, 100) show_value(data) logging.basicConfig( level=logging.DEBUG, format='(%(threadName)-10s) %(message)s', ) local_data = threading.local() show_value(local_data) local_data.value = 1000 show_value(local_data) for i in range(2): t = threading.Thread(target=worker, args=(local_data,)) t.start()
結果:
(MainThread) No value yet (MainThread) value=1000 (Thread-1 ) No value yet (Thread-1 ) value=64 (Thread-2 ) No value yet (Thread-2 ) value=99
爲了初始化設置,全部的線程都以相同的值開始,使用一個子類並在init()中設置屬性。
import random import threading import logging def show_value(data): try: val = data.value except AttributeError: logging.debug('No value yet') else: logging.debug('value=%s', val) def worker(data): show_value(data) data.value = random.randint(1, 100) show_value(data) class MyLocal(threading.local): def __init__(self, value): super().__init__() logging.debug('Initializing %r', self) self.value = value logging.basicConfig( level=logging.DEBUG, format='(%(threadName)-10s) %(message)s', ) local_data = MyLocal(1000) show_value(local_data) for i in range(2): t = threading.Thread(target=worker, args=(local_data,)) t.start()
結果:利用 初始化 給每一個調用local_data實例的線程賦初值。
(MainThread) Initializing <__main__.MyLocal object at 0x00000000026014C8> (MainThread) value=1000 (Thread-1 ) Initializing <__main__.MyLocal object at 0x00000000026014C8> (Thread-1 ) value=1000 (Thread-1 ) value=52 (Thread-2 ) Initializing <__main__.MyLocal object at 0x00000000026014C8> (Thread-2 ) value=1000 (Thread-2 ) value=94