本系列文章索引《響應式Spring的道法術器》
前情提要 響應式流 | Reactor 3快速上手 | 響應式流規範html
操做符熔合是響應式編程領域比較前沿的研究話題,目的在於經過將多個操做符以某種方式熔合起來,以達到優化的效果,進而下降開銷(好比執行時間,內存)。java
首先介紹一下關於響應式編程庫的分代的概念。直至如今,響應式編程庫及相關概念仍然在不斷的更新和升級。做者根據本身在響應式編程領域的研究經驗,將響應式編程庫分爲四代。git
第零代github
起初的響應式編程工具主要包括相似java.util.Observable
的基於訂閱者模式的API,以及那些基於回調的API,如 Swing/AWT/Android 中的 addXXXListener。可是二者都有些共同的不足:不方便組合(就像咱們前邊的操做鏈那樣)。算法
第一代編程
後來,Erik Meijer 和他在微軟的團隊解決了難以組合的問題,從而誕生了第一代響應式編程庫:2010 年左右的 Rx.NET,2011 年的 Reactive4Java,以及 2013 年早期的 RxJava。緩存
其餘的語言也陸續基於 Rx.NET 的架構開發了相似的庫,但很快你們發現這種架構存在的問題。第一個問題是,最初的 IObservable/IObserver 在純單線程中實現後,若是使用相似 take() 的操做符,以後的序列沒法取消。Rx.NET 經過在諸如 range() 的數據源進行異步,繞開了這個問題。數據結構
第二個問題是,當生產者與消費者之間存在一個異步邊界(不在同一線程)時,若是消費者消費數據的速度不夠快,也會致使問題。這時消費者的代碼會很是繁瑣,這就是咱們前面屢次提到的 backpressure 問題。架構
第二代
RxJava團隊針對上邊的兩個問題設計了一套新的架構。
首先是引入了 Subscriber 類,它能經過 isUnsubscribed() 方法判斷是否取消訂閱,數據源或者操做符發在數據以前都會調用該方法進行檢查。
而後,backpressure 的問題則經過雙方協調的方式解決,利用 Producer 接口,Subscriber 告知上游本身能處理數據的量(request() 方法)。
第三個改進是 lift() 函數,使用它能夠直接在 Subscriber 之間進行函數式的變換。幾乎全部操做符的實現都被重寫,改爲了利用新的 Operator 接口和 lift() 函數。
第三代
後邊的故事本系列文章的讀者就熟悉了。響應式編程的興起使得你們意識到互相之間要兼容。因而來自多個公司的工程師們聚在了一塊兒,設計了一套響應式流(Reactive-Streams)規範,主要成果是 4 個接口,30 條關於這幾個接口的規則,以及這幾個接口裏的 7 個方法。
Reactive-Streams 規範使得響應式編程實現庫之間能夠相互兼容,從而可以隨意切換具體的實現庫。
所以基於響應式流的實現屬於第三代,它的實現包括 RxJava 2.x,Project Reactor 和 Akka-Streams等。
第四代
在響應式流之上實現一套可組合的庫須要徹底不一樣的內部架構,所以 RxJava 2.x 不得不徹底重寫。做者參與了重寫的過程,並發現有些操做符能夠經過某種方式進行合併優化,以節省各類開銷,例如隊列,併發原子操做,以及數據請求等。
通過積極的交流以後,做者及其同事包括其餘相應時庫的做者建立了一個reactive-streams-commons 庫,設計了一套實現上述優化的組件,以後稱之爲操做符熔合。
第四代的響應式編程庫和第三代從外部看起來沒多大區別,但其實內部操做符的實現發生了很大的變化。
Reactor3已是第四代響應式編程庫了。
聊到響應式流的生命週期,就須要再次搬出本章第一節的那個圖:
爲了方便下邊討論操做符熔合的問題,咱們將整個過程分爲三個階段:
.subscribe
方法調用之前,針對每個操做符會建立一個FluxXxx
對象,並經過相似裝飾器模式的方式關聯起來;.subscribe
方法調用後,由最後一下操做符向上遊依次調用subscribe
方法以及向下回調onSubscribe
方法的期間,這時候元素尚未發出;onError
或者onComplete
)終止的階段。不一樣的階段有不一樣的優化方案,下面具體介紹兩種主要的優化方案:宏熔合和微熔合。
宏熔合主要發生在裝配期。經過2.1節本身動手寫了一個簡單的操做符實現,咱們知道對於每一個操做符都會建立一個FluxXxx
即響應的Subscription
對象。所以若是把連續的多個操做符合併爲單個操做符,就能夠優化訂閱時的開銷。這就是宏熔合的主要目標,具體來講有以下幾種方式:
1)操做符替換
好比,有幾個數據源,在對它們進行concat
或merge
操做時,若是數據源只發出一個元素,那麼就不必執行操做了,直接發出這個元素就能夠了;
在好比,咱們使用range
生成的數據源並應用subscribeOn()
時,對於這種單線程生成的數據源,subscribeOn
和publishOn
幾乎沒太大區別,所以能夠替換爲publishOn
,以便引入更多優化。
2)替換爲自定義發佈者
有些組合出現的操做符能夠合併爲一個單獨的操做符。
好比,just().subscribeOn()
或just().flatMap()
這樣的組合,它們帶來的開銷(內部隊列的建立,調度器 worker 的建立和銷燬,多個原子變量的修改)相對於它們發出的數據太高了,尤爲just()
中只有一個元素的狀況下,徹底能夠合併爲一個發佈者發出數據。
3)合併相同的操做符
好比以下例子:
Observable.range(1, 10) .filter(v -> v % 3 == 0) .filter(v -> v % 2 == 0) .map(v -> v + 1) .map(v -> v * v) .subscribe(System.out::println);
filter
和map
分別調用了兩次,這樣代碼會比較清晰,可是若是range比較大,那麼優化其性能開銷帶來的收益就很明顯了。策略就是對於同一類的操做符進行合併:
對於兩個filter()
,會把兩個 lambda 表達式合併起來:
Predicate<Integer> p1 = v -> v % 3 == 0; Predicate<Integer> p2 = v -> v % 2 == 0; Predicate<Integer> p3 = v -> p1.test(v) && p2.test(v);
map()
也能夠進行相似的合併:
Function<Integer, Integer> f1 = v -> v + 1; Function<Integer, Integer> f2 = v -> v * v; Function<Integer, Integer> f3 = v -> f2.apply(f1.apply(v));
微熔合發生在訂閱期,經過多個操做符共用內部資源和數據結構以減小開銷。
微熔合有如下幾種形式:
1)Conditional Subscriber
閱讀FluxFilter
或FluxDistinct
的源碼發現,當咱們使用過濾操做符filter()
或者distinct()
時,若是被丟棄掉,那麼會調用 request(1)。request(1) 會觸發原子遞增操做,或者是 CAS 循環,大量這樣的操做很快就能積累出性能降低。
Conditional Subscriber 的思路是爲 Subscriber 增長一個boolean tryOnNext(T v)
方法(見Fusable.ConditionalSubscriber
),它能夠告知上游本身是否會真的消費這個數據。這樣在數據被丟棄時,可以跳過原子遞增,並繼續發射數據,直到實際發出的數據量達到了請求數。
2)同步熔合
在響應式編程庫內部,許多地方都須要用到隊列以便進行數據緩存。好比,有些擁有輸出隊列的操做符,和那些須要輸入隊列的操做符能夠共用同一個隊列實例,這樣就能夠節省內存分配。
同步熔合就是採用這一方式進行優化的。對於那些操做符的上游必然是同步的情形,它們能夠僞裝本身是一個隊列。
有些用於生成數據源操做符,好比range
,fromIterable
,fromArray
,fromStream
和fromCallable
,它們都是同步的,並且都有隊列的特性。所以它們內部的Subscription
就能夠實現Queue
。而對於會使用隊列的操做符好比observeOn()
,flatMap()
,publish()
和zip()
來講,若是發現上游的Subscription
實現了Queue
接口,那就無需建立本身的隊列了。
3)異步熔合
有些狀況下,數據源也有本身的隊列,會在下游發出請求時從中取出數據併發出。與上邊的狀況相似,這時數據源也能夠實現 Queue 接口,而後讓後續的操做符直接使用,而不用建立本身的隊列。但若是這個操做符也支持同步熔合的話,就須要採用新的協議。
在Reactor中,這種優化經過一個新的接口Fusable.QueuedSubscription
來定義。
操做符熔合是下降響應式數據流開銷的一個有效途徑,能夠在保持API不變的狀況下把開銷下降到接近於常規的 Java Stream 序列(Project Reactor 2.5 M1 下降了 50%+,RxJava 2.x 則下降了 200%+)的水平。
操做符熔合主要是針對相鄰的操做符進行優化,若是針對每一對可能相鄰使用的操做符都進行優化設計,那將是矩陣式的數量級,所以操做符的熔合更可能是一種基於使用狀況的考量,尤爲主要關注於用戶常用到的操做符和操做符組合。
也許能夠爲操做符以及數據序列以某種方式進行建模,在模型上經過圖算法自動發現那些能夠被熔合的操做符,而這又將是另外一個能夠研究的話題了。目前基於操做符熔合的優化仍然處於進行時,關於響應式編程庫仍然有性能潛力能夠挖掘。爲這些大牛們點贊!