RxJava開發精要6 - 組合Observables

上一章中。咱們學到怎樣轉換可觀測序列。咱們也看到了map(),scan(),groupBY(),以及不少其它實用的函數的實際樣例,它們幫助咱們操做Observable來建立咱們想要的Observable。java

本章中,咱們將研究組合函數並學習怎樣同一時候處理多個Observables來建立咱們想要的Observable。git

Merge

在異步的世界經常會建立這種場景。咱們有多個來源但是僅僅想有一個結果:多輸入,單輸出。RxJava的merge()方法將幫助你把兩個甚至不少其它的Observables合併到他們發射的數據裏。下圖給出了把兩個序列合併在一個終於發射的Observable。github

正如你看到的那樣,發射的數據被交叉合併到一個Observable裏面。markdown

注意假設你同步的合併Observable,它們將鏈接在一塊兒並且不會交叉。架構

像一般同樣。咱們用咱們的App和已安裝的App列表來建立了一個「真實世界」的樣例。咱們還需要第二個Observable。咱們可以建立一個單獨的應用列表而後逆序。固然沒有實際的意義,僅僅是爲了這個樣例。第二個列表,咱們的loadList()函數像如下這樣:併發

private void loadList(List<AppInfo> apps) {
    mRecyclerView.setVisibility(View.VISIBLE);
    List reversedApps = Lists.reverse(apps);
    Observable<AppInfo> observableApps =Observable.from(apps);
    Observable<AppInfo> observableReversedApps =Observable.from(reversedApps);
    Observable<AppInfo> mergedObserbable = Observable.merge(observableApps,observableReversedApps);

    mergedObserbable.subscribe(new Observer<AppInfo>(){
        @Override
        public void onCompleted() {
            mSwipeRefreshLayout.setRefreshing(false);
            Toast.makeText(getActivity(), "Here is the list!", Toast.LENGTH_LONG).show();
        }

        @Override
        public void onError(Throwable e) {
            Toast.makeText(getActivity(), "One of the two Observable threw an error!", Toast.LENGTH_SHORT).show();
            mSwipeRefreshLayout.setRefreshing(false);
        }

        @Override
        public void onNext(AppInfoappInfo) {
            mAddedApps.add(appInfo);
            mAdapter.addApplication(mAddedApps.size() - 1, appInfo);
        } 
    });
}

咱們建立了Observable和observableApps數據以及新的observableReversedApps逆序列表。使用Observable.merge(),咱們可以建立新的ObservableMergedObservable在單個可觀測序列中發射源Observables發出的所有數據。app

正如你能看到的,每個方法簽名都是同樣的,所以咱們的觀察者無需在乎不論什麼不一樣就可以複用代碼。結果例如如下:異步

注意錯誤時的toast消息。你可以以爲每個Observable拋出的錯誤將會打斷合併。假設你需要避免這種狀況。RxJava提供了mergeDelayError(),它能從一個Observable中繼續發射數據即使是當中有一個拋出了錯誤。當所有的Observables都完畢時,mergeDelayError()將會發射onError()。例如如下圖所看到的:ide

ZIP

咱們在處理多源時可能會帶來這樣一種場景:多從個Observables接收數據,處理它們。而後將它們合併成一個新的可觀測序列來使用。RxJava有一個特殊的方法可以完畢:zip()合併兩個或者多個Observables發射出的數據項,依據指定的函數Func*變換它們。併發射一個新值。下圖展現了zip()方法怎樣處理髮射的「numbers」和「letters」而後將它們合併一個新的數據項:函數

對於「真實世界」的樣例來講,咱們將使用已安裝的應用列表和一個新的動態的Observable來讓樣例變得有點有趣味。

Observable<Long> tictoc = Observable.interval(1, TimeUnit.SECONDS);

tictocObservable變量使用interval()函數每秒生成一個Long類型的數據:簡單且高效。正如以前所說的,咱們需要一個Func對象。

因爲它需要傳兩個參數,因此是Func2:

private AppInfo updateTitle(AppInfoappInfo, Long time) {
    appInfo.setName(time + " " + appInfo.getName());
    return appInfo;
}

現在咱們的loadList()函數變成這樣:

private void loadList(List<AppInfo> apps) {
    mRecyclerView.setVisibility(View.VISIBLE);
    Observable<AppInfo> observableApp = Observable.from(apps);

    Observable<Long> tictoc = Observable.interval(1, TimeUnit.SECONDS);

    Observable.zip(observableApp, tictoc,
    (AppInfo appInfo, Long time) -> updateTitle(appInfo, time))
    .observeOn(AndroidSchedulers.mainThread())
    .subscribe(new Observer<AppInfo>() {
        @Override
        public void onCompleted() {
            Toast.makeText(getActivity(), "Here is the list!", Toast.LENGTH_LONG).show();
        }

        @Override
        public void onError(Throwable e) {
            mSwipeRefreshLayout.setRefreshing(false);
            Toast.makeText(getActivity(), "Something went wrong!", Toast.LENGTH_SHORT).show();
        }

        @Override
        public void onNext(AppInfoappInfo) {
            if (mSwipeRefreshLayout.isRefreshing()) {
                mSwipeRefreshLayout.setRefreshing(false);
            } 
            mAddedApps.add(appInfo);
            int position = mAddedApps.size() - 1;
            mAdapter.addApplication(position, appInfo);
            mRecyclerView.smoothScrollToPosition(position);
        }
    });
}

