5章 RxJava背壓策略

本篇文章已受權微信公衆號 YYGeeker 獨家發佈轉載請標明出處react

CSDN學院課程地址緩存

5. RxJava背壓策略(BackpressureStrategy)

5.1 背壓是什麼

背壓的概念是在平時業務開發時較爲常見,大多數是針對高併發的業務,背壓是必須考慮的因素之一。在異步場景中,因爲數據流的發射速度高於數據流的接收速度,就會致使數據不能及時處理,從而致使數據流的阻塞。背壓所要作的事情就是主動控制數據流發射的速度bash

在RxJava2.0中,推出了Flowable用來支持背壓,去除了Observable對背壓的支持,下面在背壓策略的講解中,咱們都使用Flowable做爲咱們的響應類型。在使用背壓時,只須要在create()方法中第二個參數添加背壓策略便可微信

  1. 在訂閱的時候若是使用FlowableSubscriber,那麼須要經過s.request(Long.MAX_VALUE)去主動請求上游的數據項。若是遇到背壓報錯的時候,FlowableSubscriber默認已經將錯誤try-catch,並經過onError()進行回調,程序並不會崩潰
  2. 在訂閱的時候若是使用Consumer,那麼不須要主動去請求上游數據,默認已經調用了s.request(Long.MAX_VALUE)。若是遇到背壓報錯、且對Throwable的Consumer沒有new出來,則程序直接崩潰
  3. 背壓策略的上游的默認緩存池是128
public abstract class Flowable<T> implements Publisher<T> {
    /** The default buffer size. */
    static final int BUFFER_SIZE;
    static {
        BUFFER_SIZE = Math.max(1, Integer.getInteger("rx2.buffer-size", 128));
    }
}
複製代碼

5.2 MISSING

MISSING表示OnNext事件沒有任何緩存和丟棄,下游要處理任何溢出,能夠理解爲至關於沒有指定背壓策略。Flowable至關於沒有指定背壓策略能夠將下游要處理任何溢出理解爲,上游發射的數據未獲得處理,就會緩存起來,當緩存容量達到128時,再增長一個未處理的數據項,就會拋出MissingBackpressureException,且帶有隊列已經滿了的友好提示。這裏就比如一個大水缸,當水注滿的時候,它就會把蓋子蓋上,不讓你再繼續注水了併發

這裏咱們模擬上游發送速度高於下游數據流的處理速度,在數據處理的時候加上Thread.sleep(1000)異步

public void missing() {
    Flowable.create(new FlowableOnSubscribe<Integer>() {
        @Override
        public void subscribe(FlowableEmitter<Integer> emitter) throws Exception {
            for (int i = 0; i < 129; i++) {
                emitter.onNext(i);
            }
        }
    }, BackpressureStrategy.MISSING)
            .subscribeOn(Schedulers.io())
            .observeOn(AndroidSchedulers.mainThread())
            .subscribe(new FlowableSubscriber<Integer>() {
                @Override
                public void onSubscribe(Subscription s) {
                    s.request(Long.MAX_VALUE);
                }

                @Override
                public void onNext(Integer integer) {
                    try {
                        Thread.sleep(1000);
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    }
                    Log.e("TAG", "onNext=" + integer);
                }

                @Override
                public void onError(Throwable t) {
                    t.printStackTrace();
                }

                @Override
                public void onComplete() {

                }
            });
}
複製代碼

輸出ide

io.reactivex.exceptions.MissingBackpressureException: Queue is full?!
複製代碼

5.3 ERROR

ERROR表示在下游沒法跟上時,會拋出MissingBackpressureException。能夠將下游沒法跟上理解爲,上游發射的數據未獲得處理,就會緩存起來,當緩存容量達到128時,再增長一個未處理的數據項,就會拋出MissingBackpressureException。這裏比如一個大水缸,當水注滿的時候,它會把水缸撐破了,直接破裂高併發

public void error() {
    Flowable.create(new FlowableOnSubscribe<Integer>() {
        @Override
        public void subscribe(FlowableEmitter<Integer> emitter) throws Exception {
            for (int i = 0; i < 129; i++) {
                emitter.onNext(i);
            }
        }
    }, BackpressureStrategy.ERROR)
            .subscribeOn(Schedulers.io())
            .observeOn(AndroidSchedulers.mainThread())
            .subscribe(new FlowableSubscriber<Integer>() {
                @Override
                public void onSubscribe(Subscription s) {
                    s.request(Long.MAX_VALUE);
                }

                @Override
                public void onNext(Integer integer) {
                    try {
                        Thread.sleep(1000);
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    }
                    Log.e("TAG", "onNext=" + integer);
                }

                @Override
                public void onError(Throwable t) {
                    t.printStackTrace();
                }

                @Override
                public void onComplete() {

                }
            });
}
複製代碼

輸出源碼分析

io.reactivex.exceptions.MissingBackpressureException: create: could not emit value due to lack of requests
複製代碼

