[譯] 大話(Summer vs Winter Observable)之我與 Rx Observable[Android RxJava2](這是什麼鬼)第六話

大話(Summer vs Winter Observable)之我與 Rx Observable[Android RxJava2](這是什麼鬼)第六話

哇哦,又是新的一天,是時候來學習一些新的「姿式」了 🙂。前端

嗨,朋友們,但願你們一切都好。這是咱們 RxJava2 Android 系列的第六篇文章【第一話,第二話,第三話,第四話,第五話,第六話第七話第八話 】。在這一篇文章中,咱們將繼續圍繞 Rx 展開對話。還有一件重要的事情是,基本上 Summer vs Winter 意味着 Hot 和 Cold Observale 🙂 。java

我爲啥要寫這個呢:react

緣由和我在 part1 與你分享過的同樣。android

引言:ios

**這篇文章並無引言,由於這實際上是咱們上一篇文章的延續,但在開始以前我想咱們應該進行一下前景回顧。上一篇文章中咱們遇到了一位 Rx Observable 先生。他給了咱們很多關於學習 Rx 的建議,而後他還分享給了咱們一些能夠用來創造 Observable 的方法,最後他打算告訴咱們一些關於 Could 和 Hot Observable 的東西,結果咱們就此打住。git

緊接上一話:github

Observable:其實還有不少。我在這裏介紹兩類 Observable 對象。一種叫作 Cold Observable,第二個是 Hot Observable。有些時候開發者習慣把 Hot 和 Cold Observabels 拿來作比較 :)。 這些真的是很簡單的概念。這裏,我會經過一些簡單的例子來闡述一下概念,而後我會告訴你如何在編碼中使用它們。再以後我想我會給你一些真實案例,你以爲如何?後端

Me:固然,我就在你眼前,這樣你能夠隨時檢查我是否有作錯的地方。bash

Observable: 哈哈哈哈,固然了。那麼有多少人瞭解商場的促銷人員,就是那些站在商店門口但願藉由大聲吆喝來招攬顧客的人呢?dom

Me: 估計沒幾個,不少人都不太瞭解這種盛行於亞洲國家好比巴基斯坦和印度的銷售文化……你能試着採用一些更加通俗的例子嗎,這樣的話每一個人都能更加輕易的理解這個概念。

Observable: 固然,沒問題。有多少人瞭解咖啡和咖啡店呢?

Me: 差很少每一個人吧。

Observable: 很好。如今這裏有兩家咖啡店,一家叫作霜語咖啡店,一家叫作火舞咖啡店。任何一個去霜語咖啡館的人均可以買一杯咖啡,而後坐在咖啡館的任何地方。咖啡廳裏的每一個座位上都提供了一副智能耳機。他們提供了一個有三首詩的播放列表。這些耳機最智能的地方在於,每當有人帶上它們,這些耳機老是從第一首詩開始播放,若是有人中途取下了耳機後再次從新戴上,那麼這些耳機仍然會從新從第一首詩開始播放。對了,若是你只是取下了耳機,那麼它也就會中止播放。

反過來,火舞咖啡館有一套完善的音樂播放系統。當你進入咖啡館的時候,你就會開始聽到他們播放的詩,由於他們有着很是好的音樂播放系統和一個大號的揚聲器。他們的詩歌列表裏有無數首詩,當他們天天開始營業的時候他們就會打開這個系統。因此說這個系統的運行與顧客無關,任何將會進入這家咖啡館的人都能聽到那個時刻正在播放的詩,而且他永遠也不知道他進入以前已經播放完了多少詩了。這跟咱們要講的 Observable 是一個概念。

就像霜語咖啡館的那些耳機,Cold Obervable 老是被動的。就像你用 Observable.fromArray() 或者其餘任何方法來創造 Observable 同樣,他們和那些耳機差很少。如同戴上耳機播放列表纔會播放同樣,當你開始訂閱那些 Observable 後你纔會開始接收到數據。而當訂閱者取消了對 Observable 的訂閱後,如同取下耳機後詩會中止播放同樣,你也將再也不能接收到數據。

