RxJava介紹4:背壓

Back-pressure

名詞來源

Back pressure is a resistance or force opposing the desired flow of fluid through pipes.
背壓是抵擋流體在管道中流動的阻力
以下圖所示,Pipe1和Pipe2在起始處有着相同的起始壓力和距離。Pipe2呢,就遇到了三種backpressure。
image.png java

軟件概念

背壓就是抵擋數據流動的阻力。
背壓問題的出現,須要同時知足兩個條件:react

  • 上下游不在同一個線程
  • 下游處理速度慢於上游


想象一些不少年前,社交應用消息爆炸致使應用界面卡死的情景。
1_feQArAYERutZ451cMQSV6w.gif 數組

背壓策略

當咱們聊背壓時,不只是背壓問題,也包括其處理策略。
那一般咱們有哪些背壓策略?緩存

  • control producer,控制生產者速度。生產者跟消費者又不是一家,這個挺爲難的。
  • Buffer,緩衝數據。若是是有限容量緩存,那就總有溢出的那一天
  • Drop,丟棄數據。這個卻是好,就是數據可能會失準。


RxJava2的Backpressure

Flowable特性

RxJava1中Observable支持Backpressure,因爲使用難度,使用者常常會遇到MissingBackpressureException。RxJava2進行了拆分。同時Flowable遵循"reactive-streams-jvm"接口規範,與Observable在名稱上略有區別。markdown


Observable Flowable
觀察源 ObservableSource Publisher
觀察者 Observer Subscriber
支持Backpressure 不支持 支持
觀察者.onSubscribe(xx) xx:Disposable xx:Subscription

在Subscription接口中提到,只有當下遊調用了Subscription.request(n)後,上游纔會發送數據。下游將自身處理數據流的能力告訴了上游,同時須要在後續時常調用request(xx),更新自身的處理能力。jvm

有些下游的onSubscribe(subscription)中會調用subscription.request(Long.MAX_VALUE),這就意味着,告訴上游:不用考慮個人處理能力,盡情發送數據。實際上,最終的YourSubscriber通常都這樣幹。ide

FlowableObserveOn

observeOn會切換下游線程。在Observable和Flowable中的區別以下oop

在Observable中 在Flowable中
觀察者 ObserveOnObserver ObserveOnSubscriber
觀察者中的SimpleQueue SpscLinkedArrayQueue
經過鏈表+數組實現
無限容量,
SpscArrayQueue
經過數組實現,
有限容量
根據buffersize計算數組長度
是否有潛在背壓問題 無(假定內存無限) 有(隊列溢出時)

數組長度爲離buffersize最近且>=buffersize大的2指數冪。
在Flowable中,Queue的容量有限。當隊列已滿時,offer元素會返回false,這意味着溢出。
從BaseObserveOnSubscriber的源碼能夠看出,FlowableOnserveOn在遇到背壓問題時,就只是拋出了異常。fetch

static final class BaseObserveOnSubscriber<T> implements FlowableSubscriber<T> @Override public final void onNext(T t) {
            ...
            if (!queue.offer(t)) {
            	...
                //拋出異常
                error = new MissingBackpressureException("Queue is full?!");
            }
        }
}
複製代碼

咱們先來分析ObserveOnSubscriber中的subscription.request(n)
爲了研究特性,咱們拷貝了FlowableRange,加了log。只有當下遊request(x),則會onNext x個數據。ui

public final class FlowableRange2 extends Flowable<Integer> {
    static class RangeSubscription extends AtomicLong implements Subscription {
       @Override
        public final void request(long n) {
        	System.out.print(String.format("\n[request:%d]", r));
        }
    }
}

FlowableRange2(1,20)//拷貝了FlowableRange,加了log
    .observeOn(Schedulers.newThread(), false, 5)
    .subscribe { Thread.sleep(2);print("$it,") }

//輸出結果:
[request:5]1,2,3,4,
[request:4]5,6,7,8,
[request:4]9,10,11,12,
[request:4]13,14,15,16,
[request:4]17,18,19,20,
複製代碼

