RxJava中的Observable,多Subscribers

多個訂閱者的默認行爲並不老是可取的。在本文中,咱們將介紹如何更改此行爲並以適當的方式處理多個訂閱者。html

但首先,讓咱們來看看多個訂閱者的默認行爲。java

默認行爲react

假設咱們有如下Observable:數據庫

private static Observable getObservable() {
    return Observable.create(subscriber -> {
        subscriber.onNext(gettingValue(1));
        subscriber.onNext(gettingValue(2));
 
        subscriber.add(Subscriptions.create(() -> {
            LOGGER.info("Clear resources");
        }));
    });
}
複製代碼

訂閱者訂閱後會當即發出兩個元素。bash

在咱們的示例中,咱們有兩個訂閱者:服務器

LOGGER.info("Subscribing");
 
Subscription s1 = obs.subscribe(i -> LOGGER.info("subscriber#1 is printing " + i));
Subscription s2 = obs.subscribe(i -> LOGGER.info("subscriber#2 is printing " + i));
 
s1.unsubscribe();
s2.unsubscribe();
複製代碼

想象一下,獲取每一個元素是一項代價高昂的操做 - 例如,它可能包括密集計算或打開URL鏈接。ide

爲了簡單起見,咱們只返回一個數字:spa

private static Integer gettingValue(int i) {
    LOGGER.info("Getting " + i);
    return i;
}
複製代碼

這是輸出:code

Subscribing
Getting 1
subscriber#1 is printing 1
Getting 2
subscriber#1 is printing 2
Getting 1
subscriber#2 is printing 1
Getting 2
subscriber#2 is printing 2
Clear resources
Clear resources
複製代碼

咱們能夠看到,在默認狀況下,獲取每一個元素和清除資源都要執行兩次-對於每一個訂閱服務器一次。這不是咱們想要的。ConnectableObservable類有助於解決這個問題。htm

ConnectableObservable

ConnectableObservable類容許與多個訂閱者共享訂閱,而不容許屢次執行底層操做。

但首先,讓咱們建立一個ConnectableObservable。

publish()

publish()方法是從Observable建立一個ConnectableObservable:

ConnectableObservable obs = Observable.create(subscriber -> {
    subscriber.onNext(gettingValue(1));
    subscriber.onNext(gettingValue(2));
    subscriber.add(Subscriptions.create(() -> {
        LOGGER.info("Clear resources");
    }));
}).publish();
複製代碼

但就目前而言,它什麼都不作。它的工做原理是connect()方法。

connect()

在調用ConnectableObservable的connect()方法以前,即便有一些訂閱者,也不會觸發Observable的onSubcribe()回調。

讓咱們來證實一下:

LOGGER.info("Subscribing");
obs.subscribe(i -> LOGGER.info("subscriber #1 is printing " + i));
obs.subscribe(i -> LOGGER.info("subscriber #2 is printing " + i));
Thread.sleep(1000);
LOGGER.info("Connecting");
Subscription s = obs.connect();
s.unsubscribe();
複製代碼

咱們訂閱,而後等待一秒鐘再鏈接輸出是:

Subscribing
Connecting
Getting 1
subscriber #1 is printing 1
subscriber #2 is printing 1
Getting 2
subscriber #1 is printing 2
subscriber #2 is printing 2
Clear resources
複製代碼

咱們能夠看到:

  • 獲取元素只出現一次咱們想要的
  • 清算資源也只出現一次
  • 訂閱後獲取元素開始一秒鐘
  • 訂閱再也不觸發元素的發射。只有connect()才能這樣作

這種延遲多是有益的 - 有時咱們須要爲全部訂閱者提供相同的元素序列,即便其中一個訂閱者比另外一個訂閱者更早。

可觀察的一致視圖 - 在subscribe()以後的connect()

這個用例沒法在咱們以前的Observable上進行演示,由於它運行很冷,並且兩個訂閱者均可以得到整個元素序列。

相反,想象一下,元素髮射不依賴於訂閱的時刻,例如,鼠標點擊發出的事件。如今還想象第二個訂閱者在第一個訂閱者以後訂閱第二個訂閱者。

第一個訂閱者將得到此示例中發出的全部元素,而第二個訂閱者將只接收一些元素。

另外一方面,在正確的位置使用connect()方法能夠爲兩個訂閱者提供Observable序列上的相同視圖。

讓咱們建立一個Observable。它將在JFrame上點擊鼠標時發出元素。

每一個元素都是點擊的x座標:

