【知識整理】這多是最好的RxJava 2.x 入門教程(二)

 

 

這多是最好的RxJava 2.x入門教程系列專欄

文章連接:

這多是最好的RxJava 2.x 入門教程(完結版)【強力推薦】

這多是最好的RxJava 2.x 入門教程(一)

這多是最好的RxJava 2.x 入門教程(二)

這多是最好的RxJava 2.x 入門教程(三)

GitHub 代碼同步更新:https://github.com/nanchen2251/RxJava2Examples

爲了知足你們的飢渴難耐,GitHub將同步更新代碼,主要包含基本的代碼封裝,RxJava 2.x全部操做符應用場景介紹和實際應用場景,後期除了RxJava可能還會增添其餘東西,總之,GitHub上的Demo專爲你們傾心打造。傳送門:https://github.com/nanchen2251/RxJava2Exampleshtml

 

1、前言

       很快咱們就迎來了第二期,上一期咱們主要講解了 RxJava 1.x 到 2.x 的變化概覽,相信各位熟練掌握RxJava 1.x的老司機們隨便看一下變化概覽就能夠上手RxJava 2.x了,但爲了知足更廣大的年輕一代司機(將來也是老司機),在本節中,咱們將學習RxJava 2.x 強大的操做符章節。react

     【注】如下全部操做符標題均可直接點擊進入官方doc查看。git

2、正題

一、Create

      create操做符應該是最多見的操做符了,主要用於產生一個Obserable被觀察者對象,爲了方便你們的認知,之後的教程中統一把被觀察者Observable稱爲發射器(上游事件),觀察者Observer稱爲接收器(下游事件)。github

     

 

 1 Observable.create(new ObservableOnSubscribe<Integer>() {
 2             @Override
 3             public void subscribe(@NonNull ObservableEmitter<Integer> e) throws Exception {
 4                 mRxOperatorsText.append("Observable emit 1" + "\n");
 5                 Log.e(TAG, "Observable emit 1" + "\n");
 6                 e.onNext(1);
 7                 mRxOperatorsText.append("Observable emit 2" + "\n");
 8                 Log.e(TAG, "Observable emit 2" + "\n");
 9                 e.onNext(2);
10                 mRxOperatorsText.append("Observable emit 3" + "\n");
11                 Log.e(TAG, "Observable emit 3" + "\n");
12                 e.onNext(3);
13                 e.onComplete();
14                 mRxOperatorsText.append("Observable emit 4" + "\n");
15                 Log.e(TAG, "Observable emit 4" + "\n" );
16                 e.onNext(4);
17             }
18         }).subscribe(new Observer<Integer>() {
19             private int i;
20             private Disposable mDisposable;
21 
22             @Override
23             public void onSubscribe(@NonNull Disposable d) {
24                 mRxOperatorsText.append("onSubscribe : " + d.isDisposed() + "\n");
25                 Log.e(TAG, "onSubscribe : " + d.isDisposed() + "\n" );
26                 mDisposable = d;
27             }
28 
29             @Override
30             public void onNext(@NonNull Integer integer) {
31                 mRxOperatorsText.append("onNext : value : " + integer + "\n");
32                 Log.e(TAG, "onNext : value : " + integer + "\n" );
33                 i++;
34                 if (i == 2) {
35                     // 在RxJava 2.x 中,新增的Disposable能夠作到切斷的操做,讓Observer觀察者再也不接收上游事件
36                     mDisposable.dispose();
37                     mRxOperatorsText.append("onNext : isDisposable : " + mDisposable.isDisposed() + "\n");
38                     Log.e(TAG, "onNext : isDisposable : " + mDisposable.isDisposed() + "\n");
39                 }
40             }
41 
42             @Override
43             public void onError(@NonNull Throwable e) {
44                 mRxOperatorsText.append("onError : value : " + e.getMessage() + "\n");
45                 Log.e(TAG, "onError : value : " + e.getMessage() + "\n" );
46             }
47 
48             @Override
49             public void onComplete() {
50                 mRxOperatorsText.append("onComplete" + "\n");
51                 Log.e(TAG, "onComplete" + "\n" );
52             }
53         });

輸出:app

須要注意的幾點是:dom

1)在發射事件中,咱們在發射了數值3以後,直接調用了e.onComlete(),雖然沒法接收事件,但發送事件仍是繼續的。ide

2) 另一個值得注意的點是,在RxJava 2.x中,能夠看到發射事件方法相比1.x多了一個throws Excetion,意味着咱們作一些特定操做不再用try-catch了。函數

