RxJava源碼解析(三)背壓+源碼+同步異步+原理

系列文章第三篇
承接上文:RXjava解析(二)我把RXjava的源碼和這份面試都給你了,你還告訴我面不過拿不到offer?
順手留下GitHub連接,須要獲取相關面試等內容的能夠本身去找
https://github.com/xiangjiana/Android-MS
(VX:mm14525201314)java

背壓問題

背壓是指在異步場景中,被觀察者發送事件速度遠快於觀察者的處理速度的狀況下,一種告訴上游的被觀察者下降發送速度的策略react

簡而言之,背壓是流速控制的一種策略。git

須要強調兩點:github

  • 背壓策略的一個前提是異步環境,也就是說,被觀察者和觀察者處在不一樣的線程環境中。
  • 背壓(Backpressure)並非一個像flatMap同樣能夠在程序中直接使用的操做符,他只是一種控制事件流速的策略。

響應式拉取(reactive pull)

首先咱們回憶以前那篇《關於Rxjava最友好的文章》,裏面其實提到,在RxJava的觀察者模型中,被觀察者是主動的推送數據給觀察者,觀察者是被動接收的。而響應式拉取則反過來,觀察者主動從被觀察者那裏去拉取數據,而被觀察者變成被動的等待通知再發送數據。面試

結構示意圖以下:
觀察者能夠根據自身實際狀況按需拉取數據,而不是被動接收(也就至關於告訴上游觀察者把速度慢下來),最終實現了上游被觀察者發送事件的速度的控制,實現了背壓的策略。緩存

源碼

public class FlowableOnBackpressureBufferStategy{
  ...
        @Override
        public void onNext(T t) {
           if (done) {
             return;
           }
           boolean callOnOverflow = false;
           boolean callError = false;
           Deque<T> dq = deque;
           synchronized (dq) {
              if (dq.size() == bufferSize) {
                     switch (strategy) {
                     case DROP_LATEST:
                         dq.pollLast();
                         dq.offer(t);
                         callOnOverflow = true;
                         break;
                     case DROP_OLDEST:
                         dq.poll();
                         dq.offer(t);
                         callOnOverflow = true;
                         break;
                     default:
                          // signal error
                          callError = true;
                          break;
                     }
        } else {
                     dq.offer(t);
          }
  }
      if (callOnOverflow) {
            if (onOverflow != null) {
                try {
                   onOverflow.run();
                } catch (Throwable ex) {
                    Exceptions.throwIfFatal(ex);
                    s.cancel();
                    onError(ex);
                }
           }
          } else if (callError) {
            s.cancel();
            onError(new MissingBackpressureException());
          } else {
            drain();
          }
    }
  ...
  }

在這段源碼中,根據不一樣的背壓策略進行了不一樣的處理措施,固然這只是列舉了一段關於buffer背壓策略的例子。網絡

根源

產生背壓問題的根源就是上游發送速度與下游的處理速度不均致使的,因此若是想要解決這個問題就須要經過匹配兩個速率達到解決這個背壓根源的措施。
一般有兩個策略可供使用:app

  1. 從數量上解決,對數據進行採樣
  2. 從速度上解決,下降發送事件的速率
  3. 利用flowable和subscriber

使用Flowable

Flowable<Integer> upstream = Flowable.create(new FlowableOnSubscribe<Integer>() {
        @Override
        public void subscribe(FlowableEmitter<Integer> emitter) throws Exception {
              Log.d(TAG, "emit 1");
              emitter.onNext(1);
              Log.d(TAG, "emit 2");
              emitter.onNext(2);
              Log.d(TAG, "emit 3");
              emitter.onNext(3);
              Log.d(TAG, "emit complete");
              emitter.onComplete();
          }
       }, BackpressureStrategy.ERROR); //增長了一個參數
       Subscriber<Integer> downstream = new Subscriber<Integer>() {
        @Override
        public void onSubscribe(Subscription s) {
               Log.d(TAG, "onSubscribe");
               s.request(Long.MAX_VALUE); //注意這句代碼
        }

        @Override
        public void onNext(Integer integer) {
              Log.d(TAG, "onNext: " + integer);
        }
        @Override
        public void onError(Throwable t) {
              Log.w(TAG, "onError: ", t);
        }
        @Override
         public void onComplete() {
               Log.d(TAG, "onComplete");
         }
   };
   upstream.subscribe(downstream);

咱們注意到此次和 Observable 有些不一樣. 首先是建立 Flowable 的時候增長了一個參數, 這個參數是用來選擇背壓,也就是出現上下游流速不均衡的時候應該怎麼處理的辦法, 這裏咱們直接用 BackpressureStrategy.ERROR 這種方式,這種方式會在出現上下游流速不均衡的時候直接拋出一個異常,這個異常就是著名的MissingBackpressureException . 其他的策略後面再來說解.異步

另外的一個區別是在下游的 onSubscribe 方法中傳給咱們的再也不是 Disposable 了, 而是 Subscription , 它倆有什麼區別呢, 首先它們都是上下游中間的一個開關, 以前咱們說調用 Disposable.dispose() 方法能夠切斷水管, 一樣的調用 Subscription.cancel() 也能夠切斷水管, 不一樣的地方在於 Subscription增長了一個 void request(longn) 方法, 這個方法有什麼用呢, 在上面的代碼中也有這麼一句代碼:ide

