RxJava 入門

導入

我相信你們確定對ReactiveX 和 RxJava 都不陌生,由於如今只要是和技術相關的網站,博客都會隨處見到介紹ReactiveX和RxJava的文章。html

ReactiveX

  • ReactiveX是Reactive Extensions 的縮寫,即響應式編程的擴展。
  • 「a library for composing asynchronous and event-based programs using observable sequences for the Java VM」(一個在 Java VM 上使用可觀測的序列來組成異步的、基於事件的程序庫)。
  • Rx是一種編程模型,目標是提供一致的編程接口,幫助開發者更方便的處理異步 數據流.由微軟的架構師Erik Meijer領導的團隊開發,在2012年11月開源。他在各個經常使用編程語言上都有實現。如Java,C#,PHP,Swift,Scala等等.社區網站是reactivex.io
  • ReactiveX不只僅是一個編程接口,它是一種編程思想的突破,它影響了許多其它的程序庫和框架以及編程語言。

關於響應式編程

  • 事件總線(Event buses)或我們常見的單擊事件就是一個異步事件流,你能夠觀察這個流,也能夠基於這個流作一些自定義操做。響應式就是基於這種想法。你可以建立全部事物的數據流,而不只僅只是單擊和懸停事件數據流。 流廉價且無處不在,任何事物均可以看成一個流:變量、用戶輸入、屬性、緩存、數據結構等等。好比,假設你的微博評論就是一個跟單擊事件同樣的數據流,你可以監聽這個流,並作出響應。
  • 有一堆的函數可以建立(create)任何流,也能將任何流進行組合(combine)和過濾(filter)。 這正是「函數式」的魔力所在。一個流能做爲另外一個流的輸入(input),甚至多個流也能夠做爲其它流的輸入。你能合併(merge)兩個流。你還能經過過濾(filter)一個流獲得那些你感興趣的事件。你能將一個流中的數據映射(map)到一個新的流中。
  • 響應式編程的主要組成部分是observable, operator 和 observer
  • 通常響應式編程的信息流:Observable -> Operator1 -> Operator2->...->OperatorN->Observer
  • Observable 是事件的生產者,Observer是事件的最終消費者。中間能夠經過任意多個的Operator對事件進行處理和轉換
  • 由於Observer一般在主線程中執行,所以設計上要求代碼儘量的簡單,只對事件做出相應(不對事件或者數據進行修改,全部修改事件的工做所有由operator完成)

RxJava 和 RxAndroid

  • RxJava是ReactiveX在 Java 平臺的實現,你能夠將它看做一個普通的Java類庫。
  • RxAndroid是RxJava的一個針對Android平臺的擴展,主要用於 Android 開發。
  • RxJava就是一個作異步開發的框架,和Android系統提供的 Handler+Thread,AsyncTask,Context.runOnUiThread等是解決的是一樣的問題。那麼他跟系統提供的異步編程方案比,有什麼好處呢。或者說他有什麼樣的優點值得咱們花時間和精力切換到RxJava呢?

總結起來能夠用兩個詞來歸納:異步和簡潔java

主要概念

Observable(被觀察者)

Observables 負責發出一系列的事件,這裏的事件能夠是任何東西,例如網絡請求的結果,複雜計算處理的結果,數據庫操做的結構,文件操做的結果等,事件執行結束後交給Observer的回調處理。react

Observer(觀察者)

進行訂閱接受處理事件android

Operator(操做符)中文文檔

負責對事件進行各類變化和處理git

Scheduler(調度器)

提供了各類調度器,是RxJava能夠方便的實現異步開發github

事件

這裏的事件值指的是 onNext (有新數據),onComplete (全部數據處理完成),onError (事件隊列異常)sql

RxJava的好處(爲何RxJava對於Android如此重要)

  • 輕鬆使用併發:讓異步編程變得簡單簡潔.像寫同步代碼同樣。
  • 方便的線程切換
  • 簡單而完善的異常處理機制:傳統的try/cache沒辦法處理異步中子線程產生的異常,RxJava 提供了合適的錯誤處理機制
  • 強大的操做符支持,函數式的風格,鏈式調用。