3) 而且2.x 中有一個Disposable概念,這個東西能夠直接調用切斷,能夠看到,當它的isDisposed()返回爲false的時候,接收器能正常接收事件,但當其爲true的時候,接收器中止了接收。因此能夠經過此參數動態控制接收事件了。學習

 

二、Map

Map基本算是RxJava中一個最簡單的操做符了,熟悉RxJava 1.x的知道,它的做用是對發射時間發送的每個事件應用一個函數,是的每個事件都按照指定的函數去變化,而在2.x中它的做用幾乎一致。spa

 1 Observable.create(new ObservableOnSubscribe<Integer>() {
 2             @Override
 3             public void subscribe(@NonNull ObservableEmitter<Integer> e) throws Exception {
 4                 e.onNext(1);
 5                 e.onNext(2);
 6                 e.onNext(3);
 7             }
 8         }).map(new Function<Integer, String>() {
 9             @Override
10             public String apply(@NonNull Integer integer) throws Exception {
11                 return "This is result " + integer;
12             }
13         }).subscribe(new Consumer<String>() {
14             @Override
15             public void accept(@NonNull String s) throws Exception {
16                 mRxOperatorsText.append("accept : " + s +"\n");
17                 Log.e(TAG, "accept : " + s +"\n" );
18             }
19         });

輸出:

是的,map基本做用就是將一個Observable經過某種函數關係,轉換爲另外一種Observable,上面例子中就是把咱們的Integer數據變成了String類型。從Log日誌顯而易見。

三、Zip

zip專用於合併事件,該合併非鏈接(鏈接操做符後面會說),而是兩兩配對,也就意味着,最終配對出的Observable發射事件數目只和少的那個相同。

 

 1 Observable.zip(getStringObservable(), getIntegerObservable(), new BiFunction<String, Integer, String>() {
 2             @Override
 3             public String apply(@NonNull String s, @NonNull Integer integer) throws Exception {
 4                 return s + integer;
 5             }
 6         }).subscribe(new Consumer<String>() {
 7             @Override
 8             public void accept(@NonNull String s) throws Exception {
 9                 mRxOperatorsText.append("zip : accept : " + s + "\n");
10                 Log.e(TAG, "zip : accept : " + s + "\n");
11             }
12         });
 1 private Observable<String> getStringObservable() {
 2         return Observable.create(new ObservableOnSubscribe<String>() {
 3             @Override
 4             public void subscribe(@NonNull ObservableEmitter<String> e) throws Exception {
 5                 if (!e.isDisposed()) {
 6                     e.onNext("A");
 7                     mRxOperatorsText.append("String emit : A \n");
 8                     Log.e(TAG, "String emit : A \n");
 9                     e.onNext("B");
10                     mRxOperatorsText.append("String emit : B \n");
11                     Log.e(TAG, "String emit : B \n");
12                     e.onNext("C");
13                     mRxOperatorsText.append("String emit : C \n");
14                     Log.e(TAG, "String emit : C \n");
15                 }
16             }
17         });
18     }
19 
20     private Observable<Integer> getIntegerObservable() {
21         return Observable.create(new ObservableOnSubscribe<Integer>() {
22             @Override
23             public void subscribe(@NonNull ObservableEmitter<Integer> e) throws Exception {
24                 if (!e.isDisposed()) {
25                     e.onNext(1);
26                     mRxOperatorsText.append("Integer emit : 1 \n");
27                     Log.e(TAG, "Integer emit : 1 \n");
28                     e.onNext(2);
29                     mRxOperatorsText.append("Integer emit : 2 \n");
30                     Log.e(TAG, "Integer emit : 2 \n");
31                     e.onNext(3);
32                     mRxOperatorsText.append("Integer emit : 3 \n");
33                     Log.e(TAG, "Integer emit : 3 \n");
34                     e.onNext(4);
35                     mRxOperatorsText.append("Integer emit : 4 \n");
36                     Log.e(TAG, "Integer emit : 4 \n");
37                     e.onNext(5);
38                     mRxOperatorsText.append("Integer emit : 5 \n");
39                     Log.e(TAG, "Integer emit : 5 \n");
40                 }
41             }
42         });
43     }

輸出:

須要注意的是:

1) zip 組合事件的過程就是分別從發射器A和發射器B各取出一個事件來組合,而且一個事件只能被使用一次,組合的順序是嚴格按照事件發送的順序來進行的,因此上面截圖中,能夠看到,1永遠是和A 結合的,2永遠是和B結合的。

