RxJava操做符在android中的使用場景詳解(二)

轉載請註明出處:http://www.wangxinarhat.com/2016/05/01/2016-05-01-rxjava-android-operate2/java

最近比較忙,也沒想好這個文章該怎麼寫下去。可能會比較水,不過作事不能有始無終,因此繼續吧。android

場景五:BehaviorSubject操做符的使用(橋樑)

使用場景:製做緩存git

效果圖github

圖片描述

代碼:緩存

  • 緩存管理類網絡

    public class DataCache {        
                /**
                 * 讀取磁盤緩存數據
                 */
                public List<ImageInfoBean> readData() {
                    ...
                }
            
                /**
                 * 寫緩存
                 */
                public void writeData(List<ImageInfoBean> list) {
                    ...       
                }
            
                /**
                 * 刪除緩存
                 */
                public boolean deleteCache() {
                    ...
                }
            }
  • 數據管理類併發

    public class Data {
        private static Data instance;
        private static final int DATA_SOURCE_MEMORY = 1;//內存
        private static final int DATA_SOURCE_DISK = 2;//硬盤
        private static final int DATA_SOURCE_NETWORK = 3;//網絡
        BehaviorSubject<List<ImageInfoBean>> cache;
        private int dataSource;    
        
        private Data() {
        }    
        
        public static Data newInstance() {
            if (instance == null) {
                instance = new Data();
            }
            return instance;
        }    
        private void setDataSource(@DataSource int dataSource) {
            this.dataSource = dataSource;
        }    
        
        public String getDataSourceText() {
            int dataSourceTextRes;
            switch (dataSource) {
                case DATA_SOURCE_MEMORY:
                    dataSourceTextRes = R.string.data_source_memory;
                    break;
                case DATA_SOURCE_DISK:
                    dataSourceTextRes = R.string.data_source_disk;
                    break;
                case DATA_SOURCE_NETWORK:
                    dataSourceTextRes = R.string.data_source_network;
                    break;
                default:
                    dataSourceTextRes = R.string.data_source_network;
            }
            return BaseApplication.getApplication().getString(dataSourceTextRes);
        }    
        /**
         * 請求網絡數據
         */
        public void loadData() {    
            Network.getGankApi()
                    .getBeauties(80, 1)
                    .map(BeautyResult2Beautise.newInstance())
                    .doOnNext(new Action1<List<ImageInfoBean>>() {
                        @Override
                        public void call(List<ImageInfoBean> list) {
                            DataCache.newInstance().writeData(list);
                        }
                    })
                    .subscribe(new Action1<List<ImageInfoBean>>() {
                        @Override
                        public void call(List<ImageInfoBean> list) {
                            cache.onNext(list);
                        }
                    }, new Action1<Throwable>() {
                        @Override
                        public void call(Throwable throwable) {
                            throwable.printStackTrace();
                        }
                    });    
        }    
        /**
         * 獲取數據
         * @param observer
         * @return
         */
        public Subscription subscribeData(@Nullable Observer<List<ImageInfoBean>> observer) {    
            if (null == cache) {
                cache = BehaviorSubject.create();
                Observable.create(new Observable.OnSubscribe<List<ImageInfoBean>>() {
                    @Override
                    public void call(Subscriber<? super List<ImageInfoBean>> subscriber) {
                        //從緩存獲取數據
                        List<ImageInfoBean> list = DataCache.newInstance().readData();    
                        if (null == list) {
                            setDataSource(DATA_SOURCE_NETWORK);
                            //請求網絡數據
                            loadData();
                        } else {
                            setDataSource(DATA_SOURCE_DISK);
                            subscriber.onNext(list);
                        }    
                    }
                })
                        .subscribeOn(Schedulers.io()).subscribe(cache);    
            } else {
                //內存中獲取的數據
                setDataSource(DATA_SOURCE_MEMORY);
            }    
            return cache.observeOn(AndroidSchedulers.mainThread()).subscribe(observer);    
        }    
        /**
         * 清空內存
         */
        public void clearMemoryCache() {
            cache = null;
        }    
        /**
         * 清空內存和硬盤數據
         */
        public void clearMemoryAndDiskCache() {
            clearMemoryCache();
            DataCache.newInstance().deleteCache();
        }
    }
  • 獲取數據dom

    @OnClick(R.id.load)
    public void onClick() {
        startTime = System.currentTimeMillis();
        swipeRefreshLayout.setRefreshing(true);
        unsubscribe();
        subscription = Data.newInstance()
                .subscribeData(getObserver());
    }
  • 在觀察者中進行獲取數據結果的處理ide

    private Observer<List<ImageInfoBean>> getObserver() {
        if (null == observer) {
            observer = new Observer<List<ImageInfoBean>>() {
                @Override
                public void onCompleted() {
                }
                @Override
                public void onError(Throwable e) {
                    Toast.makeText(getActivity(), R.string.loading_failed, Toast.LENGTH_SHORT).show();
                }
                @Override
                public void onNext(List<ImageInfoBean> list) {
                    swipeRefreshLayout.setRefreshing(false);
                    int loadingTime = (int) (System.currentTimeMillis() - startTime);
                    dataSituation.setText(getString(R.string.loading_time_and_source, loadingTime, Data.newInstance().getDataSourceText()));
                    adapter.setImages(list);
                }
            };
        }
        return observer;
    }

