RxJava練武場之——Token前置請求

RxJava練武場是一個rxjava在項目中應用的小系列,包括:java

RxJava的用武之地

Rxjava這個庫和其餘常見庫不太同樣,通常的庫例如Glide,ButterKnife都是爲了解決實際問題出現的,必定程度上是剛需。Glide庫若是不用他,那麼應用本身就要處理圖片下載、壓縮、內存管理、多級緩存等等複雜的邏輯。這類問題複雜而常見,而像Glide這類的輪子,Api的設計都比較友好,一個簡單的api調用就能完成一個本來很複雜的功能,簡直不要太爽。編程

Glide.with(context)
    .load(url)//圖片加載
    .crossFade()//動畫設置
    .placeholder(R.drawable.place_image)//佔位圖
    .error(R.drawable.error_image)//失敗佔位圖
    .override(width,height)//圖片裁剪
    .thumbnail(thumbnailRequest)//配置縮略圖
    .diskCacheStrategy(DiskCacheStrategy.SOURCE)//緩存策略
    .into(imageView);
複製代碼

而Rxjava,你剛開始看起來,都不知道他是幹什麼的。「異步處理」?不是通常都使用觀察者模式嗎?AsyncTask,Handler也能夠,要rxjava幹嗎?若是你有興趣研究過一點rxjava,會發現網上的教程都會說:"zip map flatmap debounce等操做符把異步回調變得‘簡潔’‘優雅’",而後對比一下原來的代碼和使用rxjava後的代碼,最後感嘆一下rxjava設計的鬼才和功能的強大。我本身在初次接觸rxjava時也感受,這些rxjava的優勢描述比較空洞,這項技術的意義大於實用。 實際狀況是這樣麼?在具體開發中,異步調用給咱們的最大困擾是:異步回調的時間並不可控。當有多個異步回調時,這些調用相互聯繫和依賴,搞清楚每一個回調什麼時候返回是個重要的問題。在每一個關鍵時間節點對‘分散的callback’作正確的事,有過相似編程經驗的人都知道,是很是痛苦的事,若是還想代碼容易看懂,簡直是瘋了。 api

rxjava號稱異步調用的終極解決方案,可否解決以上困擾?隨着學習和應用的深刻,體會會更明顯。如下會用一個稍複雜的例子,實操一個複雜異步場景,看看rxjava處理的怎麼樣。

典型複雜異步場景 -- Token的前置校驗

常常遇到這種需求,接口的請求依賴token信息。一個請求須要先請求token(token若是存在緩存則使用緩存),依賴這個token才能進行正常網絡請求。這個token有必定的時效性,在時效性內可使用緩存,過時後須要從新請求token並從新發起一次請求。這個流程能夠概括以下圖: 緩存

token前置請求.png
光看這些需求,是否是以爲已經夠你喝一壺了,別忙,還有些潛在的邏輯這個圖沒有表現出來: 1 高併發網絡請求時,若是token正在請求,須要對請求阻塞(token請求過程當中,再也不接受新的token請求) 2 阻塞的同時,要把這些請求記錄下來,token請求成功後,再‘依次’發送這些阻塞的請求。 3 token失效狀況下,網絡請求限制重試次數。(防止遞歸調用) 4 token請求自己,重試策略需單獨配置。

不使用rxjava,咱們如何實現上述需求:

一、網絡請求前,對token是否有緩存判斷,若是沒有先請求token,並把這個請求阻塞且緩存 二、token請求過程當中,若是有新的token請求進來,加入阻塞隊列 三、token請求後,通知阻塞的隊列(廣播等方式),依次進行阻塞的請求 四、對兩種次數限制,分別作邏輯判斷安全

以上就是傳統實現方法,就不貼代碼了,這樣實現有如下特色: 一、要時刻維護一個阻塞隊列 (注意其添加和清空的時機) 二、token請求結束後,有一個回調機制通知阻塞隊列,(這個回調須要註冊和反註冊) 三、兩處的次數限制,次數維護的變量,很差維護(通常動態祕鑰爲了便於使用會作成單例,單例內的變量相似static,維護較複雜) 四、請求重試的邏輯很差實現,bash

咱們能夠看到這裏涉及到不少靜態變量的維護,廣播等異步回調的處理,這種狀況一多,編程者會變得很被動。並且token的異步請求和真正的網絡異步請求雜糅在一塊兒,增大了問題的複雜性。網絡

