參考 Blinker Documentationpython
Blinker 是一個基於Python的強大的信號庫,它既支持簡單的對象到對象通訊,也支持針對多個對象進行組播。Flask的信號機制就是基於它創建的。安全
Blinker的內核雖然小巧,可是功能卻很是強大,它支持如下特性:函數
支持註冊全局命名信號優化
支持匿名信號線程
支持自定義命名信號code
支持與接收者之間的持久鏈接與短暫鏈接對象
經過弱引用實現與接收者之間的自動斷開鏈接內存
支持發送任意大小的數據get
支持收集信號接收者的返回值it
線程安全
信號經過signal()
方法進行建立:
>>> from blinker import signal >>> initialized = signal("initialized") >>> initialized is signal("initialized") True
每次調用signal('name')
都會返回同一個信號對象。所以這裏signal()
方法使用了單例模式。
使用Signal.connect()
方法註冊一個函數,每當觸發信號的時候,就會調用該函數。該函數以觸發信號的對象做爲參數,這個函數其實就是信號訂閱者。
>>> def subscriber(sender): ... print("Got a signal sent by %r" % sender) ... >>> ready = signal('ready') >>> ready.connect(subscriber) <function subscriber at 0x...>
使用Signal.send()
方法通知信號訂閱者。
下面定義類Processor
,在它的go()
方法中觸發前面聲明的ready
信號,send()
方法以self
爲參數,也就是說Processor
的實例是信號的發送者。
>>> class Processor: ... def __init__(self, name): ... self.name = name ... ... def go(self): ... ready = signal('ready') ... ready.send(self) ... print("Processing.") ... complete = signal('complete') ... complete.send(self) ... ... def __repr__(self): ... return '<Processor %s>' % self.name ... >>> processor_a = Processor('a') >>> processor_a.go() Got a signal sent by <Processor a> Processing.
注意到go()
方法中的complete
信號沒?並無訂閱者訂閱該信號,可是依然能夠觸發該信號。若是沒有任何訂閱者的信號,結果是什麼信號也不會發送,並且Blinker
內部對這種狀況進行了優化,以儘量的減小內存開銷。
默認狀況下,任意發佈者觸發信號,都會通知訂閱者。能夠給Signal.connect()
傳遞一個可選的參數,以便限制訂閱者只能訂閱特定發送者。
>>> def b_subscriber(sender): ... print("Caught signal from processor_b.") ... assert sender.name == 'b' ... >>> processor_b = Processor('b') >>> ready.connect(b_subscriber, sender=processor_b) <function b_subscriber at 0x...>
如今訂閱者只訂閱了processor_b
發佈的ready
信號:
>>> processor_a.go() Got a signal sent by <Processor a> Processing. >>> processor_b.go() Got a signal sent by <Processor b> Caught signal from processor_b. Processing.
能夠給send()
方法傳遞額外的關鍵字參數,這些參數會傳遞給訂閱者。
>>> send_data = signal('send-data') >>> @send_data.connect ... def receive_data(sender, **kw): ... print("Caught signal from %r, data %r" % (sender, kw)) ... return 'received!' ... >>> result = send_data.send('anonymous', abc=123) Caught signal from 'anonymous', data {'abc': 123}
send()
方法的返回值收集每一個訂閱者的返回值,拼接成一個元組組成的列表。每一個元組的組成爲(receiver function, return value)。
前面咱們建立的信號都是命名信號,每次調用Signal
構造器都會建立一個惟一的信號,,也就是說每次建立的信號是不同的。下面對前面的Processor
類進行改造,將signal
做爲它的類屬性。
>>> from blinker import Signal >>> class AltProcessor: ... on_ready = Signal() ... on_complete = Signal() ... ... def __init__(self, name): ... self.name = name ... ... def go(self): ... self.on_ready.send(self) ... print("Alternate processing.") ... self.on_complete.send(self) ... ... def __repr__(self): ... return '<AltProcessor %s>' % self.name ...
上面建立的就是匿名信號。on_ready與on_complete是兩個不一樣的信號。
除了使用connect()
方法訂閱信號以外,使用@connect
修飾器能夠達到一樣的效果。
>>> apc = AltProcessor('c') >>> @apc.on_complete.connect ... def completed(sender): ... print "AltProcessor %s completed!" % sender.name ... >>> apc.go() Alternate processing. AltProcessor c completed!
儘管這樣用起來很方便,可是這種形式不支持訂閱指定的發送者。這時,能夠使用connect_via()
:
>>> dice_roll = signal('dice_roll') >>> @dice_roll.connect_via(1) ... @dice_roll.connect_via(3) ... @dice_roll.connect_via(5) ... def odd_subscriber(sender): ... print("Observed dice roll %r." % sender) ... >>> result = dice_roll.send(3) Observed dice roll 3.
信號一般會進行優化,以便快速的發送。無論有沒有訂閱者,均可以發送信號。若是發送信號時須要傳送的參數要計算很長時間,能夠在發送以前使用receivers
屬性先檢查一下是否有訂閱者。
>>> bool(signal('ready').receivers) True >>> bool(signal('complete').receivers) False >>> bool(AltProcessor.on_complete.receivers) True
還能夠檢查訂閱者是否訂閱了某個具體的信號發佈者。
>>> signal('ready').has_receivers_for(processor_a) True