舉個例子

假如如今咱們有這樣一個需求:界面上有一個自定義的視圖 imageCollectorView ,它的做用是顯示多張圖片,並能使用 addImage(Bitmap) 方法來任意增長顯示的圖片。如今須要程序將一個給出的目錄數組 File[] folders 中每一個目錄下的 png 圖片都加載出來並顯示在 imageCollectorView 中。須要注意的是,因爲讀取圖片的這一過程較爲耗時,須要放在後臺執行,而圖片的顯示則必須在 UI 線程執行。shell

目錄數組數據庫

File[] folders=new File[]{......};複製代碼

線程方式實現(call hell)編程

new Thread() {
    @Override
    public void run() {
        super.run();
        try{
          for (File folder : folders) {
              File[] files = folder.listFiles();
              for (File file : files) {
                  if (file.getName().endsWith(".png")) {
                      final Bitmap bitmap = getBitmapFromFile(file);
                      getActivity().runOnUiThread(new Runnable() {
                          @Override
                          public void run() {
                              imageCollectorView.addImage(bitmap);
                          }
                      });
                  }
              }
          }
        }catch(Exception e){
          //error handling
          //只能在這裏進行異常處理
        }

    }
}.start();複製代碼

RxJava 實現

//建立Observable
Observable observable = Observable.create(new ObservableOnSubscribe<File>() {
            @Override
            public void subscribe(ObservableEmitter<File> e) throws Exception {
                for (File file : files) {
                    e.onNext(file);
                }
                e.onComplete();
            }
        });
//建立Observer
Observer<Bitmap> observer = new Observer<Bitmap>() {
            @Override
            public void onSubscribe(Disposable disposable) {

            }

            @Override
            public void onNext(Bitmap bitmap) {
                imageCollectorView.addImage(bitmap);
            }

            @Override
            public void onError(Throwable throwable) {
              //error handling
            }

            @Override
            public void onComplete() {
                Log.i(TAG,"All images are shown");
            }
        };
//對事件集進行處理並鏈接消費者
observable.flatMap(new Func1<File, Observable<File>>() {//分別獲取每一個文件夾下面的文件,組合成一個Observable
              @Override
              public Observable<File> call(File file) {
                  return Observable.from(file.listFiles());
              }
          })
          .filter(new Func1<File, Boolean>() {//過濾出全部擴展名爲png的文件
              @Override
              public Boolean call(File file) {
                  return file.getName().endsWith(".png");
              }
          })
          .map(new Func1<File, Bitmap>() {//根據File對象,獲取Bitmap對象
              @Override
              public Bitmap call(File file) {
                  return getBitmapFromFile(file);
              }
          })
          .subscribeOn(Schedulers.io())//指定Observable的全部操做符的操做在io線程中執行
          .observeOn(AndroidSchedulers.mainThread())//指定消費者在主線程中執行
          .subscribe(observer);//鏈接觀察者複製代碼

有的人可能說了,你這不是代碼更多,更復雜了嗎?
不要着急,這只是最基礎的版本,稍後會對代碼進行簡化。
但即便是這種狀況下,代碼雖然多了,但咱們能夠發現,他的邏輯更清晰了,也沒有那麼多的嵌套了。

簡化代碼

  • 對於一個數組,可用建立操做符「from」來建立Observable
  • 若是咱們只對結果感興趣,不關心異常處理和事件發射完成事件,我也能夠將Observer用Consumer來替換
//建立Observable
Observable observable = Observable.from(folers);
//建立Observer
Consumer<Bitmap> consumer=new Consumer<Bitmap>() {
            @Override
            public void accept(@NonNull Bitmap bitmap) throws Exception {
                imageCollectorView.addImage(bitmap);
            }
          };
