給初學者的 RxJava2.0 教程 (四)

Outlinejava

[TOC]程序員

前言

在上一節中, 咱們提到了FlowableBackpressure背壓, 原本這一節的確是想講這兩個東西的,但是寫到一半感受仍是差點火候,感受時機未到, 所以,這裏先來作個準備工做, 先帶你們學習zip這個操做符, 這個操做符也是比較牛逼的東西了, 涉及到的東西也比較多, 主要是一些細節上的東西太多, 經過學習這個操做符,能夠爲咱們下一節的Backpressure 作個鋪墊.api

正題

照慣例咱們仍是先貼上一下比較正式的解釋吧.服務器

Zip經過一個函數將多個Observable發送的事件結合到一塊兒,而後發送這些組合到一塊兒的事件. 它按照嚴格的順序應用這個函數。它只發射與發射數據項最少的那個Observable同樣多的數據。app

咱們再用通俗易懂的圖片來解釋一下:ide

zip.png

從這個圖中能夠看見, 此次上游和以往不一樣的是, 咱們有兩根水管了.函數

其中一根水管負責發送圓形事件 , 另一根水管負責發送三角形事件 , 經過Zip操做符, 使得圓形事件三角形事件 合併爲了一個矩形事件 . 學習

下面咱們再來看看分解動做:字體

zip1.png

經過分解動做咱們能夠看出:spa

  • 組合的過程是分別從 兩根水管裏各取出一個事件 來進行組合, 而且一個事件只能被使用一次, 組合的順序是嚴格按照事件發送的順利 來進行的, 也就是說不會出現圓形1 事件和三角形B 事件進行合併, 也不可能出現圓形2三角形A 進行合併的狀況.
  • 最終下游收到的事件數量 是和上游中發送事件最少的那一根水管的事件數量 相同. 這個也很好理解, 由於是從每一根水管 裏取一個事件來進行合併, 最少的 那個確定就最早取完 , 這個時候其餘的水管儘管還有事件 , 可是已經沒有足夠的事件來組合了, 所以下游就不會收到剩餘的事件了.

分析了大概的原理, 咱們仍是勞逸結合, 先來看看實際中的代碼怎麼寫吧:

Observable<Integer> observable1 = Observable.create(new ObservableOnSubscribe<Integer>() {            
    @Override                                                                                         
    public void subscribe(ObservableEmitter<Integer> emitter) throws Exception {                      
        Log.d(TAG, "emit 1");                                                                         
        emitter.onNext(1);                                                                            
        Log.d(TAG, "emit 2");                                                                         
        emitter.onNext(2);                                                                            
        Log.d(TAG, "emit 3");                                                                         
        emitter.onNext(3);                                                                            
        Log.d(TAG, "emit 4");                                                                         
        emitter.onNext(4);                                                                            
        Log.d(TAG, "emit complete1");                                                                 
        emitter.onComplete();                                                                         
    }                                                                                                 
});                                                                   

Observable<String> observable2 = Observable.create(new ObservableOnSubscribe<String>() {              
    @Override                                                                                         
    public void subscribe(ObservableEmitter<String> emitter) throws Exception {                       
        Log.d(TAG, "emit A");                                                                         
        emitter.onNext("A");                                                                          
        Log.d(TAG, "emit B");                                                                         
        emitter.onNext("B");                                                                          
        Log.d(TAG, "emit C");                                                                         
        emitter.onNext("C");                                                                          
        Log.d(TAG, "emit complete2");                                                                 
        emitter.onComplete();                                                                         
    }                                                                                                 
});                                                                     

Observable.zip(observable1, observable2, new BiFunction<Integer, String, String>() {                  
    @Override                                                                                         
    public String apply(Integer integer, String s) throws Exception {                                 
        return integer + s;                                                                           
    }                                                                                                 
}).subscribe(new Observer<String>() {                       
    @Override                                                                                         
    public void onSubscribe(Disposable d) {                                                           
        Log.d(TAG, "onSubscribe");                                                                    
    }                                                                                                 

    @Override                                                                                         
    public void onNext(String value) {                                                                
        Log.d(TAG, "onNext: " + value);                                                               
    }                                                                                                 

    @Override                                                                                         
    public void onError(Throwable e) {                                                                
        Log.d(TAG, "onError");                                                                        
    }                                                                                                 

    @Override                                                                                         
    public void onComplete() {                                                                        
        Log.d(TAG, "onComplete");                                                                     
    }                                                                                                 
});複製代碼

咱們分別建立了兩個上游水管, 一個發送1,2,3,4,Complete, 另外一個發送A,B,C,Complete, 接着用Zip把發出的事件組合, 來看看運行結果吧:

D/TAG: onSubscribe     
D/TAG: emit 1          
D/TAG: emit 2          
D/TAG: emit 3          
D/TAG: emit 4          
D/TAG: emit complete1  
D/TAG: emit A          
D/TAG: onNext: 1A      
D/TAG: emit B          
D/TAG: onNext: 2B      
D/TAG: emit C          
D/TAG: onNext: 3C      
D/TAG: emit complete2  
D/TAG: onComplete複製代碼

結果彷佛是對的... 可是總感受什麼地方不對勁...

哪兒不對勁呢, 爲何感受是水管一發送完了以後, 水管二纔開始發送啊? 究竟是不是呢, 咱們來驗證一下:

Observable<Integer> observable1 = Observable.create(new ObservableOnSubscribe<Integer>() {           
    @Override                                                                                        
    public void subscribe(ObservableEmitter<Integer> emitter) throws Exception {                     
        Log.d(TAG, "emit 1");                                                                        
        emitter.onNext(1);                                                                           
        Thread.sleep(1000);                                                                          

        Log.d(TAG, "emit 2");                                                                        
        emitter.onNext(2);                                                                           
        Thread.sleep(1000);                                                                          

        Log.d(TAG, "emit 3");                                                                        
        emitter.onNext(3);                                                                           
        Thread.sleep(1000);                                                                          

        Log.d(TAG, "emit 4");                                                                        
        emitter.onNext(4);                                                                           
        Thread.sleep(1000);                                                                          

        Log.d(TAG, "emit complete1");                                                                
        emitter.onComplete();                                                                        
    }                                                                                                
});                                                                                                  

Observable<String> observable2 = Observable.create(new ObservableOnSubscribe<String>() {             
    @Override                                                                                        
    public void subscribe(ObservableEmitter<String> emitter) throws Exception {                      
        Log.d(TAG, "emit A");                                                                        
        emitter.onNext("A");                                                                         
        Thread.sleep(1000);                                                                          

        Log.d(TAG, "emit B");                                                                        
        emitter.onNext("B");                                                                         
        Thread.sleep(1000);                                                                          

        Log.d(TAG, "emit C");                                                                        
        emitter.onNext("C");                                                                         
        Thread.sleep(1000);                                                                          

        Log.d(TAG, "emit complete2");                                                                
        emitter.onComplete();                                                                        
    }                                                                                                
});                                                                                                  

Observable.zip(observable1, observable2, new BiFunction<Integer, String, String>() {                 
    @Override                                                                                        
    public String apply(Integer integer, String s) throws Exception {                                
        return integer + s;                                                                          
    }                                                                                                
}).subscribe(new Observer<String>() {                                                                
    @Override                                                                                        
    public void onSubscribe(Disposable d) {                                                          
        Log.d(TAG, "onSubscribe");                                                                   
    }                                                                                                

    @Override                                                                                        
    public void onNext(String value) {                                                               
        Log.d(TAG, "onNext: " + value);                                                              
    }                                                                                                

    @Override                                                                                        
    public void onError(Throwable e) {                                                               
        Log.d(TAG, "onError");                                                                       
    }                                                                                                

    @Override                                                                                        
    public void onComplete() {                                                                       
        Log.d(TAG, "onComplete");                                                                    
    }                                                                                                
});複製代碼

此次咱們在每發送一個事件以後加入了一秒鐘的延時, 來看看運行結果吧, 注意這是個GIF圖:

zip.gif

(貼心的我怕你們看不清楚, 特地調成了老年字體呢)

阿西吧, 好像真的是先發送的水管一再發送的水管二呢, 爲何會有這種狀況呢? 由於咱們兩根水管都是運行在同一個線程裏, 同一個線程裏執行代碼確定有前後順序呀.

所以咱們來稍微改一下, 不讓他們在同一個線程, 不知道怎麼切換線程的, 請掉頭看前面幾節.

Observable<Integer> observable1 = Observable.create(new ObservableOnSubscribe<Integer>() {         
    @Override                                                                                      
    public void subscribe(ObservableEmitter<Integer> emitter) throws Exception {                   
        Log.d(TAG, "emit 1");                                                                      
        emitter.onNext(1);                                                                         
        Thread.sleep(1000);                                                                        

        Log.d(TAG, "emit 2");                                                                      
        emitter.onNext(2);                                                                         
        Thread.sleep(1000);                                                                        

        Log.d(TAG, "emit 3");                                                                      
        emitter.onNext(3);                                                                         
        Thread.sleep(1000);                                                                        

        Log.d(TAG, "emit 4");                                                                      
        emitter.onNext(4);                                                                         
        Thread.sleep(1000);                                                                        

        Log.d(TAG, "emit complete1");                                                              
        emitter.onComplete();                                                                      
    }                                                                                              
}).subscribeOn(Schedulers.io());                                                                   

