(20)操做符熔合——響應式Spring的道法術器

本系列文章索引《響應式Spring的道法術器》
前情提要 響應式流 | Reactor 3快速上手 | 響應式流規範html

2.10 操做符熔合

操做符熔合是響應式編程領域比較前沿的研究話題,目的在於經過將多個操做符以某種方式熔合起來,以達到優化的效果,進而下降開銷(好比執行時間,內存)。java

如下部份內容參考了 Dávid Karnok 的 Operator-fusion(part1part2)。react

2.10.1 分代的概念

首先介紹一下關於響應式編程庫的分代的概念。直至如今,響應式編程庫及相關概念仍然在不斷的更新和升級。做者根據本身在響應式編程領域的研究經驗,將響應式編程庫分爲四代。git

第零代github

起初的響應式編程工具主要包括相似java.util.Observable的基於訂閱者模式的API,以及那些基於回調的API,如 Swing/AWT/Android 中的 addXXXListener。可是二者都有些共同的不足:不方便組合(就像咱們前邊的操做鏈那樣)。算法

第一代編程

後來,Erik Meijer 和他在微軟的團隊解決了難以組合的問題,從而誕生了第一代響應式編程庫:2010 年左右的 Rx.NET,2011 年的 Reactive4Java,以及 2013 年早期的 RxJava。緩存

其餘的語言也陸續基於 Rx.NET 的架構開發了相似的庫,但很快你們發現這種架構存在的問題。第一個問題是,最初的 IObservable/IObserver 在純單線程中實現後,若是使用相似 take() 的操做符,以後的序列沒法取消。Rx.NET 經過在諸如 range() 的數據源進行異步,繞開了這個問題。數據結構

第二個問題是,當生產者與消費者之間存在一個異步邊界(不在同一線程)時,若是消費者消費數據的速度不夠快,也會致使問題。這時消費者的代碼會很是繁瑣,這就是咱們前面屢次提到的 backpressure 問題。架構

第二代

RxJava團隊針對上邊的兩個問題設計了一套新的架構。

首先是引入了 Subscriber 類,它能經過 isUnsubscribed() 方法判斷是否取消訂閱,數據源或者操做符發在數據以前都會調用該方法進行檢查。

而後,backpressure 的問題則經過雙方協調的方式解決,利用 Producer 接口,Subscriber 告知上游本身能處理數據的量(request() 方法)。

第三個改進是 lift() 函數,使用它能夠直接在 Subscriber 之間進行函數式的變換。幾乎全部操做符的實現都被重寫,改爲了利用新的 Operator 接口和 lift() 函數。

第三代

後邊的故事本系列文章的讀者就熟悉了。響應式編程的興起使得你們意識到互相之間要兼容。因而來自多個公司的工程師們聚在了一塊兒,設計了一套響應式流(Reactive-Streams)規範,主要成果是 4 個接口,30 條關於這幾個接口的規則,以及這幾個接口裏的 7 個方法。

Reactive-Streams 規範使得響應式編程實現庫之間能夠相互兼容,從而可以隨意切換具體的實現庫。

所以基於響應式流的實現屬於第三代,它的實現包括 RxJava 2.x,Project Reactor 和 Akka-Streams等。

第四代

在響應式流之上實現一套可組合的庫須要徹底不一樣的內部架構,所以 RxJava 2.x 不得不徹底重寫。做者參與了重寫的過程,並發現有些操做符能夠經過某種方式進行合併優化,以節省各類開銷,例如隊列,併發原子操做,以及數據請求等。

通過積極的交流以後,做者及其同事包括其餘相應時庫的做者建立了一個reactive-streams-commons 庫,設計了一套實現上述優化的組件,以後稱之爲操做符熔合。

第四代的響應式編程庫和第三代從外部看起來沒多大區別,但其實內部操做符的實現發生了很大的變化。

Reactor3已是第四代響應式編程庫了。

2.10.2 響應式流的生命週期

聊到響應式流的生命週期,就須要再次搬出本章第一節的那個圖:

title

爲了方便下邊討論操做符熔合的問題,咱們將整個過程分爲三個階段:

  1. 裝配期,也就是.subscribe方法調用之前,針對每個操做符會建立一個FluxXxx對象,並經過相似裝飾器模式的方式關聯起來;
  2. 訂閱期,這是在.subscribe方法調用後,由最後一下操做符向上遊依次調用subscribe方法以及向下回調onSubscribe方法的期間,這時候元素尚未發出;
  3. 運行期,這是數據生成併發往下游,且以最多一個終止事件(onError或者onComplete)終止的階段。

不一樣的階段有不一樣的優化方案,下面具體介紹兩種主要的優化方案:宏熔合和微熔合。

2.10.3 宏熔合(macro-fusion)

宏熔合主要發生在裝配期。經過2.1節本身動手寫了一個簡單的操做符實現,咱們知道對於每一個操做符都會建立一個FluxXxx即響應的Subscription對象。所以若是把連續的多個操做符合併爲單個操做符,就能夠優化訂閱時的開銷。這就是宏熔合的主要目標,具體來講有以下幾種方式:

1)操做符替換

好比,有幾個數據源,在對它們進行concatmerge操做時,若是數據源只發出一個元素,那麼就不必執行操做了,直接發出這個元素就能夠了;

在好比,咱們使用range生成的數據源並應用subscribeOn()時,對於這種單線程生成的數據源,subscribeOnpublishOn幾乎沒太大區別,所以能夠替換爲publishOn,以便引入更多優化。

2)替換爲自定義發佈者

有些組合出現的操做符能夠合併爲一個單獨的操做符。

好比,just().subscribeOn()just().flatMap()這樣的組合,它們帶來的開銷(內部隊列的建立,調度器 worker 的建立和銷燬,多個原子變量的修改)相對於它們發出的數據太高了,尤爲just()中只有一個元素的狀況下,徹底能夠合併爲一個發佈者發出數據。

3)合併相同的操做符

好比以下例子:

Observable.range(1, 10)
       .filter(v -> v % 3 == 0)
       .filter(v -> v % 2 == 0)
       .map(v -> v + 1)
       .map(v -> v * v)
       .subscribe(System.out::println);

filtermap分別調用了兩次,這樣代碼會比較清晰,可是若是range比較大,那麼優化其性能開銷帶來的收益就很明顯了。策略就是對於同一類的操做符進行合併:

對於兩個filter(),會把兩個 lambda 表達式合併起來:

Predicate<Integer> p1 = v -> v % 3 == 0;
    Predicate<Integer> p2 = v -> v % 2 == 0;

    Predicate<Integer> p3 = v -> p1.test(v) && p2.test(v);

map()也能夠進行相似的合併:

Function<Integer, Integer> f1 = v -> v + 1;
    Function<Integer, Integer> f2 = v -> v * v;

    Function<Integer, Integer> f3 = v -> f2.apply(f1.apply(v));

2.10.4 微熔合(micro-fusion)

微熔合發生在訂閱期,經過多個操做符共用內部資源和數據結構以減小開銷。

微熔合有如下幾種形式:

1)Conditional Subscriber

閱讀FluxFilterFluxDistinct的源碼發現,當咱們使用過濾操做符filter()或者distinct()時,若是被丟棄掉,那麼會調用 request(1)。request(1) 會觸發原子遞增操做,或者是 CAS 循環,大量這樣的操做很快就能積累出性能降低。

Conditional Subscriber 的思路是爲 Subscriber 增長一個boolean tryOnNext(T v)方法(見Fusable.ConditionalSubscriber),它能夠告知上游本身是否會真的消費這個數據。這樣在數據被丟棄時,可以跳過原子遞增,並繼續發射數據,直到實際發出的數據量達到了請求數。

2)同步熔合

在響應式編程庫內部,許多地方都須要用到隊列以便進行數據緩存。好比,有些擁有輸出隊列的操做符,和那些須要輸入隊列的操做符能夠共用同一個隊列實例,這樣就能夠節省內存分配。

同步熔合就是採用這一方式進行優化的。對於那些操做符的上游必然是同步的情形,它們能夠僞裝本身是一個隊列。

有些用於生成數據源操做符,好比rangefromIterablefromArrayfromStreamfromCallable,它們都是同步的,並且都有隊列的特性。所以它們內部的Subscription就能夠實現Queue。而對於會使用隊列的操做符好比observeOn()flatMap()publish()zip()來講,若是發現上游的Subscription實現了Queue接口,那就無需建立本身的隊列了。

3)異步熔合

有些狀況下,數據源也有本身的隊列,會在下游發出請求時從中取出數據併發出。與上邊的狀況相似,這時數據源也能夠實現 Queue 接口,而後讓後續的操做符直接使用,而不用建立本身的隊列。但若是這個操做符也支持同步熔合的話,就須要採用新的協議。

在Reactor中,這種優化經過一個新的接口Fusable.QueuedSubscription來定義。

2.10.5 最後

操做符熔合是下降響應式數據流開銷的一個有效途徑,能夠在保持API不變的狀況下把開銷下降到接近於常規的 Java Stream 序列(Project Reactor 2.5 M1 下降了 50%+,RxJava 2.x 則下降了 200%+)的水平。

操做符熔合主要是針對相鄰的操做符進行優化,若是針對每一對可能相鄰使用的操做符都進行優化設計,那將是矩陣式的數量級,所以操做符的熔合更可能是一種基於使用狀況的考量,尤爲主要關注於用戶常用到的操做符和操做符組合。

也許能夠爲操做符以及數據序列以某種方式進行建模,在模型上經過圖算法自動發現那些能夠被熔合的操做符,而這又將是另外一個能夠研究的話題了。目前基於操做符熔合的優化仍然處於進行時,關於響應式編程庫仍然有性能潛力能夠挖掘。爲這些大牛們點贊!

相關文章
相關標籤/搜索