詳解 函數

Subject能夠當作是一個橋樑或者代理,在RxJava中同時充當了Observer和Observable的角色。由於它是一個Observer,它能夠訂閱一個或多個Observable;又由於它是一個Observable,它能夠轉發它收到(Observe)的數據,也能夠發射新的數據。

場景六:retryWhen操做符的使用(錯誤處理)

使用場景:有的 token 並不是一次性的,而是能夠屢次使用,直到它超時或被銷燬(多數 token 都是這樣的)。

這樣的 token 處理起來比較麻煩:須要把它保存起來,而且在發現它失效的時候要可以自動從新獲取新的 token >並繼續訪問以前因爲 token 失效而失敗的請求。

若是項目中有多處的接口請求都須要這樣的自動修復機制,使用傳統的 Callback 形式須要寫出很是複雜的代碼。
而使用 RxJava ,能夠用 retryWhen() 來輕鬆地處理這樣的問題。

效果圖

Token API準備

因爲找不到足夠簡單的用於示例的 token API,如下API是代碼僞造的

/**
     * Created by wangxinarhat on 16-4-5.
     * TokenApi
     */
    public class TokenApi {
    
        /**
         * 獲取Observable
         * @param auth
         * @return
         */
        public static Observable<Token> getToken(@NonNull String auth) {
            return Observable.just(auth).map(new Func1<String, Token>() {
                @Override
                public Token call(String s) {
    
                    try {
                        Thread.sleep(new Random().nextInt(600) + 600);
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    }
    
                    Token token = new Token();
                    token.token = createToken();
    
                    return token;
                }
            });
        }
    
        /**
         * 隨機生成token
         * @return
         */
        private static String createToken() {
    
            return "token_wangxinarhat_" + System.currentTimeMillis() % 1000;
        }
    
    
        /**
         * 根據Token獲取用戶數據
         * @param token
         * @return
         */
        public static Observable<DataInfo> getData(@NonNull Token token) {
            return Observable.just(token).map(new Func1<Token, DataInfo>() {
                @Override
                public DataInfo call(Token token) {
    
                    try {
                        Thread.sleep(new Random().nextInt(600) + 600);
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    }
    
                    if (token.isInvalid) {
                        throw new IllegalArgumentException("Token is invalid");
                    }
                    DataInfo dataInfo = new DataInfo();
                    dataInfo.id = (int) (System.currentTimeMillis() % 1000);
                    dataInfo.name = "USER_" + dataInfo.id;
    
                    return dataInfo;
                }
            });
        }
    }

Token

