給初學者的RxJava2.0教程(五)

Outlinejavascript

[TOC]java

前言

你們喜聞樂見的Backpressure來啦.app

這一節中咱們未來學習Backpressure. 我看好多吃瓜羣衆早已坐不住了, 別急, 咱們先來回顧一下上一節講的Zip. 異步

正題

上一節中咱們說到Zip能夠將多個上游發送的事件組合起來發送給下游, 那你們有沒有想過一個問題, 若是其中一個水管A發送事件特別快, 而另外一個水管B 發送事件特別慢, 那就可能出現這種狀況, 發得快的水管A 已經發送了1000個事件了, 而發的慢的水管B 才發一個出來, 組合了一個以後水管A 還剩999個事件, 這些事件須要繼續等待水管B 發送事件出來組合, 那麼這麼多的事件是放在哪裏的呢? 總有一個地方保存吧? 沒錯, Zip給咱們的每一根水管都弄了一個水缸 , 用來保存這些事件, 用通俗易懂的圖片來表示就是:ide

zip2.png

如圖中所示, 其中藍色的框框就是zip給咱們的水缸! 它將每根水管發出的事件保存起來, 等兩個水缸都有事件了以後就分別從水缸中取出一個事件來組合, 當其中一個水缸是空的時候就處於等待的狀態.學習

題外話: 你們來分析一下這個水缸有什麼特色呢? 它是按順序保存的, 先進來的事件先取出來, 這個特色是否是很熟悉呀? 沒錯, 這就是咱們熟知的隊列, 這個水缸在Zip內部的實現就是用的隊列, 感興趣的能夠翻看源碼查看.spa

好了回到正題上來, 這個水缸有大小限制嗎? 要是一直往裏存會怎樣? 咱們來看個例子: 線程

Observable<Integer> observable1 = Observable.create(new ObservableOnSubscribe<Integer>() {    
    @Override                                                                          
    public void subscribe(ObservableEmitter<Integer> emitter) throws Exception {       
        for (int i = 0; ; i++) {   //無限循環發事件 
            emitter.onNext(i);                                                         
        }                                                                              
    }                                                                                  
}).subscribeOn(Schedulers.io());    

Observable<String> observable2 = Observable.create(new ObservableOnSubscribe<String>() {      
    @Override                                                                          
    public void subscribe(ObservableEmitter<String> emitter) throws Exception {        
        emitter.onNext("A");                                                           
    }                                                                                  
}).subscribeOn(Schedulers.io());    

Observable.zip(observable1, observable2, new BiFunction<Integer, String, String>() {                 
    @Override                                                                          
    public String apply(Integer integer, String s) throws Exception {                  
        return integer + s;                                                            
    }                                                                                  
}).observeOn(AndroidSchedulers.mainThread()).subscribe(new Consumer<String>() {                               
    @Override                                                                          
    public void accept(String s) throws Exception {                                    
        Log.d(TAG, s);                                                                 
    }                                                                                  
}, new Consumer<Throwable>() {                                                         
    @Override                                                                          
    public void accept(Throwable throwable) throws Exception {                         
        Log.w(TAG, throwable);                                                         
    }                                                                                  
});複製代碼

在這個例子中, 咱們分別建立了兩根水管, 第一根水管用機器指令的執行速度來無限循環發送事件, 第二根水管隨便發送點什麼, 因爲咱們沒有發送Complete事件, 所以第一根水管會一直髮事件到它對應的水缸裏去, 咱們來看看運行結果是什麼樣.3d

運行結果GIF圖:code

zip2.gif

我勒個草, 內存佔用以斜率爲1的直線迅速上漲, 幾秒鐘就300多M , 最終報出了OOM:

zlc.season.rxjava2demo W/art: Throwing OutOfMemoryError "Failed to allocate a 28 byte allocation with
4194304 free bytes and 8MB until OOM; 
zlc.season.rxjava2demo W/art: "main" prio=5 tid=1 Runnable      
zlc.season.rxjava2demo W/art:   | group="main" sCount=0 dsCount=0 obj=0x75188710 self=0x7fc0efe7ba00   
zlc.season.rxjava2demo W/art:   | sysTid=32686 nice=0 cgrp=default sched=0/0 handle=0x7fc0f37dc200    
zlc.season.rxjava2demo W/art:   | state=R schedstat=( 0 0 0 ) utm=948 stm=120 core=1 HZ=100         
zlc.season.rxjava2demo W/art:   | stack=0x7fff971e8000-0x7fff971ea000 stackSize=8MB         
zlc.season.rxjava2demo W/art:   | held mutexes= "mutator lock"(shared held)    
zlc.season.rxjava2demo W/art:     at java.lang.Integer.valueOf(Integer.java:742)複製代碼