s.request(Long.MAX_VALUE);

這是由於 Flowable 在設計的時候採用了一種新的思路也就是 響應式拉取 的方式來更好的解決上下游流速不均衡的問題, 與咱們以前所講的 控制數量 和 控制速度 不太同樣, 這種方式用通俗易懂的話來講就比如是 葉問打鬼子 , 咱們把 上游當作 小日本 , 把 下游 看成 葉問 , 當調用 Subscription.request(1) 時, 葉問 就說 我要打一個! 而後 小日本 就拿出 一個鬼子 給葉問, 讓他打, 等葉問打死這個鬼子以後, 再次調用 request(10) , 葉問就又說 我要打十個! 而後小日本又派出 十個鬼子 給葉問, 而後就在邊上看熱鬧, 看葉問能不能打死十個鬼子, 等葉問打死十個鬼子後再繼續要鬼子接着打...

因此咱們把request當作是一種能力, 當成 下游處理事件 的能力, 下游能處理幾個就告訴上游我要幾個, 這樣只要上游根據下游的處理能力來決定發送多少事件, 就不會形成一窩蜂的發出一堆事件來, 從而致使OOM. 這也就完美的解決以前咱們所學到的兩種方式的缺陷, 過濾事件會致使事件丟失, 減速又可能致使性能損失. 而這種方式既解決了事件丟失的問題, 又解決了速度的問題, 完美 !

同步狀況

Observable.create(new ObservableOnSubscribe<Integer>() {
      @Override
      public void subscribe(ObservableEmitter<Integer> emitter) throws Exception {
            for (int i = 0; ; i++) { //無限循環發事件
                    emitter.onNext(i);
            } 
       }
  }).subscribe(new Consumer<Integer>() {
       @Override
       public void accept(Integer integer) throws Exception {
            Thread.sleep(2000);
            Log.d(TAG, "" + integer);
       }
  });

當上下游工做在 同一個線程 中時, 這時候是一個 同步 的訂閱關係, 也就是說 上游 每發送一個事件 必須 等到 下游 接收處理完了之後才能接着發送下一個事件.

同步與異步的區別就在於有沒有緩存發送事件的緩衝區。

異步狀況

經過subscribeOnobserveOn來肯定對應的線程,達到異步的效果,異步時會有一個對應的緩存區來換從從上游發送的事件。

public enum BackpressureStrategy {
    /**
      * OnNext events are written without any buffering or dropping.
      * Downstream has to deal with any overflow.
      * <p>Useful when one applies one of the custom-parameter onBackpressureXXX operators.
      */
    MISSING,
    /**
      * Signals a MissingBackpressureException in case the downstream can't keep up.
      */
    ERROR,
    /**
      * Buffers <em>all</em> onNext values until the downstream consumes it.
      */
    BUFFER,
    /**
      * Drops the most recent onNext value if the downstream can't keep up.
      */
    DROP,
    /**
       * Keeps only the latest onNext value, overwriting any previous value if the
      * downstream can't keep up.
      */
      LATEST
  }

背壓策略:

  1. error, 緩衝區大概在128
  2. buffer, 緩衝區在1000左右
  3. drop, 把存不下的事件丟棄
  4. latest, 只保留最新的
  5. missing, 缺省設置,不作任何操做

上游從哪裏得知下游的處理能力呢?咱們來看看上游最重要的部分,確定就是 FlowableEmitter 了啊,咱們就是經過它來發送事件的啊,來看看它的源碼吧(別緊張,它的代碼灰常簡單):

public interface FlowableEmitter<T> extends Emitter<T> {
      void setDisposable(Disposable s);
      void setCancellable(Cancellable c);
      /**
        * The current outstanding request amount.
        * <p>This method is thread-safe.
        * @return the current outstanding request amount
        */
      long requested();
      boolean isCancelled();
      FlowableEmitter<T> serialize();
  }

FlowableEmitter是個接口,繼承Emitter,Emitter裏面就是咱們的onNext(),onComplete()onError()三個方法。咱們看到FlowableEmitter中有這麼一個方法:

long requested();


這張圖的意思就是當上下游在同一個線程中的時候,在 下游 調用request(n)就會直接改變 上游 中的requested的值,屢次調用便會疊加這個值,而上游每發送一個事件以後便會去減小這個值,當這個值減小至0的時候,繼續發送事件便會拋異常了

能夠看到,當上下游工做在不一樣的線程裏時,每個線程裏都有一個requested,而咱們調用request(1000)時,實際上改變的是下游主線程中的requested,而上游中的requested的值是由RxJava內部調用request(n)去設置的,這個調用會在合適的時候自動觸發。

Rxjava實例開發應用

  1. 網絡請求處理(輪詢,嵌套,出錯重連)
  2. 功能防抖
  3. 從多級緩存獲取數據
  4. 合併數據源
  5. 聯合判斷
  6. 與 Retrofit,RxBinding,EventBus結合使用

Rxjava原理

  1. Scheduler線程切換工做原理
  2. 數據的發送與接收(觀察者模式)
  3. lift的工做原理
  4. map的工做原理
  5. flatMap的工做原理
  6. merge的工做原理
  7. concat的工做原理

順手留下GitHub連接,須要獲取相關面試等內容的能夠本身去找
https://github.com/xiangjiana/Android-MS
(VX:mm14525201314)

PDF和源碼獲取

相關文章
相關標籤/搜索