/**
     *Token類
     */
    public class Token {
        public String token;
        public boolean isInvalid;//token是否失效    

        public Token(boolean isInvalid) {
            this.isInvalid = isInvalid;
        }    

        public Token() {
        }
    }

用戶數據

/**
     * Created by wangxinarhat on 16-4-5.
     * 用戶數據
     */
    public class DataInfo {
        public int id;
        public String name;
    }

操做符的使用

  • 根據token請求數據

    @OnClick(R.id.requestBt)
    void request() {
        tokenUpdated = false;
        swipeRefreshLayout.setRefreshing(true);
        unsubscribe();
        final TokenApi tokenApi = new TokenApi();
        subscription = Observable.just(null).flatMap(new Func1<Object, rx.Observable<DataInfo>>() {
            @Override
            public Observable<DataInfo> call(Object o) {
    
                return null == cachedFakeToken.token ?
                        Observable.<DataInfo>error(new NullPointerException("token id null")) :
                        tokenApi.getData(cachedFakeToken);
            }
        }).retryWhen(new Func1<Observable<? extends Throwable>, Observable<?>>() {
            @Override
            public Observable<?> call(Observable<? extends Throwable> observable) {
                return observable.flatMap(new Func1<Throwable, Observable<?>>() {
                    @Override
                    public Observable<?> call(Throwable throwable) {
                        if (throwable instanceof IllegalArgumentException || throwable instanceof NullPointerException) {
                            return tokenApi.getToken("flat_map")
                                    .doOnNext(new Action1<Token>() {
                                        @Override
                                        public void call(Token token) {
                                            tokenUpdated = true;
                                            cachedFakeToken.token = token.token;
                                            cachedFakeToken.isInvalid = token.isInvalid;
                                        }
                                    });
                        }
                        return Observable.just(throwable);
                    }
                });
            }
        }).subscribeOn(Schedulers.io())
                .observeOn(AndroidSchedulers.mainThread())
                .subscribe(new Action1<DataInfo>() {
                    @Override
                    public void call(DataInfo dataInfo) {
                        swipeRefreshLayout.setRefreshing(false);
                        String token = cachedFakeToken.token;
                        if (tokenUpdated) {
                            token += "(" + getString(R.string.updated) + ")";
                        }
                        tokenTv.setText(String.format(getString(R.string.got_token_and_data), token, dataInfo.id, dataInfo.name));
                    }
                }, new Action1<Throwable>() {
                    @Override
                    public void call(Throwable throwable) {
                        swipeRefreshLayout.setRefreshing(false);
                        Toast.makeText(getActivity(), R.string.loading_failed, Toast.LENGTH_SHORT).show();
                    }
                });
    }
  • 銷燬token

    @OnClick(R.id.invalidateTokenBt)
    void incalidate() {
        cachedFakeToken.isInvalid = true;
        Toast.makeText(getActivity(), R.string.token_expired, Toast.LENGTH_SHORT).show();
    }

詳解

若是原始Observable遇到錯誤,從新訂閱它指望它能正常終止。

retryWhen操做符不會將原始Observable的onError通知傳遞給觀察者,它會訂閱這個Observable,再給它一次機會無錯誤地完成它的數據序列。

retryWhen老是傳遞onNext通知給觀察者,因爲從新訂閱,可能會形成數據項重複。

不管收到多少次onError通知,無參數版本的retryWhen都會繼續訂閱併發射原始Observable。
接受單個count參數的retryWhen會最多從新訂閱指定的次數,若是次數超了,它不會嘗試再次訂閱,它會把最新的一個onError通知傳遞給它的觀察者。

還有一個版本的retryWhen接受一個謂詞函數做爲參數,這個函數的兩個參數是:重試次數和致使發射onError通知的Throwable。這個函數返回一個布爾值,若是返回true,retryWhen應該再次訂閱和鏡像原始的Observable,若是返回false,retryWhen會將最新的一個onError通知傳遞給它的觀察者。
retryWhen操做符默認在trampoline調度器上執行。

場景七:Debounce操做符的使用(過濾)

使用場景