出現這種狀況確定是咱們不想看見的, 這裏就能夠引出咱們的Backpressure了, 所謂的Backpressure其實就是爲了控制流量, 水缸存儲的能力畢竟有限, 所以咱們還得從源頭去解決問題, 既然你發那麼快, 數據量那麼大, 那我就想辦法不讓你發那麼快唄.

那麼這個源頭到底在哪裏, 究竟何時會出現這種狀況, 這裏只是說的Zip這一個例子, 其餘的地方會出現嗎? 帶着這個問題咱們來探究一下.

咱們讓事情變得簡單一點, 從一個單一的Observable提及.

來看段代碼:

Observable.create(new ObservableOnSubscribe<Integer>() {                         
    @Override                                                                    
    public void subscribe(ObservableEmitter<Integer> emitter) throws Exception { 
        for (int i = 0; ; i++) {   //無限循環發事件 
            emitter.onNext(i);                                                   
        }                                                                        
    }                                                                            
}).subscribe(new Consumer<Integer>() {                                           
    @Override                                                                    
    public void accept(Integer integer) throws Exception {                       
        Thread.sleep(2000);                                                      
        Log.d(TAG, "" + integer);                                                
    }                                                                            
});複製代碼

這段代碼很簡單, 上游一樣無限循環的發送事件, 在下游每次接收事件前延時2秒. 上下游工做在同一個線程裏, 來看下運行結果:

peace.gif

哎臥槽, 怎麼如此平靜, 感受像是走錯了片場.

爲何呢, 由於上下游工做在同一個線程呀騷年們! 這個時候上游每次調用emitter.onNext(i)其實就至關於直接調用了Consumer中的:

public void accept(Integer integer) throws Exception {                       
        Thread.sleep(2000);                                                      
        Log.d(TAG, "" + integer);                                                
   }     複製代碼

因此這個時候其實就是上游每延時2秒發送一次. 最終的結果也說明了這一切.

那咱們加個線程呢, 改爲這樣:

Observable.create(new ObservableOnSubscribe<Integer>() {                            
    @Override                                                                       
    public void subscribe(ObservableEmitter<Integer> emitter) throws Exception {    
        for (int i = 0; ; i++) {    //無限循環發事件 
            emitter.onNext(i);                                                      
        }                                                                           
    }                                                                               
}).subscribeOn(Schedulers.io())                                                    
        .observeOn(AndroidSchedulers.mainThread())                                  
        .subscribe(new Consumer<Integer>() {                                        
            @Override                                                               
            public void accept(Integer integer) throws Exception {                  
                Thread.sleep(2000);                                                 
                Log.d(TAG, "" + integer);                                           
            }                                                                       
        });複製代碼

這個時候把上游切換到了IO線程中去, 下游到主線程去接收, 來看看運行結果呢:

violence.gif

能夠看到, 給上游加了個線程以後, 它就像脫繮的野馬同樣, 內存又爆掉了.

爲何不加線程和加上線程區別這麼大呢, 這就涉及了同步異步的知識了.

當上下游工做在同一個線程中時, 這時候是一個同步的訂閱關係, 也就是說上游每發送一個事件必須等到下游接收處理完了之後才能接着發送下一個事件.

當上下游工做在不一樣的線程中時, 這時候是一個異步的訂閱關係, 這個時候上游發送數據不須要等待下游接收, 爲何呢, 由於兩個線程並不能直接進行通訊, 所以上游發送的事件並不能直接到下游裏去, 這個時候就須要一個田螺姑娘來幫助它們倆, 這個田螺姑娘就是咱們剛纔說的水缸 ! 上游把事件發送到水缸裏去, 下游從水缸裏取出事件來處理, 所以, 當上遊發事件的速度太快, 下游取事件的速度太慢, 水缸就會迅速裝滿, 而後溢出來, 最後就OOM了.

這兩種狀況用圖片來表示以下:

同步:

同步.png

異步:

異步.png

從圖中咱們能夠看出, 同步和異步的區別僅僅在因而否有水缸.

相信經過這個例子你們對線程之間的通訊也有了比較清楚的認知和理解.

源頭找到了, 只要有水缸, 就會出現上下游發送事件速度不平衡的狀況, 所以當咱們之後遇到這種狀況時, 仔細思考一下水缸在哪裏, 找到水缸, 你就找到了解決問題的辦法.

既然源頭找到了, 那麼下一節咱們就要來學習如何去解決了. 下節見.

相關文章
相關標籤/搜索