本系列文章索引《響應式Spring的道法術器》
前情提要 響應式流 | Reactor 3快速上手 | 響應式流規範java
Processor
既是一種特別的發佈者(Publisher
)又是一種訂閱者(Subscriber
)。 因此你可以訂閱一個Processor
,也能夠調用它們提供的方法來手動插入數據到序列,或終止序列。緩存
前面一直在聊響應式流的四個接口中的三個:Publisher
、Subscriber
、Subscription
,惟獨Processor
遲遲沒有說起。緣由在於想用好它們不太容易,多數狀況下,咱們應該進行避免使用Processor
,一般來講僅用於一些特殊場景。安全
比起直接使用Processor,更好的方式是經過調用sink()
來獲得它的Sink。這個Sink是線程安全的,能夠用於在應用程序中多線程併發地生成數據。例如,經過UnicastProcessor
獲得一個線程安全的 sink:數據結構
UnicastProcessor<Integer> processor = UnicastProcessor.create(); FluxSink<Integer> sink = processor.sink(overflowStrategy);
多個線程能夠併發地經過下邊的方法生成數據到sink。多線程
sink.next(n);
看到這裏是否是感受跟generate
生成數據流的方式很像?因此Reactor官方建議,當你想要使用Processor的時候,首先看看可否用generate
實現一樣的功能,或者看看是否有相應的操做符能夠達到你想要的效果。併發
Reactor Core 內置多種 Processor。這些 processor 具備不一樣的語法,大概分爲三類。 異步
異步的 processor 在實例化的時候最複雜,由於有許多不一樣的選項。所以它們暴露出一個 Builder 接口。 而簡單的 processors 有靜態的工廠方法。async
1)DirectProcessoride
DirectProcessor
能夠將信號分發給零到多個訂閱者(Subscriber)。它是最容易實例化的,使用靜態方法 create() 便可。另外一方面,它的不足是沒法處理背壓。因此,當DirectProcessor
推送的是 N 個元素,而至少有一個訂閱者的請求個數少於 N 的時候,就會發出一個IllegalStateException
。ui
一旦 Processor 結束(一般經過調用它的 Sink 的 error(Throwable) 或 complete() 方法), 雖然它容許更多的訂閱者訂閱它,可是會當即向它們從新發送終止信號。
2)UnicastProcessor
UnicastProcessor
可使用一個內置的緩存來處理背壓。代價就是它最多隻能有一個訂閱者(上一節的例子經過publish
轉換成了ConnectableFlux
,因此能夠接入兩個訂閱者)。
UnicastProcessor
有多種選項,所以提供多種不一樣的create
靜態方法。例如,它默認是 無限的(unbounded) :若是你在在訂閱者尚未請求數據的狀況下讓它推送數據,它會緩存全部數據。
能夠經過提供一個自定義的 Queue 的具體實現傳遞給 create 工廠方法來改變默認行爲。若是給出的隊列是有限的(bounded), 而且緩存已滿,並且未收到下游的請求,processor 會拒絕推送數據。
在上邊「有限的」例子中,還能夠在構造 processor 的時候提供一個回調方法,這個回調方法能夠在每個 被拒絕推送的元素上調用,從而讓開發者有機會清理這些元素。
3)EmitterProcessor
EmitterProcessor
可以向多個訂閱者發送數據,而且能夠對每個訂閱者進行背壓處理。它自己也能夠訂閱一個發佈者並同步得到數據。
最初若是沒有訂閱者,它仍然容許推送一些數據到緩存,緩存大小由bufferSize
定義。 以後若是仍然沒有訂閱者訂閱它並消費數據,對onNext
的調用會阻塞,直到有訂閱者接入 (這時只能併發地訂閱了)。
所以第一個訂閱者會收到最多bufferSize
個元素。然而以後,後續接入的訂閱者只能獲取到它們開始訂閱以後推送的數據。這個內部的緩存會繼續用於背壓的目的。
默認狀況下,若是全部的訂閱者都取消了訂閱,它會清空內部緩存,而且再也不接受更多的訂閱者。這一點能夠經過 create 靜態工廠方法的 autoCancel 參數來配置。
4)ReplayProcessor
ReplayProcessor
會緩存直接經過自身的 Sink 推送的元素,以及來自上游發佈者的元素, 而且後來的訂閱者也會收到重發(replay)的這些元素。
能夠經過多種配置方式建立它:
5)TopicProcessor
TopicProcessor
是一個異步的 processor,它可以重發來自多個上游發佈者的元素, 這須要在建立它的時候配置shared
(build() 的 share(boolean) 配置)。
若是你企圖在併發環境下經過併發的上游發佈者調用
TopicProcessor
的onNext
、onComplete
,或onError
方法,就必須配置shared
。不然,併發調用就是非法的,從而 processor 是徹底兼容響應式流規範的。
TopicProcessor
可以對多個訂閱者發送數據。它經過對每個訂閱者關聯一個線程來實現這一點, 這個線程會一直執行直到 processor 發出onError
或onComplete
信號,或關聯的訂閱者被取消。 最多能夠接受的訂閱者個數由構造者方法executor
指定,經過提供一個有限線程數的 ExecutorService
來限制這一個數。
這個 processor 基於一個RingBuffer
數據結構來存儲已發送的數據。每個訂閱者線程 自行管理其相關的數據在RingBuffer
中的索引。
這個 processor 也有一個autoCancel
構造器方法:若是設置爲true
(默認的),那麼當 全部的訂閱者取消以後,上游發佈者也就被取消了。
6)WorkQueueProcessor
WorkQueueProcessor
也是一個異步的 processor,也可以重發來自多個上游發佈者的元素, 一樣在建立時須要配置shared
(它多數構造器配置與TopicProcessor
相同)。
它放鬆了對響應式流規範的兼容,可是好處就在於相對於TopicProcessor
來講須要更少的資源。 它仍然基於RingBuffer
,可是再也不要求每個訂閱者都關聯一個線程,所以相對於TopicProcessor
來講更具擴展性。
代價在於分發模式有些區別:來自訂閱者的請求會彙總在一塊兒,而且這個 processor 每次只對一個 訂閱者發送數據,所以須要循環(round-robin)對訂閱者發送數據,而不是一次所有發出的模式(沒法保證徹底公平的循環分發)。
WorkQueueProcessor
多數構造器方法與TopicProcessor
相同,好比autoCancel
、share
, 以及waitStrategy
。下游訂閱者的最大數目一樣由構造器executor
配置的ExecutorService
決定。
注意:最好不要有太多訂閱者訂閱
WorkQueueProcessor
,由於這會鎖住 processor。若是你須要限制訂閱者數量,最好使用一個ThreadPoolExecutor
或ForkJoinPool
。這個 processor 可以檢測到(線程池)容量並在訂閱者過多時拋出異常。
本文的介紹並未給出示例,在下一章咱們編寫「響應式Netty」的時候會介紹到Processor的使用。