(18)Hot vs Cold——響應式Spring的道法術器

本系列文章索引《響應式Spring的道法術器》
前情提要 響應式流 | Reactor 3快速上手 | 響應式流規範
本文測試源碼java

2.8 Hot vs Cold

到目前爲止,咱們討論的發佈者,不管是Flux仍是Mono,都有一個特色:訂閱前什麼都不會發生。當咱們「建立」了一個Flux的時候,咱們只是「聲明」/「組裝」了它,可是若是不調用.subscribe來訂閱它,它就不會開始發出元素。react

可是咱們對「數據流」(尤爲是乍聽到這個詞的時候)會有種自然的感受,就是不管有沒有訂閱者,它始終在按照本身的步伐發出數據。就像假設一我的沒有一個粉絲,他也能夠發微博同樣。git

以上這兩種數據流分別稱爲「冷」序列和「熱」序列。因此咱們一直在介紹的Reactor3的發佈者就屬於「冷」的發佈者。不過有少數的例外,好比just生成的就是一個「熱」序列,它直接在組裝期就拿到數據,若是以後有誰訂閱它,就從新發送數據給訂閱者。Reactor 中多數其餘的「熱」發佈者是擴展自Processor 的(下節會介紹到)。github

下面咱們經過對比了解一下兩種不一樣的發佈者的效果,首先是咱們熟悉的「冷」發佈者:緩存

@Test
    public void testCodeSequence() {
        Flux<String> source = Flux.fromIterable(Arrays.asList("blue", "green", "orange", "purple"))
                .map(String::toUpperCase);

        source.subscribe(d -> System.out.println("Subscriber 1: "+d));
        System.out.println();
        source.subscribe(d -> System.out.println("Subscriber 2: "+d));
    }

咱們對發佈者source進行了兩次訂閱,每次訂閱都致使它把數據流重新發一遍:ide

Subscriber 1: BLUE
Subscriber 1: GREEN
Subscriber 1: ORANGE
Subscriber 1: PURPLE

Subscriber 2: BLUE
Subscriber 2: GREEN
Subscriber 2: ORANGE
Subscriber 2: PURPLE

而後再看一個「熱」發佈者的例子:測試

@Test
    public void testHotSequence() {
        UnicastProcessor<String> hotSource = UnicastProcessor.create();

        Flux<String> hotFlux = hotSource.publish()
                .autoConnect()
                .map(String::toUpperCase);

        hotFlux.subscribe(d -> System.out.println("Subscriber 1 to Hot Source: "+d));

        hotSource.onNext("blue");
        hotSource.onNext("green");

        hotFlux.subscribe(d -> System.out.println("Subscriber 2 to Hot Source: "+d));

        hotSource.onNext("orange");
        hotSource.onNext("purple");
        hotSource.onComplete();
    }

這個熱發佈者是一個UnicastProcessor,咱們可使用它的onNext等方法手動發出元素。上邊的例子中,hotSource發出兩個元素後第二個訂閱者纔開始訂閱,因此第二個訂閱者只能收到以後的元素:code

Subscriber 1 to Hot Source: BLUE
Subscriber 1 to Hot Source: GREEN
Subscriber 1 to Hot Source: ORANGE
Subscriber 2 to Hot Source: ORANGE
Subscriber 1 to Hot Source: PURPLE
Subscriber 2 to Hot Source: PURPLE

因而可知,UnicastProcessor是一個熱發佈者。blog

有時候,你不只想要在某一個訂閱者訂閱以後纔開始發出數據,可能還但願在多個訂閱者「到齊」以後 纔開始。ConnectableFlux的用意便在於此。Flux API 中有兩種經常使用的返回ConnectableFlux 的方式:publishreplay索引

  1. publish會嘗試知足各個不一樣訂閱者的需求(也就是回壓),並綜合這些請求反饋給源。假設有某個訂閱者的需求爲 0,發佈者會暫停向全部訂閱者發出元素。
  2. replay將對第一個訂閱後產生的數據進行緩存,最多緩存數量取決於配置(時間/緩存大小)。 它會對後續接入的訂閱者從新發送數據。

ConnectableFlux提供了多種對訂閱的管理方式。包括:

  • connect,當有足夠的訂閱接入後,能夠對 flux 手動執行一次。它會觸發對上游源的訂閱。
  • autoConnect(n)connect相似,不過是在有 n 個訂閱的時候自動觸發。
  • refCount(n)不只可以在訂閱者接入的時候自動觸發,還會檢測訂閱者的取消動做。若是訂閱者所有取消訂閱,則會將源「斷開鏈接」,再有新的訂閱者接入的時候纔會繼續「連上」發佈者。refCount(int, Duration)增長了一個倒計時:一旦訂閱者數量過低了,它會等待 Duration 參數指定的時間,若是沒有新的訂閱者接入纔會與源斷開鏈接。

1)connect的例子