咱們來看下rxjava如何處理:

一些代碼網絡請求部分與前一篇博客《基於RxJava Retrofit的網絡框架》相關。多線程

先看看完整的請求過程
public static <R> Observable send(final MapiHttpRequest request, final MapiTypeReference<R> t){
    return Observable.defer(new Callable<ObservableSource<String>>() {
                @Override
                public ObservableSource<String> call() throws Exception {
                    //傳入token緩存
                    return Observable.just(Store.sToken);
                }
            }).flatMap(new Function<String, ObservableSource<R>>() {
                @Override
                public ObservableSource<R> apply(String key) throws Exception {
                    if(TextUtils.isEmpty(key) && !request.skipCheckKeyValid()){
                        //token沒有緩存,須要請求Token
                        return Observable.<R>error(new KeyNotValidThrowable());
                    } else {
                        //Token存在緩存,直接請求
                        return sendRequestInternal(request,t);
                    }
                }
            })
            //進入失敗重試流程
            .retryWhen(new Function<Observable<? extends Throwable>, ObservableSource<String>>() {
                private int retryCount = 0;
                @Override
                public ObservableSource<String> apply(Observable<? extends Throwable> throwableObservable) throws Exception {
                    return throwableObservable.flatMap(new Function<Throwable, ObservableSource<String>>() {
                        @Override
                        public ObservableSource<String> apply(Throwable throwable) throws Exception {
                            if (throwable instanceof KeyNotValidThrowable){
                                //同一Request,有過一次KeyNotValidThrowable,則再也不重試
                                if (retryCount > 0){
                                    return Observable.error(throwable);
                                } else {
                                //token緩存不在,進入TokenLoader請求token
                                    retryCount++;
                                    return TokenLoader.getInstance().getNetTokenLocked();
                                }
                            } else if (throwable instanceof ApiException){
                                  //token過時的狀況,從新獲取token,並重試
                                  ApiException apiException = (ApiException)throwable;
                                  if (apiException.getCode() == MapiResultCode.SECRETKEY_EXPIRED.value()){
                                      if (retryCount > 0){
                                          return Observable.error(throwable);
                                      } else {
                                          //token緩存失效,進入TokenLoader請求token
                                          retryCount++;
                                          return DynamicKeyLoader.getInstance().getNetTokenLocked();
                                      }
                                  }
                            }
                            //其餘類型錯誤,直接拋出,再也不重試
                            return Observable.error(throwable);
                        }
                    });
                }
            });
}
複製代碼

也許你第一次看也挺暈,別怕,你順着註釋捋捋邏輯,是否是感受代碼的實現好像畫了一個時序圖。 除了註釋之外,幾點說明: 一、defer操做符的做用是在retry時,會從新建立新的Observable,不然會使用上次的Observable,不會從新獲取Store.sToken 二、retryWhen操做符,與sendRequestInternal內部統一配置的retryWhen並不衝突,至關於二次retry 三、retryWhen中若是拋出error ,則再也不重試; 四、重試請求,經過返回getNetTokenLocked這個subject實現。(下面詳述)併發

階段總結:

總體的流程被壓縮到了一個函數中,rxjava自己的retrywhen和subject機制,已經替咱們完成了這麼幾點: 一、自動重試的註冊和反註冊,subject被回調完直接失效,再次請求要從新註冊。 二、高併發request,維護隊列,經過mTokenObservable的回調自動解決了這個問題 三、retry次數的維護,因爲每次request的retry都是從新建立的內部類,因此變量的維護變的簡單。 四、重試的邏輯被retry操做符自動實現了,只要重寫retry的返回值就能夠控制重試的策略。app

TokenLoader:Token的獲取過程
public class TokenLoader {

    public static final String TAG = TokenLoader.class.getSimpleName();

    private AtomicBoolean mRefreshing = new AtomicBoolean(false);
    private PublishSubject<String> mPublishSubject;
    private Observable<String> mTokenObservable;