最後的重點是霜語咖啡館提供了不少副耳機,可是每副耳機只會在有人戴上它們以後纔會開始播放。即便某我的已經播放到了第二首詩,但另外的某我的才戴上耳機,那麼第二我的會從第一首詩開始播放。這意味着每一個人都有獨立的播放列表。就如同咱們有三個訂閱了 Cold Observable 的訂閱者同樣,它們會獲得各自獨立的數據流,也就是說 Observable 會對每一個訂閱者單獨地去調用三次 onNext 方法。換句話說就是,Cold Observable 如同那些耳機同樣依賴於訂閱者的訂閱(顧客戴上耳機)。

Hot observable 就像火舞咖啡館的音樂系統同樣。一旦咖啡館開始營業,其音樂系統就會開始播放詩歌,無論有沒有人在聽。每位進來的顧客都會從那個時刻正好在播放的詩開始聆聽。這跟 Hot Observable 所作的事情同樣,一旦它們被建立出來就會開始發射數據,任何的訂閱者都會從它們開始訂閱的那個時間點開始接收到數據,而且毫不會接收到以前就發射出去的數據。任何訂閱者都會在訂閱以後才接收到數據。我想我會使用一樣的例子來進行編碼,而且以後我會給一些真實案例。

Cold Observable:

public class HotVsCold {

    public static void main(String[] args) throws InterruptedException {

        List<String > poemsPlayList = Arrays.asList("Poem 1", "Poem 2", "Poem 3");
        Observable coldMusicCoffeCafe = Observable.fromArray(poemsPlayList);

        Consumer client1 = poem-> System.out.println(poem);
        Consumer client2 = poem-> System.out.println(poem);
        Consumer client3 = poem-> System.out.println(poem);
        Consumer client4 = poem-> System.out.println(poem);

        coldMusicCoffeCafe.subscribe(client1);
        coldMusicCoffeCafe.subscribe(client2);
        System.out.println(System.currentTimeMillis());
        Thread.sleep(2000);
        System.out.println(System.currentTimeMillis());
        coldMusicCoffeCafe.subscribe(client3);
        coldMusicCoffeCafe.subscribe(client4);

    }
}
複製代碼

好吧,這是一些很簡單的示例代碼。我有 4 個顧客和 1 個我在霜語咖啡館例子裏提到的播放列表。當前兩個顧客戴上了耳機後,我暫停了 2 秒的程序,而後 3 號和 4 號顧客也戴上了耳機。在最後咱們查看輸出數據時,咱們能輕易地看出每一個顧客都把 3 首詩從頭聽了一遍。

Output:
[Poem 1, Poem 2, Poem 3]
[Poem 1, Poem 2, Poem 3]
1494142518697
1494142520701
[Poem 1, Poem 2, Poem 3]
[Poem 1, Poem 2, Poem 3]
複製代碼

Hot Observable:

public static void main(String[] args) throws InterruptedException {

    Observable<Long> hotMusicCoffeeCafe = Observable.interval(1000, TimeUnit.MILLISECONDS);
    ConnectableObservable<Long> connectableObservable = hotMusicCoffeeCafe.publish();
    connectableObservable.connect(); //  咖啡館開始營業,音樂播放系統開啓

    Consumer client1 = poem-> System.out.println("Client 1 poem"+poem);
    Consumer client2 = poem-> System.out.println("Client 2 poem"+poem);
    Consumer client3 = poem-> System.out.println("Client 3 poem"+poem);
    Consumer client4 = poem-> System.out.println("Client 4 poem"+poem);

    Thread.sleep(2000); // 在2首詩已經播放完畢後第一位顧客才進來,因此他會才第二首詩開始聽
    connectableObservable.subscribe(client1);
    Thread.sleep(1000); // 第二位顧客會從第三首詩開始聽
    connectableObservable.subscribe(client2);

    Thread.sleep(4000); // 第三和第四爲顧客爲從第七首詩開始聽(譯者注:原本是寫的 poem 9)
    connectableObservable.subscribe(client3);
    connectableObservable.subscribe(client4);

    while (true);
}
複製代碼

