閱讀本文須要對Rxjava瞭解,若是尚未了解或者使用過Rxjava的兄die們,能夠觀看我另一篇 Android Rxjava:不同的詮釋進行學習。java
Rxjava背壓
:被觀察者發送事件的速度大於觀察者接收事件的速度時,觀察者內會建立一個無限制大少的緩衝池存儲未接收的事件,所以當存儲的事件愈來愈多時就會致使OOM的出現。(注:當subscribeOn與observeOn不爲同一個線程時,被觀察者與觀察者內存在不一樣時長耗時任務,就會使發送與接收速度存在差別。)git
背壓例子github
public void backpressureSample(){
Observable.create(new ObservableOnSubscribe<Integer>() {
@Override
public void subscribe(ObservableEmitter<Integer> e) throws Exception {
int i = 0;
while(true){
Thread.sleep(500);
i++;
e.onNext(i);
Log.i(TAG,"每500ms發送一次數據:"+i);
}
}
}).subscribeOn(Schedulers.newThread())//使被觀察者存在獨立的線程執行
.observeOn(Schedulers.newThread())//使觀察者存在獨立的線程執行
.subscribe(new Consumer<Integer>() {
@Override
public void accept(Integer integer) throws Exception {
Thread.sleep(5000);
Log.e(TAG,"每5000m接收一次數據:"+integer);
}
});
}
複製代碼
例子執行效果緩存
經過上述例子能夠大概瞭解背壓是如何產生,所以Rxjava2.0版本提供了 Flowable 解決背壓問題。
本文章就是使用與分析 Flowable 是如何解決背壓問題。
文章中實例 linhaojian的Githubide
上圖能夠很清楚看出兩者的區別,其實Flowable
出來以上的區別以外,它其餘全部使用與Observable徹底同樣。函數
Flowable
的create例子post
public void flowable(){
Flowable.create(new FlowableOnSubscribe<Integer>() {
@Override
public void subscribe(FlowableEmitter<Integer> e) throws Exception {
for(int j = 0;j<=150;j++){
e.onNext(j);
Log.i(TAG," 發送數據:"+j);
try{
Thread.sleep(50);
}catch (Exception ex){
}
}
}
},BackpressureStrategy.ERROR)
.subscribeOn(Schedulers.newThread())
.observeOn(Schedulers.newThread())
.subscribe(new Subscriber<Integer>() {
@Override
public void onSubscribe(Subscription s) {
s.request(Long.MAX_VALUE); //觀察者設置接收事件的數量,若是不設置接收不到事件
}
@Override
public void onNext(Integer integer) {
try {
Thread.sleep(100);
} catch (InterruptedException e) {
e.printStackTrace();
}
Log.e(TAG,"onNext : "+(integer));
}
@Override
public void onError(Throwable t) {
Log.e(TAG,"onError : "+t.toString());
}
@Override
public void onComplete() {
Log.e(TAG,"onComplete");
}
});
}
複製代碼
從Flowable源碼查看,緩存池默認大少爲:128性能
public abstract class Flowable<T> implements Publisher<T> {
/** The default buffer size. */
static final int BUFFER_SIZE;
static {
BUFFER_SIZE = Math.max(1, Integer.getInteger("rx2.buffer-size", 128));
}
.....
}
複製代碼
經過上面的例子,咱們能夠看到create方法中的包含了一個BackpressureStrategy媒體類,其包含5種類型:學習
把上面例子改成ERROR類型,執行結果以下:spa
總結 :當被觀察者發送事件大於128時,觀察者拋出異常並終止接收事件,但不會影響被觀察者繼續發送事件。
把上面例子改成BUFFER類型,執行結果以下:
總結 :與Observable同樣存在背壓問題,可是接收性能比Observable低,由於BUFFER類型經過BufferAsyncEmitter添加了額外的邏輯處理,再發送至觀察者。
把上面例子改成DROP類型,執行結果以下:
總結 :每當觀察者接收128事件以後,就會丟棄部分事件。
把上面例子改成LATEST類型,執行結果以下:
總結 :LATEST與DROP使用效果同樣,但LATEST會保證能接收最後一個事件,而DROP則不會保證。
把上面例子改成MISSING類型,執行結果以下:
總結 :MISSING就是沒有采起背壓策略的類型,效果跟Obserable同樣。
在設置MISSING類型時,能夠配合onBackPressure相關操做符使用,也能夠到達上述其餘類型的處理效果。
使用例子:
Flowable.interval(50,TimeUnit.MILLISECONDS)
.onBackpressureDrop()//效果與Drop類型同樣
.subscribeOn(Schedulers.newThread())
.observeOn(Schedulers.newThread())
.subscribe(new Consumer<Long>() {
@Override
public void accept(Long aLong) throws Exception {
try {
Thread.sleep(100);
} catch (InterruptedException e) {
e.printStackTrace();
}
Log.e(TAG,"onNext : "+(aLong));
}
});
複製代碼
onBackpressureBuffer :與BUFFER類型同樣效果。
onBackpressureDrop :與DROP類型同樣效果。
onBackpressureLaster :與LASTER類型同樣效果。
例子:
Flowable.create(new FlowableOnSubscribe<Integer>() {
@Override
public void subscribe(FlowableEmitter<Integer> e) throws Exception {
for(int j = 0;j<50;j++){
e.onNext(j);
Log.i(TAG," 發送數據:"+j);
try{
Thread.sleep(50);
}catch (Exception ex){
}
}
}
},BackpressureStrategy.BUFFER)
.subscribeOn(Schedulers.newThread())
.observeOn(Schedulers.newThread())
.subscribe(new Subscriber<Integer>() {
@Override
public void onSubscribe(Subscription s) {
s.request(10); //觀察者設置接收事件的數量,若是不設置接收不到事件
}
@Override
public void onNext(Integer integer) {
try {
Thread.sleep(100);
} catch (InterruptedException e) {
e.printStackTrace();
}
Log.e(TAG,"onNext : "+(integer));
}
@Override
public void onError(Throwable t) {
Log.e(TAG,"onError : "+t.toString());
}
@Override
public void onComplete() {
Log.e(TAG,"onComplete");
}
});
複製代碼
request還可進行擴展使用,當遇到在接收事件時想追加接收數量(如:通訊數據經過幾回接收,驗證準確性的應用場景),能夠經過如下方式進行擴展:
Flowable.create(new FlowableOnSubscribe<Integer>() {
@Override
public void subscribe(FlowableEmitter<Integer> e) throws Exception {
for(int j = 0;j<50;j++){
e.onNext(j);
Log.i(TAG," 發送數據:"+j);
try{
Thread.sleep(50);
}catch (Exception ex){
}
}
}
},BackpressureStrategy.BUFFER)
.subscribeOn(Schedulers.newThread())
.observeOn(Schedulers.newThread())
.subscribe(new Subscriber<Integer>() {
private Subscription subscription;
@Override
public void onSubscribe(Subscription s) {
subscription = s;
s.request(10); //觀察者設置接收事件的數量,若是不設置接收不到事件
}
@Override
public void onNext(Integer integer) {
if(integer==5){
subscription.request(3);
}
try {
Thread.sleep(100);
} catch (InterruptedException e) {
e.printStackTrace();
}
Log.e(TAG,"onNext : "+(integer));
}
@Override
public void onError(Throwable t) {
Log.e(TAG,"onError : "+t.toString());
}
@Override
public void onComplete() {
Log.e(TAG,"onComplete");
}
});
複製代碼
總結:能夠動態設置觀察者接收事件的數量,但不影響被觀察者繼續發送事件。
requested 與 request不是同一的函數,但它們都是屬於FlowableEmitter類裏的方法,那麼requested()是有什麼做用呢,看看如下例子:
Flowable.create(new FlowableOnSubscribe<Integer>() {
@Override
public void subscribe(FlowableEmitter<Integer> e) throws Exception {
for(int j = 0;j<15;j++){
e.onNext(j);
Log.i(TAG,e.requested()+" 發送數據:"+j);
try{
Thread.sleep(50);
}catch (Exception ex){
}
}
}
},BackpressureStrategy.BUFFER)
// .subscribeOn(Schedulers.newThread())
// .observeOn(Schedulers.newThread())
.subscribe(new Subscriber<Integer>() {
private Subscription subscription;
@Override
public void onSubscribe(Subscription s) {
subscription = s;
s.request(10); //觀察者設置接收事件的數量,若是不設置接收不到事件
}
@Override
public void onNext(Integer integer) {
try {
Thread.sleep(100);
} catch (InterruptedException e) {
e.printStackTrace();
}
Log.e(TAG,"onNext : "+(integer));
}
@Override
public void onError(Throwable t) {
Log.e(TAG,"onError : "+t.toString());
}
@Override
public void onComplete() {
Log.e(TAG,"onComplete");
}
});
複製代碼
從圖中咱們能夠發現,requested打印的結果就是 剩餘可接收的數量 ,它的做用就是能夠檢測剩餘可接收的事件數量。
到此,Flowable
講解完畢。
若是喜歡個人分享,能夠點擊 關注 或者 贊,大家支持是我分享的最大動力 。