@Test
    public void testConnectableFlux1() throws InterruptedException {
        Flux<Integer> source = Flux.range(1, 3)
                .doOnSubscribe(s -> System.out.println("上游收到訂閱"));

        ConnectableFlux<Integer> co = source.publish();

        co.subscribe(System.out::println, e -> {}, () -> {});
        co.subscribe(System.out::println, e -> {}, () -> {});

        System.out.println("訂閱者完成訂閱操做");
        Thread.sleep(500);
        System.out.println("尚未鏈接上");

        co.connect();
    }

輸出以下:

訂閱者完成訂閱操做
尚未鏈接上
上游收到訂閱
1
1
2
2
3
3

可見當connect的時候,上游才真正收到訂閱請求。

2)autoConnect的例子

@Test
    public void testConnectableFluxAutoConnect() throws InterruptedException {
        Flux<Integer> source = Flux.range(1, 3)
                .doOnSubscribe(s -> System.out.println("上游收到訂閱"));

        // 須要兩個訂閱者才自動鏈接
        Flux<Integer> autoCo = source.publish().autoConnect(2);

        autoCo.subscribe(System.out::println, e -> {}, () -> {});
        System.out.println("第一個訂閱者完成訂閱操做");
        Thread.sleep(500);
        System.out.println("第二個訂閱者完成訂閱操做");
        autoCo.subscribe(System.out::println, e -> {}, () -> {});
    }

輸出以下:

第一個訂閱者完成訂閱操做
第二個訂閱者完成訂閱操做
上游收到訂閱
1
1
2
2
3
3

可見,只有兩個訂閱者都完成訂閱以後,上游才收到訂閱請求,並開始發出數據。

3)refCononect的例子

@Test
    public void testConnectableFluxRefConnect() throws InterruptedException {

        Flux<Long> source = Flux.interval(Duration.ofMillis(500))
                .doOnSubscribe(s -> System.out.println("上游收到訂閱"))
                .doOnCancel(() -> System.out.println("上游發佈者斷開鏈接"));

        Flux<Long> refCounted = source.publish().refCount(2, Duration.ofSeconds(2));

        System.out.println("第一個訂閱者訂閱");
        Disposable sub1 = refCounted.subscribe(l -> System.out.println("sub1: " + l));

        TimeUnit.SECONDS.sleep(1);
        System.out.println("第二個訂閱者訂閱");
        Disposable sub2 = refCounted.subscribe(l -> System.out.println("sub2: " + l));

        TimeUnit.SECONDS.sleep(1);
        System.out.println("第一個訂閱者取消訂閱");
        sub1.dispose();

        TimeUnit.SECONDS.sleep(1);
        System.out.println("第二個訂閱者取消訂閱");
        sub2.dispose();

        TimeUnit.SECONDS.sleep(1);
        System.out.println("第三個訂閱者訂閱");
        Disposable sub3 = refCounted.subscribe(l -> System.out.println("sub3: " + l));

        TimeUnit.SECONDS.sleep(1);
        System.out.println("第三個訂閱者取消訂閱");
        sub3.dispose();

        TimeUnit.SECONDS.sleep(3);
        System.out.println("第四個訂閱者訂閱");
        Disposable sub4 = refCounted.subscribe(l -> System.out.println("sub4: " + l));
        TimeUnit.SECONDS.sleep(1);
        System.out.println("第五個訂閱者訂閱");
        Disposable sub5 = refCounted.subscribe(l -> System.out.println("sub5: " + l));
        TimeUnit.SECONDS.sleep(2);
    }

輸出以下:

第一個訂閱者訂閱
第二個訂閱者訂閱
上游收到訂閱
sub1: 0
sub2: 0
第一個訂閱者取消訂閱
sub1: 1
sub2: 1
sub2: 2
第二個訂閱者取消訂閱
sub2: 3
第三個訂閱者訂閱
sub3: 6
sub3: 7
第三個訂閱者取消訂閱
上游發佈者斷開鏈接
第四個訂閱者訂閱
第五個訂閱者訂閱
上游收到訂閱
sub4: 0
sub5: 0
sub4: 1
sub5: 1
sub4: 2
sub5: 2
sub4: 3
sub5: 3

本例中,refCount設置爲最少兩個訂閱者接入是纔開始發出數據,當全部訂閱者都取消時,若是不能在兩秒內接入新的訂閱者,則上游會斷開鏈接。

上邊的例子中,隨着前兩個訂閱者相繼取消訂閱,第三個訂閱者及時(在2秒內)開始訂閱,因此上游會繼續發出數據,並且根據輸出能夠看出是「熱序列」。

當第三個訂閱者取消後,第四個訂閱者沒能及時開始訂閱,因此上游發佈者斷開鏈接。當第五個訂閱者訂閱以後,第四和第五個訂閱者至關於開始了新一輪的訂閱。

相關文章
相關標籤/搜索