【知識整理】這多是最好的RxJava 2.x 入門教程(五)

 

這多是最好的RxJava 2.x入門教程系列專欄

文章連接:

這多是最好的RxJava 2.x 入門教程(完結版)【強力推薦】

這多是最好的RxJava 2.x 入門教程(一)

這多是最好的RxJava 2.x 入門教程(二)

這多是最好的RxJava 2.x 入門教程(三)

這多是最好的RxJava 2.x 入門教程(四)

這多是最好的RxJava 2.x 入門教程(五)

GitHub 代碼同步更新:https://github.com/nanchen2251/RxJava2Examples

爲了知足你們的飢渴難耐,GitHub將同步更新代碼,主要包含基本的代碼封裝,RxJava 2.x全部操做符應用場景介紹和實際應用場景,後期除了RxJava可能還會增添其餘東西,總之,GitHub上的Demo專爲你們傾心打造。傳送門:https://github.com/nanchen2251/RxJava2Exampleshtml

 

1、前言

       終於如願來到讓我小夥伴們亢奮的 RxJava 2 使用場景舉例了,前面幾章中咱們講解完了 RxJava 1.x 到 RxJava 2.x 的異同以及 RxJava 2.x 的各類操做符使用,若有疑問,歡迎點擊上方的連接進入你想要的環節。git

2、正題

一、簡單的網絡請求

      想必你們都知道,不少時候咱們在使用 RxJava 的時候老是和 Retrofit 進行結合使用,而爲了方便演示,這裏咱們就暫且採用 OkHttp3 進行演示,配合 map,doOnNext ,線程切換進行簡單的網絡請求:github

      1)經過 Observable.create() 方法,調用 OkHttp 網絡請求;數據庫

      2)經過 map 操做符集合 gson,將 Response 轉換爲 bean 類;api

      3)經過 doOnNext() 方法,解析 bean 中的數據,並進行數據庫存儲等操做;緩存

      4) 調度線程,在子線程中進行耗時操做任務,在主線程中更新 UI ;網絡

      5) 經過 subscribe(),根據請求成功或者失敗來更新 UI 。app

Observable.create(new ObservableOnSubscribe<Response>() {
            @Override
            public void subscribe(@NonNull ObservableEmitter<Response> e) throws Exception {
                Builder builder = new Builder()
                        .url("http://api.avatardata.cn/MobilePlace/LookUp?key=ec47b85086be4dc8b5d941f5abd37a4e&mobileNumber=13021671512")
                        .get();
                Request request = builder.build();
                Call call = new OkHttpClient().newCall(request);
                Response response = call.execute();
                e.onNext(response);
            }
        }).map(new Function<Response, MobileAddress>() {
                    @Override
                    public MobileAddress apply(@NonNull Response response) throws Exception {

                        Log.e(TAG, "map 線程:" + Thread.currentThread().getName() + "\n");
                        if (response.isSuccessful()) {
                            ResponseBody body = response.body();
                            if (body != null) {
                                Log.e(TAG, "map:轉換前:" + response.body());
                                return new Gson().fromJson(body.string(), MobileAddress.class);
                            }
                        }
                        return null;
                    }
                }).observeOn(AndroidSchedulers.mainThread())
                .doOnNext(new Consumer<MobileAddress>() {
                    @Override
                    public void accept(@NonNull MobileAddress s) throws Exception {
                        Log.e(TAG, "doOnNext 線程:" + Thread.currentThread().getName() + "\n");
                        mRxOperatorsText.append("\ndoOnNext 線程:" + Thread.currentThread().getName() + "\n");
                        Log.e(TAG, "doOnNext: 保存成功:" + s.toString() + "\n");
                        mRxOperatorsText.append("doOnNext: 保存成功:" + s.toString() + "\n");

                    }
                }).subscribeOn(Schedulers.io())
                .observeOn(AndroidSchedulers.mainThread())
                .subscribe(new Consumer<MobileAddress>() {
                    @Override
                    public void accept(@NonNull MobileAddress data) throws Exception {
                        Log.e(TAG, "subscribe 線程:" + Thread.currentThread().getName() + "\n");
                        mRxOperatorsText.append("\nsubscribe 線程:" + Thread.currentThread().getName() + "\n");
                        Log.e(TAG, "成功:" + data.toString() + "\n");
                        mRxOperatorsText.append("成功:" + data.toString() + "\n");
                    }
                }, new Consumer<Throwable>() {
                    @Override
                    public void accept(@NonNull Throwable throwable) throws Exception {
                        Log.e(TAG, "subscribe 線程:" + Thread.currentThread().getName() + "\n");
                        mRxOperatorsText.append("\nsubscribe 線程:" + Thread.currentThread().getName() + "\n");

                        Log.e(TAG, "失敗:" + throwable.getMessage() + "\n");
                        mRxOperatorsText.append("失敗:" + throwable.getMessage() + "\n");
                    }
                });

    爲了方便,咱們後面的講解大部分採用開源的 Rx2AndroidNetworking 來處理,數據來源於天狗網等多個公共API接口。ide