火舞咖啡館開始營業的時候就會開啓其音樂播放系統。詩歌會在以上代碼裏咱們調用 connect 方法的時候開始播放。暫時先不須要關注 connect 方法,而只是試着理解這個概念。當通過 2 秒暫停,第一個顧客走進了咖啡館後,他會從第二首詩開始聽。下一位顧客會在 1 秒以後進來,而且從第三首詩開始聽。以後,第三和第四位顧客會在 4 秒後進入,而且從第七首詩開始聽。你能夠看到這個音樂播放系統是獨立於顧客的。一旦這個音樂系統開始運行,它並不在意有沒人顧客在聽。也就是說,全部的顧客會在他們進入時聽到當前正在播放的詩,並且他們毫不會聽到以前已經播放過的詩。如今我以爲你已經抓住了 Hot vs Cold Observable 的概念。是時候來瞧一瞧如何建立這些不一樣 Observables 的要點了。

Cold Observable:

  1. 全部的 Observable 默認都是 Cold Obserable。這就是說咱們使用諸如 Observable.create() 或者 Observable.fromArray() 這類的方法所建立出來的 Observable 都是 Cold Observable。
  2. 任何訂閱 Cold Observable 的訂閱者都會接收到獨立的數據流。
  3. 若是沒有訂閱者訂閱,它就什麼事情也不會作。是被動的。

Hot Observable:

  1. 一旦 Hot Observable 被建立了,無論有沒有訂閱者,它們都會開始發送數據。
  2. 相同時間開始訂閱的訂閱者會獲得一樣的數據。

Me: 聽上去不錯。你能告訴我如何將咱們的 Cold Observable 轉換成 Hot Observable 嗎?

Observable: 固然,Cold 和 Hot Observable 之間的轉換很簡單。

List<Integer> integers = new ArrayList<>();
Observable.range(0, 10000)
        .subscribe(count -> integers.add(count));

Observable<List<Integer>> listObservable = Observable.fromArray(integers);
複製代碼

在上面的代碼裏面,listObservable 是一個 Cold Observable。如今來看看咱們怎麼把這個 Cold Observable 轉換成 Hot Observable 的。

Observable<List<Integer>> listObservable = Observable.fromArray(integers);
ConnectableObservable connectableObservable = listObservable.publish();
複製代碼

咱們用 publish() 方法將咱們的 Cold Observable 轉換成了 Hot Observable。因而咱們能夠說任何的 Cold Observable 均可以經過調用 publish() 方法來轉換成 Hot Observable,這個方法會返回給你一個 ConnectableObservable,只是此時尚未開始發射數據。有點神奇啊。當我對任意 Observable 調用 publish() 方法時,這意味着從如今開始任何開始訂閱的訂閱者都會分享一樣的數據流。有趣的一點是,若是如今有任意的訂閱者訂閱了 connectableObservable,它們什麼也得不到。也許大家感到有些疑惑了。這裏有兩件事須要說明。當我調用 publish() 方法時,只是說明如今這個 Observable 作好了能成爲單一數據源來發射數據的準備,爲了真正地發射數據,我須要調用 connect() 方法,以下方代碼所示。

Observable<List<Integer>> listObservable = Observable.fromArray(integers);
ConnectableObservable connectableObservable = listObservable.publish();
connectableObservable.connect();
複製代碼

很簡單對吧。記住調用 publish() 只是會把 Cold Observable 轉換成 Hot Observable,而不會開始發射數據。爲了可以發射數據咱們須要調用 cocnnect()。當我對一個 ConnectableObserbale 調用 connect() 時,數據纔會開始被髮射,無論有沒有訂閱者。這裏還有一些在正式項目裏會很是有用的方法,好比 refCount()、share()、replay()。在開始談及它們以前,我會就此打住並再給你展現一個例子,以確保大家真正抓住了要領。

Me: 好嘞,但願不要太複雜。

Observable: 哈哈哈,不會的。我只是須要再來詳細解釋一下,確保每一個人都把握了這個概念,由於這個概念其實並不算是特別簡單的和容易理解的。

Me: 我也以爲。

Observable:如今我會給你一個例子來讓你更好地來準確把握這個概念。好比咱們有以下的一個 Observable。

Observable<String> just = Observable.just("Hello guys");
複製代碼

還有兩個不一樣的訂閱者訂閱了它。

public class HotVsCold {
    public static void main(String[] args) {
        Observable<String> just = Observable.just("Hello guys");
        just.subscribe(s-> System.out.println(s));
        just.subscribe(s-> System.out.println(s));
    }
}
複製代碼
Output:
Hello guys
Hello guys
複製代碼