//對事件集進行處理並鏈接消費者
observable.flatMap(new Func1<File, Observable<File>>() {//分別獲取每一個文件夾下面的文件,組合成一個Observable
              @Override
              public Observable<File> call(File file) {
                  return Observable.from(file.listFiles());
              }
          })
          .filter(new Func1<File, Boolean>() {//過濾出全部擴展名爲png的文件
              @Override
              public Boolean call(File file) {
                  return file.getName().endsWith(".png");
              }
          })
          .map(new Func1<File, Bitmap>() {//根據File對象,獲取Bitmap對象
              @Override
              public Bitmap call(File file) {
                  return getBitmapFromFile(file);
              }
          })
          .subscribeOn(Schedulers.io())//指定Observable的全部操做符的操做在io線程中執行
          .observeOn(AndroidSchedulers.mainThread())//指定消費者在主線程中執行
          .subscribe(consumer);//鏈接消費者複製代碼

RxJava 鏈式調用實現

Observable.from(folders)
    .flatMap(new Func1<File, Observable<File>>() {
        @Override
        public Observable<File> call(File file) {
            return Observable.from(file.listFiles());
        }
    })
    .filter(new Func1<File, Boolean>() {
        @Override
        public Boolean call(File file) {
            return file.getName().endsWith(".png");
        }
    })
    .map(new Func1<File, Bitmap>() {
        @Override
        public Bitmap call(File file) {
            return getBitmapFromFile(file);
        }
    })
    .subscribeOn(Schedulers.io())
    .observeOn(AndroidSchedulers.mainThread())
    .subscribe(new Consumer<Bitmap>() {
                @Override
                public void accept(@NonNull Bitmap bitmap) throws Exception {
                    imageCollectorView.addImage(bitmap);
                }
              });複製代碼

RxJava + lambda 實現