private static Observable getObservable() {
    return Observable.create(subscriber -> {
        frame.addMouseListener(new MouseAdapter() {
            @Override
            public void mouseClicked(MouseEvent e) {
                subscriber.onNext(e.getX());
            }
        });
        subscriber.add(Subscriptions.create(() {
            LOGGER.info("Clear resources");
            for (MouseListener listener : frame.getListeners(MouseListener.class)) {
                frame.removeMouseListener(listener);
            }
        }));
    });
}
複製代碼

如今,若是咱們以第二個間隔一個接一個地訂閱兩個訂閱者,運行程序並開始單擊,咱們將看到第一個訂閱者將得到更多元素:

public static void defaultBehaviour() throws InterruptedException {
    Observable obs = getObservable();
 
    LOGGER.info("subscribing #1");
    Subscription subscription1 = obs.subscribe((i) -> 
        LOGGER.info("subscriber#1 is printing x-coordinate " + i));
    Thread.sleep(1000);
    LOGGER.info("subscribing #2");
    Subscription subscription2 = obs.subscribe((i) -> 
        LOGGER.info("subscriber#2 is printing x-coordinate " + i));
    Thread.sleep(1000);
    LOGGER.info("unsubscribe#1");
    subscription1.unsubscribe();
    Thread.sleep(1000);
    LOGGER.info("unsubscribe#2");
    subscription2.unsubscribe();
}
複製代碼
subscribing #1
subscriber#1 is printing x-coordinate 280
subscriber#1 is printing x-coordinate 242
subscribing #2
subscriber#1 is printing x-coordinate 343
subscriber#2 is printing x-coordinate 343
unsubscribe#1
clearing resources
unsubscribe#2
clearing resources
複製代碼

connect() After subscribe()

爲了使兩個訂閱者得到相同的序列,咱們將Observable轉換爲ConnectableObservable並在訂閱者以後調用connect():

public static void subscribeBeforeConnect() throws InterruptedException {
 
    ConnectableObservable obs = getObservable().publish();
 
    LOGGER.info("subscribing #1");
    Subscription subscription1 = obs.subscribe(
      i -> LOGGER.info("subscriber#1 is printing x-coordinate " + i));
    Thread.sleep(1000);
    LOGGER.info("subscribing #2");
    Subscription subscription2 = obs.subscribe(
      i ->  LOGGER.info("subscriber#2 is printing x-coordinate " + i));
    Thread.sleep(1000);
    LOGGER.info("connecting:");
    Subscription s = obs.connect();
    Thread.sleep(1000);
    LOGGER.info("unsubscribe connected");
    s.unsubscribe();
}
複製代碼

如今他們將獲得相同的序列:

subscribing #1
subscribing #2
connecting:
subscriber#1 is printing x-coordinate 317
subscriber#2 is printing x-coordinate 317
subscriber#1 is printing x-coordinate 364
subscriber#2 is printing x-coordinate 364
unsubscribe connected
clearing resources
複製代碼

因此重點是等待全部用戶準備就緒而後調用connect()。

在Spring應用程序中,咱們能夠在應用程序啓動期間訂閱全部組件,例如在onApplicationEvent()中調用connect()。

讓咱們回到咱們的例子;注意,connect()方法以前的全部單擊操做都失敗了。若是咱們不想遺漏元素,但相反,咱們能夠在代碼中更早地放置connect(),並強制可觀察到的元素在沒有任何訂閱服務器的狀況下生成事件。

在沒有任何訂閱者的狀況下強制訂閱 - connect()在subscribe()以前

爲了證實這一點,讓咱們更正咱們的例子:

public static void connectBeforeSubscribe() throws InterruptedException {
    ConnectableObservable obs = getObservable()
      .doOnNext(x -> LOGGER.info("saving " + x)).publish();
    LOGGER.info("connecting:");
    Subscription s = obs.connect();
    Thread.sleep(1000);
    LOGGER.info("subscribing #1");
    obs.subscribe((i) -> LOGGER.info("subscriber#1 is printing x-coordinate " + i));
    Thread.sleep(1000);
    LOGGER.info("subscribing #2");
    obs.subscribe((i) -> LOGGER.info("subscriber#2 is printing x-coordinate " + i));
    Thread.sleep(1000);
    s.unsubscribe();
}
複製代碼

步驟相對簡單:

  • 首先,咱們鏈接
  • 而後咱們等待一秒鐘並訂閱第一個訂閱者
  • 最後,咱們等待另外一秒鐘並訂閱第二個訂閱者

請注意,咱們添加了doOnNext()運算符。這裏咱們能夠在數據庫中存儲元素,例如在咱們的代碼中,咱們只打印「save...」。

若是咱們啓動代碼並開始點擊,咱們將看到在connect()調用以後當即發出和處理元素:

connecting:
saving 306
saving 248
subscribing #1
saving 377
subscriber#1 is printing x-coordinate 377
saving 295
subscriber#1 is printing x-coordinate 295
saving 206
subscriber#1 is printing x-coordinate 206
subscribing #2
saving 347
subscriber#1 is printing x-coordinate 347
subscriber#2 is printing x-coordinate 347
clearing resources
複製代碼

若是沒有訂閱者,則仍會處理這些元素。

所以,不論是否有人訂閱,connect()方法都會開始發出和處理元素,就好像有一個使用了元素的空操做的人工訂閱器同樣。

若是有一些真正的訂閱者訂閱,這我的工中介只向他們傳播元素。

若要取消訂閱,咱們會執行如下步驟:

s.unsubscribe();
複製代碼

而後:

Subscription s = obs.connect();
複製代碼

autoConnect()

此方法意味着在訂閱以前或以後不會調用connect(),而是在第一個訂閱者訂閱時自動調用。

使用此方法,咱們不能本身調用connect(),由於返回的對象是一般的Observable,它沒有此方法但使用底層的ConnectableObservable:

public static void autoConnectAndSubscribe() throws InterruptedException {
    Observable obs = getObservable()
    .doOnNext(x -> LOGGER.info("saving " + x)).publish().autoConnect();
 
    LOGGER.info("autoconnect()");
    Thread.sleep(1000);
    LOGGER.info("subscribing #1");
    Subscription s1 = obs.subscribe((i) -> 
        LOGGER.info("subscriber#1 is printing x-coordinate " + i));
    Thread.sleep(1000);
    LOGGER.info("subscribing #2");
    Subscription s2 = obs.subscribe((i) -> 
        LOGGER.info("subscriber#2 is printing x-coordinate " + i));
 
    Thread.sleep(1000);
    LOGGER.info("unsubscribe 1");
    s1.unsubscribe();
    Thread.sleep(1000);
    LOGGER.info("unsubscribe 2");
    s2.unsubscribe();
}
複製代碼

請注意,咱們也不能取消訂閱人工訂閱者。咱們能夠取消訂閱全部真正的訂閱者,但人工訂閱者仍將處理事件。

爲了理解這一點,讓咱們看一下最後一個訂閱者取消訂閱後最後發生的事情:

subscribing #1
saving 296
subscriber#1 is printing x-coordinate 296
saving 329
subscriber#1 is printing x-coordinate 329
subscribing #2
saving 226
subscriber#1 is printing x-coordinate 226
subscriber#2 is printing x-coordinate 226
unsubscribe 1
saving 268
subscriber#2 is printing x-coordinate 268
saving 234
subscriber#2 is printing x-coordinate 234
unsubscribe 2
saving 278
saving 268
複製代碼

正如咱們所看到的,在第二次取消訂閱後,不會出現清除資源的狀況,並繼續使用doOnNext()保存元素。這意味着人工訂閱服務器不會取消訂閱,而是繼續使用元素。

refCount()

refCount()相似於autoConnect(),由於只要第一個訂閱者訂閱,鏈接也會自動發生。

與autoconnect()不一樣,當最後一個訂閱者取消訂閱時,也會自動斷開鏈接:

public static void refCountAndSubscribe() throws InterruptedException {
    Observable obs = getObservable()
      .doOnNext(x -> LOGGER.info("saving " + x)).publish().refCount();
 
    LOGGER.info("refcount()");
    Thread.sleep(1000);
    LOGGER.info("subscribing #1");
    Subscription subscription1 = obs.subscribe(
      i -> LOGGER.info("subscriber#1 is printing x-coordinate " + i));
    Thread.sleep(1000);
    LOGGER.info("subscribing #2");
    Subscription subscription2 = obs.subscribe(
      i -> LOGGER.info("subscriber#2 is printing x-coordinate " + i));
 
    Thread.sleep(1000);
    LOGGER.info("unsubscribe#1");
    subscription1.unsubscribe();
    Thread.sleep(1000);
    LOGGER.info("unsubscribe#2");
    subscription2.unsubscribe();
}
複製代碼
refcount()
subscribing #1
saving 265
subscriber#1 is printing x-coordinate 265
saving 338
subscriber#1 is printing x-coordinate 338
subscribing #2
saving 203
subscriber#1 is printing x-coordinate 203
subscriber#2 is printing x-coordinate 203
unsubscribe#1
saving 294
subscriber#2 is printing x-coordinate 294
unsubscribe#2
clearing resources
複製代碼

結論

ConnectableObservable類能夠輕鬆地處理多個訂閱者。

它的方法看起來很類似,但因爲實現上的細微差異(甚至方法的順序也很重要),用戶的行爲發生了很大的變化。

相關文章
相關標籤/搜索