轉載請以連接形式標明出處: 本文出自:103style的博客react
base on RxJava 2.xgit
官方介紹github
Backpressure is when in an Flowable processing pipeline, some asynchronous stages can't process the values fast enough and need a way to tell the upstream producer to slow down. 背壓是在Flowable處理事件流中,某些異步階段沒法足夠快地處理這些值,而且須要一種方法來告訴上游生產商減速。緩存
因此 RxJava 的背壓策略(Backpressure)是指處理上述上游流速過快現象的一種策略。 相似 Java中的線程池 中的飽和策略 RejectedExecutionHandler。bash
咱們先使用 Observable 看看是什麼狀況:異步
Observable
.create(new ObservableOnSubscribe<Integer>() {
@Override
public void subscribe(ObservableEmitter<Integer> emitter) throws Exception {
for (int i = 0; ; i++) {
emitter.onNext(i);
}
}
})
.subscribeOn(Schedulers.computation())
.observeOn(AndroidSchedulers.mainThread())
.subscribe(new Consumer<Integer>() {
@Override
public void accept(Integer integer) throws Exception {
try {
Thread.sleep(3000);
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println(integer);
}
});
複製代碼
輸出:async
I/art: Background partial concurrent mark sweep GC freed 7(224B) AllocSpace objects, 0(0B) LOS objects, 27% free, 43MB/59MB, paused 528us total 106.928ms
I/System.out: 0
I/art: Background partial concurrent mark sweep GC freed 8(256B) AllocSpace objects, 0(0B) LOS objects, 20% free, 62MB/78MB, paused 1.065ms total 327.346ms
I/System.out: 1
I/art: Background partial concurrent mark sweep GC freed 8(256B) AllocSpace objects, 0(0B) LOS objects, 16% free, 82MB/98MB, paused 1.345ms total 299.700ms
I/art: Background partial concurrent mark sweep GC freed 8(256B) AllocSpace objects, 0(0B) LOS objects, 13% free, 103MB/119MB, paused 1.609ms total 377.432ms
I/System.out: 2
...
I/art: Alloc concurrent mark sweep GC freed 0(0B) AllocSpace objects, 0(0B) LOS objects, 1% free, 252MB/256MB, paused 1.574ms total 818.037ms
I/art: WaitForGcToComplete blocked for 2.539s for cause Alloc
I/art: Starting a blocking GC Alloc
I/art: Waiting for a blocking GC Alloc
W/art: Throwing OutOfMemoryError "Failed to allocate a 12 byte allocation with 4109520 free bytes and 3MB until OOM; failed due to fragmentation (required continguous free 4096 bytes for a new buffer where largest contiguous free 0 bytes)"
複製代碼
咱們能夠從上圖中看到,內存在逐步上升,在必定的時間後,到達 256M 以後會觸發GC,最後拋出 OutOfMemoryError。由於上游的事件發送太快而下游的消費者消耗的比較慢。ide
那致使內存暴增的源頭是什麼呢 ?函數
咱們對上面的代碼作一點點修改,註釋了 observeOn(AndroidSchedulers.mainThread()),會發現內存顯示很正常,不會存在上述問題。源碼分析
Observable
.create(new ObservableOnSubscribe<Integer>() {
@Override
public void subscribe(ObservableEmitter<Integer> emitter) throws Exception {
for (int i = 0; ; i++) {
emitter.onNext(i);
}
}
})
.subscribeOn(Schedulers.computation())
// .observeOn(AndroidSchedulers.mainThread())
.subscribe(new Consumer<Integer>() {
@Override
public void accept(Integer integer) throws Exception {
try {
Thread.sleep(3000);
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println(integer);
}
});
複製代碼
因此內存暴增的源頭就在 observeOn(AndroidSchedulers.mainThread()).
咱們來看看 observeOn 的源碼,經過 RxJava subscribeOn和observeOn源碼介紹,咱們知道在 ObservableObserveOn.ObserveOnObserver 的 onSubscribe 中構建了一個容量默認爲 128 的 SpscLinkedArrayQueue。
queue = new SpscLinkedArrayQueue<T>(bufferSize);
複製代碼
上游每發送一個事件都會經過 queue.offer(t) 保存到 SpscLinkedArrayQueue 中。
public void onNext(T t) {
if (done) {
return;
}
if (sourceMode != QueueDisposable.ASYNC) {
queue.offer(t);
}
schedule();
}
複製代碼
咱們能夠寫個測試代碼來看看,由於生產比消費快的多,至關於一直添加元素,以下:
private void test(){
SpscLinkedArrayQueue<Integer> queue = new SpscLinkedArrayQueue<>(128);
for (int i = 0; ; i++) {
queue.offer(i);
}
}
複製代碼
運行會發現內存變化和 Observable 同樣迅速暴增。
SpscLinkedArrayQueue 的詳細介紹後面再說。如今能夠大體理解爲 一直狂吃,而後最後撐破肚皮,而後裂開。
咱們來看看 Flowable 的用法:
Flowable.create(FlowableOnSubscribe<T> source, BackpressureStrategy mode)
複製代碼
BackpressureStrategy 包含五種模式:MISSING、ERROR、BUFFER、DROP、LATEST。
下面對這五種 BackpressureStrategy 分別介紹其用法以及 發送事件速度 > 接收事件速度 時的處理方式:
BackpressureStrategy.MISSING 處理方式:拋出異常 MissingBackpressureException,並提示 緩存區滿了 代碼示例:
Flowable
.create(new FlowableOnSubscribe<Object>() {
@Override
public void subscribe(FlowableEmitter<Object> emitter) throws Exception {
for (int i = 0; i < Flowable.bufferSize() * 2; i++) {
emitter.onNext(i);
}
emitter.onComplete();
}
}, BackpressureStrategy.MISSING)
.subscribeOn(Schedulers.computation())
.observeOn(AndroidSchedulers.mainThread())
.subscribe(new Subscriber<Object>() {
@Override
public void onSubscribe(Subscription s) {
s.request(Integer.MAX_VALUE);
}
@Override
public void onNext(Object o) {
System.out.println("onNext: " + o);
try {
Thread.sleep(100);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
@Override
public void onError(Throwable t) {
t.printStackTrace();
}
@Override
public void onComplete() {
System.out.println("onComplete");
}
});
複製代碼
輸出結果:
System.out: onNext: 0
System.err: io.reactivex.exceptions.MissingBackpressureException: Queue is full?!
複製代碼
BackpressureStrategy.ERROR 處理方式:直接拋出異常MissingBackpressureException 修改上述代碼的 BackpressureStrategy.MISSING 爲 BackpressureStrategy.ERROR:
Flowable
.create(new FlowableOnSubscribe<Object>() {
...
}, BackpressureStrategy.ERROR)
...
複製代碼
輸出結果:
System.out: onNext: 0
System.err: io.reactivex.exceptions.MissingBackpressureException: create: could not emit value due to lack of requests
複製代碼
BackpressureStrategy.BUFFER 處理方式:相似 Observable 同樣 擴充緩存區大小 修改上述代碼的 BackpressureStrategy.MISSING 爲 BackpressureStrategy.BUFFER:
Flowable
.create(new FlowableOnSubscribe<Object>() {
...
}, BackpressureStrategy.BUFFER)
...
複製代碼
輸出結果:
System.out: onNext: 0
System.out: onNext: 1
System.out: onNext: 2
System.out: onNext: 3
...
System.out: onNext: 253
System.out: onNext: 254
System.out: onNext: 255
System.out: onComplete
複製代碼
BackpressureStrategy.DROP 處理方式:丟棄緩存區滿後處理緩衝區數據期間發送過來的事件 示例代碼:
Flowable
.create(new FlowableOnSubscribe<Object>() {
@Override
public void subscribe(FlowableEmitter<Object> emitter) throws Exception {
for (int i = 0; ; i++) {
emitter.onNext(i);
}
}
}, BackpressureStrategy.DROP)
.subscribeOn(Schedulers.computation())
.observeOn(AndroidSchedulers.mainThread())
.subscribe(new Subscriber<Object>() {
@Override
public void onSubscribe(Subscription s) {
s.request(Integer.MAX_VALUE);
}
@Override
public void onNext(Object o) {
System.out.println("onNext: " + o);
try {
Thread.sleep(10);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
@Override
public void onError(Throwable t) {
t.printStackTrace();
}
@Override
public void onComplete() {
System.out.println("onComplete");
}
});
複製代碼
輸出結果:
System.out: onNext: 0
System.out: onNext: 1
System.out: onNext: 2
System.out: onNext: 3
...
System.out: onNext: 124
System.out: onNext: 125
System.out: onNext: 126
System.out: onNext: 127
System.out: onNext: 1070801
System.out: onNext: 1070802
System.out: onNext: 1070803
System.out: onNext: 1070804
System.out: onNext: 1070805
...
複製代碼
BackpressureStrategy.LATEST 處理方式:丟棄緩存區滿後處理緩衝區數據期間發送過來的非最後一個事件。下面示例代碼輸出了 129 個事件,下面的源碼分析會介紹。 示例代碼:
Flowable
.create(new FlowableOnSubscribe<Object>() {
@Override
public void subscribe(FlowableEmitter<Object> emitter) throws Exception {
for (int i = 0; i < Flowable.bufferSize() * 2; i++) {
emitter.onNext(i);
}
emitter.onComplete();
}
}, BackpressureStrategy.LATEST)
.subscribeOn(Schedulers.computation())
.observeOn(AndroidSchedulers.mainThread())
.subscribe(new Subscriber<Object>() {
@Override
public void onSubscribe(Subscription s) {
s.request(Integer.MAX_VALUE);
}
@Override
public void onNext(Object o) {
System.out.println("onNext: " + o);
try {
Thread.sleep(10);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
@Override
public void onError(Throwable t) {
t.printStackTrace();
}
@Override
public void onComplete() {
System.out.println("onComplete");
}
});
複製代碼
輸出結果:
System.out: onNext: 0
System.out: onNext: 1
System.out: onNext: 2
System.out: onNext: 3
...
System.out: onNext: 124
System.out: onNext: 125
System.out: onNext: 126
System.out: onNext: 127
System.out: onNext: 255
System.out: onComplete
複製代碼
經過以前 RxJava之create操做符源碼解析 的介紹。咱們知道 Flowable.create(new FlowableOnSubscribe(){...}, BackpressureStrategy.LATEST) 返回的是一個 FlowableCreate 對象。
分別對不一樣的背壓策略建立了不一樣的 Emitter .
public final class FlowableCreate<T> extends Flowable<T> {
//...
public FlowableCreate(FlowableOnSubscribe<T> source, BackpressureStrategy backpressure) {
this.source = source;
this.backpressure = backpressure;
}
public void subscribeActual(Subscriber<? super T> t) {
BaseEmitter<T> emitter;
switch (backpressure) {
case MISSING: {
emitter = new MissingEmitter<T>(t);
break;
}
case ERROR: {
emitter = new ErrorAsyncEmitter<T>(t);
break;
}
case DROP: {
emitter = new DropAsyncEmitter<T>(t);
break;
}
case LATEST: {
emitter = new LatestAsyncEmitter<T>(t);
break;
}
default: {
emitter = new BufferAsyncEmitter<T>(t, bufferSize());
break;
}
}
t.onSubscribe(emitter);
try {
source.subscribe(emitter);
} catch (Throwable ex) {
Exceptions.throwIfFatal(ex);
emitter.onError(ex);
}
}
//...
}
複製代碼
MissingEmitter
static final class MissingEmitter<T> extends BaseEmitter<T> {
MissingEmitter(Subscriber<? super T> downstream) {
super(downstream);
}
@Override
public void onNext(T t) {
if (isCancelled()) {
return;
}
if (t != null) {
downstream.onNext(t);
} else {
onError(new NullPointerException("onNext called with null. Null values are generally not allowed in 2.x operators and sources."));
return;
}
for (;;) {
long r = get();
if (r == 0L || compareAndSet(r, r - 1)) {
return;
}
}
}
}
複製代碼
經過上面的代碼咱們能夠看到 MissingEmitter 基本上沒作什麼操做,因此 BackpressureStrategy.MISSING 示例中的代碼其實是調用了 ObserveOn 中返回對象的 FlowableObserveOn.ObserveOnSubscriber 的 onNext:
public final void onNext(T t) {
if (done) {
return;
}
if (sourceMode == ASYNC) {
trySchedule();
return;
}
if (!queue.offer(t)) {
upstream.cancel();
error = new MissingBackpressureException("Queue is full?!");
done = true;
}
trySchedule();
}
複製代碼
上面代碼中咱們看到了背壓狀況下出現的報錯信息,出現的前提是 queue.offer(t) 返回 false 。這裏的 queue 是 onSubscribe 中構造的容量爲 Flowable.bufferSize() 的 SpscArrayQueue .
public void onSubscribe(Subscription s) {
if (SubscriptionHelper.validate(this.upstream, s)) {
this.upstream = s;
//...
queue = new SpscArrayQueue<T>(prefetch);
downstream.onSubscribe(this);
s.request(prefetch);
}
}
複製代碼
SpscArrayQueue 的 offer 方法,咱們能夠看到當 SpscArrayQueue 數據 「滿了」 的時候即返回 false .
public boolean offer(E e) {
//...
final int mask = this.mask;
final long index = producerIndex.get();
final int offset = calcElementOffset(index, mask);
if (index >= producerLookAhead) {
int step = lookAheadStep;
if (null == lvElement(calcElementOffset(index + step, mask))) { // LoadLoad
producerLookAhead = index + step;
} else if (null != lvElement(offset)) {
return false;
}
}
soElement(offset, e); // StoreStore
soProducerIndex(index + 1); // ordered store -> atomic and ordered for size()
return true;
}
複製代碼
因此 BackpressureStrategy.MISSING 在緩衝區滿了以後再發射事件即會拋出 message 爲 "Queue is full?!" 的 MissingBackpressureException .
ErrorAsyncEmitter
abstract static class BaseEmitter<T>
extends AtomicLong
implements FlowableEmitter<T>, Subscription {
//...
@Override
public final void request(long n) {
if (SubscriptionHelper.validate(n)) {
BackpressureHelper.add(this, n);
onRequested();
}
}
//...
}
abstract static class NoOverflowBaseAsyncEmitter<T> extends BaseEmitter<T> {
NoOverflowBaseAsyncEmitter(Subscriber<? super T> downstream) {
super(downstream);
}
@Override
public final void onNext(T t) {
//...
if (get() != 0) {
downstream.onNext(t);
BackpressureHelper.produced(this, 1);
} else {
onOverflow();
}
}
abstract void onOverflow();
}
static final class ErrorAsyncEmitter<T> extends NoOverflowBaseAsyncEmitter<T> {
ErrorAsyncEmitter(Subscriber<? super T> downstream) {
super(downstream);
}
@Override
void onOverflow() {
onError(new MissingBackpressureException("create: could not emit value due to lack of requests"));
}
}
複製代碼
經過在 onSubscribe 中調用 request(Flowable.bufferSize()) 設置當前 AtomicLong 的 value 值。 而後 onNext 中每傳遞一個事件就經過 BackpressureHelper.produced(this, 1) 將 value 減 1 . 當發送了 Flowable.bufferSize() 個事件, get() != 0 不成立,調用 onOverflow() 方法拋出 MissingBackpressureException 異常。
static final class DropAsyncEmitter<T> extends NoOverflowBaseAsyncEmitter<T> {
private static final long serialVersionUID = 8360058422307496563L;
DropAsyncEmitter(Subscriber<? super T> downstream) {
super(downstream);
}
@Override
void onOverflow() {
// nothing to do
}
}
複製代碼
和 ErrorAsyncEmitter 相似,只不過當發送超過超過 Flowable.bufferSize() 的事件時,啥也沒作,即實現丟棄的功能。LatestAsyncEmitter
static final class LatestAsyncEmitter<T> extends BaseEmitter<T> {
final AtomicReference<T> queue;
//...
LatestAsyncEmitter(Subscriber<? super T> downstream) {
super(downstream);
this.queue = new AtomicReference<T>();
//...
}
@Override
public void onNext(T t) {
//...
queue.set(t);
drain();
}
//...
}
複製代碼
咱們能夠看到每次調用 onNext 都會更新傳過來的值到 queue 中,因此 queue 中保存了最新的值。
接着來看 drain 方法: 上面咱們知道在 onSubscribe 中調用 request() 設置當前 AtomicLong 的 value 值。
void drain() {
//...
for (;;) {
long r = get();
long e = 0L;
while (e != r) {
//...
boolean d = done;
T o = q.getAndSet(null);
boolean empty = o == null;
if (d && empty) {
//...
return;
}
if (empty) {
break;
}
a.onNext(o);
e++;
}
if (e == r) {
//...
boolean d = done;
boolean empty = q.get() == null;
if (d && empty) {
//...
return;
}
}
if (e != 0) {
BackpressureHelper.produced(this, e);
}
//...
}
}
複製代碼
最後一個事件是怎麼發出的? 咱們在上面的 drain() 中調用 a.onNext(o) 最終是調用 observeOn 構建對象中的 ObserveOnSubscriber 的 onNext ,即調用 runAsync(); 。
public final void onNext(T t) {
//...
trySchedule();
}
final void trySchedule() {
//...
worker.schedule(this);
}
@Override
public final void run() {
if (outputFused) {
runBackfused();
} else if (sourceMode == SYNC) {
runSync();
} else {
runAsync();
}
}
複製代碼
runAsync():
void runAsync() {
//...
for (;;) {
long r = requested.get();
while (e != r) {
boolean d = done;
T v;
try {
v = q.poll();
} catch (Throwable ex) {
//...
return;
}
//...
a.onNext(v);
e++;
if (e == limit) {
if (r != Long.MAX_VALUE) {
r = requested.addAndGet(-e);
}
upstream.request(e);
e = 0L;
}
}
//...
}
}
複製代碼
BaseObserveOnSubscriber(
Worker worker,
boolean delayError,
int prefetch) {
//...
this.limit = prefetch - (prefetch >> 2);
}
複製代碼
request方法:
@Override
public final void request(long n) {
if (SubscriptionHelper.validate(n)) {
System.out.println("n = " + n);
BackpressureHelper.add(this, n);
onRequested();
}
}
@Override
void onRequested() {
drain();
}
複製代碼
即繼續執行 drain() 方法,由於 queue 中還保存最新的值事件。因此會經過 a.onNext(o) 發送這個最新的事件。
若是在執行完等待隊列 3/4 的事件以後,上游的事件還沒發送結束,下游即會再次緩存上游發送過來的容量的 3/4 個事件。 示例代碼:
Flowable.create(new FlowableOnSubscribe<Object>() {
@Override
public void subscribe(FlowableEmitter<Object> emitter) throws Exception {
for (int i = 0; i < Flowable.bufferSize() * 2; i++) {
emitter.onNext(i);
}
Thread.sleep(10 * Flowable.bufferSize());
for (int i = 0; i < Flowable.bufferSize() * 2; i++) {
emitter.onNext(Flowable.bufferSize() * 2 + i);
}
emitter.onComplete();
}
}, BackpressureStrategy.LATEST)
.subscribeOn(Schedulers.computation())
.observeOn(AndroidSchedulers.mainThread())
.subscribe(new Subscriber<Object>() {
@Override
public void onSubscribe(Subscription s) {
s.request(Integer.MAX_VALUE);
}
@Override
public void onNext(Object o) {
System.out.println("onNext: " + o);
try {
Thread.sleep(10);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
@Override
public void onError(Throwable t) {
t.printStackTrace();
}
@Override
public void onComplete() {
System.out.println("onComplete");
}
});
複製代碼
輸出結果:
System.out: onNext: 0
System.out: onNext: 1
System.out: onNext: 2
System.out: onNext: 3
//....
System.out: onNext: 125
System.out: onNext: 126
System.out: onNext: 127
System.out: onNext: 255
System.out: onNext: 256
System.out: onNext: 257
//...
System.out: onNext: 349
System.out: onNext: 350
System.out: onNext: 511
System.out: onComplete
複製代碼
能夠看到輸出結果中 255-350 即爲容量 128 的 3/4 個元素。
BufferAsyncEmitter
static final class BufferAsyncEmitter<T> extends BaseEmitter<T> {
final SpscLinkedArrayQueue<T> queue;
//...
BufferAsyncEmitter(Subscriber<? super T> actual, int capacityHint) {
super(actual);
this.queue = new SpscLinkedArrayQueue<T>(capacityHint);
this.wip = new AtomicInteger();
}
@Override
public void onNext(T t) {
//...
queue.offer(t);
drain();
}
void drain() {
//...
final SpscLinkedArrayQueue<T> q = queue;
for (;;) {
long r = get();
long e = 0L;
while (e != r) {
//...
boolean d = done;
T o = q.poll();
boolean empty = o == null;
if (d && empty) {
//...
return;
}
if (empty) {
break;
}
a.onNext(o);
e++;
}
if (e == r) {
//...
boolean d = done;
boolean empty = q.isEmpty();
if (d && empty) {
//...
return;
}
}
if (e != 0) {
BackpressureHelper.produced(this, e);
}
missed = wip.addAndGet(-missed);
if (missed == 0) {
break;
}
}
}
}
複製代碼
若是以爲不錯的話,請幫忙點個讚唄。
以上
掃描下面的二維碼,關注個人公衆號 Android1024, 點關注,不迷路。