本系列文章索引《響應式Spring的道法術器》
前情提要 響應式流 | Reactor 3快速上手 | 響應式流規範
本文測試源碼java
到目前爲止,咱們討論的發佈者,不管是Flux仍是Mono,都有一個特色:訂閱前什麼都不會發生。當咱們「建立」了一個Flux的時候,咱們只是「聲明」/「組裝」了它,可是若是不調用.subscribe
來訂閱它,它就不會開始發出元素。react
可是咱們對「數據流」(尤爲是乍聽到這個詞的時候)會有種自然的感受,就是不管有沒有訂閱者,它始終在按照本身的步伐發出數據。就像假設一我的沒有一個粉絲,他也能夠發微博同樣。git
以上這兩種數據流分別稱爲「冷」序列和「熱」序列。因此咱們一直在介紹的Reactor3的發佈者就屬於「冷」的發佈者。不過有少數的例外,好比just
生成的就是一個「熱」序列,它直接在組裝期就拿到數據,若是以後有誰訂閱它,就從新發送數據給訂閱者。Reactor 中多數其餘的「熱」發佈者是擴展自Processor
的(下節會介紹到)。github
下面咱們經過對比了解一下兩種不一樣的發佈者的效果,首先是咱們熟悉的「冷」發佈者:緩存
@Test public void testCodeSequence() { Flux<String> source = Flux.fromIterable(Arrays.asList("blue", "green", "orange", "purple")) .map(String::toUpperCase); source.subscribe(d -> System.out.println("Subscriber 1: "+d)); System.out.println(); source.subscribe(d -> System.out.println("Subscriber 2: "+d)); }
咱們對發佈者source
進行了兩次訂閱,每次訂閱都致使它把數據流重新發一遍:ide
Subscriber 1: BLUE Subscriber 1: GREEN Subscriber 1: ORANGE Subscriber 1: PURPLE Subscriber 2: BLUE Subscriber 2: GREEN Subscriber 2: ORANGE Subscriber 2: PURPLE
而後再看一個「熱」發佈者的例子:測試
@Test public void testHotSequence() { UnicastProcessor<String> hotSource = UnicastProcessor.create(); Flux<String> hotFlux = hotSource.publish() .autoConnect() .map(String::toUpperCase); hotFlux.subscribe(d -> System.out.println("Subscriber 1 to Hot Source: "+d)); hotSource.onNext("blue"); hotSource.onNext("green"); hotFlux.subscribe(d -> System.out.println("Subscriber 2 to Hot Source: "+d)); hotSource.onNext("orange"); hotSource.onNext("purple"); hotSource.onComplete(); }
這個熱發佈者是一個UnicastProcessor
,咱們可使用它的onNext
等方法手動發出元素。上邊的例子中,hotSource
發出兩個元素後第二個訂閱者纔開始訂閱,因此第二個訂閱者只能收到以後的元素:code
Subscriber 1 to Hot Source: BLUE Subscriber 1 to Hot Source: GREEN Subscriber 1 to Hot Source: ORANGE Subscriber 2 to Hot Source: ORANGE Subscriber 1 to Hot Source: PURPLE Subscriber 2 to Hot Source: PURPLE
因而可知,UnicastProcessor
是一個熱發佈者。blog
有時候,你不只想要在某一個訂閱者訂閱以後纔開始發出數據,可能還但願在多個訂閱者「到齊」以後 纔開始。ConnectableFlux
的用意便在於此。Flux API 中有兩種經常使用的返回ConnectableFlux
的方式:publish
和replay
。索引
publish
會嘗試知足各個不一樣訂閱者的需求(也就是回壓),並綜合這些請求反饋給源。假設有某個訂閱者的需求爲 0,發佈者會暫停向全部訂閱者發出元素。replay
將對第一個訂閱後產生的數據進行緩存,最多緩存數量取決於配置(時間/緩存大小)。 它會對後續接入的訂閱者從新發送數據。ConnectableFlux
提供了多種對訂閱的管理方式。包括:
connect
,當有足夠的訂閱接入後,能夠對 flux 手動執行一次。它會觸發對上游源的訂閱。autoConnect(n)
與connect
相似,不過是在有 n 個訂閱的時候自動觸發。refCount(n)
不只可以在訂閱者接入的時候自動觸發,還會檢測訂閱者的取消動做。若是訂閱者所有取消訂閱,則會將源「斷開鏈接」,再有新的訂閱者接入的時候纔會繼續「連上」發佈者。refCount(int, Duration)
增長了一個倒計時:一旦訂閱者數量過低了,它會等待 Duration 參數指定的時間,若是沒有新的訂閱者接入纔會與源斷開鏈接。1)connect的例子
@Test public void testConnectableFlux1() throws InterruptedException { Flux<Integer> source = Flux.range(1, 3) .doOnSubscribe(s -> System.out.println("上游收到訂閱")); ConnectableFlux<Integer> co = source.publish(); co.subscribe(System.out::println, e -> {}, () -> {}); co.subscribe(System.out::println, e -> {}, () -> {}); System.out.println("訂閱者完成訂閱操做"); Thread.sleep(500); System.out.println("尚未鏈接上"); co.connect(); }
輸出以下:
訂閱者完成訂閱操做 尚未鏈接上 上游收到訂閱 1 1 2 2 3 3
可見當connect
的時候,上游才真正收到訂閱請求。
2)autoConnect的例子
@Test public void testConnectableFluxAutoConnect() throws InterruptedException { Flux<Integer> source = Flux.range(1, 3) .doOnSubscribe(s -> System.out.println("上游收到訂閱")); // 須要兩個訂閱者才自動鏈接 Flux<Integer> autoCo = source.publish().autoConnect(2); autoCo.subscribe(System.out::println, e -> {}, () -> {}); System.out.println("第一個訂閱者完成訂閱操做"); Thread.sleep(500); System.out.println("第二個訂閱者完成訂閱操做"); autoCo.subscribe(System.out::println, e -> {}, () -> {}); }
輸出以下:
第一個訂閱者完成訂閱操做 第二個訂閱者完成訂閱操做 上游收到訂閱 1 1 2 2 3 3
可見,只有兩個訂閱者都完成訂閱以後,上游才收到訂閱請求,並開始發出數據。
3)refCononect的例子
@Test public void testConnectableFluxRefConnect() throws InterruptedException { Flux<Long> source = Flux.interval(Duration.ofMillis(500)) .doOnSubscribe(s -> System.out.println("上游收到訂閱")) .doOnCancel(() -> System.out.println("上游發佈者斷開鏈接")); Flux<Long> refCounted = source.publish().refCount(2, Duration.ofSeconds(2)); System.out.println("第一個訂閱者訂閱"); Disposable sub1 = refCounted.subscribe(l -> System.out.println("sub1: " + l)); TimeUnit.SECONDS.sleep(1); System.out.println("第二個訂閱者訂閱"); Disposable sub2 = refCounted.subscribe(l -> System.out.println("sub2: " + l)); TimeUnit.SECONDS.sleep(1); System.out.println("第一個訂閱者取消訂閱"); sub1.dispose(); TimeUnit.SECONDS.sleep(1); System.out.println("第二個訂閱者取消訂閱"); sub2.dispose(); TimeUnit.SECONDS.sleep(1); System.out.println("第三個訂閱者訂閱"); Disposable sub3 = refCounted.subscribe(l -> System.out.println("sub3: " + l)); TimeUnit.SECONDS.sleep(1); System.out.println("第三個訂閱者取消訂閱"); sub3.dispose(); TimeUnit.SECONDS.sleep(3); System.out.println("第四個訂閱者訂閱"); Disposable sub4 = refCounted.subscribe(l -> System.out.println("sub4: " + l)); TimeUnit.SECONDS.sleep(1); System.out.println("第五個訂閱者訂閱"); Disposable sub5 = refCounted.subscribe(l -> System.out.println("sub5: " + l)); TimeUnit.SECONDS.sleep(2); }
輸出以下:
第一個訂閱者訂閱 第二個訂閱者訂閱 上游收到訂閱 sub1: 0 sub2: 0 第一個訂閱者取消訂閱 sub1: 1 sub2: 1 sub2: 2 第二個訂閱者取消訂閱 sub2: 3 第三個訂閱者訂閱 sub3: 6 sub3: 7 第三個訂閱者取消訂閱 上游發佈者斷開鏈接 第四個訂閱者訂閱 第五個訂閱者訂閱 上游收到訂閱 sub4: 0 sub5: 0 sub4: 1 sub5: 1 sub4: 2 sub5: 2 sub4: 3 sub5: 3
本例中,refCount設置爲最少兩個訂閱者接入是纔開始發出數據,當全部訂閱者都取消時,若是不能在兩秒內接入新的訂閱者,則上游會斷開鏈接。
上邊的例子中,隨着前兩個訂閱者相繼取消訂閱,第三個訂閱者及時(在2秒內)開始訂閱,因此上游會繼續發出數據,並且根據輸出能夠看出是「熱序列」。
當第三個訂閱者取消後,第四個訂閱者沒能及時開始訂閱,因此上游發佈者斷開鏈接。當第五個訂閱者訂閱以後,第四和第五個訂閱者至關於開始了新一輪的訂閱。