python blinker庫學習

參考 Blinker Documentationpython

Blinker 是一個基於Python的強大的信號庫,它既支持簡單的對象到對象通訊,也支持針對多個對象進行組播。Flask的信號機制就是基於它創建的。安全

Blinker的內核雖然小巧,可是功能卻很是強大,它支持如下特性:函數

  • 支持註冊全局命名信號優化

  • 支持匿名信號線程

  • 支持自定義命名信號code

  • 支持與接收者之間的持久鏈接與短暫鏈接對象

  • 經過弱引用實現與接收者之間的自動斷開鏈接內存

  • 支持發送任意大小的數據get

  • 支持收集信號接收者的返回值it

  • 線程安全

建立信號

信號經過signal()方法進行建立:

>>> from blinker import signal
>>> initialized = signal("initialized")
>>> initialized is signal("initialized")
True

每次調用signal('name')都會返回同一個信號對象。所以這裏signal()方法使用了單例模式。

訂閱信號

使用Signal.connect()方法註冊一個函數,每當觸發信號的時候,就會調用該函數。該函數以觸發信號的對象做爲參數,這個函數其實就是信號訂閱者。

>>> def subscriber(sender):
...     print("Got a signal sent by %r" % sender)
...
>>> ready = signal('ready')
>>> ready.connect(subscriber)
<function subscriber at 0x...>

觸發信號

使用Signal.send()方法通知信號訂閱者。

下面定義類Processor,在它的go()方法中觸發前面聲明的ready信號,send()方法以self爲參數,也就是說Processor的實例是信號的發送者。

>>> class Processor:
...    def __init__(self, name):
...        self.name = name
...
...    def go(self):
...        ready = signal('ready')
...        ready.send(self)
...        print("Processing.")
...        complete = signal('complete')
...        complete.send(self)
...
...    def __repr__(self):
...        return '<Processor %s>' % self.name
...
>>> processor_a = Processor('a')
>>> processor_a.go()
Got a signal sent by <Processor a>
Processing.

注意到go()方法中的complete信號沒?並無訂閱者訂閱該信號,可是依然能夠觸發該信號。若是沒有任何訂閱者的信號,結果是什麼信號也不會發送,並且Blinker內部對這種狀況進行了優化,以儘量的減小內存開銷。

訂閱特定的發佈者

默認狀況下,任意發佈者觸發信號,都會通知訂閱者。能夠給Signal.connect()傳遞一個可選的參數,以便限制訂閱者只能訂閱特定發送者。

>>> def b_subscriber(sender):
...     print("Caught signal from processor_b.")
...     assert sender.name == 'b'
...
>>> processor_b = Processor('b')
>>> ready.connect(b_subscriber, sender=processor_b)
<function b_subscriber at 0x...>

如今訂閱者只訂閱了processor_b發佈的ready信號:

>>> processor_a.go()
Got a signal sent by <Processor a>
Processing.
>>> processor_b.go()
Got a signal sent by <Processor b>
Caught signal from processor_b.
Processing.

經過信號收發數據

能夠給send()方法傳遞額外的關鍵字參數,這些參數會傳遞給訂閱者。

>>> send_data = signal('send-data')
>>> @send_data.connect
... def receive_data(sender, **kw):
...     print("Caught signal from %r, data %r" % (sender, kw))
...     return 'received!'
...
>>> result = send_data.send('anonymous', abc=123)
Caught signal from 'anonymous', data {'abc': 123}

send()方法的返回值收集每一個訂閱者的返回值,拼接成一個元組組成的列表。每一個元組的組成爲(receiver function, return value)。

匿名信號

前面咱們建立的信號都是命名信號,每次調用Signal構造器都會建立一個惟一的信號,,也就是說每次建立的信號是不同的。下面對前面的Processor類進行改造,將signal做爲它的類屬性。

>>> from blinker import Signal
>>> class AltProcessor:
...    on_ready = Signal()
...    on_complete = Signal()
...
...    def __init__(self, name):
...        self.name = name
...
...    def go(self):
...        self.on_ready.send(self)
...        print("Alternate processing.")
...        self.on_complete.send(self)
...
...    def __repr__(self):
...        return '<AltProcessor %s>' % self.name
...

上面建立的就是匿名信號。on_ready與on_complete是兩個不一樣的信號。

使用修飾器訂閱信號

除了使用connect()方法訂閱信號以外,使用@connect修飾器能夠達到一樣的效果。

>>> apc = AltProcessor('c')
>>> @apc.on_complete.connect
... def completed(sender):
...     print "AltProcessor %s completed!" % sender.name
...
>>> apc.go()
Alternate processing.
AltProcessor c completed!

儘管這樣用起來很方便,可是這種形式不支持訂閱指定的發送者。這時,能夠使用connect_via()

>>> dice_roll = signal('dice_roll')
>>> @dice_roll.connect_via(1)
... @dice_roll.connect_via(3)
... @dice_roll.connect_via(5)
... def odd_subscriber(sender):
...     print("Observed dice roll %r." % sender)
...
>>> result = dice_roll.send(3)
Observed dice roll 3.

優化信號發送

信號一般會進行優化,以便快速的發送。無論有沒有訂閱者,均可以發送信號。若是發送信號時須要傳送的參數要計算很長時間,能夠在發送以前使用receivers屬性先檢查一下是否有訂閱者。

>>> bool(signal('ready').receivers)
True
>>> bool(signal('complete').receivers)
False
>>> bool(AltProcessor.on_complete.receivers)
True

還能夠檢查訂閱者是否訂閱了某個具體的信號發佈者。

>>> signal('ready').has_receivers_for(processor_a)
True
相關文章
相關標籤/搜索