個人問題是,這個 Observable 是 Cold 仍是 Hot 的呢。我知道你確定已經知道這個是 cold,由於這裏沒有 publish() 的調用。先暫時把這個想象成我從某個第三方庫得到而來的,因而我也不知道這是哪一種類型的 Observable。如今我打算寫一個例子,這樣不少事情就不言而喻了。

public static void main(String[] args) {
    Random random = new Random();
    Observable<Integer> just = Observable.create(source->source.onNext(random.nextInt()));
    just.subscribe(s-> System.out.println(s));
    just.subscribe(s-> System.out.println(s));
}
複製代碼

我有一段生產隨機數的程序,讓咱們來看下輸出再來討論這是 Cold 仍是 Hot。

Output: 1531768121 607951518

兩個不一樣的值。這就是說這是一個 Cold observable,由於根據 Cold Observable 的定義每次都會獲得一個全新的值。每次它都會建立一個全新的值,或者簡單來講 onNext() 方法會被不一樣的訂閱者分別調用一次。

如今讓咱們來把這個 Cold Observable 轉換成 Hot Observable。

public static void main(String[] args) {
    Random random = new Random();
    Observable<Integer> just = Observable.create(source->source.onNext(random.nextInt()));
    ConnectableObservable<Integer> publish = just.publish();
    publish.subscribe(s-> System.out.println(s));
    publish.subscribe(s-> System.out.println(s));
    publish.connect();
}
複製代碼

在解釋上面的代碼以前,先讓咱們來看一下輸出。

Output:
1926621976
1926621976
複製代碼

咱們的兩個不一樣訂閱者獲得了同一份數據。根據 Hot Observable 老是每份數據只發射一次的定義說明了這是一個 Hot Obsevable,或者簡單來講 onNext() 只被調用了一次。我接下來會解釋 publish() 和 connect() 的調用。

當我調用 publish() 方法時,這意味着個人這個 Observable 已經獨立於訂閱者,而且全部訂閱者只會接收到同一個數據源發射的同一份數據。簡單來講,Hot Observable 將會對全部訂閱者發射調用一次 onNext() 所產生的數據。這裏或許有些讓你感到困惑,我在兩個訂閱者訂閱以後才調用了 connect() 方法。由於我想告訴大家 Hot Observable 是獨立的而且數據的發射應該經過一次對 onNext() 的調用,而且咱們知道 Hot Observable 只會在咱們調用 connect() 以後纔會開始發射數據。因此首先咱們讓兩個訂閱者去訂閱,而後在咱們才調用 connect() 方法,因而咱們就能夠獲得一樣一份數據。如今讓咱們來對這個例子作些小小的改動。

Random random = new Random();
Observable<Integer> just = Observable.create(source->source.onNext(random.nextInt()));
ConnectableObservable<Integer> publish = just.publish();
publish.connect();
publish.subscribe(s-> System.out.println(s));
publish.subscribe(s-> System.out.println(s));
複製代碼

咱們看到這裏只有一處小小的變化。我在調用 connect() 以後才讓訂閱者訂閱。你們來猜猜會輸出什麼?

Output:
Process finished with exit code 0
複製代碼

沒錯,沒有輸出。是否是以爲有點不對勁?聽我慢慢解釋。如你所見,我建立了一個發射隨機數的 Observable,而且它只會調用一次了。經過調用 publish() 我將這個 Cold Observable 轉換成了 Hot Observable,接着我當即調用了 connect() 方法。咱們知道如今它是一個獨立於訂閱者的 Hot Observable,而且它生成了一個隨機數將其發射了出去。在調用 connect() 以後咱們才讓兩個訂閱者訂閱了這個 Observable,兩個訂閱者沒有接收到任何數據的緣由是在它們訂閱以前 Hot Observable 就已經將數據發射了出去。我想你們都能明白的吧。如今讓咱們在 Observable 內部加上日誌打印輸出,這樣咱們就能夠確認這個流程是如同我所解釋的同樣了。

public static void main(String[] args) {
    Random random = new Random();
    Observable<Integer> just = Observable.create(source -> {
                int value = random.nextInt();
                System.out.println("Emitted data: " + value);
                source.onNext(value);
            }
    );
    ConnectableObservable<Integer> publish = just.publish();
    publish.connect();
    publish.subscribe(s -> System.out.println(s));
    publish.subscribe(s -> System.out.println(s));
}
複製代碼
Output:

Emitted data: -690044789

Process finished with exit code 0
複製代碼

如上所示,個人 Hot Observable 在調用 connect() 以後開始發射數據,而後纔是訂閱者發起了訂閱。這就是爲何個人訂閱者沒有獲得數據。讓咱們在繼續深刻以前來複習一下。

  1. 全部的 Observable 默認都是 Cold Obserable。
  2. 經過調用 Publish() 方法咱們能夠將一個 Cold Observable 轉換成 Hot Observable,該方法返回了一個 ConnectableObservable,它如今並不會當即開始發射數據。
  3. 在對 ConnectableObservable 調用 connect() 方法後它纔開始發射數據。

Observable: 小小的暫停一下,在咱們繼續研究 Observable 以前,你若是能將以上的代碼改形成能無限制間隔發射數據的話就太棒了。

Me: 小菜一碟。

public static void main(String[] args) throws InterruptedException {
    Random random = new Random();
    Observable<Integer> just = Observable.create(
            source -> {
                Observable.interval(1000, TimeUnit.MILLISECONDS)
                        .subscribe(aLong -> {
                            int value = random.nextInt();
                            System.out.println("Emitted data: " + value);
                            source.onNext(value);
                        });
            }
    ); // 簡單的把數據源變成了每間隔一秒就發射一次數據。
    ConnectableObservable<Integer> publish = just.publish();
    publish.connect();

    Thread.sleep(2000); // 咱們的訂閱者在 2 秒後纔開始訂閱。
    publish.subscribe(s -> System.out.println(s));
    publish.subscribe(s -> System.out.println(s));

    while (true);

}
複製代碼
Output:

Emitted data: -918083931
Emitted data: 697720136
Emitted data: 416474929
416474929
416474929
Emitted data: -930074666
-930074666
-930074666
Emitted data: 1694552310
1694552310
1694552310
Emitted data: -61106201
-61106201
-61106201
複製代碼

輸出結果如上所示。咱們的 Hot Observable 徹底在按照咱們以前得出的定義在工做。當它開始發射數據的 2 秒時間後,咱們獲得了 2 個不一樣的輸出值,接着咱們讓兩個訂閱者去訂閱它,因而它們獲得了同一份第三個被髮射出來的值。 是時候來更加深刻的來理解這個概念了。在咱們已經對 Cold 和 Hot 有必定概念的基礎上,我將針對一些場景對 Hot Observable 作更詳細的介紹。

場景 1: 我但願任意訂閱者在訂閱以後也能首先接收到其訂閱這個時間點以前的數據,而後纔是同步接收到新發射出來的數據。要解決這個問題,咱們只須要簡單的調用 replay() 方法就行。

public static void main(String[] args) throws InterruptedException {

    Random random = new Random();
    Observable<Integer> just = Observable.create(
            source -> {
                Observable.interval(500, TimeUnit.MILLISECONDS)
                        .subscribe(aLong -> {
                            int value = random.nextInt();
                            System.out.println("Emitted data: " + value);
                            source.onNext(value);
                        });
            }
    );
    ConnectableObservable<Integer> publish = just.replay();
    publish.connect();

    Thread.sleep(2000);
    publish.subscribe(s -> System.out.println("Subscriber 1: "+s));
    publish.subscribe(s -> System.out.println("Subscriber 2: "+s));

    while (true);

}
複製代碼
Output:
**Emitted data: -1320694608**
**Emitted data: -1198449126**
**Emitted data: -1728414877**
**Emitted data: -498499026**
Subscriber 1: -1320694608
Subscriber 1: -1198449126
Subscriber 1: -1728414877
Subscriber 1: -498499026
Subscriber 2: -1320694608
Subscriber 2: -1198449126
Subscriber 2: -1728414877
Subscriber 2: -498499026
**Emitted data: -1096683631**
**Subscriber 1: -1096683631**
**Subscriber 2: -1096683631**
**Emitted data: -268791291**
**Subscriber 1: -268791291**
**Subscriber 2: -268791291**
複製代碼