Observable<String> observable2 = Observable.create(new ObservableOnSubscribe<String>() {           
    @Override                                                                                      
    public void subscribe(ObservableEmitter<String> emitter) throws Exception {                    
        Log.d(TAG, "emit A");                                                                      
        emitter.onNext("A");                                                                       
        Thread.sleep(1000);                                                                        

        Log.d(TAG, "emit B");                                                                      
        emitter.onNext("B");                                                                       
        Thread.sleep(1000);                                                                        

        Log.d(TAG, "emit C");                                                                      
        emitter.onNext("C");                                                                       
        Thread.sleep(1000);                                                                        

        Log.d(TAG, "emit complete2");                                                              
        emitter.onComplete();                                                                      
    }                                                                                              
}).subscribeOn(Schedulers.io());                                                                   

Observable.zip(observable1, observable2, new BiFunction<Integer, String, String>() {               
    @Override                                                                                      
    public String apply(Integer integer, String s) throws Exception {                              
        return integer + s;                                                                        
    }                                                                                              
}).subscribe(new Observer<String>() {                    
    @Override                                                                                      
    public void onSubscribe(Disposable d) {                                                        
        Log.d(TAG, "onSubscribe");                                                                 
    }                                                                                              

    @Override                                                                                      
    public void onNext(String value) {                                                             
        Log.d(TAG, "onNext: " + value);                                                            
    }                                                                                              

    @Override                                                                                      
    public void onError(Throwable e) {                                                             
        Log.d(TAG, "onError");                                                                     
    }                                                                                              

    @Override                                                                                      
    public void onComplete() {                                                                     
        Log.d(TAG, "onComplete");                                                                  
    }                                                                                              
});複製代碼

好了, 此次咱們讓水管都在IO線程裏發送事件, 再來看看運行結果:

D/TAG: onSubscribe    
D/TAG: emit A         
D/TAG: emit 1         
D/TAG: onNext: 1A     
D/TAG: emit B         
D/TAG: emit 2         
D/TAG: onNext: 2B     
D/TAG: emit C         
D/TAG: emit 3         
D/TAG: onNext: 3C     
D/TAG: emit complete2 
D/TAG: onComplete複製代碼

GIF圖:

zip_io.gif

誒! 這下就對了嘛, 兩根水管同時開始發送, 每發送一個, Zip就組合一個, 再將組合結果發送給下游.

不對呀! 可能細心點的朋友又看出端倪了, 第一根水管明明發送了四個數據+一個Complete, 以前明明還有的, 爲啥到這裏沒了呢?

這是由於咱們以前說了, zip發送的事件數量跟上游中發送事件最少的那一根水管的事件數量是有關的, 在這個例子裏咱們第二根水管只發送了三個事件而後就發送了Complete, 這個時候儘管第一根水管還有事件4 和事件Complete 沒有發送, 可是它們發不發送還有什麼意義呢? 因此本着節約是美德的思想, 就乾脆打斷它的狗腿, 不讓它發了.

至於前面的例子爲何會發送, 剛纔不是已經說了是!在!同!一!個!線!程!裏!嗎!!!!再問老子打死你!

有好事的程序員可能又要問了, 那我不發送Complete呢? 答案是顯然的, 上游會繼續發送事件, 可是下游仍然收不到那些多餘的事件. 不信你能夠試試.

實踐

學習了Zip的基本用法, 那麼它在Android有什麼用呢, 其實不少場景均可以用到Zip. 舉個例子.

好比一個界面須要展現用戶的一些信息, 而這些信息分別要從兩個服務器接口中獲取, 而只有當兩個都獲取到了以後才能進行展現, 這個時候就能夠用Zip了:

首先分別定義這兩個請求接口:

public interface Api {
    @GET
    Observable<UserBaseInfoResponse> getUserBaseInfo(@Body UserBaseInfoRequest request);

    @GET
    Observable<UserExtraInfoResponse> getUserExtraInfo(@Body UserExtraInfoRequest request);

}複製代碼

接着用Zip來打包請求:

Observable<UserBaseInfoResponse> observable1 =                                            
        api.getUserBaseInfo(new UserBaseInfoRequest()).subscribeOn(Schedulers.io());      

Observable<UserExtraInfoResponse> observable2 =                                           
        api.getUserExtraInfo(new UserExtraInfoRequest()).subscribeOn(Schedulers.io());    

Observable.zip(observable1, observable2,                                                  
        new BiFunction<UserBaseInfoResponse, UserExtraInfoResponse, UserInfo>() {         
            @Override                                                                     
            public UserInfo apply(UserBaseInfoResponse baseInfo, UserExtraInfoResponse extraInfo) throws Exception {     
                return new UserInfo(baseInfo, extraInfo);                                 
            }                                                                             
        }).observeOn(AndroidSchedulers.mainThread())                                      
        .subscribe(new Consumer<UserInfo>() {                                             
            @Override                                                                     
            public void accept(UserInfo userInfo) throws Exception {                      
                //do something; 
            }                                                                             
        });複製代碼

好了, 本次的教程就到這裏吧. 又到週末鳥, 下週見.

相關文章
相關標籤/搜索