mRxOperatorsText.append("RxNetworkActivity\n");
        Rx2AndroidNetworking.get("http://api.avatardata.cn/MobilePlace/LookUp?key=ec47b85086be4dc8b5d941f5abd37a4e&mobileNumber=13021671512")
                .build()
                .getObjectObservable(MobileAddress.class)
                .observeOn(AndroidSchedulers.mainThread()) // 爲doOnNext() 指定在主線程,不然報錯
                .doOnNext(new Consumer<MobileAddress>() {
                    @Override
                    public void accept(@NonNull MobileAddress data) throws Exception {
                        Log.e(TAG, "doOnNext:"+Thread.currentThread().getName()+"\n" );
                        mRxOperatorsText.append("\ndoOnNext:"+Thread.currentThread().getName()+"\n" );
                        Log.e(TAG,"doOnNext:"+data.toString()+"\n");
                        mRxOperatorsText.append("doOnNext:"+data.toString()+"\n");
                    }
                })
                .map(new Function<MobileAddress, ResultBean>() {
                    @Override
                    public ResultBean apply(@NonNull MobileAddress mobileAddress) throws Exception {
                        Log.e(TAG, "\n" );
                        mRxOperatorsText.append("\n");
                        Log.e(TAG, "map:"+Thread.currentThread().getName()+"\n" );
                        mRxOperatorsText.append("map:"+Thread.currentThread().getName()+"\n" );
                        return mobileAddress.getResult();
                    }
                })
                .subscribeOn(Schedulers.io())
                .observeOn(AndroidSchedulers.mainThread())
                .subscribe(new Consumer<ResultBean>() {
                    @Override
                    public void accept(@NonNull ResultBean data) throws Exception {
                        Log.e(TAG, "subscribe 成功:"+Thread.currentThread().getName()+"\n" );
                        mRxOperatorsText.append("\nsubscribe 成功:"+Thread.currentThread().getName()+"\n" );
                        Log.e(TAG, "成功:" + data.toString() + "\n");
                        mRxOperatorsText.append("成功:" + data.toString() + "\n");
                    }
                }, new Consumer<Throwable>() {
                    @Override
                    public void accept(@NonNull Throwable throwable) throws Exception {
                        Log.e(TAG, "subscribe 失敗:"+Thread.currentThread().getName()+"\n" );
                        mRxOperatorsText.append("\nsubscribe 失敗:"+Thread.currentThread().getName()+"\n" );
                        Log.e(TAG, "失敗:"+ throwable.getMessage()+"\n" );
                        mRxOperatorsText.append("失敗:"+ throwable.getMessage()+"\n");
                    }
                });

 

二、先讀取緩存,若是緩存沒數據再經過網絡請求獲取數據後更新UI

      想必在實際應用中,不少時候(對數據操做不敏感時)都須要咱們先讀取緩存的數據,若是緩存沒有數據,再經過網絡請求獲取,隨後在主線程更新咱們的UI。post

      concat 操做符簡直就是爲咱們這種需求量身定作。

      concat 能夠作到不交錯的發射兩個甚至多個 Observable 的發射事件,而且只有前一個 Observable 終止(onComplete) 後纔會定義下一個 Observable。

      利用這個特性,咱們就能夠先讀取緩存數據,假若獲取到的緩存數據不是咱們想要的,再調用 onComplete() 以執行獲取網絡數據的Observable,若是緩存數據能應咱們所需,則直接調用 onNext() ,防止過分的網絡請求,浪費用戶的流量。