    private TokenLoader() {
        final TokenRequest request = new TokenRequest(CarOperateApplication.getInstance());
        mTokenObservable = Observable
                  .defer(new Callable<ObservableSource<TokenRequest>>() {
                      @Override
                      public ObservableSource<TokenRequest> call() throws Exception {
                          return Observable.just(request);
                      }
                  })
                  .flatMap(new Function<TokenRequest, ObservableSource<MapiHttpResponse<Boolean>>>() {
                      @Override
                      public ObservableSource<MapiHttpResponse<Boolean>> apply(RefreshKeyRequest refreshKeyRequest) throws Exception {
                          //Token請求接口
                          return ApiHelper.sendDynamicKey(refreshKeyRequest,new MapiTypeReference<MapiHttpResponse<Boolean>>(){});
                      }
                  })
                  .retryWhen(new Function<Observable<Throwable>, ObservableSource<TokenRequest>>() {
                      private int retryCount = 0;
                      @Override
                      public ObservableSource<TokenRequest> apply(Observable<Throwable> throwableObservable) throws Exception {
                          return throwableObservable.flatMap(new Function<Throwable, ObservableSource<TokenRequest>>() {
                              @Override
                              public ObservableSource<RefreshKeyRequest> apply(Throwable throwable) throws Exception {
                                  retryCount++;
                                  if (retryCount == 3){
                                      //失敗次數達到閾值,更改請求策略
                                      request.setFlag(0);
                                      return Observable.just(request);
                                  } else if (retryCount > 3){
                                      //失敗次數超過閾值,拋出失敗,放棄請求
                                      mRefreshing.set(false);
                                      return Observable.error(throwable);
                                  } else {
                                      //再次請求token
                                      return Observable.just(request);
                                  }
                              }
                          });

                      }
                  })
    //                      .delay(6000, TimeUnit.MILLISECONDS) //模擬token請求延遲
                  .map(new Function<MapiHttpResponse<Boolean>,String>() {
                      @Override
                      public String apply(MapiHttpResponse<Boolean> response) throws Exception {
                          //成功,保存token緩存
                          if (response.getContent().booleanValue() == true){
                              setCacheToken(response.getToken());
                          } else if (response.getContent().booleanValue() == false){
                              setCacheToken(UcarK.getSign());
                          }
                          //請求完成標識
                          mRefreshing.set(false);
                          return getCacheToken();
                      }
                  });
    }

    public static TokenLoader getInstance() {
        return Holder.INSTANCE;
    }

    private static class Holder {
        private static final TokenLoader INSTANCE = new TokenLoader();
    }

    public String getCacheToken() {
        return Store.sToken;
    }

    public void setCacheToken(String key){
        Store.sToken = key;
    }

    /**
     *
     * @return
     */
    public Observable<String> getNetTokenLocked() {
        if (mRefreshing.compareAndSet(false, true)) {
            Log.d(TAG, "沒有請求,發起一次新的Token請求");
            startTokenRequest();
        } else {
            Log.d(TAG, "已經有請求,直接返回等待");
        }
        return mPublishSubject;
    }

    private void startTokenRequest() {
        mPublishSubject = PublishSubject.create();
        mTokenObservable.subscribe(mPublishSubject);
    }

}
複製代碼

仍是讀註釋,除了註釋之外,幾點說明: 一、mRefreshing的做用是在token請求過程當中,再也不容許新的token請求, 變量採用原子類,而非boolean;這樣在多線程環境下,原子類的方法是線程安全的。 compareAndSet(boolean expect, boolean update)這個方法兩個做用 1)比較expect和mRefresh是否一致 2)將mRefreshing置爲update

二、startTokenRequest()方法開啓token請求,注意Observable在subscribe時才正式開始

三、這裏使用了PublishSubject較爲關鍵,在rxjava中Subject既是observable,又是observer,在TokenLoader中,mPublishSubject是mTokenObservable的觀察者,token請求的會由mPublishSubject響應,同時mPublishSubject也做爲Observable返回給TokenLoader的調用者做爲retryWhen的返回值返回。(因此這裏PublishSubject的泛型與send()方法中Observable的泛型應該是一致的)

四、對於mRefreshing是true的狀況,直接返回mPublishSubject,這樣每一個阻塞的請求retryWhen都會等待mPublishSubject的返回值,回調通知的順序與加入阻塞的順序是隊列關係(先請求的接口,先回調),知足咱們的需求。

最後: 感受怎麼樣,是豁然開朗仍是越陷越深,無論那樣都沒有關係,你須要的是瞭解還存在另外一種處理異步任務的方法。在你下一次遇到一樣讓你頭疼的問題時,你能夠把這篇文章拿起來再看看,也許你的頭疼會好一點了。。。

相關文章
相關標籤/搜索