RxJava之背壓策略

轉載請以連接形式標明出處: 本文出自:103style的博客react

base on RxJava 2.xgit


目錄

  • RxJava 背壓策略簡介
  • Observable 背壓致使崩潰的緣由
  • Flowable 使用介紹
  • 五種背壓策略源碼分析
  • 小結

RxJava背壓策略簡介

官方介紹github

Backpressure is when in an Flowable processing pipeline, some asynchronous stages can't process the values fast enough and need a way to tell the upstream producer to slow down. 背壓是在Flowable處理事件流中,某些異步階段沒法足夠快地處理這些值,而且須要一種方法來告訴上游生產商減速。緩存

因此 RxJava 的背壓策略(Backpressure)是指處理上述上游流速過快現象的一種策略。 相似 Java中的線程池 中的飽和策略 RejectedExecutionHandler。bash


Observable背壓致使崩潰的緣由

咱們先使用 Observable 看看是什麼狀況:異步

Observable
        .create(new ObservableOnSubscribe<Integer>() {
            @Override
            public void subscribe(ObservableEmitter<Integer> emitter) throws Exception {
                for (int i = 0; ; i++) {
                    emitter.onNext(i);
                }
            }
        })
        .subscribeOn(Schedulers.computation())
        .observeOn(AndroidSchedulers.mainThread())
        .subscribe(new Consumer<Integer>() {
            @Override
            public void accept(Integer integer) throws Exception {
                try {
                    Thread.sleep(3000);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
                System.out.println(integer);
            }
        });
複製代碼

image.png

輸出:async

I/art: Background partial concurrent mark sweep GC freed 7(224B) AllocSpace objects, 0(0B) LOS objects, 27% free, 43MB/59MB, paused 528us total 106.928ms
I/System.out: 0
I/art: Background partial concurrent mark sweep GC freed 8(256B) AllocSpace objects, 0(0B) LOS objects, 20% free, 62MB/78MB, paused 1.065ms total 327.346ms
I/System.out: 1
I/art: Background partial concurrent mark sweep GC freed 8(256B) AllocSpace objects, 0(0B) LOS objects, 16% free, 82MB/98MB, paused 1.345ms total 299.700ms
I/art: Background partial concurrent mark sweep GC freed 8(256B) AllocSpace objects, 0(0B) LOS objects, 13% free, 103MB/119MB, paused 1.609ms total 377.432ms
I/System.out: 2
...
I/art: Alloc concurrent mark sweep GC freed 0(0B) AllocSpace objects, 0(0B) LOS objects, 1% free, 252MB/256MB, paused 1.574ms total 818.037ms
I/art: WaitForGcToComplete blocked for 2.539s for cause Alloc
I/art: Starting a blocking GC Alloc
I/art: Waiting for a blocking GC Alloc
W/art: Throwing OutOfMemoryError "Failed to allocate a 12 byte allocation with 4109520 free bytes and 3MB until OOM; failed due to fragmentation (required continguous free 4096 bytes for a new buffer where largest contiguous free 0 bytes)"
複製代碼

咱們能夠從上圖中看到,內存在逐步上升,在必定的時間後,到達 256M 以後會觸發GC,最後拋出 OutOfMemoryError。由於上游的事件發送太快而下游的消費者消耗的比較慢。ide

那致使內存暴增的源頭是什麼呢 ?函數


咱們對上面的代碼作一點點修改,註釋了 observeOn(AndroidSchedulers.mainThread()),會發現內存顯示很正常,不會存在上述問題。源碼分析

Observable
            .create(new ObservableOnSubscribe<Integer>() {
                @Override
                public void subscribe(ObservableEmitter<Integer> emitter) throws Exception {
                    for (int i = 0; ; i++) {
                        emitter.onNext(i);
                    }
                }
            })
            .subscribeOn(Schedulers.computation())
//          .observeOn(AndroidSchedulers.mainThread())
            .subscribe(new Consumer<Integer>() {
                @Override
                public void accept(Integer integer) throws Exception {
                    try {
                        Thread.sleep(3000);
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    }
                    System.out.println(integer);
                }
            });
複製代碼

註釋了observeOn

因此內存暴增的源頭就在 observeOn(AndroidSchedulers.mainThread()).

咱們來看看 observeOn 的源碼,經過 RxJava subscribeOn和observeOn源碼介紹,咱們知道在 ObservableObserveOn.ObserveOnObserver 的 onSubscribe 中構建了一個容量默認爲 128 的 SpscLinkedArrayQueue。

queue = new SpscLinkedArrayQueue<T>(bufferSize);
複製代碼

上游每發送一個事件都會經過 queue.offer(t) 保存到 SpscLinkedArrayQueue 中。

public void onNext(T t) {
    if (done) {
        return;
    }
    if (sourceMode != QueueDisposable.ASYNC) {
        queue.offer(t);
    }
    schedule();
}
複製代碼

咱們能夠寫個測試代碼來看看,由於生產比消費快的多,至關於一直添加元素,以下:

private void test(){
    SpscLinkedArrayQueue<Integer> queue = new SpscLinkedArrayQueue<>(128);
    for (int i = 0; ; i++) {
        queue.offer(i);
    }
}
複製代碼

運行會發現內存變化和 Observable 同樣迅速暴增。

測試代碼內存變化

SpscLinkedArrayQueue 的詳細介紹後面再說。如今能夠大體理解爲 一直狂吃,而後最後撐破肚皮,而後裂開


Flowable的用法

咱們來看看 Flowable 的用法:

Flowable.create(FlowableOnSubscribe<T> source, BackpressureStrategy mode)
複製代碼

BackpressureStrategy 包含五種模式:MISSINGERRORBUFFERDROPLATEST

下面對這五種 BackpressureStrategy 分別介紹其用法以及 發送事件速度 > 接收事件速度 時的處理方式:

  • BackpressureStrategy.MISSING 處理方式:拋出異常 MissingBackpressureException,並提示 緩存區滿了 代碼示例:

    Flowable
            .create(new FlowableOnSubscribe<Object>() {
                @Override
                public void subscribe(FlowableEmitter<Object> emitter) throws Exception {
                    for (int i = 0; i < Flowable.bufferSize() * 2; i++) {
                        emitter.onNext(i);
                    }
                    emitter.onComplete();
                }
            }, BackpressureStrategy.MISSING)
            .subscribeOn(Schedulers.computation())
            .observeOn(AndroidSchedulers.mainThread())
            .subscribe(new Subscriber<Object>() {
                @Override
                public void onSubscribe(Subscription s) {
                    s.request(Integer.MAX_VALUE);
                }
                @Override
                public void onNext(Object o) {
                    System.out.println("onNext: " + o);
                    try {
                        Thread.sleep(100);
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    }
                }
                @Override
                public void onError(Throwable t) {
                    t.printStackTrace();
                }
                @Override
                public void onComplete() {
                    System.out.println("onComplete");
                }
            });
    複製代碼

    輸出結果:

    System.out: onNext: 0
    System.err: io.reactivex.exceptions.MissingBackpressureException: Queue is full?!
    複製代碼
  • BackpressureStrategy.ERROR 處理方式:直接拋出異常MissingBackpressureException 修改上述代碼的 BackpressureStrategy.MISSING 爲 BackpressureStrategy.ERROR:

    Flowable
            .create(new FlowableOnSubscribe<Object>() {
                ...
            }, BackpressureStrategy.ERROR)
            ...
    複製代碼

    輸出結果:

    System.out: onNext: 0
    System.err: io.reactivex.exceptions.MissingBackpressureException: create: could not emit value due to lack of requests
    複製代碼
  • BackpressureStrategy.BUFFER 處理方式:相似 Observable 同樣 擴充緩存區大小 修改上述代碼的 BackpressureStrategy.MISSING 爲 BackpressureStrategy.BUFFER:

    Flowable
            .create(new FlowableOnSubscribe<Object>() {
                ...
            }, BackpressureStrategy.BUFFER)
            ...
    複製代碼

    輸出結果:

    System.out: onNext: 0
    System.out: onNext: 1
    System.out: onNext: 2
    System.out: onNext: 3
    ...
    System.out: onNext: 253
    System.out: onNext: 254
    System.out: onNext: 255
    System.out: onComplete
    複製代碼
  • BackpressureStrategy.DROP 處理方式:丟棄緩存區滿後處理緩衝區數據期間發送過來的事件 示例代碼:

    Flowable
            .create(new FlowableOnSubscribe<Object>() {
                @Override
                public void subscribe(FlowableEmitter<Object> emitter) throws Exception {
                    for (int i = 0; ; i++) {
                        emitter.onNext(i);
                    }
                }
            }, BackpressureStrategy.DROP)
            .subscribeOn(Schedulers.computation())
            .observeOn(AndroidSchedulers.mainThread())
            .subscribe(new Subscriber<Object>() {
                @Override
                public void onSubscribe(Subscription s) {
                    s.request(Integer.MAX_VALUE);
                }
                @Override
                public void onNext(Object o) {
                    System.out.println("onNext: " + o);
                    try {
                        Thread.sleep(10);
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    }
                }
                @Override
                public void onError(Throwable t) {
                    t.printStackTrace();
                }
                @Override
                public void onComplete() {
                    System.out.println("onComplete");
                }
            });
    複製代碼

    輸出結果:

    System.out: onNext: 0
    System.out: onNext: 1
    System.out: onNext: 2
    System.out: onNext: 3
    ...
    System.out: onNext: 124
    System.out: onNext: 125
    System.out: onNext: 126
    System.out: onNext: 127
    System.out: onNext: 1070801
    System.out: onNext: 1070802
    System.out: onNext: 1070803
    System.out: onNext: 1070804
    System.out: onNext: 1070805
    ...
    複製代碼
  • BackpressureStrategy.LATEST 處理方式:丟棄緩存區滿後處理緩衝區數據期間發送過來的非最後一個事件。下面示例代碼輸出了 129 個事件,下面的源碼分析會介紹。 示例代碼:

    Flowable
            .create(new FlowableOnSubscribe<Object>() {
                @Override
                public void subscribe(FlowableEmitter<Object> emitter) throws Exception {
                    for (int i = 0; i < Flowable.bufferSize() * 2; i++) {
                        emitter.onNext(i);
                    }
                    emitter.onComplete();
                }
            }, BackpressureStrategy.LATEST)
            .subscribeOn(Schedulers.computation())
            .observeOn(AndroidSchedulers.mainThread())
            .subscribe(new Subscriber<Object>() {
                @Override
                public void onSubscribe(Subscription s) {
                    s.request(Integer.MAX_VALUE);
                }
                @Override
                public void onNext(Object o) {
                    System.out.println("onNext: " + o);
                    try {
                        Thread.sleep(10);
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    }
                }
                @Override
                public void onError(Throwable t) {
                    t.printStackTrace();
                }
                @Override
                public void onComplete() {
                    System.out.println("onComplete");
                }
            });
    複製代碼

    輸出結果:

    System.out: onNext: 0
    System.out: onNext: 1
    System.out: onNext: 2
    System.out: onNext: 3
    ...
    System.out: onNext: 124
    System.out: onNext: 125
    System.out: onNext: 126
    System.out: onNext: 127
    System.out: onNext: 255
    System.out: onComplete
    複製代碼

五種背壓策略源碼分析

經過以前 RxJava之create操做符源碼解析 的介紹。咱們知道 Flowable.create(new FlowableOnSubscribe(){...}, BackpressureStrategy.LATEST) 返回的是一個 FlowableCreate 對象。

分別對不一樣的背壓策略建立了不一樣的 Emitter .

public final class FlowableCreate<T> extends Flowable<T> {
    //...
    public FlowableCreate(FlowableOnSubscribe<T> source, BackpressureStrategy backpressure) {
        this.source = source;
        this.backpressure = backpressure;
    }
    public void subscribeActual(Subscriber<? super T> t) {
        BaseEmitter<T> emitter;
        switch (backpressure) {
            case MISSING: {
                emitter = new MissingEmitter<T>(t);
                break;
            }
            case ERROR: {
                emitter = new ErrorAsyncEmitter<T>(t);
                break;
            }
            case DROP: {
                emitter = new DropAsyncEmitter<T>(t);
                break;
            }
            case LATEST: {
                emitter = new LatestAsyncEmitter<T>(t);
                break;
            }
            default: {
                emitter = new BufferAsyncEmitter<T>(t, bufferSize());
                break;
            }
        }
        t.onSubscribe(emitter);
        try {
            source.subscribe(emitter);
        } catch (Throwable ex) {
            Exceptions.throwIfFatal(ex);
            emitter.onError(ex);
        }
    }
    //...
}
複製代碼
  • MissingEmitter

    static final class MissingEmitter<T> extends BaseEmitter<T> {
        MissingEmitter(Subscriber<? super T> downstream) {
            super(downstream);
        }
        @Override
        public void onNext(T t) {
            if (isCancelled()) {
                return;
            }
            if (t != null) {
                downstream.onNext(t);
            } else {
                onError(new NullPointerException("onNext called with null. Null values are generally not allowed in 2.x operators and sources."));
                return;
            }
            for (;;) {
                long r = get();
                if (r == 0L || compareAndSet(r, r - 1)) {
                    return;
                }
            }
        }
    
    }
    複製代碼

    經過上面的代碼咱們能夠看到 MissingEmitter 基本上沒作什麼操做,因此 BackpressureStrategy.MISSING 示例中的代碼其實是調用了 ObserveOn 中返回對象的 FlowableObserveOn.ObserveOnSubscriber 的 onNext:

    public final void onNext(T t) {
        if (done) {
            return;
        }
        if (sourceMode == ASYNC) {
            trySchedule();
            return;
        }
        if (!queue.offer(t)) {
            upstream.cancel();
            error = new MissingBackpressureException("Queue is full?!");
            done = true;
        }
        trySchedule();
    }
    複製代碼

    上面代碼中咱們看到了背壓狀況下出現的報錯信息,出現的前提是 queue.offer(t) 返回 false 。這裏的 queue 是 onSubscribe 中構造的容量爲 Flowable.bufferSize() 的 SpscArrayQueue .

    public void onSubscribe(Subscription s) {
        if (SubscriptionHelper.validate(this.upstream, s)) {
            this.upstream = s;
            //...
            queue = new SpscArrayQueue<T>(prefetch);
            downstream.onSubscribe(this);
            s.request(prefetch);
        }
    }
    複製代碼

    SpscArrayQueue 的 offer 方法,咱們能夠看到當 SpscArrayQueue 數據 「滿了」 的時候即返回 false .

    public boolean offer(E e) {
        //...
        final int mask = this.mask;
        final long index = producerIndex.get();
        final int offset = calcElementOffset(index, mask);
        if (index >= producerLookAhead) {
            int step = lookAheadStep;
            if (null == lvElement(calcElementOffset(index + step, mask))) { // LoadLoad
                producerLookAhead = index + step;
            } else if (null != lvElement(offset)) {
                return false;
            }
        }
        soElement(offset, e); // StoreStore
        soProducerIndex(index + 1); // ordered store -> atomic and ordered for size()
        return true;
    }
    複製代碼

    因此 BackpressureStrategy.MISSING 在緩衝區滿了以後再發射事件即會拋出 message 爲 "Queue is full?!" 的 MissingBackpressureException .


  • ErrorAsyncEmitter

    abstract static class BaseEmitter<T>
    extends AtomicLong
    implements FlowableEmitter<T>, Subscription {
        //...
        @Override
        public final void request(long n) {
            if (SubscriptionHelper.validate(n)) {
                BackpressureHelper.add(this, n);
                onRequested();
            }
        }
        //...
    }
    abstract static class NoOverflowBaseAsyncEmitter<T> extends BaseEmitter<T> {
        NoOverflowBaseAsyncEmitter(Subscriber<? super T> downstream) {
            super(downstream);
        }
        @Override
        public final void onNext(T t) {
            //...
            if (get() != 0) {
                downstream.onNext(t);
                BackpressureHelper.produced(this, 1);
            } else {
                onOverflow();
            }
        }
        abstract void onOverflow();
    }
    static final class ErrorAsyncEmitter<T> extends NoOverflowBaseAsyncEmitter<T> {
        ErrorAsyncEmitter(Subscriber<? super T> downstream) {
            super(downstream);
        }
        @Override
        void onOverflow() {
            onError(new MissingBackpressureException("create: could not emit value due to lack of requests"));
        }
    }
    複製代碼

    經過在 onSubscribe 中調用 request(Flowable.bufferSize()) 設置當前 AtomicLong 的 value 值。 而後 onNext 中每傳遞一個事件就經過 BackpressureHelper.produced(this, 1) 將 value 減 1 . 當發送了 Flowable.bufferSize() 個事件, get() != 0 不成立,調用 onOverflow() 方法拋出 MissingBackpressureException 異常。


  • DropAsyncEmitter
    static final class DropAsyncEmitter<T> extends NoOverflowBaseAsyncEmitter<T> {
        private static final long serialVersionUID = 8360058422307496563L;
        DropAsyncEmitter(Subscriber<? super T> downstream) {
            super(downstream);
        }
        @Override
        void onOverflow() {
            // nothing to do
        }
    
    }
    複製代碼
    和 ErrorAsyncEmitter 相似,只不過當發送超過超過 Flowable.bufferSize() 的事件時,啥也沒作,即實現丟棄的功能。

  • LatestAsyncEmitter

    static final class LatestAsyncEmitter<T> extends BaseEmitter<T> {
        final AtomicReference<T> queue;
        //...
        LatestAsyncEmitter(Subscriber<? super T> downstream) {
            super(downstream);
            this.queue = new AtomicReference<T>();
            //...
        }
        @Override
        public void onNext(T t) {
            //...
            queue.set(t);
            drain();
        }
        //...
    }
    複製代碼

    咱們能夠看到每次調用 onNext 都會更新傳過來的值到 queue 中,因此 queue 中保存了最新的值。

    • 接着來看 drain 方法: 上面咱們知道在 onSubscribe 中調用 request() 設置當前 AtomicLong 的 value 值。

      void drain() {
          //...
          for (;;) {
              long r = get();
              long e = 0L;
              while (e != r) {
                  //...
                  boolean d = done;
                  T o = q.getAndSet(null);
                  boolean empty = o == null;
                  if (d && empty) {
                      //...
                      return;
                  }
                  if (empty) {
                      break;
                  }
                  a.onNext(o);
                  e++;
              }
              if (e == r) {
                  //...
                  boolean d = done;
                  boolean empty = q.get() == null;
                  if (d && empty) {
                      //...
                      return;
                  }
              }
              if (e != 0) {
                  BackpressureHelper.produced(this, e);
              }
              //...
          }
      }
      複製代碼
      • 在 for (;;) 裏面經過 get() 獲取當前 AtomicLong 的值。而後經過 a.onNext(o); 傳遞給下游,而後 e++ ,在經過 BackpressureHelper.produced(this, e); 減掉 AtomicLong 的值。
      • 當調用 Flowable.bufferSize() 次 onNext 以後, get() 返回的值爲 0 ,因此 e != r 不成立,在 e == r 的判斷中,在從 onNext 過來時 empty 爲 false ,因此直接跳出 for 循環。
      • 經過上面咱們知道當傳遞超過 Flowable.bufferSize() 的事件過來,只會更新 queue 中的值爲最新的事件,其餘啥也沒作。那最後一個事件是怎麼發出的呢?,繼續往下看。
    • 最後一個事件是怎麼發出的? 咱們在上面的 drain() 中調用 a.onNext(o) 最終是調用 observeOn 構建對象中的 ObserveOnSubscriber 的 onNext ,即調用 runAsync(); 。

      public final void onNext(T t) {
          //...
          trySchedule();
      }
      final void trySchedule() {
          //...
          worker.schedule(this);
      }
      @Override
      public final void run() {
          if (outputFused) {
              runBackfused();
          } else if (sourceMode == SYNC) {
              runSync();
          } else {
              runAsync();
          }
      }
      複製代碼
    • runAsync()

      void runAsync() {
          //...
          for (;;) {
              long r = requested.get();
              while (e != r) {
                  boolean d = done;
                  T v;
                  try {
                      v = q.poll();
                  } catch (Throwable ex) {
                      //...
                      return;
                  }
                  //...
                  a.onNext(v);
                  e++;
                  if (e == limit) {
                      if (r != Long.MAX_VALUE) {
                          r = requested.addAndGet(-e);
                      }
                      upstream.request(e);
                      e = 0L;
                  }
              }
              //...
          }
      }
      複製代碼
      • 咱們能夠看到在 for 循環中經過 q.poll() 去獲取緩存隊列 SpscArrayQueue 中的事件。而後經過 a.onNext(v); 去執行咱們示例代碼中的耗時操做。
      • 而後當 e == limit 時,回去調用 LatestAsyncEmitter 的 request(e) ,而 limit 是在構造函數中初始化的,值爲緩存隊列容量 Flowable.bufferSize() 的 3/4 。因此當隊列中的事件消耗了容量的 3/4 以後,會再去請求上游發送事件。
        BaseObserveOnSubscriber(
                Worker worker,
                boolean delayError,
                int prefetch) {
            //...
            this.limit = prefetch - (prefetch >> 2);
        }
        複製代碼
    • request方法:

      @Override
      public final void request(long n) {
          if (SubscriptionHelper.validate(n)) {
              System.out.println("n = " + n);
              BackpressureHelper.add(this, n);
              onRequested();
          }
      }
      @Override
      void onRequested() {
          drain();
      }
      複製代碼

      即繼續執行 drain() 方法,由於 queue 中還保存最新的值事件。因此會經過 a.onNext(o) 發送這個最新的事件。

  • 若是在執行完等待隊列 3/4 的事件以後,上游的事件還沒發送結束,下游即會再次緩存上游發送過來的容量的 3/4 個事件。 示例代碼:

    Flowable.create(new FlowableOnSubscribe<Object>() {
        @Override
        public void subscribe(FlowableEmitter<Object> emitter) throws Exception {
            for (int i = 0; i < Flowable.bufferSize() * 2; i++) {
                emitter.onNext(i);
            }
            Thread.sleep(10 * Flowable.bufferSize());
            for (int i = 0; i < Flowable.bufferSize() * 2; i++) {
                emitter.onNext(Flowable.bufferSize() * 2 + i);
            }
            emitter.onComplete();
        }
    }, BackpressureStrategy.LATEST)
            .subscribeOn(Schedulers.computation())
            .observeOn(AndroidSchedulers.mainThread())
            .subscribe(new Subscriber<Object>() {
                @Override
                public void onSubscribe(Subscription s) {
                    s.request(Integer.MAX_VALUE);
                }
                @Override
                public void onNext(Object o) {
                    System.out.println("onNext: " + o);
                    try {
                        Thread.sleep(10);
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    }
                }
                @Override
                public void onError(Throwable t) {
                    t.printStackTrace();
                }
                @Override
                public void onComplete() {
                    System.out.println("onComplete");
                }
            });
    複製代碼

    輸出結果:

    System.out: onNext: 0
    System.out: onNext: 1
    System.out: onNext: 2
    System.out: onNext: 3
    //....
    System.out: onNext: 125
    System.out: onNext: 126
    System.out: onNext: 127
    System.out: onNext: 255
    System.out: onNext: 256
    System.out: onNext: 257
    //...
    System.out: onNext: 349
    System.out: onNext: 350
    System.out: onNext: 511
    System.out: onComplete
    複製代碼

    能夠看到輸出結果中 255-350 即爲容量 128 的 3/4 個元素。


  • BufferAsyncEmitter

    • 咱們能夠看到內部有一個 SpscLinkedArrayQueue 的緩存隊列,每次調用 onNext 都會先保存到緩存隊列,而後經過 drain() 方法一直去遍歷當前的緩存隊列。 而後和 LatestAsyncEmitter 同樣,當下遊的緩存隊列滿了以後,即再也不放下游發送事件,只是把上游的事件保存在 SpscLinkedArrayQueue 中,等待下游處理了容量的 3/4 的事件以後,上游在發送容量的 3/4 的事件過去。知道上游的事件消耗完,或者異常退出。即和 Observable 的效果相似,只不過緩存隊列一個在上游一個在下游。
    static final class BufferAsyncEmitter<T> extends BaseEmitter<T> {
        final SpscLinkedArrayQueue<T> queue;
        //...
        BufferAsyncEmitter(Subscriber<? super T> actual, int capacityHint) {
            super(actual);
            this.queue = new SpscLinkedArrayQueue<T>(capacityHint);
            this.wip = new AtomicInteger();
        }
        @Override
        public void onNext(T t) {
            //...
            queue.offer(t);
            drain();
        }
        void drain() {
            //...
            final SpscLinkedArrayQueue<T> q = queue;
            for (;;) {
                long r = get();
                long e = 0L;
                while (e != r) {
                    //...
                    boolean d = done;
                    T o = q.poll();
                    boolean empty = o == null;
                    if (d && empty) {
                        //...
                        return;
                    }
                    if (empty) {
                        break;
                    }
                    a.onNext(o);
                    e++;
                }
                if (e == r) {
                    //...
                    boolean d = done;
                    boolean empty = q.isEmpty();
                    if (d && empty) {
                        //...
                        return;
                    }
                }
                if (e != 0) {
                    BackpressureHelper.produced(this, e);
                }
                missed = wip.addAndGet(-missed);
                if (missed == 0) {
                    break;
                }
            }
        }
    }
    複製代碼

小結

  • 咱們知道了 Observable 出現背壓的緣由是上游發送的超多事件緩存在 observeOn 返回對象的緩存隊列中,事件的增長致使了內存的增長。
  • 咱們介紹了 Flowable 的使用和五種背壓策略的具體實現。
  • MISSING:超過 observeOn 配置的 bufferSize 則拋出異常 MissingBackpressureException 並提示 Queue is full?!
  • ERROR:超過 observeOn 配置的 bufferSize 則直接拋出異常 MissingBackpressureException。
  • BUFFER:超過 observeOn 配置的 bufferSize 則緩存到上游的緩衝隊列,等待下游消耗了容量的 3/4 的事件以後,在繼續發送上游緩存的事件給下游。
  • DROP:超過 observeOn 配置的 bufferSize 則丟棄。
  • LATEST:超過 observeOn 配置的 bufferSize 則丟棄並保存最新的值到 queue ,若是在下游消耗了容量的 3/4 的事件以後,上游還有事件在發送,則繼續往下游發送事件,當沒有事件的時候,再發送 queue 中保存的最新的那個事件。

參考文章


若是以爲不錯的話,請幫忙點個讚唄。

以上


掃描下面的二維碼,關注個人公衆號 Android1024, 點關注,不迷路。

Android1024
相關文章
相關標籤/搜索