併發編程的方法:python
多線程、加載子進程、設計生成器函數的技巧。編程
1、啓動和中止線程安全
threading庫用來在單獨的線程中執行任意的Python可調用對象。服務器
from threading import Thread t = Thread(target=func, args=(10,)) t.start()
當建立一個線程實例時,在調用它的start()方法以前,線程並不會馬上開始執行。網絡
線程實例會在他們本身所屬的系統級線程中執行,這些線程徹底由操做系統來管理。一旦啓動後,線程就開始獨立運行、直到目標函數返回爲止。多線程
能夠查詢線程實例來判斷它是否還在運行。閉包
if t.is_alive(): print(' Still alive') else: print('Completed')
也能夠請求鏈接(join)到某個線程上,這麼作會等待該線程結束。架構
解釋器會一直保持運行,知道全部的線程都終結爲止。併發
對於須要長時間運行的線程或者一直不斷運行的後臺任務,應該考慮將這些線程設置爲daemon(守護線程)app
>>> t = Thread(target=func, args=(10,), daemon=True)
daemon線程是沒法被鏈接的。可是,當主線程結束後他們會自動銷燬掉。
若是須要終止線程,必需要可以在某個指定的點上輪詢退出狀態,這就須要編程實現。
import time from threading import Thread class CountdownTask: def __init__(self): self._running = True def terminate(self): self._running = False def run(self, n): while self._running and n > 0: print('T-minus', n) n -= 1 time.sleep(2) print('Threading Done') c = CountdownTask() t = Thread(target=c.run, args=(10,), ) t.start() time.sleep(5) c.terminate() t.join()
若是線程執行阻塞性的I/O操做,須要爲線程加上超時循環。
class IOTask: def __init__(self): self._running = True def run(self): sock.settimeout(5) while self._running: try: data = sock.recv(1024) break except socket.timeout: continue return
因爲全局解釋器鎖GIL的存在,Python線程的執行模型被限制爲在任意時刻只容許在解釋器中運行一個線程。
2、判斷線程是否已經啓動
線程的核心特徵:可以以非肯定性的方式獨立執行。(即什麼時候開始執行、什麼時候被打斷、什麼時候恢復執行徹底由操做系統來調度管理)
若是線程須要判斷某個線程是否已經到達執行過程當中的某個點。根據這個判斷來執行後續的操做,產生了棘手的線程同步問題。
threading庫中的Event對象。
若是事件沒有被設置而線程正在等待該事件event.wait(),那麼線程就會被阻塞(即,進入休眠狀態),直到事件被設置爲止event.set()。
當線程設置了這個事件時,這會喚醒全部正在等待該事件的線程。
若是線程等待的事件已經設置了,那麼線程會繼續執行。
from threading import Thread,Event import time def countdown(n,event): print(' countdown starting ') event.set() while n > 0: print('T-minus', n) n -= 1 time.sleep(2) if __name__ == '__main__': started_evet = Event() t = Thread(target=countdown, args=(10,started_evet)) t.start() started_evet.wait() print(' countdown is running ')
當運行這段代碼時,字符串「countdown is running」,老是會在「countdown starting」以後顯示。
使用了事件來同步線程,使得主線程等待,直到countdown()函數首先打印出啓動信息以後纔開始執行。
Event對象最好只用於一次性的事件。
若是線程打算一遍又一遍地重複通知某個事件,最好使用Condition對象來處理。
import threading import time class PeriodicTimer: def __init__(self, interval): self._interval = interval self._flag = 0 self._cv = threading.Condition() def start(self): t = threading.Thread(target=self.run) t.daemon = True t.start() def run(self): ''' Run the timer and notify waiting threads after each interval ''' while True: time.sleep(self._interval) with self._cv: self._cv.notify(1) print('time.sleep done 5') def wait_for_tick(self): ''' Wait for the next tick of the timer ''' with self._cv: self._cv.wait() # Example use of the timer ptimer = PeriodicTimer(3) ptimer.start() # Two threads that synchronize on the timer def countdown(nticks): while nticks > 0: ptimer.wait_for_tick() print('T-minus', nticks) nticks -= 1 def countup(last): n = 0 while n < last: ptimer.wait_for_tick() print('Counting', n) n += 1 threading.Thread(target=countup, args=(5,)).start() threading.Thread(target=countdown, args=(10,)).start()
Event對象的關鍵特性就是它會喚醒全部等待的線程。
若是隻但願喚醒一個單獨的等待線程,那麼最好使用Semaphore或者Condition對象。
# Worker thread def worker(n, sema): # Wait to be signaled sema.acquire() # Do some work print('Working', n) # Create some threads sema = threading.Semaphore(0) nworkers = 10 for n in range(nworkers): t = threading.Thread(target=worker, args=(n, sema,)) t.start()
運行上邊的代碼將會啓動一個線程池,可是並無什麼事情發生。
這是由於全部的線程都在等待獲取信號量。每次信號量被釋放,只有一個線程會被喚醒並執行,示例以下:
>>> sema.release() Working 0 >>> sema.release() Working 1
3、線程間通訊
程序中有多個線程,在線程之間實現安全的通訊或者交換數據。
將數據從一個線程發往另外一個線程最安全的作法就是使用queue模塊中的Queue隊列了。
建立一個Queue隊列,使用put()或者get()操做來給隊列添加或移除元素。
from threading import Thread from queue import Queue def producer(out_q): while True: out_q.put(data) def consumer(in_q): while True: data = in_q.get() q = Queue() t1 = Thread(target=producer, args=(q,)) t2 = Thread(target=consumer, args=(q,)) t1.start() t2.start()
Queue實例已經擁有了全部所需的鎖,所以它能夠安全地在任意多的線程之間共享。
生產者和消費者之間的協調同步,使用一個特殊的終止值。
_sentinel = object() def producer(out_q): while True: out_q.put(data) out_q.put(_sentinel) def consumer(in_q): while True: data = in_q.get() if data is _sentinel: in_q.put(_sentinel) break
建立一個線程安全的優先隊列
import heapq import threading class PriorityQueue: def __init__(self): self._queue = [] self._count = 0 self._cv = threading.Condition() def put(self, item, priority): with self._cv: heapq.heappush(self._queue, (-priority, self._count, item)) self._count += 1 self._cv.notify() def get(self): with self._cv: while len(self._queue) == 0: self._cv.wait() return heapq.heappop(self._queue)[-1]
經過隊列實現線程間通訊是一種單方向且不肯定的過程。
沒法得知線程什麼時候會實際接收到消息並開始工做。
Queue對象提供了事件完成功能。task_done()和join()方法。
from threading import Thread from queue import Queue def producer(out_q): while True: out_q.put(data) def consumer(in_q): while True: data = in_q.get() in_q.task_done() q = Queue() t1 = Thread(target=producer, args=(q,)) t2 = Thread(target=consumer, args=(q,)) t1.start() t2.start() q.join()
當消費者線程已經處理了某項特定的數據,而生產者須要對此馬上感知的話,那麼就應該將發送的數據和一個Event對象配對在一塊兒。
def producer(out_q): while True: evt = Event() out_q.put((data, evt)) evt.wait() def consumer(in_q): while True: data,evt = in_q.get() evt.set()
在線程中使用隊列時, 將某個數據放入隊列並不會產生該數據的拷貝。
所以,通訊過程當中實際上涉及在不一樣的線程間傳遞對象的引用。
Queue的get()和put()方法都支持非阻塞和超時機制
import queue q = queue.Queue() try: q.put(item, block=False) except queue.Full: pass try: q.get(block=False) except queue.Empty: pass try: q.put(item, timeout=5.0) except queue.Full: pass try: q.get(timeout = 5.0) except queue.Empty: pass
能夠避免在特定的隊列操做上無限期阻塞下去的問題。
還有q.qsize()、q.full()、q.empty(),查詢隊列的當前大小和狀態。可是,這些方法在多線程環境中都是不可靠的。
4、對臨界區加鎖
讓可變對象安全地用在多線程中,能夠利用threading庫中的Lock對象來解決
import threading class SharedCounter: def __init__(self, initial_value = 0): self._value = initial_value self._value_lock = threading.Lock() def incr(self, delta=1): with self._value_lock: self._value += delta def decr(self, delta=1): with self._value_lock: self._value -= delta
當使用with語句時,Lock對象可確保產生互斥的行爲。
同一時間只容許一個線程執行with語句塊中的代碼。
在較老的代碼中,咱們常會看到顯式地獲取和釋放鎖的動做。
def incr(self, delta=1): self._value_lock.acquire() self._valeu += delta self._value_lock.release() def decr(self, delta=1): self._value_lock.acquire() self._value -= delta self._value_lock.release()
採用with語句不容易出錯,若是程序恰好在持有鎖的時候拋出了異常,而with語句會確保老是釋放鎖。
RLock被稱爲可重入鎖,他能夠被同一個線程屢次獲取,用來編寫基於鎖的代碼或者基於監視器的同步處理。
當某個類持有這種類型的鎖時,只有一個線程可使用類中的所有函數或者方法。
class SharedCounter: _lock = threading.RLock() def __init__(self, initial_value = 0): self._value = initial_value def incr(self, delta=1): with SharedCounter._lock: self._value += delta def decr(self, delta=1): with SharedCounter._lock: self.incr(-delta)
只有一個做用於整個類的鎖,它被全部的類實例所共享。
這個鎖能夠確保每次只有一個線程可使用類中的方法。已經持有了鎖的方法能夠調用一樣使用了這個鎖的其餘方法。
不管建立多少個counter實例,都只會有一個鎖存在。
Semaphore對象是一種基於共享計數器的同步原語。
若是計數器非零,那麼with語句會遞減計數器而且容許線程繼續執行,當with語句塊結束時計數器會獲得遞增。
若是計數器爲零,那麼執行過程會被阻塞,直到由另外一個線程來遞增計數器爲止。
若是想在代碼中限制併發的數量,可使用Semaphore來處理。
5、避免死鎖
線程一次品牌須要獲取不止一把鎖,同時還要避免出現死鎖。
出現死鎖常見緣由是有一個線程獲取到第一個鎖,可是在嘗試獲取第二個鎖時阻塞了,那麼這個線程就有可能會阻塞住其餘線程的執行,進而使得整個程序僵死。
解決方案:爲每一個鎖分配一個惟一的數字編號,而且在獲取多個鎖時只按照編號的升序方式來獲取。
import threading from contextlib import contextmanager _local = threading.local() @contextmanager def acquire(*locks): locks = sorted(locks, key=lambda x:id(x)) acquired = getattr(_local, 'acquired', []) if acquired and max(id(lock) for lock in acquired) >= id(locks[0]): raise RuntimeError('Lock Order Violation') acquired.extend(locks) _local.acquired = acquired try: for lock in locks: lock.acquire() yield finally: for lock in reversed(locks): lock.release() del acquired[-len(locks):]
acquire()函數根據對象的數字編號對所鎖進行排序。
經過對鎖進行排序,不管用戶按照什麼順序將鎖提供給acquire()函數,他們老是會按照統一的順序來獲取。
運行代碼:
x_lock = threading.Lock() y_lock = threading.Lock() def thread_1(): while True: with acquire(x_lock, y_lock): print('Thread 1.....') def thread_2(): while True: with acquire(y_lock, x_lock): print('Thread 2.....') t1 = threading.Thread(target=thread_1) t1.daemon = True t1.start() t2 = threading.Thread(target=thread_2) t2.daemon = True t2.start()
示例使用到了線程本地存儲來解決一個小問題。即,若是有多個acquire()操做嵌套在一塊兒時,能夠檢測可能存在的死鎖狀況。
def thread_1(): while True: with acquire(x_lock): with acquire(y_lock): print('Thread 1.....') def thread_2(): while True: with acquire(y_lock): with acquire(x_lock): print('Thread 2.....')
每一個線程會記住他們已經獲取到的鎖的順序。
acquire()函數會檢測以前獲取到的鎖的列表,並對鎖的順序作強制性的約束。
先獲取到的鎖的對象ID必須比後獲取的鎖的ID要小。
解決哲學家就餐問題:(5個科學家圍在一塊兒,拿筷子吃飯,每支筷子表明一把鎖)
def philosopher(left, right): while True: with acquire(left, right): print(threading.current_thread(), 'eating...') NSTICKS = 5 chopsticks = [threading.Lock() for n in range(NSTICKS)] for n in range(NSTICKS): t = threading.Thread(target=philosopher, args=(chopsticks[n], chopsticks[(n+1) % NSTICKS])) t.start()
6、保存線程專有狀態
在多線程程序中,須要保存專屬於當前運行線程的狀態。經過threading.local()來建立一個線程本地存儲對象。
在這個對象上保存和讀取的屬性只對當前運行的線程可見,其餘線程沒法感知。
import threading from socket import socket,AF_INET,SOCK_STREAM class LazyConnection: def __init__(self, address, family=AF_INET, type=SOCK_STREAM): self.address = address self.family = family self.type = type self.local = threading.local() def __enter__(self): if hasattr(self.local, 'sock'): raise RuntimeError('Already Connected') self.local.sock = socket(self.family, self.type) self.local.sock.connect(self.address) return self.local.sock def __exit__(self, exc_type, exc_val, exc_tb): self.local.sock.close() del self.local.sock
每一個線程都建立了本身專屬的socket鏈接。
當不一樣線程在socket上執行操做時,它們並不會互相產生影響,由於它們都是在不一樣的socket上完成操做的。
threading.local()實例爲每一個線程維護者一個單獨的實例字典。全部對實例的常見操做好比獲取、設定以及刪除都只是做用於每一個線程專有的字典上。
7、建立線程池
建立一個工做者線程池用來處理客戶端鏈接。
concurrent.futures庫中包含有一個ThreadPoolExecutor類可用來實現這個目的。
from concurrent.futures import ThreadPoolExecutor from socket import socket, AF_INET, SOCK_STREAM def echo_client(sock, client_addr): print('Got COnnection from ', client_addr) while True: msg = sock.recv(1024) if not msg: break sock.sendall(msg) print('Client closed connection') sock.close() def echo_server(addr): pool = ThreadPoolExecutor(128) sock = socket(AF_INET, SOCK_STREAM) sock.bind(addr) sock.listen(5) while True: client_sock, client_addr = sock.accept() echo_client(echo_client, client_sock, client_addr) echo_server(('',18000))
使用Queue來手動建立一個線程池。
from socket import socket, AF_INET, SOCK_STREAM from threading import Thread from queue import Queue def echo_client(q): sock, client_addr = q.get() print('Got COnnection from ', client_addr) while True: msg = sock.recv(1024) if not msg: break sock.sendall(msg) print('Client closed connection') sock.close() def echo_server(addr, nworkers): q = Queue() for n in range(nworkers): t = Thread(target=echo_client, args=(q,)) t.daemon = True t.start() sock = socket(AF_INET, SOCK_STREAM) sock.bind(addr) sock.listen(5) while True: client_sock, client_addr = sock.accept() q.put((client_sock, client_addr)) echo_server(('',18000), 128)
使用ThreadPoolExecutor類實現線程池,使得更容易從調用函數中取得結果。
from concurrent.futures import ThreadPoolExecutor import urllib.request def fetch_url(url): u = urllib.request.urlopen(url) data = u.read() return data pool = ThreadPoolExecutor(10) a = pool.submit(fetch_url, 'http://www.python.org') b = pool.submit(fetch_url, 'http://www.pypy.org') a.result() b.result()
a.result()操做會阻塞,直到對應的函數已經由線程池執行完畢並返回告終果爲止。
當建立一個線程時,操做系統會佔用一段虛擬內存來保存線程的執行棧(一般8M),這段內存只有一小部分會實際映射到物理內存上。
>>> threading.stack_size(65535) 調整線程棧的大小。
所以,Python進程佔用的物理內存遠比虛擬內存要小。
8、實現簡單的並行編程
concurrent.futures庫中提供了ProcessPoolExecutor類,用來在單獨運行的Python解釋器實例中執行計算密集型的函數。
import gzip import io import glob def find_robots(filename): ''' Find all of the hosts that access robots.txt in a single log file ''' robots = set() with gzip.open(filename) as f: for line in io.TextIOWrapper(f,encoding='ascii'): fields = line.split() if fields[6] == '/robots.txt': robots.add(fields[0]) return robots def find_all_robots(logdir): ''' Find all hosts across and entire sequence of files ''' files = glob.glob(logdir+'/*.log.gz') all_robots = set() for robots in map(find_robots, files): all_robots.update(robots) return all_robots if __name__ == '__main__': robots = find_all_robots('logs') for ipaddr in robots: print(ipaddr)
上面的程序以經常使用的map-reduce風格來編寫。函數find_robots()被映射到一系列的文件名上
修改成利用多個CPU核心的程序。
import gzip import io import glob from concurrent import futures def find_robots(filename): ''' Find all of the hosts that access robots.txt in a single log file ''' robots = set() with gzip.open(filename) as f: for line in io.TextIOWrapper(f,encoding='ascii'): fields = line.split() if fields[6] == '/robots.txt': robots.add(fields[0]) return robots def find_all_robots(logdir): ''' Find all hosts across and entire sequence of files ''' files = glob.glob(logdir+'/*.log.gz') all_robots = set() with futures.ProcessPoolExecutor() as pool: for robots in pool.map(find_robots, files): all_robots.update(robots) return all_robots if __name__ == '__main__': robots = find_all_robots('logs') for ipaddr in robots: print(ipaddr)
ProcessPoolExecutor的典型用法:
from concurrent.futures import ProcessPoolExecutor with ProcessPoolExecutor() as pool: ... do work in parallel using pool ...
在底層,ProcessPoolExecutor建立了N個獨立運行的Python解釋器,這裏的N就是在系統上檢測到的可用的CPU個數。
能夠修改建立的Python進程格式,只要給ProcessPoolExecutor(N)提供參數便可。
進程池會一直運行,直到with語句塊中的最後一條語句執行完畢爲止,此時進程池就會關閉。
可是,程序會一直等待全部已經提交的任務都處理完畢爲止。
提交到進程池中的任務必須定義成函數的形式。
(1)若是想並行處理一個列表推導式或者map()操做,可使用pool.map()
(2)經過pool.submit()方法來手動提交一個單獨的任務。
def work(x): ... return result with ProcessPoolExecutor() as pool: ... # Example of submitting work to the pool future_result = pool.submit(work, arg) # Obtaining the result (blocks until done) r = future_result.result() ...
手動提交任務,獲得一個Future實例。
要獲取實際的結果還須要調用它的 result() 方法,這麼作會阻塞進程,直到完成了計算並將結果返回給進程池爲止。
或者提供一個回調函數,讓它在任務完成時獲得觸發執行。
def when_done(r): print('Got:', r.result()) with ProcessPoolExecutor() as pool: future_result = pool.submit(work, arg) future_result.add_done_callback(when_done)
9、規避GIL帶來的限制
解釋器被一個稱之爲全局解釋器鎖(GIL)的東西保護着,在任意時刻只容許一個Python線程投入執行。
GIL帶來的最明顯影響是Python程序沒法充分利用多個CPU核心帶來的優點。
一個採用多線程的技術的計算密集型應用只能在一個CPU上運行。
對於I/O密集型的線程,每當阻塞等待I/O操做時解釋器都會釋放GIL。
對於歷來不執行任何I/O操做的CPU密集型線程,Python解釋器會在執行了必定數量的字節碼後釋放GIL,以便其餘線程獲得執行的機會。
可是C語言擴展模塊不一樣,調用C函數時GIL會被鎖定,直到它返回爲止。
因爲C代碼的執行是不受解釋器控制的,這一期間不會執行任何Python字節碼,所以解釋器就無法釋放GIL了。
若是編寫的C語言擴展調用了會阻塞的C函數,執行耗時很長的操做等,那麼必須等到C函數返回時纔會釋放GIL,這時其餘的線程就僵死了。
規避GIL的限制主要有兩種經常使用的策略。
(1)使用multiprocessing模塊來建立進程池,把它當作協處理器來使用。
線程版本:
# Performs a large calculation (CPU bound) def some_work(args): ... return result # A thread that calls the above function def some_thread(): while True: ... r = some_work(args) ...
使用進程池的方式:
# Processing pool (see below for initiazation) pool = None # Performs a large calculation (CPU bound) def some_work(args): ... return result # A thread that calls the above function def some_thread(): while True: ... r = pool.apply(some_work, (args)) ... # Initiaze the pool if __name__ == '__main__': import multiprocessing pool = multiprocessing.Pool()
每當有線程要執行CPU密集型的任務時,它就把任務提交到池中,而後進程池將任務轉交給運行在另外一個進程中的Python解釋器。
當線程等待結果的時候就會釋放GIL。
(2)將計算密集型的任務轉移到C語言中,使其獨立於Python,在C代碼中釋放GIL。
總結:
CPU密集型的處理才須要考慮GIL,I/O密集型的處理則沒必要。
當使用進程池來規避GIL,涉及同另外一個Python解釋器之間進行數據序列化和通訊的處理。
待執行的操做須要包含在以def語句定義的Python函數中(lambda、閉包、可調用實例都是不能夠的!),並且函數參數和返回值必須兼容於pickle編碼。
此外,要完成的工做規模必須足夠大,這樣能夠彌補額外產生的通訊開銷。
將線程和進程混在一塊兒使用。最好在先建立任何線程以前將進程池做爲單例(singleton)在程序啓動的時候建立。
以後,線程就可使用相同的進程池來處理全部那些計算密集型的工做了。
對於C語言的擴展,最重要的是保持與Python解釋器進程的隔離。確保C代碼能夠獨立於Python執行。
10、定義一個Actor任務
actor模式是用來解決併發和分佈式計算問題的方法之一。
actor就是一個併發執行的任務,他只是簡單地對發送給它的消息進行處理。
做爲對這些消息的響應,actor會決定是否要對其餘的actor發送進一步的消息。
actor任務之間的通訊是單向且異步的,消息的發送者並不知道消息什麼時候纔會實際傳遞,當消息已經處理完畢時也不會接受到響應或者確認。
把線程和隊列結合起來使用很容易定義出actor:
from queue import Queue from threading import Thread,Event class ActorExit(Exception): pass class Actor: def __init__(self): self._mailbox = Queue() def send(self, msg): self._mailbox.put(msg) def recv(self): msg = self._mailbox.get() if msg is ActorExit: raise ActorExit return msg def close(self): self.send(ActorExit) def start(self): self._terminated = Event() t = Thread(target=self._boostrap) t.daemon = True t.start() def _boostrap(self): try: self.run() except ActorExit: pass finally: self._terminated.set() def join(self): self._terminated.wait() def run(self): while True: msg = self.recv() class PrintActor(Actor): def run(self): while True: msg = self.recv() print('Got', msg) p = PrintActor() p.start() p.send('HHHH') p.send('wwwwwwww') p.close() p.join()
使用actor實例的send()方法來發送消息。在底層,將消息放入到隊列上,內部運行的線程會從中取出收到的消息處理。
close()方法經過在隊列中放置一個特殊的終止值(ActorExit)開關閉Actor。
若是將併發和異步消息傳遞的需求去掉,那麼徹底能夠用生成器來定義個最簡化的actor對象。
def print_actor(): while True: try: msg = yield print('Got',msg) except GeneratorExit: print('Actor terminating') p = print_actor() next(p) p.send('HHHHH') p.send('wwwwwwwww') p.close()
actor核心操做send(),在基於actor模式的系統中,消息的概念能夠擴展到許多不一樣的方向。
能夠以元組的形式傳遞帶標籤的消息,讓actor執行不一樣的操做。
class TaggedActor(Actor): def run(self): while True: tag, *payload = self.recv() getattr(self,'do_'+tag)(*payload) def do_A(self, x): print('Running A', x) def do_B(self, x, y): print('Running B', x, y) a = TaggedActor() a.start() a.send(('A',3)) a.send(('B',3,888))
一個actor變形,容許在工做者線程中執行任意的函數,並經過特殊的Result對象將結果回傳。
from threading import Event class Result: def __init__(self): self._evt = Event() self._result = None def set_result(self, value): self._result = value self._evt.set() def result(self): self._evt.wait() return self._result class Worker(Actor): def submit(self, func, *args, **kwargs): r = Result() self.send((func, args, kwargs, r)) return r def run(self): while True: func, args, kwargs, r = self.recv() r.set_result(func(*args, **kwargs)) worker = Worker() worker.start() r = worker.submit(pow, 2, 3) print(r.result())
能夠給actor對象的send()方法實現爲在socket鏈接上傳輸數據,或者經過某種消息傳遞的基礎架構(AMQP、ZMQ)來完成傳遞。
11、實現發佈者/訂閱者消息模式
實現發佈者/訂閱者消息模式,通常來講須要引入一個單獨的「交換」或者「網關」這樣的對象,做爲全部消息的中介。
不是直接將消息從一個任務發往另外一個任務,而是將消息發往交換中介,由交換中介將消息轉發給一個或多個相關聯的任務。
from collections import defaultdict class Exchange: def __init__(self): self._subscribers = set() def attach(self, task): self._subscribers.add(task) def detach(self, task): self._subscribers.remove(task) def send(self, msg): for subscriber in self._subscribers: subscriber.send(msg) _exchanges = defaultdict(Exchange) def get_exchange(name): return _exchanges[name] class Task: def send(self, msg): pass task_a = Task() task_b = Task() exc = get_exchange('name') exc.attach(task_a) exc.attach(task_b) exc.send('msg1') exc.send('msg2') exc.detach(task_a) exc.detach(task_b)
交換中介其實就是一個對象,它保存了活躍的訂閱者集合,並提供關聯、取消關聯以及發送消息的方法。
每一個交換中介都由一個名稱來標識,get_exchange()函數簡單地返回同給定的名稱相關聯的那個Exchange對象。
消息會先傳遞到一箇中介,再由中介將消息傳遞給相關聯的訂閱者。
交換中介具備將消息廣播發送給多個訂閱者的能力,能夠實現帶有冗餘任務、廣播或者扇出的系統。
也能夠構建調式以及診斷工具,將它們做爲普通的訂閱者關聯到交換中介上。
class DisplayMessages: def __init__(self): self.count = 0 def send(self, msg): self.count += 1 print('msg[{}]: {!r}'.format(self.count, msg)) exc = get_exchange('name') d = DisplayMessages() exc.attach(d)
消息接收者能夠是actor、協程、網絡鏈接,甚至只要實現了合適的send()方法的對象均可以。
關於交換中介,要以適當的方式對訂閱者進行關聯和取消關聯的處理。
這和使用文件、鎖以及相似的資源對象很類似。使用上下文管理協議。
from contextlib import contextmanager from collections import defaultdict class Exchange: def __init__(self): self._subscribers = set() def attach(self, task): self._subscribers.add(task) def detach(self, task): self._subscribers.remove(task) @contextmanager def subscribe(self, *tasks): for task in tasks: self.attach(task) try: yield finally: for task in tasks: self.detach(task) def send(self, msg): for subscriber in self._subscribers: subscriber.send(msg) _exchanges = defaultdict(Exchange) def get_exchange(name): return _exchanges[name] # Example of using the subscribe() method exc = get_exchange('name') with exc.subscribe(task_a, task_b): exc.send('msg1') exc.send('msg2')
12、使用生成器做爲線程的替代方案
用生成器做爲系統線程的替代方案來實現併發。協程有時也稱爲用戶級線程或綠色線程。
yield的基本行爲,即,使得生成器暫停執行。
編寫一個調度器將生成器函數當作一種「任務」來對待,並經過使用某種形式的任務切換來交替執行這寫任務。
def countdown(n): while n > 0: print('T-minus', n) yield n -= 1 print('Blastoff') def countup(n): x = 0 while x < n: print('Counting up', x) yield x += 1 from collections import deque class TaskScheduler: def __init__(self): self._task_queue = deque() def new_task(self, task): self._task_queue.append(task) def run(self): while self._task_queue: task = self._task_queue.popleft() try: next(task) self._task_queue.append(task) except StopIteration: pass sched = TaskScheduler() sched.new_task(countdown(10)) sched.new_task(countup(8)) sched.new_task(countdown(5)) sched.run()
TaskScheduler類以循環的方式運行了一系列的生成器函數,每一個都運行到yield語句就暫停。
生成器函數就是任務,而yield語句就是通知任務須要暫停掛起的信號。
調度器只是簡單地輪流執行全部的任務,直到沒有一個任務還能執行爲止。
使用生成器來實現actor,徹底沒有用到線程:
from collections import deque class ActorScheduler: def __init__(self): self._actors = { } # Mapping of names to actors self._msg_queue = deque() # Message queue def new_actor(self, name, actor): ''' Admit a newly started actor to the scheduler and give it a name ''' self._msg_queue.append((actor,None)) self._actors[name] = actor def send(self, name, msg): ''' Send a message to a named actor ''' actor = self._actors.get(name) if actor: self._msg_queue.append((actor,msg)) def run(self): ''' Run as long as there are pending messages. ''' while self._msg_queue: actor, msg = self._msg_queue.popleft() try: actor.send(msg) except StopIteration: pass # Example use if __name__ == '__main__': def printer(): while True: msg = yield print('Got:', msg) def counter(sched): while True: # Receive the current count n = yield if n == 0: break # Send to the printer task sched.send('printer', n) # Send the next count to the counter task (recursive) sched.send('counter', n-1) sched = ActorScheduler() # Create the initial actors sched.new_actor('printer', printer()) sched.new_actor('counter', counter(sched)) # Send an initial message to the counter to initiate sched.send('counter', 10000) sched.run()
只要有消息須要傳遞,調度器就會運行。這裏有一個值得注意的特性:
counter生成器發送消息給本身並進入一個遞歸循環,但卻並不會受到Python的遞歸限制。
十3、輪詢多個線程隊列
有一組線程隊列,想輪詢這些隊列來獲取數據。這個輪詢一組網絡鏈接來獲取數據相似。
對於輪詢問題,利用隱藏的環回(loopback)網絡鏈接。
針對每一個想要輪詢的隊列(或任何對象),建立一對互聯的socket。而後對其中一個socket執行寫操做,以此來表示數據存在。
另外一個socket就傳遞給select()或者相似的函數來輪詢數據。
import os import socket import queue class PollableQueue(queue.Queue): def __init__(self): super().__init__() if os.name == 'posix': self._putsocket, self._getsocket = socket.socketpair() else: server = socket.socket(socket.AF_INET, socket.SOCK_STREAM) server.bind(('127.0.0.1',0)) server.listen(1) self._putsocket = socket.socket(socket.AF_INET, socket.SOCK_STREAM) self._putsocket.connect(server.getsockname()) self._getsocket, _ = server.accept() server.close() def fileno(self): return self._getsocket.fileno() def put(self, item): super().put(item) self._putsocket.send(b'x') def get(self): self._getsocket.recv(1) return super().get()
定義了一種新的Queue實例,底層有一對互聯的socket。
在UNIX上用socketpair()函數來創建這樣的socket對是很是容易的。
在Windows上,咱們不得不使用示例中展現的方法來假裝socket對。
首先建立一個服務器socket,以後馬上建立客戶端socket並鏈接到服務器上。
以後對get()和put()方法作重構,在這些socket上執行了少許的I/O操做。
put()方法在將數據放入隊列以後,對其中一個socket寫入了一個字節的數據。
當要把數據從隊列中取出時,get()方法就從另外一個socket中把那個單獨的字節讀出。
定義一個消費者,用來在多個隊列上監視是否有數據到來。
import select import threading def consumer(queues): while True: can_read, _, _ = select.select(queues,[],[]) for r in can_read: item = r.get() print('Got', item) q1 = PollableQueue() q2 = PollableQueue() q3 = PollableQueue() t = threading.Thread(target=consumer,args=([q1,q2,q3],)) t.daemon = True t.start() q1.put(1) q2.put(10) q3.put('Helloooo')
無論把數據放到哪一個隊列中,消費者最後都能接收到全部的數據。
十4、在UNIX上加載守護進程
建立一個合適的守護進程須要以精確的順序調用一系列的系統調用,並當心注意其中的細節。