最近在工做中,頻繁的使用了Rxjava來解決一些問題,在使用過程當中也給予了本身一些思考,如何使用好RxJava,在什麼樣的場景中才能發揮它更好的做用,如何脫離代碼來理解RxJava的工做機制,下面是本身一些淺顯的思考。java
太多示例喜歡鏈式的把RxJava的流程表述起來,這個地方我把觀察者和訂閱者拆開來看。服務器
Observable<String> observable = Observable.create(new ObservableOnSubscribe<String>() {
@Override
public void subscribe(ObservableEmitter<String> emitter) throws Exception {
emitter.onNext("123");
}
});
Observer observer = new Observer<String>() {
...
@Override
public void onNext(String s) {
Log.i("TAG", "onNext: " + s);
}
};
observable.subscribe(observer);
複製代碼
這個簡單的例子你們應該都知道,只要subcribe產生了訂閱,onNext方法將會收到 emitter.onNext("123");
發射出去的數據。app
這個地方讓我產生思考主要是有一次去吃自助餐,你們在打酸奶的時候,都會拿着一個杯子對準出口,而後按住開關,酸奶就會自動流到杯子中。在這個過程當中,咱們不妨把酸奶機看作 Observable
,酸奶機裏面的酸奶是許許多多的 emitter.onNext("123")
,按住開關的那一刻產生了 subscribe
訂閱,而後咱們是用杯子 Observer
去接牛奶的,固然,咱們還有橙子機、酸梅湯機等,則機子內盛的飲料類型就是 Observable<String>
。咱們知道,酸奶機有不少個開關入口,這時候,又來一我的,拿着杯子Observer
來打牛奶,那麼,我和他是一塊共享這酸奶機裏面的酸奶,咱們倆都能接收到酸奶,等咱們不須要接酸奶了,咱們就dispose關閉開關。異步
eg:ide
Observable<String> observable = Observable.create(new ObservableOnSubscribe<String>() {
@Override
public void subscribe(ObservableEmitter<String> observableEmitter) throws Exception {
//模擬耗時任務
for (int i = 0; i < 5; i++) {
observableEmitter.onNext(""+i);
}
}
}).subscribeOn(Schedulers.io());
//杯子1
observable.subscribe(observer1);
//杯子2
observable.subscribe(observer2);
複製代碼
result:post
observer1 onNext: 0
observer2 onNext: 0
observer1 onNext: 1
observer2 onNext: 1
observer1 onNext: 2
observer2 onNext: 2
observer1 onNext: 3
observer2 onNext: 3
observer1 onNext: 4
observer2 onNext: 4
複製代碼
以前在思考事件驅動這一塊,如何更好的通知其餘業務組件,業界比較有名的當屬EventBus,但EventBus用起來很雜亂無章,當項目規模大起來,業務複雜起來時,都不敢修改這個post,雖然解耦了,但事件變得更亂了,因此,本身從新思考了事件驅動這一塊。spa
鑑於EventBus提供的的思路,我打算用RxJava的方式來實現。以酸奶機爲例,當前頁面我想訂閱一個事件,等待被觸發,我徹底能夠先準備一個杯子(Observer),而後將他們存到一個集合裏面,待酸奶機(Observable)裏面有酸奶了(observableEmitter.onNext),而後訂閱(subcribe)這個杯子的集合,將酸奶倒到杯子裏,鑑於此思路,用代碼大體的實現下。設計
List<Observer> list = new ArrayList<>();
//註冊事件
public void registerObserver(Observer observer) {
list.add(observer);
}
//驅動事件
public void postEvent() {
Observable<String> observable = Observable.create(new ObservableOnSubscribe<String>() {
@Override
public void subscribe(ObservableEmitter<String> emitter) throws Exception {
emitter.onNext("123");
}
});
for (int i = 0; i < list.size(); i++) {
observable.subscribeOn(Schedulers.io()).subscribe(list.get(i));
}
}
@Test
public void Test() {
Observer observer = new Observer<String>() {
...
@Override
public void onNext(String s) {
System.out.println("onNext: " + s);
}
};
//註冊事件
registerObserver(observer);
//發送事件
postEvent();
}
複製代碼
這裏只給了大體的思路,用了一個平時均可見的例子來實現了事件驅動。code
最近有一個業務場景,須要監聽RTK當前狀態的變化,業務場景是:cdn
有一個前提,RTK必須先設置帳戶,才能使用後續的服務。若是用戶按照應用準則走,進入主頁後,先去設置頁面設置RTK,而後回到主頁,進入任務執行頁,這時候監聽RTK state是可用的;但若是用戶忘了設置的步驟,或是有強迫症的用戶,我就不往你提示的方式走,我就要先進任務執行員頁,這時候RTK State監聽是不可用的,咱們會引導用戶進入設置頁面設置RTK,這個地方又要分狀況,若是用戶設置好RTK帳戶,而後返回了任務頁,那麼任務頁的RTK state監聽到了用戶設置了帳戶就會返回可用,那麼此次任務是可以使用的;但若是用戶設置好了帳戶,想去診斷RTK當前鏈接的狀態的話,RTK state監聽事件就會被診斷RTK頁面給設置,也就意味着,任務頁的RTK state監聽就會失效,那麼返回任務頁的話,任務頁是不會有任何反應的。但這一塊也是有解決辦法的,就是任務頁的RTK state監聽放在 onResume
方法裏面,即設置頁面返回任務頁後,觸發任務頁的 onResume
方法,從新奪回RTK state的監聽。 辦法都是有的,但依賴生命週期去作到這點,感受並非特別的可靠,咱們能夠參考上面事件驅動的例子,將RTK state當作是酸奶機,而後哪一個頁面(杯子)須要RTK state信息的話,就能夠訂閱(subcribe)酸奶機,若是想要牛奶的話,就post一個信息出去,告訴酸奶機我要酸奶,下面,我給出一份示例:
Set<Observer> set = new HashSet<>();
//模擬一個RTK state 單例
public Observable getObservableInstance() {
return Observable.create(new ObservableOnSubscribe<RTKState>() {
@Override
public void subscribe(ObservableEmitter<RTKState> emitter) throws Exception {
RTK rtk = DjiSettingUtils.getRTK();
rtk.setStateCallback(new RTKState.Callback() {
@Override
public void onUpdate(@NonNull RTKState rtkState) {
emitter.onNext(rtkState);
}
});
}
});
}
//驅動事件
public void postEvent(Observer observer) {
if (!set.contains(observer)) {
getObservableInstance().subscribeOn(Schedulers.io()).subscribe(observer);
}
}
@Test
public void onCreate() {
Observer observer = new Observer<RTKState>() {
...
@Override
public void onNext(RTKState s) {
System.out.println("onNext: " + s.isRTKBeingUsed());
}
};
//發送事件
postEvent(observer);
}
複製代碼
以後,咱們只須要關注 onCreate
方法,在任務頁咱們發起一個訂閱事件,接收RTK state信息,在診斷頁面也發起一個訂閱,接收RTK信息,這樣就不會像上面那樣,搶斷監聽事件的問題。
業務場景中有須要從無人機中讀取縮略圖,並將縮略圖上傳至服務器,圖片上傳咱們使用的是七牛雲,由於一次任務產生的縮略圖很是多,基本上都在百張左右,咱們不可能爲了在上傳過程當中,由於某些緣由致使了斷開了,讓用戶從新上傳全部的縮略圖,因此,咱們打算讓百張縮略圖採用順序上傳,當哪一個節點發生錯誤的時候,記住index,等用戶點擊從新上傳時,咱們再從index的位置繼續上傳,若是按照傳統方式來作的話,第一張上傳成功後,如何通知第二張上傳呢,我這裏給個大體的代碼:
List<File> list=new ArrayList<>();
int index=0;
public void uploadPic(){
uploadManager.put(list.get(index), key, token, new UpCompletionHandler() {
@Override
public void complete(String key, ResponseInfo info, JSONObject res) {
if (info.isOK()) {
index++;
uploadPic()
} else {
//彈框提示用戶,當前index上傳失敗
}
}
}, null);
}
@Test
public void test(){
uploadPic()
}
複製代碼
每次上傳成功後都調用自身的方法,若是上傳失敗了,則記住index的位置,提示用戶,用戶點擊重試上傳,那麼就繼續調用 uploadPic
方法,上傳的拿到的文件仍是從index位置開始拿,因此,也是沒有任何問題的。 可是,總以爲這麼設計不那麼優雅,好比我想知道上傳進度的話,那也就意味着我須要在index++方法下面加一個設置進度條的功能,那若是業務須要再加一個上傳完成的操做的話,那是否是又要在index++下面多加一個 index==list.size()
的判斷呢,其實,這樣設計下去的話,整個上傳功能就變得特別的鬆散,移植性也不強,因此,是時候發揮RxJava的 Observer 了。
鑑於異步回調的思考,我打算把上傳任務封裝成一個 ObservableOnSubcribe
,每次執行任務成功後,就將事件流onNext交給下游,告訴他我完成了一次上傳,若是上傳失敗了,則發射onError異常。
public class QiNiuBitmapOnSubscribe implements ObservableOnSubscribe<QiniuParam> {
...
@Override
public void subscribe(final ObservableEmitter<QiniuParam> emitter) throws Exception {
//上傳操做
uploadManager.put(file, key, token, new UpCompletionHandler() {
@Override
public void complete(String key, ResponseInfo info, JSONObject res) {
if (info.isOK()) {
emitter.onNext(new QiniuParam(key, info, res));
emitter.onComplete();
} else {
emitter.onError(new ServerException(-1, res.toString()));
}
}
}, null);
}
}
複製代碼
因爲圖片是存儲在一個集合中,那麼就確定要用到RxJava的 fromIterable
來遍歷集合,因爲須要保證圖片是有序上傳,就須要用到 concatMap
操做符 , 因此,大體代碼以下
Observable.fromIterable(fileList)
.concatMap(new Function<QiNiuFile, ObservableSource<QiniuParam>>() {
@Override
public ObservableSource<QiniuParam> apply(QiNiuFile qiniuFile) throws Exception {
//返回七牛雲上傳
return Observable.create(new QiNiuFileOnSubscribe(uploadManager,
qiniuFile.getFile(), qiniuFile.getKey(), qiniuFile.getUploadToken()));
}
}).subscribe(new Observer<QiniuParam>() {
...
@Override
public void onNext(QiniuParam qiniuParam) {
index++;
//通知上傳進度
uploadCallBack.onUploadProcess(index);
}
@Override
public void onError(Throwable e) {
//通知斷傳的位置
uploadCallBack.onUploadQiNiuError(index);
}
@Override
public void onComplete() {
//上傳成功
uploadCallBack.onUploadQiNiuComplete();
}
});
複製代碼
對於 Observer
來講,他是一個乾淨的接收流,他不關心上游發生的事情,只專一結果的處理。
以上思考有的地方可能不是特別的完善,還須要多思考,RxJava用的人確實不少,但要想玩的溜的話,確實任重而道遠。