正如你看到的那樣。zip()函數有三個參數:兩個Observables和一個Func2

細緻一看會發現observeOn()函數。

它將在下一章中解說:現在咱們可以小試一下。

結果例如如下:

Join

前面兩個方法。zip()merge()方法做用在發射數據的範疇內,在決定怎樣操做值以前有些場景咱們需要考慮時間的。RxJava的join()函數基於時間窗體將兩個Observables發射的數據結合在一塊兒。

爲了正確的理解上一張圖。咱們解釋下join()需要的參數:

  • 第二個Observable和源Observable結合。

  • Func1參數:在指定的由時間窗體定義時間間隔內,源Observable發射的數據和從第二個Observable發射的數據相互配合返回的Observable。
  • Func1參數:在指定的由時間窗體定義時間間隔內,第二個Observable發射的數據和從源Observable發射的數據相互配合返回的Observable。
  • Func2參數:定義已發射的數據怎樣與新發射的數據項相結合。

  • 例如如下練習的樣例。咱們可以改動loadList()函數像如下這樣:
private void loadList(List<AppInfo> apps) {
    mRecyclerView.setVisibility(View.VISIBLE);

    Observable<AppInfo> appsSequence =
    Observable.interval(1000, TimeUnit.MILLISECONDS)
                .map(position -> {
                    return apps.get(position.intValue());
                });

    Observable<Long> tictoc = Observable.interval(1000,TimeUnit.MILLISECONDS);

    appsSequence.join(
        tictoc, 
        appInfo -> Observable.timer(2,TimeUnit.SECONDS),
        time -> Observable.timer(0, TimeUnit.SECONDS),
        this::updateTitle)
        .observeOn(AndroidSchedulers.mainThread())
        .take(10)
        .subscribe(new Observer<AppInfo>() {
            @Override
            public void onCompleted() {
                Toast.makeText(getActivity(), "Here is the list!", Toast.LENGTH_LONG).show();
            }

            @Override
            public void onError(Throwable e) {
                mSwipeRefreshLayout.setRefreshing(false); 
                Toast.makeText(getActivity(), "Something went wrong!", Toast.LENGTH_SHORT).show();
            }

            @Override
            public void onNext(AppInfoappInfo) {
                if (mSwipeRefreshLayout.isRefreshing()) {
                    mSwipeRefreshLayout.setRefreshing(false);
                } 
                mAddedApps.add(appInfo);
                int position = mAddedApps.size() - 1;
                mAdapter.addApplication(position, appInfo);
                mRecyclerView.smoothScrollToPosition(position);
            } 
        });
}

咱們有一個新的對象appsSequence,它是一個每秒從咱們已安裝的app列表發射app數據的可觀測序列。

tictoc這個Observable數據每秒僅僅發射一個新的Long型整數。爲了合併它們,咱們需要指定兩個Func1變量:

appInfo -> Observable.timer(2, TimeUnit.SECONDS)

time -> Observable.timer(0, TimeUnit.SECONDS)

上面描寫敘述了兩個時間窗體。

如下一行描寫敘述咱們怎樣使用Func2將兩個發射的數據結合在一塊兒。

this::updateTitle

結果例如如下:

它看起來有點亂,但是注意app的名字和咱們指定的時間窗體,咱們可以看到:一旦第二個數據發射了咱們就會將它與源數據結合,但咱們用同一個源數據有2秒鐘。這就是爲何標題反覆數字累加的緣由。

值得一提的是,爲了簡單起見,也有一個join()操做符做用於字符串而後簡單的和發射的字符串鏈接成終於的字符串。

combineLatest

RxJava的combineLatest()函數有點像zip()函數的特殊形式。

正如咱們已經學習的,zip()做用於近期未打包的兩個Observables。相反。combineLatest()做用於近期發射的數據項:假設Observable1發射了A並且Observable2發射了B和C,combineLatest()將會分組處理AB和AC,例如如下圖所看到的:

combineLatest()函數接受二到九個Observable做爲參數,假設有需要的話或者單個Observables列表做爲參數。

從以前的樣例中把loadList()函數借用過來,咱們可以改動一下來用於combineLatest()實現「真實世界」這個樣例:

private void loadList(List<AppInfo> apps) {
    mRecyclerView.setVisibility(View.VISIBLE);
    Observable<AppInfo> appsSequence = Observable.interval(1000, TimeUnit.MILLISECONDS)
              .map(position ->apps.get(position.intValue()));
    Observable<Long> tictoc = Observable.interval(1500, TimeUnit.MILLISECONDS);
    Observable.combineLatest(appsSequence, tictoc,
               this::updateTitle)
       .observeOn(AndroidSchedulers.mainThread())
        .subscribe(new Observer<AppInfo>() {

        @Override
        public void onCompleted() {
            Toast.makeText(getActivity(), "Here is the list!", Toast.LENGTH_LONG).show();
        }

        @Override
        public void onError(Throwable e) {
            mSwipeRefreshLayout.setRefreshing(false);
            Toast.makeText(getActivity(), "Something went wrong!", Toast.LENGTH_SHORT).show();
        }

        @Override
        public void onNext(AppInfoappInfo) {
            if (mSwipeRefreshLayout.isRefreshing()) {
                mSwipeRefreshLayout.setRefreshing(false);
            } 
            mAddedApps.add(appInfo);
            int position = mAddedApps.size() - 1;
            mAdapter.addApplication(position, appInfo);
            mRecyclerView.smoothScrollToPosition(position);
        } 
    });
}

這咱們使用了兩個Observables:一個是每秒鐘從咱們已安裝的應用列表發射一個App數據,第二個是每隔1.5秒發射一個Long型整數。

咱們將他們結合起來並運行updateTitle()函數,結果例如如下:

正如你看到的。因爲不一樣的時間間隔,AppInfo對象如咱們所預料的那樣有時候會反覆。

And,Then和When

在未來另外一些zip()知足不了的場景。

如複雜的架構,或者是僅僅爲了我的愛好。你可以使用And/Then/When解決方式。

它們在RxJava的joins包下,使用Pattern和Plan做爲中介,將發射的數據集合併到一塊兒。

咱們的loadList()函數將會被改動從這樣:

private void loadList(List<AppInfo> apps) {

    mRecyclerView.setVisibility(View.VISIBLE);

    Observable<AppInfo> observableApp = Observable.from(apps);

    Observable<Long> tictoc = Observable.interval(1, TimeUnit.SECONDS);

    Pattern2<AppInfo, Long> pattern = JoinObservable.from(observableApp).and(tictoc); 

    Plan0<AppInfo> plan = pattern.then(this::updateTitle);

    JoinObservable
        .when(plan)
        .toObservable()
        .observeOn(AndroidSchedulers.mainThread())
        .subscribe(new Observer<AppInfo>() {

            @Override
            public void onCompleted() {
                Toast.makeText(getActivity(), "Here is the list!", Toast.LENGTH_LONG).show();
            }

            @Override
            public void onError(Throwable e) {
                mSwipeRefreshLayout.setRefreshing(false); 
                Toast.makeText(getActivity(), "Something went wrong!", Toast.LENGTH_SHORT).show();
            }

            @Override
            public void onNext(AppInfoappInfo) {
                if (mSwipeRefreshLayout.isRefreshing()) { 
                mSwipeRefreshLayout.setRefreshing(false);
                } 
                mAddedApps.add(appInfo);
                int position = mAddedApps.size() - 1;
                mAdapter.addApplication(position, appInfo); mRecyclerView.smoothScrollToPosition(position);
            } 
        });
}

和一般同樣。咱們有兩個發射的序列。observableApp,發射咱們安裝的應用列表數據。tictoc每秒發射一個Long型整數。現在咱們用and()鏈接源Observable和第二個Observable。

JoinObservable.from(observableApp).and(tictoc);

這裏建立一個pattern對象,使用這個對象咱們可以建立一個Plan對象:」咱們有兩個發射數據的Observables,then()是作什麼的?」

pattern.then(this::updateTitle);

現在咱們有了一個Plan對象並且當plan發生時咱們可以決定接下來發生的事情。

.when(plan).toObservable()

這時候,咱們可以訂閱新的Observable,正如咱們老是作的那樣。

Switch

有這樣一個複雜的場景就是在一個subscribe-unsubscribe的序列裏咱們可以從一個Observable本身主動取消訂閱來訂閱一個新的Observable。

RxJava的switch(),正如定義的,將一個發射多個Observables的Observable轉換成另外一個單獨的Observable。後者發射那些Observables近期發射的數據項。

給出一個發射多個Observables序列的源Observable,switch()訂閱到源Observable而後開始發射由第一個發射的Observable發射的同樣的數據。當源Observable發射一個新的Observable時,switch()立刻取消訂閱前一個發射數據的Observable(所以打斷了從它那裏發射的數據流)而後訂閱一個新的Observable,並開始發射它的數據。

StartWith

咱們已經學到怎樣鏈接多個Observables並追加指定的值到一個發射序列裏。

RxJava的startWith()concat()的相應部分。正如concat()向發射數據的Observable追加數據那樣,在Observable開始發射他們的數據以前。 startWith()經過傳遞一個參數來先發射一個數據序列。

總結

這章中。咱們學習了怎樣將兩個或者不少其它個Observable結合來建立一個新的可觀測序列。

咱們將可以merge Observable。join Observables 。zip Observables 並在幾種狀況下把他們結合在一塊兒。

下一章。咱們將介紹調度器,它將很是easy的幫助咱們建立主線程以及提升咱們應用程序的性能。咱們也將學習怎樣正確的運行長任務或者I/O任務來得到更好的性能。

相關文章
相關標籤/搜索