實時搜索,若是在EditText中監聽到字符改變就發起請求數據,明顯不合適。
有了Debounce操做符,僅在過了指定的一段時間還沒發射數據時才發射一個數據,Debounce操做符會過濾掉髮射速率過快的數據項,優化網絡請求

效果圖

圖片描述

代碼

  • 配合jakewharton大神的rxbinding使用,獲取可觀察對象

    @Override
    public void onActivityCreated(@Nullable Bundle savedInstanceState) {
        super.onActivityCreated(savedInstanceState);
        setLogger();
        //使用rxbing給EditText註冊字符改變事件
        subscription = RxTextView.textChangeEvents(input)
                .debounce(500, TimeUnit.MILLISECONDS)//設置發射時間間隔
                .observeOn(AndroidSchedulers.mainThread())
                .subscribe(getObserver());
    }
  • 在觀察者中進行結果處理

    /**
     * 獲取觀察者
     * @return
     */
    private Observer<? super TextViewTextChangeEvent> getObserver() {
    
        return new Observer<TextViewTextChangeEvent>() {
            @Override
            public void onCompleted() {
            }
    
            @Override
            public void onError(Throwable e) {
            }
            @Override
            public void onNext(TextViewTextChangeEvent textViewTextChangeEvent) {
                //獲得搜索關鍵字,進行網絡請求
                log(String.format("搜索關鍵字 : %s", textViewTextChangeEvent.text().toString()));
            }
        };
    }
  • 更新adapter數據集

    /**
     * 更新adapter數據集
     *
     * @param logMsg
     */
    private void log(String logMsg) {
    
        if (isCurrentlyOnMainThread()) {
            mLogs.add(0, logMsg + " (main thread) ");
            mAdapter.notifyDataSetChanged();
    
        } else {
            mLogs.add(0, logMsg + " (NOT main thread) ");
            new Handler(Looper.getMainLooper()).post(new Runnable() {
                @Override
                public void run() {
                    mAdapter.notifyDataSetChanged();
                }
            });
    
        }
    }

詳解

Debounce僅在過了一段指定的時間還沒發射數據時才發射一個數據,會根據設置的時間間隔過濾掉髮射速率過快的數據項。

場景八:Buffer操做符的使用(變換)

按期收集Observable的數據放進一個數據包裹,而後發射這些數據包裹,而不是一次發射一個值。
這個操做符,我暫時尚未比較好的使用場景,不過既然是能夠按期收集數據,那麼應該能夠作指定時間內點擊次數等之類的統計。

效果圖

圖片描述

代碼

  • 仍是使用jakewharton大神的rxbinding,註冊點擊事件獲取可觀察對象

    @Override
    public void onActivityCreated(@Nullable Bundle savedInstanceState) {
        super.onActivityCreated(savedInstanceState);
        setLogger();
        subscription = RxView.clicks(btn)
                .map(new Func1<Void, Integer>() {
                    @Override
                    public Integer call(Void aVoid) {
                        log("點擊一次");
                        return 1;
                    }
                })
                .buffer(3, TimeUnit.SECONDS)//設置收集數據時間間隔爲3s
                .observeOn(AndroidSchedulers.mainThread())
                .subscribe(getObserver());
    }

詳解

Buffer操做符將一個Observable變換爲另外一個,原來的Observable正常發射數據,變換產生的Observable發射這些數據的緩存集合。

還有就是:若是原來的Observable發射了一個onError通知,Buffer會當即傳遞這個通知,而不是首先發射緩存的數據,即便在這以前緩存中包含了原始Observable發射的數據。

小結

由於我也才嘗試使用rx,這篇終於擠出來了,好難。。代碼在這裏

若是又學到新的使用場景,仍是會再寫。

說明

我是從國內rx大神扔物線,還有github上star數最多的那位哥們兒(kaushikgopal)學習的。由於他們都沒有很詳細的說明操做符的使用,因此纔想寫這個文章。

如想深刻學習,請看大神代碼。

相關文章
相關標籤/搜索