RxJava2.0入門篇

傳統用法:開啓子線程去作耗時任務,業務邏輯越複雜,代碼嵌套越嚴重,Rx系列出來好久了,想本身作一個總結,但願能幫到一部分人緩存

觀察者模式先提一嘴

這個老模式簡直不想說太多,就說一下流程服務器

1建立被觀察者app

2建立觀察者ide

3被觀察者與觀察者進行綁定函數

4當被觀察者狀態改變,觀察者收到後作響應處理spa

第一步,RxJava建立被觀察者

第一種方法:經過Observable.create(ObservableOnSubscribe)線程

這裏邊的emitter來發射數據和信息code

二:經過Observable.just(參數);server

三:經過Observable.from();blog

第二部,建立觀察者

 1 Observer<Object> observer = new Observer<Object>() {
 2             @Override
 3             public void onSubscribe(Disposable d) {
 4                 //被訂閱時調用
 5             }
 6 
 7             @Override
 8             public void onNext(Object o) {
 9           //當被觀察者改變的時候調用的方法
10             }
11 
12             @Override
13             public void onError(Throwable e) {
14           //處理異常的方法
15             }
16 
17             @Override
18             public void onComplete() {
19           //再也不有新的事件的時候調用
20             }
21         };

訂閱

observable.subscribe(observer);

訂閱以後,代碼將依次調用observer的onSubscribe(),observable的subscribe(),observer的onNext與onComplete

一個簡單的模式就造成了

操做符

map -->把一個事件轉化成另外一個事件

舉個栗子:Integer轉String操做

Observable.create(new ObservableOnSubscribe<Integer>() {
            @Override
            public void subscribe(ObservableEmitter<Integer> e) throws Exception {
                Log.d(TAG, "subscribe: ");
                e.onNext(1);
                e.onNext(2);
                e.onNext(3);
            }
        }).map(new Function<Integer, String>() {
            @Override
            public String apply(Integer integer) throws Exception {
                String mapStr = String.valueOf(integer + 1);
                return mapStr;
            }
        }).subscribe(new Consumer<String>() {
            @Override
            public void accept(String s) throws Exception {
                Log.d(TAG, "accept: " + s);
            }
        });

flatMap -->flatMap是一個很是強大的操做符,flatMap將一個發送事件的上游Observable變換爲多個發送事件的Observables,而後將它們發射的事件合併後放進一個單獨的Observable裏,可是flatmap不能保證事件的順序

Observable.create(new ObservableOnSubscribe<Integer>() {
            @Override
            public void subscribe(ObservableEmitter<Integer> e) throws Exception {
                e.onNext(1);
                e.onNext(2);
                e.onNext(3);
            }
        }).flatMap(new Function<Integer, Observable<String>>() {
            @Override
            public Observable<String> apply(Integer integer) throws Exception {
                ArrayList<String> arrayList = new ArrayList<>();
                for (int i = 0; i < 5; i++) {
                    String iStr = "flatMap value" + integer;
                    arrayList.add(iStr);
                }
                return Observable.fromIterable(arrayList).delay(10, TimeUnit.MICROSECONDS);
            }
        }).subscribe(new Consumer<String>() {
            @Override
            public void accept(String s) throws Exception {
                Log.d(TAG, "accept: " + s);
            }
        });

 

concatMap -->做用和flatMap同樣,可是保證了順序

Observable.create(new ObservableOnSubscribe<Integer>() {
            @Override
            public void subscribe(ObservableEmitter<Integer> e) throws Exception {
                e.onNext(1);
                e.onNext(11);
                e.onNext(111);
            }
        }).concatMap(new Function<Integer, ObservableSource<String>>() {
            @Override
            public ObservableSource<String> apply(Integer integer) throws Exception {
                ArrayList<String> arrayList = new ArrayList<>();
                for (int i = 0; i < 3; i++) {
                    arrayList.add("concatMap value" + i + "integer" + integer);
                }
                return Observable.fromIterable(arrayList).delay(5, TimeUnit.MILLISECONDS);
            }
        }).subscribe(new Consumer<String>() {
            @Override
            public void accept(String s) throws Exception {
                Log.d(TAG, "accept: " + s);
            }
        });

Buffer -->

Buffer操做符會按期收集Observable的數據放進一個數據包裹,而後發射這些包裹,並非一次發射一個值
Buffer操做符將一個Observable變換爲另外一個,原來的Observable正常發射數據,變換產生的Observable發射這些數據的緩存集合。若是原來的Observable發射了一個onError通知,Buffer會當即傳遞這個通知,而不是首先發射緩存的數據。

scan -->
Scan連續地對數據序列的每一項應用一個函數,而後連續發射結果
Scan操做符對原始Observable發射的第一項數據應用一個函數,而後將這個函數的結果做爲本身的第一項數據發射。將函數的結果同第二項數據一塊兒填充給這個函數來產生本身的第二項數據。持續進行這個過程來產生剩餘的數據序列。
Observable.just(1,2,3,4,5).scan(new BiFunction<Integer, Integer, Integer>() {
            @Override
            public Integer apply(Integer integer, Integer integer2) throws Exception {
                return integer + integer2;
            }
        }).subscribe(new Consumer<Integer>() {
            @Override
            public void accept(Integer integer) throws Exception {
                Log.d(TAG, "accept: " + integer);
            }
        });

window -->

Window按期未來自原始Observable的數據分解爲一個Observable窗口,發射這些窗口而不是每次發射一項數據

window和Buffer相似,但不是發射來自原始Observable的數據包,發射的是Observables,這些Observables中的每個都發射原始Observable數據的一個子集,最後發射一個onComplete通知。

zip -->

ZIP經過一個函數將多個Observable發送的事件結合到一塊兒,而後發送這些組合到一塊兒的事件。按照嚴格的順序應用這個函數,只發射與發射項最少的那個Observable同樣多的數據,zip在Android中的使用,能夠適用於以下場景,一個界面須要展現用戶的一些信息,這些信息分別要從兩個服務器接口中獲取,只有當兩個數據都獲取後才能進行展現。這類同時的信息請求比較適用zip

        //第一個事件
        Observable<Integer> observable1 = Observable.range(1, 5);
        //第二個事件
        Observable<Integer> observable2 = Observable.range(6, 10);
        //合併事件
        Observable.zip(observable1, observable2, new BiFunction<Integer, Integer, String>() {
            @Override
            public String apply(Integer integer, Integer integer2) throws Exception {
                return String.valueOf(integer + integer2);
            }
        }).subscribe(new Consumer<String>() {
            @Override
            public void accept(String s) throws Exception {
                Log.d(TAG, "accept: " + s);
            }
        });
相關文章
相關標籤/搜索