5.4 BUFFER

上游不斷的發出onNext請求,直到下游處理完,上游發射的數據項的緩存池是無限大的,程序也不會拋出錯誤,可是要注意程序OOM的現象,由於緩存越大,佔用的內存就越多。例子中發射129個數據項,然而程序並無崩潰,只會一直讀取緩存池的數據項,直到數據項被處理完。這裏就是一個無限大的水缸ui

背壓策略除了BUFFER策略的緩存池是無限大以外,其餘默認的緩存池都是128

public void buffer() {
    Flowable.create(new FlowableOnSubscribe<Integer>() {
        @Override
        public void subscribe(FlowableEmitter<Integer> emitter) throws Exception {
            for (int i = 0; i < 1000; i++) {
                emitter.onNext(i);
            }
        }
    }, BackpressureStrategy.BUFFER)
            .subscribeOn(Schedulers.io())
            .observeOn(AndroidSchedulers.mainThread())
            .subscribe(new FlowableSubscriber<Integer>() {
                @Override
                public void onSubscribe(Subscription s) {
                    s.request(Long.MAX_VALUE);
                }

                @Override
                public void onNext(Integer integer) {
                    try {
                        Thread.sleep(10);
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    }
                    Log.e("TAG", "onNext=" + integer);
                }

                @Override
                public void onError(Throwable t) {
                    t.printStackTrace();
                }

                @Override
                public void onComplete() {

                }
            });
}
複製代碼

輸出

onNext=0
onNext=1
onNext=2
......
onNext=998
onNext=999
複製代碼

5.5 DROP

會在下游跟不上速度時,把onNext的值丟棄,簡單的說就是,超過緩存區大小(128)的數據項都會被丟棄。例子中經過發射800個數據項,那麼咱們只會收到0-127的數據項。若是咱們再次調用request(),這時候取到的數據就是上一次request()後的128個數據。這裏比如一個大水缸,當水注滿的時候,水仍是在繼續的流,一旦有request調用的時候,它就會去取出水缸裏的全部水,這時候水缸就是空的,但水一直在流,因此水缸立刻又會被注滿,這個時候就要等request再次取出水缸裏的水

public void drop() {
    Flowable.create(new FlowableOnSubscribe<Integer>() {
        @Override
        public void subscribe(FlowableEmitter<Integer> emitter) throws Exception {
            for (int i = 0; i < 1000; i++) {
                emitter.onNext(i);
            }
        }
    }, BackpressureStrategy.DROP)
            .subscribeOn(Schedulers.io())
            .observeOn(AndroidSchedulers.mainThread())
            .subscribe(new FlowableSubscriber<Integer>() {
                @Override
                public void onSubscribe(Subscription s) {
                    s.request(Long.MAX_VALUE);
                }

                @Override
                public void onNext(Integer integer) {
                    try {
                        Thread.sleep(1000);
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    }
                    Log.e("TAG", "onNext=" + integer);
                }

                @Override
                public void onError(Throwable t) {
                    t.printStackTrace();
                }

                @Override
                public void onComplete() {

                }
            });
}
複製代碼

輸出

onNext=0
onNext=1
onNext=2
......
onNext=127
複製代碼

5.6 LATEST

LATEST與Drop策略同樣,若是超過緩存池容量大小的數據項都會被丟棄。不一樣的是,無論緩存池的狀態如何,LATEST都會將最後一條數據強行放入緩存池中。這裏的水缸容納下了最後一滴水

public void latest() {
    Flowable.create(new FlowableOnSubscribe<Integer>() {
        @Override
        public void subscribe(FlowableEmitter<Integer> emitter) throws Exception {
            for (int i = 0; i < 1000; i++) {
                emitter.onNext(i);
            }
        }
    }, BackpressureStrategy.LATEST)
            .subscribeOn(Schedulers.io())
            .observeOn(AndroidSchedulers.mainThread())
            .subscribe(new FlowableSubscriber<Integer>() {
                @Override
                public void onSubscribe(Subscription s) {
                    s.request(Long.MAX_VALUE);
                }

                @Override
                public void onNext(Integer integer) {
                    try {
                        Thread.sleep(10);
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    }
                    Log.e("TAG", "onNext=" + integer);
                }

                @Override
                public void onError(Throwable t) {
                    t.printStackTrace();
                }

                @Override
                public void onComplete() {

                }
            });
}
複製代碼

輸出

onNext=0
onNext=1
......
onNext=126
onNext=127
onNext=999
複製代碼

5.7 小結

  1. MISSING:沒有任何緩存和丟棄,下游要處理任何溢出
  2. ERROR:下游的處理速度沒法跟上上游的發射速度時報錯
  3. BUFFER:數據項的緩存池無限大
  4. DROP:下游的處理速度沒法跟上上游的發射速度時丟棄
  5. LATEST:最後一條數據項被強行放入緩存池
相關文章
相關標籤/搜索