關於RxJava在業務上的一些思考

最近在工做中,頻繁的使用了Rxjava來解決一些問題,在使用過程當中也給予了本身一些思考,如何使用好RxJava,在什麼樣的場景中才能發揮它更好的做用,如何脫離代碼來理解RxJava的工做機制,下面是本身一些淺顯的思考。java

示例

太多示例喜歡鏈式的把RxJava的流程表述起來,這個地方我把觀察者和訂閱者拆開來看。服務器

Observable<String> observable = Observable.create(new ObservableOnSubscribe<String>() {
            @Override
            public void subscribe(ObservableEmitter<String> emitter) throws Exception {
                emitter.onNext("123");
            }
        });

 Observer observer = new Observer<String>() {
            ...
            @Override
            public void onNext(String s) {
                Log.i("TAG", "onNext: " + s);
            }
        };

observable.subscribe(observer);
複製代碼

這個簡單的例子你們應該都知道,只要subcribe產生了訂閱,onNext方法將會收到 emitter.onNext("123"); 發射出去的數據。app

這個地方讓我產生思考主要是有一次去吃自助餐,你們在打酸奶的時候,都會拿着一個杯子對準出口,而後按住開關,酸奶就會自動流到杯子中。在這個過程當中,咱們不妨把酸奶機看作 Observable ,酸奶機裏面的酸奶是許許多多的 emitter.onNext("123") ,按住開關的那一刻產生了 subscribe 訂閱,而後咱們是用杯子 Observer 去接牛奶的,固然,咱們還有橙子機、酸梅湯機等,則機子內盛的飲料類型就是 Observable<String> 。咱們知道,酸奶機有不少個開關入口,這時候,又來一我的,拿着杯子Observer來打牛奶,那麼,我和他是一塊共享這酸奶機裏面的酸奶,咱們倆都能接收到酸奶,等咱們不須要接酸奶了,咱們就dispose關閉開關。異步

eg:ide

Observable<String> observable = Observable.create(new ObservableOnSubscribe<String>() {
            @Override
            public void subscribe(ObservableEmitter<String> observableEmitter) throws Exception {
                //模擬耗時任務
                for (int i = 0; i < 5; i++) {
                    observableEmitter.onNext(""+i);
                }
            }
        }).subscribeOn(Schedulers.io());
//杯子1
observable.subscribe(observer1);
//杯子2
observable.subscribe(observer2);
複製代碼

result:post

observer1 onNext: 0
observer2 onNext: 0
observer1 onNext: 1
observer2 onNext: 1
observer1 onNext: 2
observer2 onNext: 2
observer1 onNext: 3
observer2 onNext: 3
observer1 onNext: 4
observer2 onNext: 4
複製代碼

事件驅動的思考

以前在思考事件驅動這一塊,如何更好的通知其餘業務組件,業界比較有名的當屬EventBus,但EventBus用起來很雜亂無章,當項目規模大起來,業務複雜起來時,都不敢修改這個post,雖然解耦了,但事件變得更亂了,因此,本身從新思考了事件驅動這一塊。spa

鑑於EventBus提供的的思路,我打算用RxJava的方式來實現。以酸奶機爲例,當前頁面我想訂閱一個事件,等待被觸發,我徹底能夠先準備一個杯子(Observer),而後將他們存到一個集合裏面,待酸奶機(Observable)裏面有酸奶了(observableEmitter.onNext),而後訂閱(subcribe)這個杯子的集合,將酸奶倒到杯子裏,鑑於此思路,用代碼大體的實現下。設計