能夠看到並無每次都請求buffsize5。能夠看看FlowableObserveOn代碼流程,只適用當前例子。FlowableObserveOn特色:

  • FlowableObserveOn會切換線程
    • queue.offer(t),生產者在上游線程
    • runAsync運行下游消費者線程,即scheduler的新線程(在新線程中串行)
  • 有背壓問題,當SpscArrayQueue已滿時,插入數值會有MissingBackpressureException("Queue is full?!")
  • 首次預取數據個數爲prefetch即bufferSize
  • 當處理的數據達到limit,則會向上遊request limit個數值。limit爲prefetch - (prefetch >> 2)
public final class FlowableObserveOn<T> extends Flowable<T> implements HasUpstreamPublisher<T> {
    
    @Override
    public void subscribeActual(Subscriber<? super T> s) {
        source.subscribe(new ObserveOnSubscriber<T>(s, scheduler.createWorker(), prefetch));
    }

     static class ObserveOnSubscriber<T> extends AtomicInteger implements FlowableSubscriber<T>, Runnable, Subscription {	
        final Subscriber<? super T> actual; //actual: 下游的,yourSubscriber 
        Subscription s; //上游的,RangeSubscription(在咱們這個例子中)
		SimpleQueue<T> queue;
        final Worker worker;
        final int prefetch;//buffSize
		//prefetch - (prefetch >> 2)
        final int limit;
        
         final AtomicLong requested;
        long produced;
       
		ObserveOnSubscriber(Subscriber<? super T> actual, Worker worker,int prefetch) {
            this.actual = actual;
            this.worker = worker;
            this.prefetch = prefetch;//5
            this.requested = new AtomicLong();
            this.limit = prefetch - (prefetch >> 2);//4
        }

        @Override
        public void onSubscribe(Subscription s) {
                this.s = s;
            	//observeOn,無鎖容量有限隊列,
                queue = new SpscArrayQueue<T>(prefetch);
                actual.onSubscribe(this);
				//首次request: 參數爲bufferSize,5
                s.request(prefetch);
            }
        }
     
     	@Override
       //供下游yourSubscriber調用的,實際n爲Long.MAX_VALUE
        public final void request(long n) {          
            BackpressureHelper.add(requested, n);
            trySchedule();
        }
     
        @Override
        public final void onNext(T t) {
            if (!queue.offer(t)) {//若是隊列已滿onNext會異常
                error = new MissingBackpressureException("Queue is full?!");
            }
            trySchedule();
        }
		final void trySchedule() {
            if (getAndIncrement() != 0)  return;
            worker.schedule(this);
        }
        @Override
        public final void run() { runAsync();}
        void runAsync() {
            int missed = 1;

            final Subscriber<? super T> a = actual;
            final SimpleQueue<T> q = queue;
            long e = produced;
            for (; ; ) {
                long r = requested.get();  
                //首次進入時e爲0,r爲Long.MAX_VALUE
                while (e != r) {
                    //取出元素
                    T v = q.poll();
                    boolean empty = v == null;
                    if (empty) break;
                    a.onNext(v);
                    e++;
                    //當消費的數量達到limit時,
                    if (e == limit) {
                        //會再次請求上游。個數爲limit。
                        s.request(e);
                        e = 0L;
                    }
                }
               //w爲while執行過程當中trySchedule請求的次數。
                int w = get();
                if (missed == w) {
                    produced = e;
                    missed = addAndGet(-missed);
                    if (missed == 0) break;
                } else {
                    missed = w;
                }
            }
        }
    }
複製代碼

BackpressureStrategy

瞭解Flowable這部分特性後,咱們來看BackpressureStrategy 中提到的5種策略:

  • MISSING 寫入OnNext事件時不會進行任何緩衝或丟棄,下游要處理溢出。
  • �ERROR 拋出 MissingBackpressureException
  • BUFFER 緩衝全部onNext值,直到下游消費該值
  • DROP 丟棄最新的onNext值
  • LATEST 只保留最新的onNext值,一直覆蓋前面的值

這裏有段代碼舉例。生產速度1毫秒/次,消費速度4毫秒/次。生產速度是消費速度的4倍。
ObserveOnSubscriber首次request爲4(prefetch),當消費3(limit)個數值(12毫秒)後,會request(3)。
這就意味着上游線程在前16毫秒內只能onNext 4個數值,當再次request後onNext 3個數值。