以上所示,你能輕鬆的理解 Hot Observabel 裏的 replay() 這個方法。我首先建立了一個每隔 0.5 秒發射數據的 Hot Observable,在 2 秒事後咱們才讓兩個訂閱者去訂閱它。此時因爲咱們的 Observable 已經發射出來了 4 個數據,因而你能看到輸出結果裏,咱們的訂閱者首先獲得了在其訂閱這個時間點以前已經被髮射出去的 4 個數據,而後纔開始同步接收到新發射出來的數據。

場景 2: 我但願有一種 Hot Observable 可以在最少有一個訂閱者的狀況下才發射數據,而且若是全部它的訂閱者都取消了訂閱,它就會中止發射數據。 這一樣可以很輕鬆的辦到。

public static void main(String[] args) throws InterruptedException {

    Observable<Long> observable = Observable.interval(500, TimeUnit.MILLISECONDS).publish().refCount();

    Consumer<Long > firstSubscriber = s -> System.out.println("Subscriber 1: "+s);
    Consumer<Long > secondSubscriber = s -> System.out.println("Subscriber 2: "+s);

    Disposable subscribe1 = observable.subscribe(firstSubscriber);
    Disposable subscribe2 = observable.subscribe(secondSubscriber);

    Thread.sleep(2000);
    subscribe1.dispose();
    Thread.sleep(2000);
    subscribe2.dispose();

    Consumer<Long > thirdSubscriber = s -> System.out.println("Subscriber 3: "+s);
    Disposable subscribe3 = observable.subscribe(thirdSubscriber);

    Thread.sleep(2000);
    subscribe3.dispose();

    while (true);
}
複製代碼

Output: Subscriber 1: 0 Subscriber 2: 0 Subscriber 1: 1 Subscriber 2: 1 Subscriber 1: 2 Subscriber 2: 2 Subscriber 1: 3 Subscriber 2: 3 Subscriber 2: 4 Subscriber 2: 5 Subscriber 2: 6 Subscriber 2: 7 Subscriber 3: 0 Subscriber 3: 1 Subscriber 3: 2 Subscriber 3: 3 (譯者注:原文少寫了一行輸出)

相當重要的一點是,這是一個 Hot Observable,而且它在第一個訂閱者訂閱以後纔開始發射數據,而後當它沒有訂閱者時它會中止發射數據。 如上面的輸出所示,當頭兩個訂閱者開始訂閱它以後,它纔開始發射數據,而後其中一個訂閱者取消了訂閱,可是它並無中止發射數據,由於此時它還擁有另一個訂閱者。又過了一會,另一個訂閱者也取消了訂閱後,它便中止了發射數據。當 2 秒事後第三個訂閱者開始訂閱它以後,它開始從頭開始發射數據,而不是從第二個訂閱者取消訂閱時停留在的位置。

Observable: 哇哦,你真棒!你把這個概念解釋地超好。

Me: 多謝誇獎。

Observable: 那麼你還有其餘的問題嗎?

Me: 是的,我有。你能告訴我什麼是 Subject 以及不一樣類別的 Subject 的區別嗎,好比 Publish,Behaviour 之類的。

Observable: Emmmmmm。我覺我應該在教你那些個概念以前告訴你關於 Observer API 的相關知識,還有就是關於如何使用 Lambda 表達式或者叫函數式接口來代替使用完整的 Observer 接口的方法。你以爲呢?

Me: 好啊,都聽你的。

Observable: 就目前咱們瞭解到的 Observable,這裏還有一個關於咱們一直在使用的 Observable 的概念...

小結: 大家好啊,朋友們。此次的對話真是有點長啊,我必須在此打住了。不然的話這篇文章就會變成一本四庫全書,什麼亂七八糟的東西都會出現。我但願咱們可以系統地有條理地來學習這一切。因此餘下的內容,咱們下回再揭曉。再者,試試看盡你可能把咱們此次學到的東西用在你真正的項目中。最後感謝 Rx Observable 的到場。 週末快樂,再見。🙂


掘金翻譯計劃 是一個翻譯優質互聯網技術文章的社區,文章來源爲 掘金 上的英文分享文章。內容覆蓋 AndroidiOS前端後端區塊鏈產品設計人工智能等領域,想要查看更多優質譯文請持續關注 掘金翻譯計劃官方微博知乎專欄

相關文章
相關標籤/搜索