Back pressure is a resistance or force opposing the desired flow of fluid through pipes.
背壓是抵擋流體在管道中流動的阻力。
以下圖所示,Pipe1和Pipe2在起始處有着相同的起始壓力和距離。Pipe2呢,就遇到了三種backpressure。
java
背壓就是抵擋數據流動的阻力。
背壓問題的出現,須要同時知足兩個條件:react
想象一些不少年前,社交應用消息爆炸致使應用界面卡死的情景。
數組
當咱們聊背壓時,不只是背壓問題,也包括其處理策略。
那一般咱們有哪些背壓策略?緩存
RxJava1中Observable支持Backpressure,因爲使用難度,使用者常常會遇到MissingBackpressureException。RxJava2進行了拆分。同時Flowable遵循"reactive-streams-jvm"接口規範,與Observable在名稱上略有區別。markdown
|
Observable | Flowable |
---|---|---|
觀察源 | ObservableSource | Publisher |
觀察者 | Observer | Subscriber |
支持Backpressure | 不支持 | 支持 |
觀察者.onSubscribe(xx) | xx:Disposable | xx:Subscription |
在Subscription接口中提到,只有當下遊調用了Subscription.request(n)後,上游纔會發送數據。下游將自身處理數據流的能力告訴了上游,同時須要在後續時常調用request(xx),更新自身的處理能力。jvm
有些下游的onSubscribe(subscription)中會調用subscription.request(Long.MAX_VALUE),這就意味着,告訴上游:不用考慮個人處理能力,盡情發送數據。實際上,最終的YourSubscriber通常都這樣幹。ide
observeOn會切換下游線程。在Observable和Flowable中的區別以下oop
在Observable中 | 在Flowable中 | |
---|---|---|
觀察者 | ObserveOnObserver | ObserveOnSubscriber |
觀察者中的SimpleQueue | SpscLinkedArrayQueue 經過鏈表+數組實現 無限容量, |
SpscArrayQueue 經過數組實現, 有限容量 根據buffersize計算數組長度 |
是否有潛在背壓問題 | 無(假定內存無限) | 有(隊列溢出時) |
數組長度爲離buffersize最近且>=buffersize大的2指數冪。
在Flowable中,Queue的容量有限。當隊列已滿時,offer元素會返回false,這意味着溢出。
從BaseObserveOnSubscriber的源碼能夠看出,FlowableOnserveOn在遇到背壓問題時,就只是拋出了異常。fetch
static final class BaseObserveOnSubscriber<T> implements FlowableSubscriber<T> @Override public final void onNext(T t) {
...
if (!queue.offer(t)) {
...
//拋出異常
error = new MissingBackpressureException("Queue is full?!");
}
}
}
複製代碼
咱們先來分析ObserveOnSubscriber中的subscription.request(n)
爲了研究特性,咱們拷貝了FlowableRange,加了log。只有當下遊request(x),則會onNext x個數據。ui
public final class FlowableRange2 extends Flowable<Integer> {
static class RangeSubscription extends AtomicLong implements Subscription {
@Override
public final void request(long n) {
System.out.print(String.format("\n[request:%d]", r));
}
}
}
FlowableRange2(1,20)//拷貝了FlowableRange,加了log
.observeOn(Schedulers.newThread(), false, 5)
.subscribe { Thread.sleep(2);print("$it,") }
//輸出結果:
[request:5]1,2,3,4,
[request:4]5,6,7,8,
[request:4]9,10,11,12,
[request:4]13,14,15,16,
[request:4]17,18,19,20,
複製代碼
能夠看到並無每次都請求buffsize5。能夠看看FlowableObserveOn代碼流程,只適用當前例子。FlowableObserveOn特色:
public final class FlowableObserveOn<T> extends Flowable<T> implements HasUpstreamPublisher<T> {
@Override
public void subscribeActual(Subscriber<? super T> s) {
source.subscribe(new ObserveOnSubscriber<T>(s, scheduler.createWorker(), prefetch));
}
static class ObserveOnSubscriber<T> extends AtomicInteger implements FlowableSubscriber<T>, Runnable, Subscription {
final Subscriber<? super T> actual; //actual: 下游的,yourSubscriber
Subscription s; //上游的,RangeSubscription(在咱們這個例子中)
SimpleQueue<T> queue;
final Worker worker;
final int prefetch;//buffSize
//prefetch - (prefetch >> 2)
final int limit;
final AtomicLong requested;
long produced;
ObserveOnSubscriber(Subscriber<? super T> actual, Worker worker,int prefetch) {
this.actual = actual;
this.worker = worker;
this.prefetch = prefetch;//5
this.requested = new AtomicLong();
this.limit = prefetch - (prefetch >> 2);//4
}
@Override
public void onSubscribe(Subscription s) {
this.s = s;
//observeOn,無鎖容量有限隊列,
queue = new SpscArrayQueue<T>(prefetch);
actual.onSubscribe(this);
//首次request: 參數爲bufferSize,5
s.request(prefetch);
}
}
@Override
//供下游yourSubscriber調用的,實際n爲Long.MAX_VALUE
public final void request(long n) {
BackpressureHelper.add(requested, n);
trySchedule();
}
@Override
public final void onNext(T t) {
if (!queue.offer(t)) {//若是隊列已滿onNext會異常
error = new MissingBackpressureException("Queue is full?!");
}
trySchedule();
}
final void trySchedule() {
if (getAndIncrement() != 0) return;
worker.schedule(this);
}
@Override
public final void run() { runAsync();}
void runAsync() {
int missed = 1;
final Subscriber<? super T> a = actual;
final SimpleQueue<T> q = queue;
long e = produced;
for (; ; ) {
long r = requested.get();
//首次進入時e爲0,r爲Long.MAX_VALUE
while (e != r) {
//取出元素
T v = q.poll();
boolean empty = v == null;
if (empty) break;
a.onNext(v);
e++;
//當消費的數量達到limit時,
if (e == limit) {
//會再次請求上游。個數爲limit。
s.request(e);
e = 0L;
}
}
//w爲while執行過程當中trySchedule請求的次數。
int w = get();
if (missed == w) {
produced = e;
missed = addAndGet(-missed);
if (missed == 0) break;
} else {
missed = w;
}
}
}
}
複製代碼
瞭解Flowable這部分特性後,咱們來看BackpressureStrategy 中提到的5種策略:
這裏有段代碼舉例。生產速度1毫秒/次,消費速度4毫秒/次。生產速度是消費速度的4倍。
ObserveOnSubscriber首次request爲4(prefetch),當消費3(limit)個數值(12毫秒)後,會request(3)。
這就意味着上游線程在前16毫秒內只能onNext 4個數值,當再次request後onNext 3個數值。
var flowableOnSubscribe = FlowableOnSubscribe<Int> { emitter ->
//1.FlowableEmitter每隔1毫秒發射一次值,從1到20。
for (i in 1..20) {
Thread.sleep(1)
emitter.onNext(i)
}
}
//2.背壓策略爲DROP
//3.observeOn, bufferSize爲4。
//4.消費,sleep4毫秒,再打印數值
Flowable.create(flowableOnSubscribe, BackpressureStrategy.DROP)
.observeOn(Schedulers.newThread(), false, 4)
.subscribe { Thread.sleep(4);println(it) }
DROP--輸出結果:
1,2,3,4,13,14,15,
LATEST--輸出結果:
1,2,3,4,12,13,14,16,
複製代碼
abstract static class DropAsyncEmitter<T> extends BaseEmitter<T> {
@Override
public final void onNext(T t) {
if (get() != 0) { //request != 0
actual.onNext(t);
//每次onNext,request--;
BackpressureHelper.produced(this, 1);
} else {
onOverflow();//request == 0
}
}
void onOverflow() {
// nothing to do
}
}
複製代碼
static final class LatestAsyncEmitter<T> extends BaseEmitter<T> {
//實際不是隊列,只有一個值,每次onNext都會更新這個值
final AtomicReference<T> queue = new AtomicReference<T>();
final AtomicInteger wip;
@Override
public final void request(long n) {
BackpressureHelper.add(this, n); //request += n
}
@Override
public void onNext(T t) {
//每drain一次,request--;
//而request爲0時,onNext會更新queue的值,drain不會調用下游的onNext(value)
queue.set(t);
drain();
}
void drain() {
if (wip.getAndIncrement() != 0) return;
int missed = 1;
final Subscriber<? super T> a = actual;
final AtomicReference<T> q = queue;
for (; ; ) {
//在本例子中r開始爲4,即下游的request(4)
long r = get();
long e = 0L;
while (e != r) {
//while循環第一趟e爲0,r爲4,o有值
//while循環第一趟e爲1,r爲4,o沒有值,跳出while
T o = q.getAndSet(null);
boolean empty = o == null;
if (empty) { break;}
a.onNext(o);
e++;
}
每次跳出while循環時,e均爲1
if (e != 0) {
//r--;
BackpressureHelper.produced(this, e);
}
missed = wip.addAndGet(-missed);
//因爲是單線程生成,每一個drain()的for都只會執行一趟
if (missed == 0) {
break;
}
}
}
}
複製代碼
static final class BufferAsyncEmitter<T> extends BaseEmitter<T> {
//無鎖,容量無限隊列,幫下游緩存數值。
final SpscLinkedArrayQueue<T> queue;
}
複製代碼
static final class MissingEmitter<T> extends BaseEmitter<T> {
@Override
public void onNext(T t) {
//無論下游死活,往下游傳數據
actual.onNext(t);
}
}
複製代碼
static final class ErrorAsyncEmitter<T> extends NoOverflowBaseAsyncEmitter<T> {
@Override
void onOverflow() {
//當溢出時,本層即報錯
onError(new MissingBackpressureException("create: could not emit value due to lack of requests"));
}
}
abstract static class NoOverflowBaseAsyncEmitter<T> extends BaseEmitter<T> {
@Override
public final void onNext(T t) {
if (get() != 0) {
actual.onNext(t);
BackpressureHelper.produced(this, 1);
} else {
onOverflow();
}
}
abstract void onOverflow();
}
複製代碼