var flowableOnSubscribe = FlowableOnSubscribe<Int> { emitter ->
    //1.FlowableEmitter每隔1毫秒發射一次值,從1到20。
	for (i in 1..20) {
    	Thread.sleep(1)
		emitter.onNext(i)
	}
}
//2.背壓策略爲DROP
//3.observeOn, bufferSize爲4。
//4.消費,sleep4毫秒,再打印數值
Flowable.create(flowableOnSubscribe, BackpressureStrategy.DROP)
	.observeOn(Schedulers.newThread(), false, 4)
	.subscribe { Thread.sleep(4);println(it)  }
    
DROP--輸出結果:
1,2,3,4,13,14,15,

LATEST--輸出結果:
1,2,3,4,12,13,14,16,
複製代碼

DropAsyncEmitter

abstract static class DropAsyncEmitter<T> extends BaseEmitter<T> {

        @Override
        public final void onNext(T t) {
            if (get() != 0) {  //request != 0
                actual.onNext(t);
                //每次onNext,request--;
                BackpressureHelper.produced(this, 1);
            } else {
                onOverflow();//request == 0
            }
        }
     	void onOverflow() {
            // nothing to do
        }
    }
複製代碼

截屏2021-05-17 下午1.48.06.png

LatestAsyncEmitter

static final class LatestAsyncEmitter<T> extends BaseEmitter<T> {

    	//實際不是隊列,只有一個值,每次onNext都會更新這個值
        final AtomicReference<T> queue = new AtomicReference<T>();
		final AtomicInteger wip;
    
    	@Override
        public final void request(long n) {
           	BackpressureHelper.add(this, n); //request += n
        }

        @Override
        public void onNext(T t) {
        //每drain一次,request--;
    	//而request爲0時,onNext會更新queue的值,drain不會調用下游的onNext(value)
			queue.set(t);
            drain();
        }
    
    
        void drain() {
            if (wip.getAndIncrement() != 0) return;
            int missed = 1;
            final Subscriber<? super T> a = actual;
            final AtomicReference<T> q = queue;

            for (; ; ) {
                //在本例子中r開始爲4,即下游的request(4)
                long r = get();
                long e = 0L;
              
                while (e != r) {
                    //while循環第一趟e爲0,r爲4,o有值
                	//while循環第一趟e爲1,r爲4,o沒有值,跳出while
                    T o = q.getAndSet(null);
                    boolean empty = o == null;
                    if (empty) { break;}
                    a.onNext(o);
                    e++;
                }
                每次跳出while循環時,e均爲1
                if (e != 0) {
                    //r--;
                    BackpressureHelper.produced(this, e);
                }
               
                missed = wip.addAndGet(-missed);
                 //因爲是單線程生成,每一個drain()的for都只會執行一趟
                if (missed == 0) {
                    break;
                }
            }
        }
    }
複製代碼

截屏2021-05-17 下午1.48.29.png

BufferAsyncEmitter

static final class BufferAsyncEmitter<T> extends BaseEmitter<T> {

    	//無鎖,容量無限隊列,幫下游緩存數值。
        final SpscLinkedArrayQueue<T> queue;
}
複製代碼

MissingEmitter

static final class MissingEmitter<T> extends BaseEmitter<T> {

        @Override
        public void onNext(T t) {
            //無論下游死活,往下游傳數據
            actual.onNext(t);  
        }
}
複製代碼

ErrorAsyncEmitter

static final class ErrorAsyncEmitter<T> extends NoOverflowBaseAsyncEmitter<T> {
        @Override
        void onOverflow() {
            //當溢出時,本層即報錯
            onError(new MissingBackpressureException("create: could not emit value due to lack of requests"));
        }

    }
abstract static class NoOverflowBaseAsyncEmitter<T> extends BaseEmitter<T> {
        @Override
        public final void onNext(T t) {
            if (get() != 0) {
                actual.onNext(t);
                BackpressureHelper.produced(this, 1);
            } else {
                onOverflow();
            }
        }

        abstract void onOverflow();
    }
複製代碼

總結

  • 背壓問題的前提:上下游在不一樣線程。
  • 下游調用request(n),上游記錄request值,即下游可以處理的個數。
  • 上游onNext(數值),request--。當request爲0,上游onNext則會有背壓問題。
  • 出現背壓時,通常的處理策略:
    • 上游無論,直接onNext,下游本身處理溢出
    • 上游拋出異常
    • 上游緩存溢出的數值,直到下游再次更新request值。
    • 上游丟棄溢出的數值,區分是否保留最後的值。
相關文章
相關標籤/搜索