2) 最終接收器收到的事件數量是和發送器發送事件最少的那個發送器的發送事件數目相同,因此如截圖中,5很孤單,沒有人願意和它交往,孤獨終老的單身狗。

 

四、Concat

對於單一的把兩個發射器鏈接成一個發射器,雖然 zip 不能完成,但咱們仍是能夠自力更生,官方提供的 concat 讓咱們的問題獲得了完美解決。

1 Observable.concat(Observable.just(1,2,3), Observable.just(4,5,6))
2                 .subscribe(new Consumer<Integer>() {
3                     @Override
4                     public void accept(@NonNull Integer integer) throws Exception {
5                         mRxOperatorsText.append("concat : "+ integer + "\n");
6                         Log.e(TAG, "concat : "+ integer + "\n" );
7                     }
8                 });

輸出:

如圖,能夠看到。發射器B把本身的三個孩子送給了發射器A,讓他們組合成了一個新的發射器,很是懂事的孩子,有條不紊的排序接收。

 

五、FlatMap

FlatMap 是一個頗有趣的東西,我堅信你在實際開發中會常常用到。它能夠把一個發射器Observable 經過某種方法轉換爲多個Observables,而後再把這些分散的Observables裝進一個單一的發射器Observable。但有個須要注意的是,flatMap並不能保證事件的順序,若是須要保證,須要用到咱們下面要講的ConcatMap。

 

 1  Observable.create(new ObservableOnSubscribe<Integer>() {
 2             @Override
 3             public void subscribe(@NonNull ObservableEmitter<Integer> e) throws Exception {
 4                 e.onNext(1);
 5                 e.onNext(2);
 6                 e.onNext(3);
 7             }
 8         }).flatMap(new Function<Integer, ObservableSource<String>>() {
 9             @Override
10             public ObservableSource<String> apply(@NonNull Integer integer) throws Exception {
11                 List<String> list = new ArrayList<>();
12                 for (int i = 0; i < 3; i++) {
13                     list.add("I am value " + integer);
14                 }
15                 int delayTime = (int) (1 + Math.random() * 10);
16                 return Observable.fromIterable(list).delay(delayTime, TimeUnit.MILLISECONDS);
17             }
18         }).subscribeOn(Schedulers.newThread())
19                 .observeOn(AndroidSchedulers.mainThread())
20                 .subscribe(new Consumer<String>() {
21                     @Override
22                     public void accept(@NonNull String s) throws Exception {
23                         Log.e(TAG, "flatMap : accept : " + s + "\n");
24                         mRxOperatorsText.append("flatMap : accept : " + s + "\n");
25                     }
26                 });

輸出:

一切都如咱們預期中的有意思,爲了區分concatMap(下一個會講),我在代碼中特地動了一點小手腳,我採用一個隨機數,生成一個時間,而後經過delay(後面會講)操做符,作一個小延時操做,而查看Log日誌也確認驗證了咱們上面的說法,它是無序的。

六、concatMap

上面其實就說了,concatMap 與 FlatMap 的惟一區別就是 concatMap 保證了順序,因此,咱們就直接把 flatMap 替換爲 concatMap 驗證吧。

 1 Observable.create(new ObservableOnSubscribe<Integer>() {
 2             @Override
 3             public void subscribe(@NonNull ObservableEmitter<Integer> e) throws Exception {
 4                 e.onNext(1);
 5                 e.onNext(2);
 6                 e.onNext(3);
 7             }
 8         }).concatMap(new Function<Integer, ObservableSource<String>>() {
 9             @Override
10             public ObservableSource<String> apply(@NonNull Integer integer) throws Exception {
11                 List<String> list = new ArrayList<>();
12                 for (int i = 0; i < 3; i++) {
13                     list.add("I am value " + integer);
14                 }
15                 int delayTime = (int) (1 + Math.random() * 10);
16                 return Observable.fromIterable(list).delay(delayTime, TimeUnit.MILLISECONDS);
17             }
18         }).subscribeOn(Schedulers.newThread())
19                 .observeOn(AndroidSchedulers.mainThread())
20                 .subscribe(new Consumer<String>() {
21                     @Override
22                     public void accept(@NonNull String s) throws Exception {
23                         Log.e(TAG, "flatMap : accept : " + s + "\n");
24                         mRxOperatorsText.append("flatMap : accept : " + s + "\n");
25                     }
26                 });

輸出:

結果的確和咱們預想的同樣。

 

3、寫在最後

       好了,這一節就先介紹到這裏,下一節咱們將學習其它的一些操做符,在操做符講完後再帶你們進入實際情景,但願持續關注,代碼傳送門

相關文章
相關標籤/搜索