Observable.from(folders)
    .flatMap(file -> Observable.from(file.listFiles())
    .filter(file -> file.getName().endsWith(".png"))
    .map( file -> getBitmapFromFile(file))
    .subscribeOn(Schedulers.io())
    .observeOn(AndroidSchedulers.mainThread())
    .subscribe(bitmap -> imageCollectorView.addImage(bitmap));//無異常處理,有異常會拋到主線程,不影響咱們原來程序的crash處理複製代碼

關於lambda(匿名函數,它能夠包含表達式和語句)在Android中的使用:

  • 要在 Android 的較早版本中測試 Lambda 表達式、方法引用和類型註解,請前往您的 build.gradle 文件,將 compileSdkVersion 和 targetSdkVersion 設置爲 23 或更低。您仍須要啓用 Jack 工具鏈以使用這些 Java 8 功能。
  • 經測試,按照官方提供的方案配置後,雖然可使用lambda,但編譯速度變的很慢。
  • 在運行的時候,而且我測試用的項目引用改了 Bouncy Castle(輕量級加密解密工具包) 這個包報出了內存溢出的異常,因此我感受如今還不太穩定。
  • 第三方開源的實現方案:retrolambda
  • 固然咱們也能夠不用lambda,這樣代碼看着比較多,但因其只有一層嵌套的鏈式調用,因此邏輯結構並不複雜。事實上 Android Studio 會自動幫咱們把這部分代碼摺疊成lambda的形式。

更進一步,假設咱們如今須要忽略掉前5張,一共顯示10張

Observable.from(folders)
    .flatMap(file -> Observable.from(file.listFiles())
    .filter(file -> file.getName().endsWith(".png"))

    .skip(5)
    .take(10)

    .map( file -> getBitmapFromFile(file))
    .subscribeOn(Schedulers.io())
    .observeOn(AndroidSchedulers.mainThread())
    .subscribe(bitmap -> imageCollectorView.addImage(bitmap));//無異常處理,有異常會拋到主線程,不影響咱們原來程序的crash處理複製代碼

操做符簡介

建立操做

用於建立Observable的操做符

  • Create — 經過調用觀察者的方法從頭建立一個Observable

    create操做符是全部建立型操做符的「根」,也就是說其餘建立型操做符最後都是經過create操做符來建立Observable的

  • From — 將其它的對象或數據結構轉換爲Observable

  • Just — 將對象或者對象集合轉換爲一個會發射這些對象的Observable
  • Defer — 在觀察者訂閱以前不建立這個Observable,爲每個觀察者建立一個新的Observable
  • Empty/Never/Throw — 建立行爲受限的特殊Observable

    通常用於測試

  • Interval — 建立一個定時發射整數序列的Observable

  • Range — 建立發射指定範圍的整數序列的Observable
  • Repeat — 建立重複發射特定的數據或數據序列的Observable
  • Start — 建立發射一個函數的返回值的Observable
  • Timer — 建立在一個指定的延遲以後發射單個數據的Observable
變換操做

這些操做符可用於對Observable發射的數據進行變換

  • Map — 映射,經過對序列的每一項都應用一個函數變換Observable發射的數據,實質是對序列中的每一項執行一個函數,函數的參數就是這個數據項
  • Buffer — 緩存,能夠簡單的理解爲緩存,它按期從Observable收集數據到一個集合,而後把這些數據集合打包發射,而不是一次發射一個
  • FlatMap — 扁平映射,將Observable發射的數據變換爲Observables集合,而後將這些Observable發射的數據平坦化的放進一個單獨的Observable,能夠認爲是一個將嵌套的數據結構展開的過程。
  • GroupBy — 分組,將原來的Observable分拆爲Observable集合,將原始Observable發射的數據按Key分組,每個Observable發射一組不一樣的數據
  • Scan — 掃描,對Observable發射的每一項數據應用一個函數,而後按順序依次發射這些值
  • Window — 窗口,按期未來自Observable的數據分拆成一些Observable窗口,而後發射這些窗口,而不是每次發射一項。相似於Buffer,但Buffer發射的是數據,Window發射的是Observable,每個Observable發射原始Observable的數據的一個子集
過濾操做

這些操做符用於從Observable發射的數據中進行選擇,符合必定條件的發送給觀察者進行處理,不符合條件的直接丟棄

  • Filter — 過濾,過濾掉沒有經過謂詞測試的數據項,只發射經過測試的
  • Skip — 跳過前面的若干項數據
  • SkipLast — 跳事後面的若干項數據
  • Take — 只保留前面的若干項數據
  • TakeLast — 只保留後面的若干項數據
  • Debounce — 只有在空閒了一段時間後才發射數據,通俗的說,就是若是一段時間沒有操做,就執行一次操做
  • Distinct — 去重,過濾掉重複數據項
  • ElementAt — 取值,取特定位置的數據項
  • First — 首項,只發射知足條件的第一條數據
  • IgnoreElements — 忽略全部的數據,只保留/終止通知(onError或onCompleted)
  • Last — 末項,只發射最後一條數據
  • Sample — 取樣,按期發射最新的數據,等因而數據抽樣,有的實現裏叫ThrottleFirst
組合操做

組合操做符用於將多個Observable組合成一個單一的Observable

  • And/Then/When — 經過模式(And條件)和計劃(Then次序)組合兩個或多個Observable發射的數據集
  • CombineLatest — 當兩個Observables中的任何一個發射了一個數據時,經過一個指定的函數組合每一個Observable發射的最新數據(一共兩個數據),而後發射這個函數的結果
  • Join — 不管什麼時候,若是一個Observable發射了一個數據項,只要在另外一個Observable發射的數據項定義的時間窗口內,就將兩個Observable發射的數據合併發射
  • Merge — 將兩個Observable發射的數據組合併成一個
  • StartWith — 在發射原來的Observable的數據序列以前,先發射一個指定的數據序列或數據項
  • Switch — 將一個發射Observable序列的Observable轉換爲這樣一個Observable:它逐個發射那些Observable最近發射的數據
  • Zip — 打包,使用一個指定的函數將多個Observable發射的數據組合在一塊兒,而後將這個函數的結果做爲單項數據發射
錯誤處理

這些操做符用於從錯誤通知中恢復

  • Catch — 捕獲,繼續序列操做,將錯誤替換爲正常的數據,從onError通知中恢復
  • Retry — 重試,若是Observable發射了一個錯誤通知,從新訂閱它,期待它正常終止
輔助操做

一組用於處理Observable的操做符

  • Delay — 延遲一段時間發射結果數據
  • Do — 註冊一個動做佔用一些Observable的生命週期事件,至關於Mock某個操做
  • Materialize/Dematerialize — 將發射的數據和通知都當作數據發射,或者反過來
  • ObserveOn — 指定觀察者觀察Observable的調度程序(工做線程)
  • SubscribeOn — 指定Observable應該在哪一個調度程序上執行
  • Serialize — 強制Observable按次序發射數據而且功能是有效的
  • Subscribe — 收到Observable發射的數據和通知後執行的操做
  • TimeInterval — 將一個Observable轉換爲發射兩個數據之間所耗費時間的Observable
  • Timeout — 添加超時機制,若是過了指定的一段時間沒有發射數據,就發射一個錯誤通知
  • Timestamp — 給Observable發射的每一個數據項添加一個時間戳
  • Using — 建立一個只在Observable的生命週期內存在的一次性資源
條件和布爾操做

這些操做符可用於單個或多個數據項,也可用於Observable

  • All — 判斷Observable發射的全部的數據項是否都知足某個條件
  • Amb — 給定多個Observable,只讓第一個發射數據的Observable發射所有數據
  • Contains — 判斷Observable是否會發射一個指定的數據項
  • DefaultIfEmpty — 發射來自原始Observable的數據,若是原始Observable沒有發射數據,就發射一個默認數據
  • SequenceEqual — 判斷兩個Observable是否按相同的數據序列
  • SkipUntil — 丟棄原始Observable發射的數據,直到第二個Observable發射了一個數據,而後發射原始Observable的剩餘數據
  • SkipWhile — 丟棄原始Observable發射的數據,直到一個特定的條件爲假,而後發射原始Observable剩餘的數據
  • TakeUntil — 發射來自原始Observable的數據,直到第二個Observable發射了一個數據或一個通知
  • TakeWhile — 發射原始Observable的數據,直到一個特定的條件爲真,而後跳過剩餘的數據
算術和聚合操做

這些操做符可用於整個數據序列

  • Average — 計算Observable發射的數據序列的平均值,而後發射這個結果
  • Concat — 不交錯的鏈接多個Observable的數據
  • Count — 計算Observable發射的數據個數,而後發射這個結果
  • Max — 計算併發射數據序列的最大值
  • Min — 計算併發射數據序列的最小值
  • Reduce — 按順序對數據序列的每個應用某個函數,而後返回這個值
  • Sum — 計算併發射數據序列的和
鏈接操做

一些有精確可控的訂閱行爲的特殊Observable

  • Connect — 指示一個可鏈接的Observable開始發射數據給訂閱者

    • 可鏈接的Observable (connectable Observable)與普通的Observable差很少,不過它並不會在被訂閱時開始發射數據,而是直到使用了Connect操做符時纔會開始。用這個方法,你能夠等待全部的觀察者都訂閱了Observable以後再開始發射數據。
    • RxJava中connect是ConnectableObservable接口的一個方法,使用publish操做符能夠將一個普通的Observable轉換爲一個ConnectableObservable。
    • 舉例

      ConnectableObservable<String> connectableObservable = Observable.just("a", "c", "d").publish();
          connectableObservable.subscribe(new Consumer<String>() {
              @Override
              public void accept(@NonNull String s) throws Exception {
                  LogUtil.i(s);
              }
          });
      
          LogUtil.i("subscribe end.....");
      
          Observable.timer(3, TimeUnit.SECONDS).subscribe(new Consumer<Long>() {
              @Override
              public void accept(@NonNull Long aLong) throws Exception {
                  LogUtil.i("connect method called after 3 seconds.");
                  connectableObservable.connect();
              }
          });複製代碼
      03-20 15:54:19.328 27493-27493/me.sunbird.react_native_demo I/x_log:RxJavaActivity.testConnectableObservable(L:586): subscribe end.....
      03-20 15:54:22.378 27493-27573/me.sunbird.react_native_demo I/x_log:RxJavaActivity$34.accept(L:591): connect method called after 3 seconds.
      03-20 15:54:22.419 27493-27573/me.sunbird.react_native_demo I/x_log:RxJavaActivity$33.accept(L:582): a
      03-20 15:54:22.419 27493-27573/me.sunbird.react_native_demo I/x_log:RxJavaActivity$33.accept(L:582): c
      03-20 15:54:22.420 27493-27573/me.sunbird.react_native_demo I/x_log:RxJavaActivity$33.accept(L:582): d複製代碼
  • Publish — 將一個普通的Observable轉換爲可鏈接的

  • RefCount — 使一個可鏈接的Observable表現得像一個普通的Observable
  • Replay — 確保全部的觀察者收到一樣的數據序列,即便他們在Observable開始發射數據以後才訂閱
轉換操做
  • To — 將Observable轉換爲其它的對象或數據結構
  • Blocking 阻塞Observable的操做符
操做符決策樹

幾種主要的需求

  • 直接建立一個Observable(建立操做)
  • 組合多個Observable(組合操做)
  • 對Observable發射的數據執行變換操做(變換操做)
  • 從Observable發射的數據中取特定的值(過濾操做)
  • 轉發Observable的部分值(條件/布爾/過濾操做)
  • 對Observable發射的數據序列求值(算術/聚合操做)

Scheduler(調度器)中文文檔

本質上RxJava就是一個作異步開發的框架,能使咱們極其靈活的進行線程切換。
咱們可使用ObserveOn和SubscribeOn操做符,可讓Observable在一個特定的調度器上執行,ObserveOn指示一個Observable在一個特定的調度器上調用觀察者的onNext, onError和onCompleted方法,SubscribeOn更進一步,它指示Observable將所有的處理過程(包括髮射數據和通知)放在特定的調度器上執行。
subscribeOn 和 observeOn 兩個操做符是極其容易混淆的,能夠看下這篇博客來完全分清楚這兩個操做符SubscribeOn 和 ObserveOn

調度器類型 效果
Schedulers.computation( ) 用於計算任務,如事件循環或和回調處理,不要用於IO操做(IO操做請使用Schedulers.io());默認線程數等於處理器的數量
Schedulers.from(executor) 使用指定的Executor做爲調度器
Schedulers.immediate( ) 在當前線程當即開始執行任務
Schedulers.io( ) 用於IO密集型任務,如異步阻塞IO操做,這個調度器的線程池會根據須要增加;對於普通的計算任務,請使用Schedulers.computation();Schedulers.io( )默認是一個CachedThreadScheduler,很像一個有線程緩存的新線程調度器
Schedulers.newThread( ) 爲每一個任務建立一個新線程
Schedulers.trampoline( ) 當其它排隊的任務完成後,在當前線程排隊開始執行
Observable.create(new ObservableOnSubscribe<String>() {
            @Override
            public void subscribe(ObservableEmitter<String> observableEmitter) throws Exception {
                LogUtil.w("subscribe method is running in thread:" + Thread.currentThread().getName());
                observableEmitter.onNext("a");
                observableEmitter.onComplete();
            }
        }).map(new Function<String, String>() {
            @Override
            public String apply(@NonNull String s) throws Exception {
                LogUtil.w("first map is running in thread:" + Thread.currentThread().getName());
                return s;
            }
        }).subscribeOn(Schedulers.io())
                .observeOn(Schedulers.newThread())
                .map(new Function<String, String>() {
                    @Override
                    public String apply(String s) throws Exception {
                        LogUtil.w("second map is running in thread:" + Thread.currentThread().getName());
                        return s;
                    }
                })
                .observeOn(Schedulers.io())
                .map(new Function<String, String>() {
                    @Override
                    public String apply(String s) throws Exception {
                        LogUtil.w("third map is running in thread:" + Thread.currentThread().getName());
                        return s;
                    }
                })
                .observeOn(Schedulers.computation())
                .map(new Function<String, String>() {
                    @Override
                    public String apply(@NonNull String s) throws Exception {
                        LogUtil.w("fourth map is running in thread:" + Thread.currentThread().getName());
                        return s;
                    }
                })
                .observeOn(AndroidSchedulers.mainThread())
                .subscribe(new Consumer<String>() {
                    @Override
                    public void accept(String s) throws Exception {
                        LogUtil.w("consumer accept method is running in thread:" + Thread.currentThread().getName());
                    }
                  });複製代碼

運行後咱們能夠獲得以下結果:

03-20 11:44:23.716 8687-8723/? W/x_log:RxJavaActivity$27.subscribe(L:528): subscribe method is running in thread:RxCachedThreadScheduler-1
03-20 11:44:23.717 8687-8723/? W/x_log:RxJavaActivity$28.apply(L:535): first map is running in thread:RxCachedThreadScheduler-1
03-20 11:44:23.721 8687-8724/? W/x_log:RxJavaActivity$29.apply(L:543): second map is running in thread:RxNewThreadScheduler-1
03-20 11:44:23.726 8687-8725/? W/x_log:RxJavaActivity$30.apply(L:551): third map is running in thread:RxCachedThreadScheduler-2
03-20 11:44:23.729 8687-8726/? W/x_log:RxJavaActivity$31.apply(L:559): fourth map is running in thread:RxComputationThreadPool-1
03-20 11:44:23.836 8687-8687/? W/x_log:RxJavaActivity$32.accept(L:567): consumer accept method is running in thread:main複製代碼

RxJava 的使用場景舉例

複雜的數據變換
Observable.just("1", "2", "2", "3", "4", "5")//建立Observable
    .map(Integer::parseInt)//對每一項執行Integer.parseInt方法
    .filter(s -> s > 1)//過濾出全部值 >1 的對象
    .distinct() //去重,這裏也能夠傳遞一個方法,來定義兩個對象是否equals的策略,很是靈活
    .take(3)//取到前3個
    .reduce((sum, item) -> sum + item) //累加
    .subscribe(System.out::println);//9 打印出最終累加的結果。複製代碼
Retrofit結合RxJava作網絡請求框架

這裏不做詳解,具體的介紹能夠看扔物線的這篇文章,對RxJava的入門者有很大的啓發。其中也講到了RxJava和Retrofit如何結合來實現更簡潔的代碼

RxJava代替EventBus進行數據傳遞
RxBus

RxBus並非一個庫,而是一種模式,是使用了RxJava的思想來達到EventBus的數據傳遞效果。這篇文章把RxBus講的比較詳細。

square/Otto 對 RxBus 的態度

This project is deprecated in favor of RxJava and RxAndroid. These projects permit the same event-driven programming model as Otto, but they’re more capable and offer better control of threading.

爲了支持 RxJava 和 RxAndroid,咱們已經廢棄了這個項目。這兩個項目提供了和 Otto 同樣的基於事件驅動的編程模型,並且他們更強大,並提供更好的線程控制。

If you’re looking for guidance on migrating from Otto to Rx, this post is a good start.

若是你正在尋找從 Otto 遷移到 Rx 的教程,閱讀這篇文章將會是一個很好的開始。

一個網絡請求依賴另一個網絡請求返回的結果。例如:登陸以後,根據拿到的token去獲取消息列表。
@GET("/token")
public Observable<String> getToken();

@GET("/user")
public Observable<User> getUser(@Query("token") String token, @Query("userId") String userId);

...

getToken()
    .flatMap(new Func1<String, Observable<User>>() {
        @Override
        public Observable<User> call(String token) {
            return getUser(token, userId);
        })
    .observeOn(AndroidSchedulers.mainThread())
    .subscribe(new Observer<User>() {
        @Override
        public void onNext(User user) {
            userView.setUser(user);
        }

        @Override
        public void onCompleted() {
        }

        @Override
        public void onError(Throwable error) {
            // Error handling
            ...
        }
    });複製代碼
同一個頁面同事發起兩個以上的請求,此時對頁面中的ProgressBar進行管理,即兩個請求只能展現一個ProgressBar,而且全部的請求都結束後,ProgressBar 才能消失
@GET("/date1")
public Observable<String> getDate1();

@GET("/data2")
public Observable<String> getData2() ... progressDialog.show() Observable.merge(getData1(), getData2()) .subscribeOn(Schedulers.newThread()) .observeOn(AndroidSchedulers.mainThread()) .subscribe(new Observer() {
                    @Override
                    public void onSubscribe(Disposable disposable) {

                    }

                    @Override
                    public void onNext(String s) {
                      // 這裏進行結果處理
                    }

                    @Override
                    public void onError(Throwable throwable) {
                      //error handling
                      progressDialog.dismiss();
                    }

                    @Override
                    public void onComplete() {
                      progressDialog.dismiss();
                    }
                  });複製代碼
使用throttleFirst(throttle:節流閥)防止按鈕重複點擊
RxView.clicks(button)
    .throttleFirst(1, TimeUnit.SECONDS)
    .subscribe(new Observer<Object>() {
        @Override
        public void onCompleted() {
              log.d ("completed");
        }

        @Override
        public void onError(Throwable e) {
              log.e("error");
        }

        @Override
        public void onNext(Object o) {
             log.d("button clicked");
        }
    });複製代碼
使用debounce(去抖動)作textSearch

用簡單的話講就是當N個結點發生的時間太靠近(即發生的時間差小於設定的值T),debounce就會自動過濾掉前N-1個結點。
好比在作百度地址聯想的時候,可使用debounce減小頻繁的網絡請求。避免每輸入(刪除)一個字就作一次聯想

RxTextView.textChangeEvents(inputEditText)
      .debounce(400, TimeUnit.MILLISECONDS)
      .observeOn(AndroidSchedulers.mainThread())
      .subscribe(new Observer<TextViewTextChangeEvent>() {
    @Override
    public void onCompleted() {
        log.d("onComplete");
    }

    @Override
    public void onError(Throwable e) {
        log.d("Error");
    }

    @Override
    public void onNext(TextViewTextChangeEvent onTextChangeEvent) {
        log.d(format("Searching for %s", onTextChangeEvent.text().toString()));
    }
});複製代碼

RxJava 的生態

  • rx-preferences -使SharedPreferences支持RxJava

  • RxAndroid -RxJava的Android拓展

  • RxLifecycle -幫助使用了RxJava的安卓應用控制生命週期

  • RxBinding -安卓UI控件的RxJava綁定API

  • Android-ReactiveLocation -Google Play Service API wrapped in RxJava

  • storio -支持RxJava的數據庫

  • retrofit -支持RxJava的網絡請求庫

  • sqlbrite -支持RxJava的sqlite數據庫

  • RxPermissions -RxJava實現的Android運行時權限控制

  • reark -RxJava architecture library for Android

  • frodo -Android Library for Logging RxJava Observables and Subscribers.

RxJava 的現況

  • RxJava 最新版 2.0.7(注:不兼容1.x 的版本) 大小 2.1M
  • RxAndroid 大小 10k
  • 支持Java6以上,Android2.3以上
  • github star 22511

RxJava 的將來展望

通用的數據流,強大的操做符,靈活的線程調度,簡單完善的異常處理機制,函數式編程等等特性,奠基了RxJava的強大地位。
Android 系統中處處都是異步,好比網絡請求,文件讀寫,數據庫查詢,系統服務或者第三方SDK服務等,這些異步請求很是耗時,須要在非UI線程中執行,而對UI的修改又必需要在主線程中執行。若是再包含多個異步執行嵌套的話,就會讓咱們的代碼顯得凌亂。經過RxJava提供的強大而通用的異步處理機制,可使咱們的代碼邏輯更清晰,便於後期的維護。而且如今RxJava的生態愈來愈大,我的認爲,之後全部的涉及異步操做的系統服務,第三方庫,第三方服務SDK都會以Observable或類Observable的方式提供給咱們調用,而不是像如今這樣,讓咱們傳遞一個又一個的listener。

參考資料

分享工具

zeplin(軟件演示)
  • 方便的效果圖管理
  • 新文件,修改文件提示
  • 自動測量
  • 標註評論
  • 支持pohtoshop和sketch
  • 第一個項目免費
charles(軟件演示)
  • 簡單已用,功能強大
  • focus 某一個域名下的請求,方便查找
  • 對某個請求修改參數,從新請求,方便調試
  • 各類格式良好的response解析
  • copy出cURL 格式的請求,方便傳遞個任何人進行請求模擬
相關文章
相關標籤/搜索