Observable<FoodList> cache = Observable.create(new ObservableOnSubscribe<FoodList>() {
            @Override
            public void subscribe(@NonNull ObservableEmitter<FoodList> e) throws Exception {
                Log.e(TAG, "create當前線程:"+Thread.currentThread().getName() );
                FoodList data = CacheManager.getInstance().getFoodListData();

                // 在操做符 concat 中,只有調用 onComplete 以後纔會執行下一個 Observable
                if (data != null){ // 若是緩存數據不爲空,則直接讀取緩存數據,而不讀取網絡數據
                    isFromNet = false;
                    Log.e(TAG, "\nsubscribe: 讀取緩存數據:" );
                    runOnUiThread(new Runnable() {
                        @Override
                        public void run() {
                            mRxOperatorsText.append("\nsubscribe: 讀取緩存數據:\n");
                        }
                    });

                    e.onNext(data);
                }else {
                    isFromNet = true;
                    runOnUiThread(new Runnable() {
                        @Override
                        public void run() {
                            mRxOperatorsText.append("\nsubscribe: 讀取網絡數據:\n");
                        }
                    });
                    Log.e(TAG, "\nsubscribe: 讀取網絡數據:" );
                    e.onComplete();
                }


            }
        });

        Observable<FoodList> network = Rx2AndroidNetworking.get("http://www.tngou.net/api/food/list")
                .addQueryParameter("rows",10+"")
                .build()
                .getObjectObservable(FoodList.class);


        // 兩個 Observable 的泛型應當保持一致

        Observable.concat(cache,network)
                .subscribeOn(Schedulers.io())
                .observeOn(AndroidSchedulers.mainThread())
                .subscribe(new Consumer<FoodList>() {
                    @Override
                    public void accept(@NonNull FoodList tngouBeen) throws Exception {
                        Log.e(TAG, "subscribe 成功:"+Thread.currentThread().getName() );
                        if (isFromNet){
                            mRxOperatorsText.append("accept : 網絡獲取數據設置緩存: \n");
                            Log.e(TAG, "accept : 網絡獲取數據設置緩存: \n"+tngouBeen.toString() );
                            CacheManager.getInstance().setFoodListData(tngouBeen);
                        }

                        mRxOperatorsText.append("accept: 讀取數據成功:" + tngouBeen.toString()+"\n");
                        Log.e(TAG, "accept: 讀取數據成功:" + tngouBeen.toString());
                    }
                }, new Consumer<Throwable>() {
                    @Override
                    public void accept(@NonNull Throwable throwable) throws Exception {
                        Log.e(TAG, "subscribe 失敗:"+Thread.currentThread().getName() );
                        Log.e(TAG, "accept: 讀取數據失敗:"+throwable.getMessage() );
                        mRxOperatorsText.append("accept: 讀取數據失敗:"+throwable.getMessage()+"\n");
                    }
                });

      有時候咱們的緩存可能還會分爲 memory 和 disk ,實際上都差很少,無非是多寫點 Observable ,而後經過 concat 合併便可。

 

三、多個網絡請求依次依賴

      想必這種狀況也在實際狀況中比比皆是,例如用戶註冊成功後須要自動登陸,咱們只須要先經過註冊接口註冊用戶信息,註冊成功後立刻調用登陸接口進行自動登陸便可。

      咱們的 flatMap 剛好解決了這種應用場景,flatMap 操做符能夠將一個發射數據的 Observable 變換爲多個 Observables ,而後將它們發射的數據合併後放到一個單獨的 Observable,利用這個特性,咱們很輕鬆地達到了咱們的需求。