List<Observer> list = new ArrayList<>();
    
    //註冊事件
    public void registerObserver(Observer observer) {
        list.add(observer);
    }

    //驅動事件
    public void postEvent() {
        Observable<String> observable = Observable.create(new ObservableOnSubscribe<String>() {
            @Override
            public void subscribe(ObservableEmitter<String> emitter) throws Exception {
                emitter.onNext("123");
            }
        });

        for (int i = 0; i < list.size(); i++) {
            observable.subscribeOn(Schedulers.io()).subscribe(list.get(i));
        }
    }

    @Test
    public void Test() {
        Observer observer = new Observer<String>() {
            ...
            @Override
            public void onNext(String s) {
                System.out.println("onNext: " + s);
            }

        };
        //註冊事件
        registerObserver(observer);

        //發送事件
        postEvent();
    }
複製代碼

這裏只給了大體的思路,用了一個平時均可見的例子來實現了事件驅動。code

異步回調的思考

最近有一個業務場景,須要監聽RTK當前狀態的變化,業務場景是:cdn

image.png | left | 747x182

有一個前提,RTK必須先設置帳戶,才能使用後續的服務。若是用戶按照應用準則走,進入主頁後,先去設置頁面設置RTK,而後回到主頁,進入任務執行頁,這時候監聽RTK state是可用的;但若是用戶忘了設置的步驟,或是有強迫症的用戶,我就不往你提示的方式走,我就要先進任務執行員頁,這時候RTK State監聽是不可用的,咱們會引導用戶進入設置頁面設置RTK,這個地方又要分狀況,若是用戶設置好RTK帳戶,而後返回了任務頁,那麼任務頁的RTK state監聽到了用戶設置了帳戶就會返回可用,那麼此次任務是可以使用的;但若是用戶設置好了帳戶,想去診斷RTK當前鏈接的狀態的話,RTK state監聽事件就會被診斷RTK頁面給設置,也就意味着,任務頁的RTK state監聽就會失效,那麼返回任務頁的話,任務頁是不會有任何反應的。但這一塊也是有解決辦法的,就是任務頁的RTK state監聽放在 onResume 方法裏面,即設置頁面返回任務頁後,觸發任務頁的 onResume 方法,從新奪回RTK state的監聽。 辦法都是有的,但依賴生命週期去作到這點,感受並非特別的可靠,咱們能夠參考上面事件驅動的例子,將RTK state當作是酸奶機,而後哪一個頁面(杯子)須要RTK state信息的話,就能夠訂閱(subcribe)酸奶機,若是想要牛奶的話,就post一個信息出去,告訴酸奶機我要酸奶,下面,我給出一份示例:

Set<Observer> set = new HashSet<>();
    //模擬一個RTK state 單例
    public Observable getObservableInstance() {
        return Observable.create(new ObservableOnSubscribe<RTKState>() {
            @Override
            public void subscribe(ObservableEmitter<RTKState> emitter) throws Exception {
                RTK rtk = DjiSettingUtils.getRTK();
                rtk.setStateCallback(new RTKState.Callback() {
                    @Override
                    public void onUpdate(@NonNull RTKState rtkState) {
                        emitter.onNext(rtkState);
                    }
                });
            }
        });
    }

    //驅動事件
    public void postEvent(Observer observer) {
        if (!set.contains(observer)) {
            getObservableInstance().subscribeOn(Schedulers.io()).subscribe(observer);
        }
    }

    @Test
    public void onCreate() {
        Observer observer = new Observer<RTKState>() {
            ...
            @Override
            public void onNext(RTKState s) {
                System.out.println("onNext: " + s.isRTKBeingUsed());
            }
        };
        //發送事件
        postEvent(observer);
    }
複製代碼

以後,咱們只須要關注 onCreate 方法,在任務頁咱們發起一個訂閱事件,接收RTK state信息,在診斷頁面也發起一個訂閱,接收RTK信息,這樣就不會像上面那樣,搶斷監聽事件的問題。

多圖上傳的思考

業務場景中有須要從無人機中讀取縮略圖,並將縮略圖上傳至服務器,圖片上傳咱們使用的是七牛雲,由於一次任務產生的縮略圖很是多,基本上都在百張左右,咱們不可能爲了在上傳過程當中,由於某些緣由致使了斷開了,讓用戶從新上傳全部的縮略圖,因此,咱們打算讓百張縮略圖採用順序上傳,當哪一個節點發生錯誤的時候,記住index,等用戶點擊從新上傳時,咱們再從index的位置繼續上傳,若是按照傳統方式來作的話,第一張上傳成功後,如何通知第二張上傳呢,我這裏給個大體的代碼:

List<File> list=new ArrayList<>();
int index=0;

public void uploadPic(){
    uploadManager.put(list.get(index), key, token, new UpCompletionHandler() {
                @Override
                public void complete(String key, ResponseInfo info, JSONObject res) {
                    if (info.isOK()) {
                       index++;
                       uploadPic()
                    } else {
                        //彈框提示用戶,當前index上傳失敗
                    }
                }
            }, null);
}  
@Test
public void test(){
    uploadPic()
}


複製代碼

每次上傳成功後都調用自身的方法,若是上傳失敗了,則記住index的位置,提示用戶,用戶點擊重試上傳,那麼就繼續調用 uploadPic 方法,上傳的拿到的文件仍是從index位置開始拿,因此,也是沒有任何問題的。 可是,總以爲這麼設計不那麼優雅,好比我想知道上傳進度的話,那也就意味着我須要在index++方法下面加一個設置進度條的功能,那若是業務須要再加一個上傳完成的操做的話,那是否是又要在index++下面多加一個 index==list.size() 的判斷呢,其實,這樣設計下去的話,整個上傳功能就變得特別的鬆散,移植性也不強,因此,是時候發揮RxJava的 Observer 了。

鑑於異步回調的思考,我打算把上傳任務封裝成一個 ObservableOnSubcribe ,每次執行任務成功後,就將事件流onNext交給下游,告訴他我完成了一次上傳,若是上傳失敗了,則發射onError異常。

public class QiNiuBitmapOnSubscribe implements ObservableOnSubscribe<QiniuParam> {
    ...
    @Override
    public void subscribe(final ObservableEmitter<QiniuParam> emitter) throws Exception {
        //上傳操做
        uploadManager.put(file, key, token, new UpCompletionHandler() {
            @Override
            public void complete(String key, ResponseInfo info, JSONObject res) {
                if (info.isOK()) {
                    emitter.onNext(new QiniuParam(key, info, res));
                    emitter.onComplete();
                } else {
                    emitter.onError(new ServerException(-1, res.toString()));
                }
            }
        }, null);
    }
}
複製代碼

因爲圖片是存儲在一個集合中,那麼就確定要用到RxJava的 fromIterable 來遍歷集合,因爲須要保證圖片是有序上傳,就須要用到 concatMap 操做符 , 因此,大體代碼以下

Observable.fromIterable(fileList)
                .concatMap(new Function<QiNiuFile, ObservableSource<QiniuParam>>() {
                    @Override
                    public ObservableSource<QiniuParam> apply(QiNiuFile qiniuFile) throws Exception {
                    //返回七牛雲上傳
                    return Observable.create(new QiNiuFileOnSubscribe(uploadManager,
                                    qiniuFile.getFile(), qiniuFile.getKey(), qiniuFile.getUploadToken()));
                    }
                }).subscribe(new Observer<QiniuParam>() {
             ...
            @Override
            public void onNext(QiniuParam qiniuParam) {
                index++;
                //通知上傳進度
                uploadCallBack.onUploadProcess(index);
            }

            @Override
            public void onError(Throwable e) {
                //通知斷傳的位置
                uploadCallBack.onUploadQiNiuError(index);
            }

            @Override
            public void onComplete() {
               //上傳成功
                uploadCallBack.onUploadQiNiuComplete();
            }
        });
複製代碼

對於 Observer 來講,他是一個乾淨的接收流,他不關心上游發生的事情,只專一結果的處理。

思考

以上思考有的地方可能不是特別的完善,還須要多思考,RxJava用的人確實不少,但要想玩的溜的話,確實任重而道遠。

相關文章
相關標籤/搜索