- 原文出自《RxJava Essentials》
- 原文做者 : Ivan Morgillo
- 譯文出自 : 開發技術前線 www.devtf.cn
- 轉載聲明: 本譯文已受權開發人員頭條享有獨家轉載權。未經贊成,不得轉載!
- 譯者 : yuxingxin
- 項目地址 : RxJava-Essentials-CN
上一章中。咱們學到怎樣轉換可觀測序列。咱們也看到了map()
,scan()
,groupBY()
,以及不少其它實用的函數的實際樣例,它們幫助咱們操做Observable來建立咱們想要的Observable。java
本章中,咱們將研究組合函數並學習怎樣同一時候處理多個Observables來建立咱們想要的Observable。git
在異步的世界經常會建立這種場景。咱們有多個來源但是僅僅想有一個結果:多輸入,單輸出。RxJava的merge()
方法將幫助你把兩個甚至不少其它的Observables合併到他們發射的數據裏。下圖給出了把兩個序列合併在一個終於發射的Observable。github
正如你看到的那樣,發射的數據被交叉合併到一個Observable裏面。markdown
注意假設你同步的合併Observable,它們將鏈接在一塊兒並且不會交叉。架構
像一般同樣。咱們用咱們的App和已安裝的App列表來建立了一個「真實世界」的樣例。咱們還需要第二個Observable。咱們可以建立一個單獨的應用列表而後逆序。固然沒有實際的意義,僅僅是爲了這個樣例。第二個列表,咱們的loadList()
函數像如下這樣:併發
private void loadList(List<AppInfo> apps) {
mRecyclerView.setVisibility(View.VISIBLE);
List reversedApps = Lists.reverse(apps);
Observable<AppInfo> observableApps =Observable.from(apps);
Observable<AppInfo> observableReversedApps =Observable.from(reversedApps);
Observable<AppInfo> mergedObserbable = Observable.merge(observableApps,observableReversedApps);
mergedObserbable.subscribe(new Observer<AppInfo>(){
@Override
public void onCompleted() {
mSwipeRefreshLayout.setRefreshing(false);
Toast.makeText(getActivity(), "Here is the list!", Toast.LENGTH_LONG).show();
}
@Override
public void onError(Throwable e) {
Toast.makeText(getActivity(), "One of the two Observable threw an error!", Toast.LENGTH_SHORT).show();
mSwipeRefreshLayout.setRefreshing(false);
}
@Override
public void onNext(AppInfoappInfo) {
mAddedApps.add(appInfo);
mAdapter.addApplication(mAddedApps.size() - 1, appInfo);
}
});
}
咱們建立了Observable和observableApps數據以及新的observableReversedApps逆序列表。使用Observable.merge()
,咱們可以建立新的ObservableMergedObservable
在單個可觀測序列中發射源Observables發出的所有數據。app
正如你能看到的,每個方法簽名都是同樣的,所以咱們的觀察者無需在乎不論什麼不一樣就可以複用代碼。結果例如如下:異步
注意錯誤時的toast消息。你可以以爲每個Observable拋出的錯誤將會打斷合併。假設你需要避免這種狀況。RxJava提供了mergeDelayError()
,它能從一個Observable中繼續發射數據即使是當中有一個拋出了錯誤。當所有的Observables都完畢時,mergeDelayError()
將會發射onError()
。例如如下圖所看到的:ide
咱們在處理多源時可能會帶來這樣一種場景:多從個Observables接收數據,處理它們。而後將它們合併成一個新的可觀測序列來使用。RxJava有一個特殊的方法可以完畢:zip()
合併兩個或者多個Observables發射出的數據項,依據指定的函數Func*
變換它們。併發射一個新值。下圖展現了zip()
方法怎樣處理髮射的「numbers」和「letters」而後將它們合併一個新的數據項:函數
對於「真實世界」的樣例來講,咱們將使用已安裝的應用列表和一個新的動態的Observable來讓樣例變得有點有趣味。
Observable<Long> tictoc = Observable.interval(1, TimeUnit.SECONDS);
tictoc
Observable變量使用interval()
函數每秒生成一個Long類型的數據:簡單且高效。正如以前所說的,咱們需要一個Func
對象。
因爲它需要傳兩個參數,因此是Func2
:
private AppInfo updateTitle(AppInfoappInfo, Long time) {
appInfo.setName(time + " " + appInfo.getName());
return appInfo;
}
現在咱們的loadList()
函數變成這樣:
private void loadList(List<AppInfo> apps) {
mRecyclerView.setVisibility(View.VISIBLE);
Observable<AppInfo> observableApp = Observable.from(apps);
Observable<Long> tictoc = Observable.interval(1, TimeUnit.SECONDS);
Observable.zip(observableApp, tictoc,
(AppInfo appInfo, Long time) -> updateTitle(appInfo, time))
.observeOn(AndroidSchedulers.mainThread())
.subscribe(new Observer<AppInfo>() {
@Override
public void onCompleted() {
Toast.makeText(getActivity(), "Here is the list!", Toast.LENGTH_LONG).show();
}
@Override
public void onError(Throwable e) {
mSwipeRefreshLayout.setRefreshing(false);
Toast.makeText(getActivity(), "Something went wrong!", Toast.LENGTH_SHORT).show();
}
@Override
public void onNext(AppInfoappInfo) {
if (mSwipeRefreshLayout.isRefreshing()) {
mSwipeRefreshLayout.setRefreshing(false);
}
mAddedApps.add(appInfo);
int position = mAddedApps.size() - 1;
mAdapter.addApplication(position, appInfo);
mRecyclerView.smoothScrollToPosition(position);
}
});
}
正如你看到的那樣。zip()
函數有三個參數:兩個Observables和一個Func2
。
細緻一看會發現observeOn()
函數。
它將在下一章中解說:現在咱們可以小試一下。
結果例如如下:
前面兩個方法。zip()
和merge()
方法做用在發射數據的範疇內,在決定怎樣操做值以前有些場景咱們需要考慮時間的。RxJava的join()
函數基於時間窗體將兩個Observables發射的數據結合在一塊兒。
爲了正確的理解上一張圖。咱們解釋下join()
需要的參數:
Func1
參數:在指定的由時間窗體定義時間間隔內,源Observable發射的數據和從第二個Observable發射的數據相互配合返回的Observable。Func1
參數:在指定的由時間窗體定義時間間隔內,第二個Observable發射的數據和從源Observable發射的數據相互配合返回的Observable。Func2
參數:定義已發射的數據怎樣與新發射的數據項相結合。loadList()
函數像如下這樣:private void loadList(List<AppInfo> apps) {
mRecyclerView.setVisibility(View.VISIBLE);
Observable<AppInfo> appsSequence =
Observable.interval(1000, TimeUnit.MILLISECONDS)
.map(position -> {
return apps.get(position.intValue());
});
Observable<Long> tictoc = Observable.interval(1000,TimeUnit.MILLISECONDS);
appsSequence.join(
tictoc,
appInfo -> Observable.timer(2,TimeUnit.SECONDS),
time -> Observable.timer(0, TimeUnit.SECONDS),
this::updateTitle)
.observeOn(AndroidSchedulers.mainThread())
.take(10)
.subscribe(new Observer<AppInfo>() {
@Override
public void onCompleted() {
Toast.makeText(getActivity(), "Here is the list!", Toast.LENGTH_LONG).show();
}
@Override
public void onError(Throwable e) {
mSwipeRefreshLayout.setRefreshing(false);
Toast.makeText(getActivity(), "Something went wrong!", Toast.LENGTH_SHORT).show();
}
@Override
public void onNext(AppInfoappInfo) {
if (mSwipeRefreshLayout.isRefreshing()) {
mSwipeRefreshLayout.setRefreshing(false);
}
mAddedApps.add(appInfo);
int position = mAddedApps.size() - 1;
mAdapter.addApplication(position, appInfo);
mRecyclerView.smoothScrollToPosition(position);
}
});
}
咱們有一個新的對象appsSequence
,它是一個每秒從咱們已安裝的app列表發射app數據的可觀測序列。
tictoc
這個Observable數據每秒僅僅發射一個新的Long
型整數。爲了合併它們,咱們需要指定兩個Func1
變量:
appInfo -> Observable.timer(2, TimeUnit.SECONDS)
time -> Observable.timer(0, TimeUnit.SECONDS)
上面描寫敘述了兩個時間窗體。
如下一行描寫敘述咱們怎樣使用Func2
將兩個發射的數據結合在一塊兒。
this::updateTitle
結果例如如下:
它看起來有點亂,但是注意app的名字和咱們指定的時間窗體,咱們可以看到:一旦第二個數據發射了咱們就會將它與源數據結合,但咱們用同一個源數據有2秒鐘。這就是爲何標題反覆數字累加的緣由。
值得一提的是,爲了簡單起見,也有一個join()
操做符做用於字符串而後簡單的和發射的字符串鏈接成終於的字符串。
RxJava的combineLatest()
函數有點像zip()
函數的特殊形式。
正如咱們已經學習的,zip()
做用於近期未打包的兩個Observables。相反。combineLatest()
做用於近期發射的數據項:假設Observable1
發射了A並且Observable2
發射了B和C,combineLatest()
將會分組處理AB和AC,例如如下圖所看到的:
combineLatest()
函數接受二到九個Observable做爲參數,假設有需要的話或者單個Observables列表做爲參數。
從以前的樣例中把loadList()
函數借用過來,咱們可以改動一下來用於combineLatest()
實現「真實世界」這個樣例:
private void loadList(List<AppInfo> apps) {
mRecyclerView.setVisibility(View.VISIBLE);
Observable<AppInfo> appsSequence = Observable.interval(1000, TimeUnit.MILLISECONDS)
.map(position ->apps.get(position.intValue()));
Observable<Long> tictoc = Observable.interval(1500, TimeUnit.MILLISECONDS);
Observable.combineLatest(appsSequence, tictoc,
this::updateTitle)
.observeOn(AndroidSchedulers.mainThread())
.subscribe(new Observer<AppInfo>() {
@Override
public void onCompleted() {
Toast.makeText(getActivity(), "Here is the list!", Toast.LENGTH_LONG).show();
}
@Override
public void onError(Throwable e) {
mSwipeRefreshLayout.setRefreshing(false);
Toast.makeText(getActivity(), "Something went wrong!", Toast.LENGTH_SHORT).show();
}
@Override
public void onNext(AppInfoappInfo) {
if (mSwipeRefreshLayout.isRefreshing()) {
mSwipeRefreshLayout.setRefreshing(false);
}
mAddedApps.add(appInfo);
int position = mAddedApps.size() - 1;
mAdapter.addApplication(position, appInfo);
mRecyclerView.smoothScrollToPosition(position);
}
});
}
這咱們使用了兩個Observables:一個是每秒鐘從咱們已安裝的應用列表發射一個App數據,第二個是每隔1.5秒發射一個Long
型整數。
咱們將他們結合起來並運行updateTitle()
函數,結果例如如下:
正如你看到的。因爲不一樣的時間間隔,AppInfo
對象如咱們所預料的那樣有時候會反覆。
在未來另外一些zip()
知足不了的場景。
如複雜的架構,或者是僅僅爲了我的愛好。你可以使用And/Then/When解決方式。
它們在RxJava的joins包下,使用Pattern和Plan做爲中介,將發射的數據集合併到一塊兒。
咱們的loadList()
函數將會被改動從這樣:
private void loadList(List<AppInfo> apps) {
mRecyclerView.setVisibility(View.VISIBLE);
Observable<AppInfo> observableApp = Observable.from(apps);
Observable<Long> tictoc = Observable.interval(1, TimeUnit.SECONDS);
Pattern2<AppInfo, Long> pattern = JoinObservable.from(observableApp).and(tictoc);
Plan0<AppInfo> plan = pattern.then(this::updateTitle);
JoinObservable
.when(plan)
.toObservable()
.observeOn(AndroidSchedulers.mainThread())
.subscribe(new Observer<AppInfo>() {
@Override
public void onCompleted() {
Toast.makeText(getActivity(), "Here is the list!", Toast.LENGTH_LONG).show();
}
@Override
public void onError(Throwable e) {
mSwipeRefreshLayout.setRefreshing(false);
Toast.makeText(getActivity(), "Something went wrong!", Toast.LENGTH_SHORT).show();
}
@Override
public void onNext(AppInfoappInfo) {
if (mSwipeRefreshLayout.isRefreshing()) {
mSwipeRefreshLayout.setRefreshing(false);
}
mAddedApps.add(appInfo);
int position = mAddedApps.size() - 1;
mAdapter.addApplication(position, appInfo); mRecyclerView.smoothScrollToPosition(position);
}
});
}
和一般同樣。咱們有兩個發射的序列。observableApp
,發射咱們安裝的應用列表數據。tictoc
每秒發射一個Long
型整數。現在咱們用and()
鏈接源Observable和第二個Observable。
JoinObservable.from(observableApp).and(tictoc);
這裏建立一個pattern
對象,使用這個對象咱們可以建立一個Plan
對象:」咱們有兩個發射數據的Observables,then()
是作什麼的?」
pattern.then(this::updateTitle);
現在咱們有了一個Plan
對象並且當plan發生時咱們可以決定接下來發生的事情。
.when(plan).toObservable()
這時候,咱們可以訂閱新的Observable,正如咱們老是作的那樣。
有這樣一個複雜的場景就是在一個subscribe-unsubscribe
的序列裏咱們可以從一個Observable本身主動取消訂閱來訂閱一個新的Observable。
RxJava的switch()
,正如定義的,將一個發射多個Observables的Observable轉換成另外一個單獨的Observable。後者發射那些Observables近期發射的數據項。
給出一個發射多個Observables序列的源Observable,switch()
訂閱到源Observable而後開始發射由第一個發射的Observable發射的同樣的數據。當源Observable發射一個新的Observable時,switch()
立刻取消訂閱前一個發射數據的Observable(所以打斷了從它那裏發射的數據流)而後訂閱一個新的Observable,並開始發射它的數據。
咱們已經學到怎樣鏈接多個Observables並追加指定的值到一個發射序列裏。
RxJava的startWith()
是concat()
的相應部分。正如concat()
向發射數據的Observable追加數據那樣,在Observable開始發射他們的數據以前。 startWith()
經過傳遞一個參數來先發射一個數據序列。
這章中。咱們學習了怎樣將兩個或者不少其它個Observable結合來建立一個新的可觀測序列。
咱們將可以merge
Observable。join
Observables 。zip
Observables 並在幾種狀況下把他們結合在一塊兒。
下一章。咱們將介紹調度器,它將很是easy的幫助咱們建立主線程以及提升咱們應用程序的性能。咱們也將學習怎樣正確的運行長任務或者I/O任務來得到更好的性能。