- 原文地址:5 Not So Obvious Things About RxJava
- 原文做者:Jag Saund
- 譯文出自:掘金翻譯計劃
- 譯者: skyar2009
- 校對者:Danny1451, yunshuipiao
不管你是剛剛接觸 RxJava,仍是已經使用過一段時間,關於 RxJava 你總會有些新的知識要學。在使用 RxJava 框架過程當中,我發現了 5 點不那麼明顯的知識,使我能夠充分挖掘它的潛能。javascript
註釋 本文引用的 APIs 是基於 RxJava 1.2.6html
map 和 flatMap 是經常使用的兩個 ReactiveX 操做。它們每每是你最早接觸的兩個操做,而且很難肯定使用哪一個是正確的。前端
map 和 flatMap 都是對 Observable 發出的每個元素執行轉換方法。可是,map 只輸出一個元素,flatMap 輸出 0 或多個元素。java
在上面的例子中,map
操做對每個字符串執行了 split
方法並輸出了一個包含字符串數組的元素。當你想將一個元素轉換成另外一個時使用 map
。react
有些時候,咱們執行的方法返回多個元素,而且咱們但願將他們添加到同一個流中。這種狀況下,flatMap
是一個好的選擇。在上面的例子中 flatMap
操做將字符串數組處理後輸出到了同一個序列。android
有些時候你須要將同步或異步的 API 轉成響應式的 API。使用 Observable.create 看起來是個極具誘惑性的選擇,但它有以下要求:ios
很難正確的實現以上要求,幸運的是,你能夠不這麼作。有一些靜態工具方法能夠幫你解決:git
syncOnSubscribegithub
一個能夠建立安全 OnSubscribe<T>
的工具,它建立的 OnSubscribe<T>
可以正確地處理來自訂閱者的背壓請求。當你須要將一個同步獲取式的阻塞 API 轉成響應式 API 時可使用。數據庫
public Observable<byte[]> readFile(@NonNull FileInputStream stream) {
final SyncOnSubscribe<FileInputStream, byte[]> fileReader = SyncOnSubscribe.createStateful(
() -> stream,
(stream, output) -> {
try {
final byte[] buffer = new byte[BUFFER_SIZE];
int count = stream.read(buffer);
if (count < 0) {
output.onCompleted();
} else {
output.onNext(buffer);
}
} catch (IOException error) {
output.onError(error);
}
return stream;
},
s -> IOUtil.closeSilently(s));
return Observable.create(fileReader);
}複製代碼
fromCallable
一個靜態工具,能夠對簡單的同步 API 進行封裝並將之轉化成響應式 API。更讚的是,fromCallable
也能夠處理檢查到的異常。
public Observable<Boolean> enablePushNotifications(boolean enable) {
return Observable.fromCallable(() -> sharedPrefs
.edit()
.putBoolean(KEY_PUSH_NOTIFICATIONS_PREFS, enable)
.commit());
}複製代碼
fromEmitter
一個靜態工具,對異步 API 進行封裝並能夠管理 Observable 被取消訂閱時釋放的資源。不像 fromCallable
,你能夠輸出多個元素。
import android.bluetooth.le.BluetoothLeScanner;
import android.bluetooth.le.ScanCallback;
import android.bluetooth.le.ScanResult;
import android.support.annotation.NonNull;
import rx.Emitter;
import rx.Observable;
import java.util.List;
public class RxBluetoothScanner {
public static class ScanResultException extends RuntimeException {
public ScanResultException(int errorCode) {
super("Bluetooth scan failed. Error code: " + errorCode);
}
}
private RxBluetoothScanner() {
}
@NonNull
public static Observable<ScanResult> scan(@NonNull final BluetoothLeScanner scanner) {
return Observable.fromEmitter(scanResultEmitter -> {
final ScanCallback scanCallback = new ScanCallback() {
@Override
public void onScanResult(int callbackType, @NonNull ScanResult result) {
scanResultEmitter.onNext(result);
}
@Override
public void onBatchScanResults(@NonNull List<ScanResult> results) {
for (ScanResult r : results) {
scanResultEmitter.onNext(r);
}
}
@Override
public void onScanFailed(int errorCode) {
scanResultEmitter.onError(new ScanResultException(errorCode));
}
};
scanResultEmitter.setCancellation(() -> scanner.stopScan(scanCallback));
scanner.startScan(scanCallback);
}, Emitter.BackpressureMode.BUFFER);
}
}複製代碼
有時,Observable 產生事件過快以致於下游觀察者跟不上它的速度。當這種狀況發生時,你每每會遇到 MissingBackpressureException
異常。
RxJava 提供了一些方法管理背壓,可是具體使用哪種須要視狀況而定。
冷、熱 Observable
只有當有訂閱時,冷 Observable 纔會發送元素。觀察者訂閱冷 Observable 能夠控制發送事件的速度而不須要犧牲流的完整性。冷 Observable 例子有:讀文件、數據庫查詢、網絡請求以及靜態迭代器轉成的 Observable。
熱 Observable 是連續的事件流,它的發出不依賴訂閱者的數量。當一個觀察者訂閱了 Observable,那麼它將面臨下面的一種狀況:
熱 Observables 例子有:觸摸事件、通知以及進度更新。
因爲熱 Observable 發出事件的本性,咱們不能控制它的速度。例如,你不能下降觸摸事件發出的速度。所以,最好是使用 BackpressureMode
提供的流控制策略。
使用一個響應式獲取方法,冷 Observable 能夠根據觀察者的反饋下降發送速度。更多知識,請看 ReactiveX 文檔的背壓與響應式獲取方法.
BackpressureMode.NONE 和 BackpressureMode.ERROR
在這兩種模式中,發送的事件不是背壓。當被觀察者的 16 元素緩衝區溢出時會拋出 MissingBackpressureException
。
BackpressureMode.BUFFER
在這種模式下,有一個無限的緩衝區(初始化時是 128)。過快發出的元素都會放到緩衝區中。若是緩衝區中的元素沒法消耗,會持續的積累直到內存耗盡。結果是 OutOfMemoryException
異常。
BackpressureMode.DROP
這種模式是使用固定大小爲 1 的緩衝區。若是下游觀察者沒法處理,第一個元素會緩存下來後續的會被丟棄。當消費者能夠處理下一個元素時,它收到的將是 Observable 發出的第一個元素。
BackpressureMode.LATEST
這種模式與 BackpressureMode.DROP
相似,由於它也使用固定大小爲 1 的緩衝區。然而,不是緩存第一個元素丟棄後續元素,BackpressureMode.LATEST
而是使用最新的元素替換緩衝區緩存的元素。當消費者能夠處理下一個元素時,它收到的是 Observable 最近一次發送的元素。
RxJava 經過給 Observable 序列發送 onError
通知不可恢復的錯誤,而且會結束序列。
有時,你不但願結束序列。對於這種狀況,RxJava 提供了幾種不會結束序列的錯誤處理方法。
RxJava 提供了許多錯誤處理方法,可是有時你不但願結束序列。尤爲是涉及到主題時。
onErrorResumeNext
使用 onErrorResumeNext 能夠攔截 onError
並返回一個 Observable。或者對錯誤信息添加附加信息並返回一個新的錯誤,或者發送給 onNext
一個新的事件。
public Observable<SearchResult> search(@NotNull EditText searchView) {
return RxTextView.textChanges(searchView) // In production, share this text view observable, don't create a new one each time
.map(CharSequence::toString)
.debounce(500, TimeUnit.MILLISECONDS) // Avoid getting spammed with key stroke changes
.filter(s -> s.length() > 1) // Only interested in queries of length greater than 1
.observeOn(workerScheduler) // Next set of operations will be network so switch to an IO Scheduler (or worker)
.switchMap(query -> searchService.query(query)) // Take the latest observable from upstream and unsubscribe from any previous subscriptions
.onErrorResumeNext(Observable.empty()); // <-- This will terminate upstream (ie. we will stop receiving text view changes after an error!)
}複製代碼
使用 onErrorResumeNext 捕獲
使用該操做會修復下游序列,可是會結束上游序列由於已經發送了 onError
通知。因此,若是你鏈接的是一個發佈通知的主題,onError
通知會結束主題。
若是你但願上游繼續運行,能夠在 onErrorResumeNext
操做中嵌套 flatMap
或 switchMap
操做。
public Observable<SearchResult> search(@NotNull EditText searchView) {
return RxTextView.textChanges(searchView) // In production, share this text view observable, don't create a new one each time
.map(CharSequence::toString)
.debounce(500, TimeUnit.MILLISECONDS) // Avoid getting spammed with key stroke changes
.filter(s -> s.length() > 1) // Only interested in queries of length greater than 1
.observeOn(workerScheduler) // Next set of operations will be network so switch to an IO Scheduler (or worker)
.switchMap(query -> searchService.query(query) // Take the latest observable from upstream and unsubscribe from any previous subscriptions
.onErrorResumeNext(Observable.empty()); // <-- This fixes the problem since the error is not seen by the upstream observable
}複製代碼
有時你須要將 Observable 的輸出共享給多個觀察者。RxJava 提供了 share
和 publish
兩種方式實現 Observable 發送事件的多播。
Share
share
容許多個觀察者鏈接到源 Observable。下面的例子中,共享的是 Observable 發送的 MotionEvent
事件。而後,咱們建立了另外兩個 Observable 分別過濾 DOWN
和 UP
觸摸事件。DOWN
事件咱們畫紅圈,UP
事件咱們畫籃圈。
public void touchEventHandler(@NotNull View view) {
final Observable<MotionEvent> motionEventObservable = RxView.touches(view).share();
// Capture down events
final Observable<MotionEvent> downEventsObservable = motionEventObservable
.filter(event -> event.getAction() == MotionEvent.ACTION_DOWN);
// Capture up events
final Observable<MotionEvent> upEventsObservable = motionEventObservable
.filter(event -> event.getAction() == MotionEvent.ACTION_UP);
// Show a red circle at the position where the down event ocurred
subscriptions.add(downEventsObservable.subscribe(event ->
view.showCircle(event.getX(), event.getY(), Color.RED)));
// Show a blue circle at the position where the up event ocurred
subscriptions.add(upEventsObservable.subscribe(event ->
view.showCircle(event.getX(), event.getY(), Color.BLUE)));
}複製代碼
然而,一旦有觀察者訂閱 Observable,Observable 就會開始發送事件。這樣就會形成後續的訂閱者會錯過一個或多個觸摸事件。
在這個例子中,「藍」 觀察者錯過了第一個事件。有些時候這沒問題,可是若是你不能接受錯過任何事件,那麼你須要使用 publish
操做。
Publish
對 Observable 執行 publish
操做會將值轉化爲 ConnectedObservable。就像打開閥門同樣。下面的例子和上面同樣,須要注意的是咱們如今使用的是 publish
操做。
public void touchEventHandler(@NotNull View view) {
final ConnectedObservable<MotionEvent> motionEventObservable = RxView.touches(view).publish();
// Capture down events
final Observable<MotionEvent> downEventsObservable = motionEventObservable
.filter(event -> event.getAction() == MotionEvent.ACTION_DOWN);
// Capture up events
final Observable<MotionEvent> upEventsObservable = motionEventObservable
.filter(event -> event.getAction() == MotionEvent.ACTION_UP);
// Show a red circle at the position where the down event ocurred
subscriptions.add(downEventsObservable.subscribe(event ->
view.showCircle(event.getX(), event.getY(), Color.RED)));
// Show a blue circle at the position where the up event ocurred
subscriptions.add(upEventsObservable.subscribe(event ->
view.showCircle(event.getX(), event.getY(), Color.BLUE)));
// Connect the source observable to begin emitting events
subscriptions.add(motionEventObservable.connect());
}複製代碼
一旦必要的 Observables 訂閱了源,你須要執行對源 ConnectedObservable 執行 connect
來開始發送事件。
注意,一旦對源調用了 connect
方法,相同事件序列會分別發送給 「綠」 和 「藍」 觀察者。
掘金翻譯計劃 是一個翻譯優質互聯網技術文章的社區,文章來源爲 掘金 上的英文分享文章。內容覆蓋 Android、iOS、React、前端、後端、產品、設計 等領域,想要查看更多優質譯文請持續關注 掘金翻譯計劃。