Rx2AndroidNetworking.get("http://www.tngou.net/api/food/list")
                .addQueryParameter("rows", 1 + "")
                .build()
                .getObjectObservable(FoodList.class) // 發起獲取食品列表的請求,並解析到FootList
                .subscribeOn(Schedulers.io())        // 在io線程進行網絡請求
                .observeOn(AndroidSchedulers.mainThread()) // 在主線程處理獲取食品列表的請求結果
                .doOnNext(new Consumer<FoodList>() {
                    @Override
                    public void accept(@NonNull FoodList foodList) throws Exception {
                        // 先根據獲取食品列表的響應結果作一些操做
                        Log.e(TAG, "accept: doOnNext :" + foodList.toString());
                        mRxOperatorsText.append("accept: doOnNext :" + foodList.toString()+"\n");
                    }
                })
                .observeOn(Schedulers.io()) // 回到 io 線程去處理獲取食品詳情的請求
                .flatMap(new Function<FoodList, ObservableSource<FoodDetail>>() {
                    @Override
                    public ObservableSource<FoodDetail> apply(@NonNull FoodList foodList) throws Exception {
                        if (foodList != null && foodList.getTngou() != null && foodList.getTngou().size() > 0) {
                            return Rx2AndroidNetworking.post("http://www.tngou.net/api/food/show")
                                    .addBodyParameter("id", foodList.getTngou().get(0).getId() + "")
                                    .build()
                                    .getObjectObservable(FoodDetail.class);
                        }
                        return null;

                    }
                })
                .observeOn(AndroidSchedulers.mainThread())
                .subscribe(new Consumer<FoodDetail>() {
                    @Override
                    public void accept(@NonNull FoodDetail foodDetail) throws Exception {
                        Log.e(TAG, "accept: success :" + foodDetail.toString());
                        mRxOperatorsText.append("accept: success :" + foodDetail.toString()+"\n");
                    }
                }, new Consumer<Throwable>() {
                    @Override
                    public void accept(@NonNull Throwable throwable) throws Exception {
                        Log.e(TAG, "accept: error :" + throwable.getMessage());
                        mRxOperatorsText.append("accept: error :" + throwable.getMessage()+"\n");
                    }
                });

 

四、結合多個接口的數據更新UI

     在實際應用中,咱們極有可能會在一個頁面顯示的數據來源於多個接口,這時候咱們的 zip 操做符爲咱們排憂解難。

     zip 操做符能夠將多個 Observable 的數據結合爲一個數據源再發射出去。

Observable<MobileAddress> observable1 = Rx2AndroidNetworking.get("http://api.avatardata.cn/MobilePlace/LookUp?key=ec47b85086be4dc8b5d941f5abd37a4e&mobileNumber=13021671512")
                .build()
                .getObjectObservable(MobileAddress.class);

        Observable<CategoryResult> observable2 = Network.getGankApi()
                .getCategoryData("Android",1,1);

        Observable.zip(observable1, observable2, new BiFunction<MobileAddress, CategoryResult, String>() {
            @Override
            public String apply(@NonNull MobileAddress mobileAddress, @NonNull CategoryResult categoryResult) throws Exception {
                return "合併後的數據爲:手機歸屬地:"+mobileAddress.getResult().getMobilearea()+"人名:"+categoryResult.results.get(0).who;
            }
        }).subscribeOn(Schedulers.io())
                .observeOn(AndroidSchedulers.mainThread())
                .subscribe(new Consumer<String>() {
                    @Override
                    public void accept(@NonNull String s) throws Exception {
                        Log.e(TAG, "accept: 成功:" + s+"\n");
                    }
                }, new Consumer<Throwable>() {
                    @Override
                    public void accept(@NonNull Throwable throwable) throws Exception {
                        Log.e(TAG, "accept: 失敗:" + throwable+"\n");
                    }
                });

五、間隔任務實現心跳

      想必即時通信等須要輪訓的任務在現在的 APP 中已經是很常見,而 RxJava 2.x 的 interval 操做符可謂完美地解決了咱們的疑惑。

      這裏就簡單的意思一下輪訓。

    private Disposable mDisposable;
    @Override
    protected void doSomething() {
        mDisposable = Flowable.interval(1, TimeUnit.SECONDS)
                .doOnNext(new Consumer<Long>() {
                    @Override
                    public void accept(@NonNull Long aLong) throws Exception {
                        Log.e(TAG, "accept: doOnNext : "+aLong );
                    }
                })
                .observeOn(AndroidSchedulers.mainThread())
                .subscribe(new Consumer<Long>() {
                    @Override
                    public void accept(@NonNull Long aLong) throws Exception {
                        Log.e(TAG, "accept: 設置文本 :"+aLong );
                        mRxOperatorsText.append("accept: 設置文本 :"+aLong +"\n");
                    }
                });
    }

    /**
     * 銷燬時中止心跳
     */
    @Override
    protected void onDestroy() {
        super.onDestroy();
        if (mDisposable != null){
            mDisposable.dispose();
        }
    }

3、後記

      姑且先講到這裏吧,喜歡的小夥伴別忘了關注和點贊哦~https://github.com/nanchen2251/RxJava2Examples

相關文章
相關標籤/搜索