RxJava練武場是一個rxjava在項目中應用的小系列,包括:java
Rxjava這個庫和其餘常見庫不太同樣,通常的庫例如Glide,ButterKnife都是爲了解決實際問題出現的,必定程度上是剛需。Glide庫若是不用他,那麼應用本身就要處理圖片下載、壓縮、內存管理、多級緩存等等複雜的邏輯。這類問題複雜而常見,而像Glide這類的輪子,Api的設計都比較友好,一個簡單的api調用就能完成一個本來很複雜的功能,簡直不要太爽。編程
Glide.with(context)
.load(url)//圖片加載
.crossFade()//動畫設置
.placeholder(R.drawable.place_image)//佔位圖
.error(R.drawable.error_image)//失敗佔位圖
.override(width,height)//圖片裁剪
.thumbnail(thumbnailRequest)//配置縮略圖
.diskCacheStrategy(DiskCacheStrategy.SOURCE)//緩存策略
.into(imageView);
複製代碼
而Rxjava,你剛開始看起來,都不知道他是幹什麼的。「異步處理」?不是通常都使用觀察者模式嗎?AsyncTask,Handler也能夠,要rxjava幹嗎?若是你有興趣研究過一點rxjava,會發現網上的教程都會說:"zip map flatmap debounce等操做符把異步回調變得‘簡潔’‘優雅’",而後對比一下原來的代碼和使用rxjava後的代碼,最後感嘆一下rxjava設計的鬼才和功能的強大。我本身在初次接觸rxjava時也感受,這些rxjava的優勢描述比較空洞,這項技術的意義大於實用。 實際狀況是這樣麼?在具體開發中,異步調用給咱們的最大困擾是:異步回調的時間並不可控。當有多個異步回調時,這些調用相互聯繫和依賴,搞清楚每一個回調什麼時候返回是個重要的問題。在每一個關鍵時間節點對‘分散的callback’作正確的事,有過相似編程經驗的人都知道,是很是痛苦的事,若是還想代碼容易看懂,簡直是瘋了。 api
rxjava號稱異步調用的終極解決方案,可否解決以上困擾?隨着學習和應用的深刻,體會會更明顯。如下會用一個稍複雜的例子,實操一個複雜異步場景,看看rxjava處理的怎麼樣。常常遇到這種需求,接口的請求依賴token信息。一個請求須要先請求token(token若是存在緩存則使用緩存),依賴這個token才能進行正常網絡請求。這個token有必定的時效性,在時效性內可使用緩存,過時後須要從新請求token並從新發起一次請求。這個流程能夠概括以下圖: 緩存
光看這些需求,是否是以爲已經夠你喝一壺了,別忙,還有些潛在的邏輯這個圖沒有表現出來: 1 高併發網絡請求時,若是token正在請求,須要對請求阻塞(token請求過程當中,再也不接受新的token請求) 2 阻塞的同時,要把這些請求記錄下來,token請求成功後,再‘依次’發送這些阻塞的請求。 3 token失效狀況下,網絡請求限制重試次數。(防止遞歸調用) 4 token請求自己,重試策略需單獨配置。一、網絡請求前,對token是否有緩存判斷,若是沒有先請求token,並把這個請求阻塞且緩存 二、token請求過程當中,若是有新的token請求進來,加入阻塞隊列 三、token請求後,通知阻塞的隊列(廣播等方式),依次進行阻塞的請求 四、對兩種次數限制,分別作邏輯判斷安全
以上就是傳統實現方法,就不貼代碼了,這樣實現有如下特色: 一、要時刻維護一個阻塞隊列 (注意其添加和清空的時機) 二、token請求結束後,有一個回調機制通知阻塞隊列,(這個回調須要註冊和反註冊) 三、兩處的次數限制,次數維護的變量,很差維護(通常動態祕鑰爲了便於使用會作成單例,單例內的變量相似static,維護較複雜) 四、請求重試的邏輯很差實現,bash
咱們能夠看到這裏涉及到不少靜態變量的維護,廣播等異步回調的處理,這種狀況一多,編程者會變得很被動。並且token的異步請求和真正的網絡異步請求雜糅在一塊兒,增大了問題的複雜性。網絡
一些代碼網絡請求部分與前一篇博客《基於RxJava Retrofit的網絡框架》相關。多線程
public static <R> Observable send(final MapiHttpRequest request, final MapiTypeReference<R> t){
return Observable.defer(new Callable<ObservableSource<String>>() {
@Override
public ObservableSource<String> call() throws Exception {
//傳入token緩存
return Observable.just(Store.sToken);
}
}).flatMap(new Function<String, ObservableSource<R>>() {
@Override
public ObservableSource<R> apply(String key) throws Exception {
if(TextUtils.isEmpty(key) && !request.skipCheckKeyValid()){
//token沒有緩存,須要請求Token
return Observable.<R>error(new KeyNotValidThrowable());
} else {
//Token存在緩存,直接請求
return sendRequestInternal(request,t);
}
}
})
//進入失敗重試流程
.retryWhen(new Function<Observable<? extends Throwable>, ObservableSource<String>>() {
private int retryCount = 0;
@Override
public ObservableSource<String> apply(Observable<? extends Throwable> throwableObservable) throws Exception {
return throwableObservable.flatMap(new Function<Throwable, ObservableSource<String>>() {
@Override
public ObservableSource<String> apply(Throwable throwable) throws Exception {
if (throwable instanceof KeyNotValidThrowable){
//同一Request,有過一次KeyNotValidThrowable,則再也不重試
if (retryCount > 0){
return Observable.error(throwable);
} else {
//token緩存不在,進入TokenLoader請求token
retryCount++;
return TokenLoader.getInstance().getNetTokenLocked();
}
} else if (throwable instanceof ApiException){
//token過時的狀況,從新獲取token,並重試
ApiException apiException = (ApiException)throwable;
if (apiException.getCode() == MapiResultCode.SECRETKEY_EXPIRED.value()){
if (retryCount > 0){
return Observable.error(throwable);
} else {
//token緩存失效,進入TokenLoader請求token
retryCount++;
return DynamicKeyLoader.getInstance().getNetTokenLocked();
}
}
}
//其餘類型錯誤,直接拋出,再也不重試
return Observable.error(throwable);
}
});
}
});
}
複製代碼
也許你第一次看也挺暈,別怕,你順着註釋捋捋邏輯,是否是感受代碼的實現好像畫了一個時序圖。 除了註釋之外,幾點說明: 一、defer操做符的做用是在retry時,會從新建立新的Observable,不然會使用上次的Observable,不會從新獲取Store.sToken 二、retryWhen操做符,與sendRequestInternal內部統一配置的retryWhen並不衝突,至關於二次retry 三、retryWhen中若是拋出error ,則再也不重試; 四、重試請求,經過返回getNetTokenLocked這個subject實現。(下面詳述)併發
總體的流程被壓縮到了一個函數中,rxjava自己的retrywhen和subject機制,已經替咱們完成了這麼幾點: 一、自動重試的註冊和反註冊,subject被回調完直接失效,再次請求要從新註冊。 二、高併發request,維護隊列,經過mTokenObservable的回調自動解決了這個問題 三、retry次數的維護,因爲每次request的retry都是從新建立的內部類,因此變量的維護變的簡單。 四、重試的邏輯被retry操做符自動實現了,只要重寫retry的返回值就能夠控制重試的策略。app
public class TokenLoader {
public static final String TAG = TokenLoader.class.getSimpleName();
private AtomicBoolean mRefreshing = new AtomicBoolean(false);
private PublishSubject<String> mPublishSubject;
private Observable<String> mTokenObservable;
private TokenLoader() {
final TokenRequest request = new TokenRequest(CarOperateApplication.getInstance());
mTokenObservable = Observable
.defer(new Callable<ObservableSource<TokenRequest>>() {
@Override
public ObservableSource<TokenRequest> call() throws Exception {
return Observable.just(request);
}
})
.flatMap(new Function<TokenRequest, ObservableSource<MapiHttpResponse<Boolean>>>() {
@Override
public ObservableSource<MapiHttpResponse<Boolean>> apply(RefreshKeyRequest refreshKeyRequest) throws Exception {
//Token請求接口
return ApiHelper.sendDynamicKey(refreshKeyRequest,new MapiTypeReference<MapiHttpResponse<Boolean>>(){});
}
})
.retryWhen(new Function<Observable<Throwable>, ObservableSource<TokenRequest>>() {
private int retryCount = 0;
@Override
public ObservableSource<TokenRequest> apply(Observable<Throwable> throwableObservable) throws Exception {
return throwableObservable.flatMap(new Function<Throwable, ObservableSource<TokenRequest>>() {
@Override
public ObservableSource<RefreshKeyRequest> apply(Throwable throwable) throws Exception {
retryCount++;
if (retryCount == 3){
//失敗次數達到閾值,更改請求策略
request.setFlag(0);
return Observable.just(request);
} else if (retryCount > 3){
//失敗次數超過閾值,拋出失敗,放棄請求
mRefreshing.set(false);
return Observable.error(throwable);
} else {
//再次請求token
return Observable.just(request);
}
}
});
}
})
// .delay(6000, TimeUnit.MILLISECONDS) //模擬token請求延遲
.map(new Function<MapiHttpResponse<Boolean>,String>() {
@Override
public String apply(MapiHttpResponse<Boolean> response) throws Exception {
//成功,保存token緩存
if (response.getContent().booleanValue() == true){
setCacheToken(response.getToken());
} else if (response.getContent().booleanValue() == false){
setCacheToken(UcarK.getSign());
}
//請求完成標識
mRefreshing.set(false);
return getCacheToken();
}
});
}
public static TokenLoader getInstance() {
return Holder.INSTANCE;
}
private static class Holder {
private static final TokenLoader INSTANCE = new TokenLoader();
}
public String getCacheToken() {
return Store.sToken;
}
public void setCacheToken(String key){
Store.sToken = key;
}
/**
*
* @return
*/
public Observable<String> getNetTokenLocked() {
if (mRefreshing.compareAndSet(false, true)) {
Log.d(TAG, "沒有請求,發起一次新的Token請求");
startTokenRequest();
} else {
Log.d(TAG, "已經有請求,直接返回等待");
}
return mPublishSubject;
}
private void startTokenRequest() {
mPublishSubject = PublishSubject.create();
mTokenObservable.subscribe(mPublishSubject);
}
}
複製代碼
仍是讀註釋,除了註釋之外,幾點說明: 一、mRefreshing的做用是在token請求過程當中,再也不容許新的token請求, 變量採用原子類,而非boolean;這樣在多線程環境下,原子類的方法是線程安全的。 compareAndSet(boolean expect, boolean update)這個方法兩個做用 1)比較expect和mRefresh是否一致 2)將mRefreshing置爲update
二、startTokenRequest()方法開啓token請求,注意Observable在subscribe時才正式開始
三、這裏使用了PublishSubject較爲關鍵,在rxjava中Subject既是observable,又是observer,在TokenLoader中,mPublishSubject是mTokenObservable的觀察者,token請求的會由mPublishSubject響應,同時mPublishSubject也做爲Observable返回給TokenLoader的調用者做爲retryWhen的返回值返回。(因此這裏PublishSubject的泛型與send()方法中Observable的泛型應該是一致的)
四、對於mRefreshing是true的狀況,直接返回mPublishSubject,這樣每一個阻塞的請求retryWhen都會等待mPublishSubject的返回值,回調通知的順序與加入阻塞的順序是隊列關係(先請求的接口,先回調),知足咱們的需求。
最後: 感受怎麼樣,是豁然開朗仍是越陷越深,無論那樣都沒有關係,你須要的是瞭解還存在另外一種處理異步任務的方法。在你下一次遇到一樣讓你頭疼的問題時,你能夠把這篇文章拿起來再看看,也許你的頭疼會好一點了。。。