多個訂閱者的默認行爲並不老是可取的。在本文中,咱們將介紹如何更改此行爲並以適當的方式處理多個訂閱者。html
但首先,讓咱們來看看多個訂閱者的默認行爲。java
默認行爲react
假設咱們有如下Observable:數據庫
private static Observable getObservable() {
return Observable.create(subscriber -> {
subscriber.onNext(gettingValue(1));
subscriber.onNext(gettingValue(2));
subscriber.add(Subscriptions.create(() -> {
LOGGER.info("Clear resources");
}));
});
}
複製代碼
訂閱者訂閱後會當即發出兩個元素。bash
在咱們的示例中,咱們有兩個訂閱者:服務器
LOGGER.info("Subscribing");
Subscription s1 = obs.subscribe(i -> LOGGER.info("subscriber#1 is printing " + i));
Subscription s2 = obs.subscribe(i -> LOGGER.info("subscriber#2 is printing " + i));
s1.unsubscribe();
s2.unsubscribe();
複製代碼
想象一下,獲取每一個元素是一項代價高昂的操做 - 例如,它可能包括密集計算或打開URL鏈接。ide
爲了簡單起見,咱們只返回一個數字:spa
private static Integer gettingValue(int i) {
LOGGER.info("Getting " + i);
return i;
}
複製代碼
這是輸出:code
Subscribing
Getting 1
subscriber#1 is printing 1
Getting 2
subscriber#1 is printing 2
Getting 1
subscriber#2 is printing 1
Getting 2
subscriber#2 is printing 2
Clear resources
Clear resources
複製代碼
咱們能夠看到,在默認狀況下,獲取每一個元素和清除資源都要執行兩次-對於每一個訂閱服務器一次。這不是咱們想要的。ConnectableObservable類有助於解決這個問題。htm
ConnectableObservable類容許與多個訂閱者共享訂閱,而不容許屢次執行底層操做。
但首先,讓咱們建立一個ConnectableObservable。
publish()
publish()方法是從Observable建立一個ConnectableObservable:
ConnectableObservable obs = Observable.create(subscriber -> {
subscriber.onNext(gettingValue(1));
subscriber.onNext(gettingValue(2));
subscriber.add(Subscriptions.create(() -> {
LOGGER.info("Clear resources");
}));
}).publish();
複製代碼
但就目前而言,它什麼都不作。它的工做原理是connect()方法。
connect()
在調用ConnectableObservable的connect()方法以前,即便有一些訂閱者,也不會觸發Observable的onSubcribe()回調。
讓咱們來證實一下:
LOGGER.info("Subscribing");
obs.subscribe(i -> LOGGER.info("subscriber #1 is printing " + i));
obs.subscribe(i -> LOGGER.info("subscriber #2 is printing " + i));
Thread.sleep(1000);
LOGGER.info("Connecting");
Subscription s = obs.connect();
s.unsubscribe();
複製代碼
咱們訂閱,而後等待一秒鐘再鏈接輸出是:
Subscribing
Connecting
Getting 1
subscriber #1 is printing 1
subscriber #2 is printing 1
Getting 2
subscriber #1 is printing 2
subscriber #2 is printing 2
Clear resources
複製代碼
咱們能夠看到:
這種延遲多是有益的 - 有時咱們須要爲全部訂閱者提供相同的元素序列,即便其中一個訂閱者比另外一個訂閱者更早。
可觀察的一致視圖 - 在subscribe()以後的connect()
這個用例沒法在咱們以前的Observable上進行演示,由於它運行很冷,並且兩個訂閱者均可以得到整個元素序列。
相反,想象一下,元素髮射不依賴於訂閱的時刻,例如,鼠標點擊發出的事件。如今還想象第二個訂閱者在第一個訂閱者以後訂閱第二個訂閱者。
第一個訂閱者將得到此示例中發出的全部元素,而第二個訂閱者將只接收一些元素。
另外一方面,在正確的位置使用connect()方法能夠爲兩個訂閱者提供Observable序列上的相同視圖。
讓咱們建立一個Observable。它將在JFrame上點擊鼠標時發出元素。
每一個元素都是點擊的x座標:
private static Observable getObservable() {
return Observable.create(subscriber -> {
frame.addMouseListener(new MouseAdapter() {
@Override
public void mouseClicked(MouseEvent e) {
subscriber.onNext(e.getX());
}
});
subscriber.add(Subscriptions.create(() {
LOGGER.info("Clear resources");
for (MouseListener listener : frame.getListeners(MouseListener.class)) {
frame.removeMouseListener(listener);
}
}));
});
}
複製代碼
如今,若是咱們以第二個間隔一個接一個地訂閱兩個訂閱者,運行程序並開始單擊,咱們將看到第一個訂閱者將得到更多元素:
public static void defaultBehaviour() throws InterruptedException {
Observable obs = getObservable();
LOGGER.info("subscribing #1");
Subscription subscription1 = obs.subscribe((i) ->
LOGGER.info("subscriber#1 is printing x-coordinate " + i));
Thread.sleep(1000);
LOGGER.info("subscribing #2");
Subscription subscription2 = obs.subscribe((i) ->
LOGGER.info("subscriber#2 is printing x-coordinate " + i));
Thread.sleep(1000);
LOGGER.info("unsubscribe#1");
subscription1.unsubscribe();
Thread.sleep(1000);
LOGGER.info("unsubscribe#2");
subscription2.unsubscribe();
}
複製代碼
subscribing #1
subscriber#1 is printing x-coordinate 280
subscriber#1 is printing x-coordinate 242
subscribing #2
subscriber#1 is printing x-coordinate 343
subscriber#2 is printing x-coordinate 343
unsubscribe#1
clearing resources
unsubscribe#2
clearing resources
複製代碼
connect() After subscribe()
爲了使兩個訂閱者得到相同的序列,咱們將Observable轉換爲ConnectableObservable並在訂閱者以後調用connect():
public static void subscribeBeforeConnect() throws InterruptedException {
ConnectableObservable obs = getObservable().publish();
LOGGER.info("subscribing #1");
Subscription subscription1 = obs.subscribe(
i -> LOGGER.info("subscriber#1 is printing x-coordinate " + i));
Thread.sleep(1000);
LOGGER.info("subscribing #2");
Subscription subscription2 = obs.subscribe(
i -> LOGGER.info("subscriber#2 is printing x-coordinate " + i));
Thread.sleep(1000);
LOGGER.info("connecting:");
Subscription s = obs.connect();
Thread.sleep(1000);
LOGGER.info("unsubscribe connected");
s.unsubscribe();
}
複製代碼
如今他們將獲得相同的序列:
subscribing #1
subscribing #2
connecting:
subscriber#1 is printing x-coordinate 317
subscriber#2 is printing x-coordinate 317
subscriber#1 is printing x-coordinate 364
subscriber#2 is printing x-coordinate 364
unsubscribe connected
clearing resources
複製代碼
因此重點是等待全部用戶準備就緒而後調用connect()。
在Spring應用程序中,咱們能夠在應用程序啓動期間訂閱全部組件,例如在onApplicationEvent()中調用connect()。
讓咱們回到咱們的例子;注意,connect()方法以前的全部單擊操做都失敗了。若是咱們不想遺漏元素,但相反,咱們能夠在代碼中更早地放置connect(),並強制可觀察到的元素在沒有任何訂閱服務器的狀況下生成事件。
在沒有任何訂閱者的狀況下強制訂閱 - connect()在subscribe()以前
爲了證實這一點,讓咱們更正咱們的例子:
public static void connectBeforeSubscribe() throws InterruptedException {
ConnectableObservable obs = getObservable()
.doOnNext(x -> LOGGER.info("saving " + x)).publish();
LOGGER.info("connecting:");
Subscription s = obs.connect();
Thread.sleep(1000);
LOGGER.info("subscribing #1");
obs.subscribe((i) -> LOGGER.info("subscriber#1 is printing x-coordinate " + i));
Thread.sleep(1000);
LOGGER.info("subscribing #2");
obs.subscribe((i) -> LOGGER.info("subscriber#2 is printing x-coordinate " + i));
Thread.sleep(1000);
s.unsubscribe();
}
複製代碼
步驟相對簡單:
請注意,咱們添加了doOnNext()運算符。這裏咱們能夠在數據庫中存儲元素,例如在咱們的代碼中,咱們只打印「save...」。
若是咱們啓動代碼並開始點擊,咱們將看到在connect()調用以後當即發出和處理元素:
connecting:
saving 306
saving 248
subscribing #1
saving 377
subscriber#1 is printing x-coordinate 377
saving 295
subscriber#1 is printing x-coordinate 295
saving 206
subscriber#1 is printing x-coordinate 206
subscribing #2
saving 347
subscriber#1 is printing x-coordinate 347
subscriber#2 is printing x-coordinate 347
clearing resources
複製代碼
若是沒有訂閱者,則仍會處理這些元素。
所以,不論是否有人訂閱,connect()方法都會開始發出和處理元素,就好像有一個使用了元素的空操做的人工訂閱器同樣。
若是有一些真正的訂閱者訂閱,這我的工中介只向他們傳播元素。
若要取消訂閱,咱們會執行如下步驟:
s.unsubscribe();
複製代碼
而後:
Subscription s = obs.connect();
複製代碼
autoConnect()
此方法意味着在訂閱以前或以後不會調用connect(),而是在第一個訂閱者訂閱時自動調用。
使用此方法,咱們不能本身調用connect(),由於返回的對象是一般的Observable,它沒有此方法但使用底層的ConnectableObservable:
public static void autoConnectAndSubscribe() throws InterruptedException {
Observable obs = getObservable()
.doOnNext(x -> LOGGER.info("saving " + x)).publish().autoConnect();
LOGGER.info("autoconnect()");
Thread.sleep(1000);
LOGGER.info("subscribing #1");
Subscription s1 = obs.subscribe((i) ->
LOGGER.info("subscriber#1 is printing x-coordinate " + i));
Thread.sleep(1000);
LOGGER.info("subscribing #2");
Subscription s2 = obs.subscribe((i) ->
LOGGER.info("subscriber#2 is printing x-coordinate " + i));
Thread.sleep(1000);
LOGGER.info("unsubscribe 1");
s1.unsubscribe();
Thread.sleep(1000);
LOGGER.info("unsubscribe 2");
s2.unsubscribe();
}
複製代碼
請注意,咱們也不能取消訂閱人工訂閱者。咱們能夠取消訂閱全部真正的訂閱者,但人工訂閱者仍將處理事件。
爲了理解這一點,讓咱們看一下最後一個訂閱者取消訂閱後最後發生的事情:
subscribing #1
saving 296
subscriber#1 is printing x-coordinate 296
saving 329
subscriber#1 is printing x-coordinate 329
subscribing #2
saving 226
subscriber#1 is printing x-coordinate 226
subscriber#2 is printing x-coordinate 226
unsubscribe 1
saving 268
subscriber#2 is printing x-coordinate 268
saving 234
subscriber#2 is printing x-coordinate 234
unsubscribe 2
saving 278
saving 268
複製代碼
正如咱們所看到的,在第二次取消訂閱後,不會出現清除資源的狀況,並繼續使用doOnNext()保存元素。這意味着人工訂閱服務器不會取消訂閱,而是繼續使用元素。
refCount()
refCount()相似於autoConnect(),由於只要第一個訂閱者訂閱,鏈接也會自動發生。
與autoconnect()不一樣,當最後一個訂閱者取消訂閱時,也會自動斷開鏈接:
public static void refCountAndSubscribe() throws InterruptedException {
Observable obs = getObservable()
.doOnNext(x -> LOGGER.info("saving " + x)).publish().refCount();
LOGGER.info("refcount()");
Thread.sleep(1000);
LOGGER.info("subscribing #1");
Subscription subscription1 = obs.subscribe(
i -> LOGGER.info("subscriber#1 is printing x-coordinate " + i));
Thread.sleep(1000);
LOGGER.info("subscribing #2");
Subscription subscription2 = obs.subscribe(
i -> LOGGER.info("subscriber#2 is printing x-coordinate " + i));
Thread.sleep(1000);
LOGGER.info("unsubscribe#1");
subscription1.unsubscribe();
Thread.sleep(1000);
LOGGER.info("unsubscribe#2");
subscription2.unsubscribe();
}
複製代碼
refcount()
subscribing #1
saving 265
subscriber#1 is printing x-coordinate 265
saving 338
subscriber#1 is printing x-coordinate 338
subscribing #2
saving 203
subscriber#1 is printing x-coordinate 203
subscriber#2 is printing x-coordinate 203
unsubscribe#1
saving 294
subscriber#2 is printing x-coordinate 294
unsubscribe#2
clearing resources
複製代碼
結論
ConnectableObservable類能夠輕鬆地處理多個訂閱者。
它的方法看起來很類似,但因爲實現上的細微差異(甚至方法的順序也很重要),用戶的行爲發生了很大的變化。