我相信你們確定對ReactiveX 和 RxJava 都不陌生,由於如今只要是和技術相關的網站,博客都會隨處見到介紹ReactiveX和RxJava的文章。html
總結起來能夠用兩個詞來歸納:異步和簡潔java
Observables 負責發出一系列的事件,這裏的事件能夠是任何東西,例如網絡請求的結果,複雜計算處理的結果,數據庫操做的結構,文件操做的結果等,事件執行結束後交給Observer的回調處理。react
進行訂閱接受處理事件android
負責對事件進行各類變化和處理git
提供了各類調度器,是RxJava能夠方便的實現異步開發github
這裏的事件值指的是 onNext (有新數據),onComplete (全部數據處理完成),onError (事件隊列異常)sql
假如如今咱們有這樣一個需求:界面上有一個自定義的視圖 imageCollectorView ,它的做用是顯示多張圖片,並能使用 addImage(Bitmap) 方法來任意增長顯示的圖片。如今須要程序將一個給出的目錄數組 File[] folders 中每一個目錄下的 png 圖片都加載出來並顯示在 imageCollectorView 中。須要注意的是,因爲讀取圖片的這一過程較爲耗時,須要放在後臺執行,而圖片的顯示則必須在 UI 線程執行。shell
目錄數組數據庫
File[] folders=new File[]{......};複製代碼
線程方式實現(call hell)編程
new Thread() {
@Override
public void run() {
super.run();
try{
for (File folder : folders) {
File[] files = folder.listFiles();
for (File file : files) {
if (file.getName().endsWith(".png")) {
final Bitmap bitmap = getBitmapFromFile(file);
getActivity().runOnUiThread(new Runnable() {
@Override
public void run() {
imageCollectorView.addImage(bitmap);
}
});
}
}
}
}catch(Exception e){
//error handling
//只能在這裏進行異常處理
}
}
}.start();複製代碼
RxJava 實現
//建立Observable
Observable observable = Observable.create(new ObservableOnSubscribe<File>() {
@Override
public void subscribe(ObservableEmitter<File> e) throws Exception {
for (File file : files) {
e.onNext(file);
}
e.onComplete();
}
});
//建立Observer
Observer<Bitmap> observer = new Observer<Bitmap>() {
@Override
public void onSubscribe(Disposable disposable) {
}
@Override
public void onNext(Bitmap bitmap) {
imageCollectorView.addImage(bitmap);
}
@Override
public void onError(Throwable throwable) {
//error handling
}
@Override
public void onComplete() {
Log.i(TAG,"All images are shown");
}
};
//對事件集進行處理並鏈接消費者
observable.flatMap(new Func1<File, Observable<File>>() {//分別獲取每一個文件夾下面的文件,組合成一個Observable
@Override
public Observable<File> call(File file) {
return Observable.from(file.listFiles());
}
})
.filter(new Func1<File, Boolean>() {//過濾出全部擴展名爲png的文件
@Override
public Boolean call(File file) {
return file.getName().endsWith(".png");
}
})
.map(new Func1<File, Bitmap>() {//根據File對象,獲取Bitmap對象
@Override
public Bitmap call(File file) {
return getBitmapFromFile(file);
}
})
.subscribeOn(Schedulers.io())//指定Observable的全部操做符的操做在io線程中執行
.observeOn(AndroidSchedulers.mainThread())//指定消費者在主線程中執行
.subscribe(observer);//鏈接觀察者複製代碼
有的人可能說了,你這不是代碼更多,更復雜了嗎?
不要着急,這只是最基礎的版本,稍後會對代碼進行簡化。
但即便是這種狀況下,代碼雖然多了,但咱們能夠發現,他的邏輯更清晰了,也沒有那麼多的嵌套了。
簡化代碼
//建立Observable
Observable observable = Observable.from(folers);
//建立Observer
Consumer<Bitmap> consumer=new Consumer<Bitmap>() {
@Override
public void accept(@NonNull Bitmap bitmap) throws Exception {
imageCollectorView.addImage(bitmap);
}
};
//對事件集進行處理並鏈接消費者
observable.flatMap(new Func1<File, Observable<File>>() {//分別獲取每一個文件夾下面的文件,組合成一個Observable
@Override
public Observable<File> call(File file) {
return Observable.from(file.listFiles());
}
})
.filter(new Func1<File, Boolean>() {//過濾出全部擴展名爲png的文件
@Override
public Boolean call(File file) {
return file.getName().endsWith(".png");
}
})
.map(new Func1<File, Bitmap>() {//根據File對象,獲取Bitmap對象
@Override
public Bitmap call(File file) {
return getBitmapFromFile(file);
}
})
.subscribeOn(Schedulers.io())//指定Observable的全部操做符的操做在io線程中執行
.observeOn(AndroidSchedulers.mainThread())//指定消費者在主線程中執行
.subscribe(consumer);//鏈接消費者複製代碼
RxJava 鏈式調用實現
Observable.from(folders)
.flatMap(new Func1<File, Observable<File>>() {
@Override
public Observable<File> call(File file) {
return Observable.from(file.listFiles());
}
})
.filter(new Func1<File, Boolean>() {
@Override
public Boolean call(File file) {
return file.getName().endsWith(".png");
}
})
.map(new Func1<File, Bitmap>() {
@Override
public Bitmap call(File file) {
return getBitmapFromFile(file);
}
})
.subscribeOn(Schedulers.io())
.observeOn(AndroidSchedulers.mainThread())
.subscribe(new Consumer<Bitmap>() {
@Override
public void accept(@NonNull Bitmap bitmap) throws Exception {
imageCollectorView.addImage(bitmap);
}
});複製代碼
RxJava + lambda 實現
Observable.from(folders)
.flatMap(file -> Observable.from(file.listFiles())
.filter(file -> file.getName().endsWith(".png"))
.map( file -> getBitmapFromFile(file))
.subscribeOn(Schedulers.io())
.observeOn(AndroidSchedulers.mainThread())
.subscribe(bitmap -> imageCollectorView.addImage(bitmap));//無異常處理,有異常會拋到主線程,不影響咱們原來程序的crash處理複製代碼
關於lambda(匿名函數,它能夠包含表達式和語句)在Android中的使用:
- 要在 Android 的較早版本中測試 Lambda 表達式、方法引用和類型註解,請前往您的 build.gradle 文件,將 compileSdkVersion 和 targetSdkVersion 設置爲 23 或更低。您仍須要啓用 Jack 工具鏈以使用這些 Java 8 功能。
- 經測試,按照官方提供的方案配置後,雖然可使用lambda,但編譯速度變的很慢。
- 在運行的時候,而且我測試用的項目引用改了 Bouncy Castle(輕量級加密解密工具包) 這個包報出了內存溢出的異常,因此我感受如今還不太穩定。
- 第三方開源的實現方案:retrolambda
- 固然咱們也能夠不用lambda,這樣代碼看着比較多,但因其只有一層嵌套的鏈式調用,因此邏輯結構並不複雜。事實上 Android Studio 會自動幫咱們把這部分代碼摺疊成lambda的形式。
更進一步,假設咱們如今須要忽略掉前5張,一共顯示10張
Observable.from(folders)
.flatMap(file -> Observable.from(file.listFiles())
.filter(file -> file.getName().endsWith(".png"))
.skip(5)
.take(10)
.map( file -> getBitmapFromFile(file))
.subscribeOn(Schedulers.io())
.observeOn(AndroidSchedulers.mainThread())
.subscribe(bitmap -> imageCollectorView.addImage(bitmap));//無異常處理,有異常會拋到主線程,不影響咱們原來程序的crash處理複製代碼
用於建立Observable的操做符
Create
— 經過調用觀察者的方法從頭建立一個Observable
create操做符是全部建立型操做符的「根」,也就是說其餘建立型操做符最後都是經過create操做符來建立Observable的
From
— 將其它的對象或數據結構轉換爲Observable
Just
— 將對象或者對象集合轉換爲一個會發射這些對象的ObservableDefer
— 在觀察者訂閱以前不建立這個Observable,爲每個觀察者建立一個新的ObservableEmpty/Never/Throw
— 建立行爲受限的特殊Observable
通常用於測試
Interval
— 建立一個定時發射整數序列的Observable
Range
— 建立發射指定範圍的整數序列的ObservableRepeat
— 建立重複發射特定的數據或數據序列的ObservableStart
— 建立發射一個函數的返回值的ObservableTimer
— 建立在一個指定的延遲以後發射單個數據的Observable這些操做符可用於對Observable發射的數據進行變換
Map
— 映射,經過對序列的每一項都應用一個函數變換Observable發射的數據,實質是對序列中的每一項執行一個函數,函數的參數就是這個數據項Buffer
— 緩存,能夠簡單的理解爲緩存,它按期從Observable收集數據到一個集合,而後把這些數據集合打包發射,而不是一次發射一個FlatMap
— 扁平映射,將Observable發射的數據變換爲Observables集合,而後將這些Observable發射的數據平坦化的放進一個單獨的Observable,能夠認爲是一個將嵌套的數據結構展開的過程。GroupBy
— 分組,將原來的Observable分拆爲Observable集合,將原始Observable發射的數據按Key分組,每個Observable發射一組不一樣的數據Scan
— 掃描,對Observable發射的每一項數據應用一個函數,而後按順序依次發射這些值Window
— 窗口,按期未來自Observable的數據分拆成一些Observable窗口,而後發射這些窗口,而不是每次發射一項。相似於Buffer,但Buffer發射的是數據,Window發射的是Observable,每個Observable發射原始Observable的數據的一個子集這些操做符用於從Observable發射的數據中進行選擇,符合必定條件的發送給觀察者進行處理,不符合條件的直接丟棄
Filter
— 過濾,過濾掉沒有經過謂詞測試的數據項,只發射經過測試的Skip
— 跳過前面的若干項數據SkipLast
— 跳事後面的若干項數據Take
— 只保留前面的若干項數據TakeLast
— 只保留後面的若干項數據Debounce
— 只有在空閒了一段時間後才發射數據,通俗的說,就是若是一段時間沒有操做,就執行一次操做Distinct
— 去重,過濾掉重複數據項ElementAt
— 取值,取特定位置的數據項First
— 首項,只發射知足條件的第一條數據IgnoreElements
— 忽略全部的數據,只保留/終止通知(onError或onCompleted)Last
— 末項,只發射最後一條數據Sample
— 取樣,按期發射最新的數據,等因而數據抽樣,有的實現裏叫ThrottleFirst組合操做符用於將多個Observable組合成一個單一的Observable
And/Then/When
— 經過模式(And條件)和計劃(Then次序)組合兩個或多個Observable發射的數據集CombineLatest
— 當兩個Observables中的任何一個發射了一個數據時,經過一個指定的函數組合每一個Observable發射的最新數據(一共兩個數據),而後發射這個函數的結果Join
— 不管什麼時候,若是一個Observable發射了一個數據項,只要在另外一個Observable發射的數據項定義的時間窗口內,就將兩個Observable發射的數據合併發射Merge
— 將兩個Observable發射的數據組合併成一個StartWith
— 在發射原來的Observable的數據序列以前,先發射一個指定的數據序列或數據項Switch
— 將一個發射Observable序列的Observable轉換爲這樣一個Observable:它逐個發射那些Observable最近發射的數據Zip
— 打包,使用一個指定的函數將多個Observable發射的數據組合在一塊兒,而後將這個函數的結果做爲單項數據發射這些操做符用於從錯誤通知中恢復
一組用於處理Observable的操做符
Delay
— 延遲一段時間發射結果數據Do
— 註冊一個動做佔用一些Observable的生命週期事件,至關於Mock某個操做Materialize/Dematerialize
— 將發射的數據和通知都當作數據發射,或者反過來ObserveOn
— 指定觀察者觀察Observable的調度程序(工做線程)SubscribeOn
— 指定Observable應該在哪一個調度程序上執行Serialize
— 強制Observable按次序發射數據而且功能是有效的Subscribe
— 收到Observable發射的數據和通知後執行的操做TimeInterval
— 將一個Observable轉換爲發射兩個數據之間所耗費時間的ObservableTimeout
— 添加超時機制,若是過了指定的一段時間沒有發射數據,就發射一個錯誤通知Timestamp
— 給Observable發射的每一個數據項添加一個時間戳Using
— 建立一個只在Observable的生命週期內存在的一次性資源這些操做符可用於單個或多個數據項,也可用於Observable
All
— 判斷Observable發射的全部的數據項是否都知足某個條件Amb
— 給定多個Observable,只讓第一個發射數據的Observable發射所有數據Contains
— 判斷Observable是否會發射一個指定的數據項DefaultIfEmpty
— 發射來自原始Observable的數據,若是原始Observable沒有發射數據,就發射一個默認數據SequenceEqual
— 判斷兩個Observable是否按相同的數據序列SkipUntil
— 丟棄原始Observable發射的數據,直到第二個Observable發射了一個數據,而後發射原始Observable的剩餘數據SkipWhile
— 丟棄原始Observable發射的數據,直到一個特定的條件爲假,而後發射原始Observable剩餘的數據TakeUntil
— 發射來自原始Observable的數據,直到第二個Observable發射了一個數據或一個通知TakeWhile
— 發射原始Observable的數據,直到一個特定的條件爲真,而後跳過剩餘的數據這些操做符可用於整個數據序列
Average
— 計算Observable發射的數據序列的平均值,而後發射這個結果Concat
— 不交錯的鏈接多個Observable的數據Count
— 計算Observable發射的數據個數,而後發射這個結果Max
— 計算併發射數據序列的最大值Min
— 計算併發射數據序列的最小值Reduce
— 按順序對數據序列的每個應用某個函數,而後返回這個值Sum
— 計算併發射數據序列的和一些有精確可控的訂閱行爲的特殊Observable
Connect
— 指示一個可鏈接的Observable開始發射數據給訂閱者
舉例
ConnectableObservable<String> connectableObservable = Observable.just("a", "c", "d").publish();
connectableObservable.subscribe(new Consumer<String>() {
@Override
public void accept(@NonNull String s) throws Exception {
LogUtil.i(s);
}
});
LogUtil.i("subscribe end.....");
Observable.timer(3, TimeUnit.SECONDS).subscribe(new Consumer<Long>() {
@Override
public void accept(@NonNull Long aLong) throws Exception {
LogUtil.i("connect method called after 3 seconds.");
connectableObservable.connect();
}
});複製代碼
03-20 15:54:19.328 27493-27493/me.sunbird.react_native_demo I/x_log:RxJavaActivity.testConnectableObservable(L:586): subscribe end.....
03-20 15:54:22.378 27493-27573/me.sunbird.react_native_demo I/x_log:RxJavaActivity$34.accept(L:591): connect method called after 3 seconds.
03-20 15:54:22.419 27493-27573/me.sunbird.react_native_demo I/x_log:RxJavaActivity$33.accept(L:582): a
03-20 15:54:22.419 27493-27573/me.sunbird.react_native_demo I/x_log:RxJavaActivity$33.accept(L:582): c
03-20 15:54:22.420 27493-27573/me.sunbird.react_native_demo I/x_log:RxJavaActivity$33.accept(L:582): d複製代碼
Publish
— 將一個普通的Observable轉換爲可鏈接的
RefCount
— 使一個可鏈接的Observable表現得像一個普通的ObservableReplay
— 確保全部的觀察者收到一樣的數據序列,即便他們在Observable開始發射數據以後才訂閱幾種主要的需求
本質上RxJava就是一個作異步開發的框架,能使咱們極其靈活的進行線程切換。
咱們可使用ObserveOn和SubscribeOn操做符,可讓Observable在一個特定的調度器上執行,ObserveOn指示一個Observable在一個特定的調度器上調用觀察者的onNext, onError和onCompleted方法,SubscribeOn更進一步,它指示Observable將所有的處理過程(包括髮射數據和通知)放在特定的調度器上執行。
subscribeOn 和 observeOn 兩個操做符是極其容易混淆的,能夠看下這篇博客來完全分清楚這兩個操做符SubscribeOn 和 ObserveOn
調度器類型 | 效果 |
---|---|
Schedulers.computation( ) | 用於計算任務,如事件循環或和回調處理,不要用於IO操做(IO操做請使用Schedulers.io());默認線程數等於處理器的數量 |
Schedulers.from(executor) | 使用指定的Executor做爲調度器 |
Schedulers.immediate( ) | 在當前線程當即開始執行任務 |
Schedulers.io( ) | 用於IO密集型任務,如異步阻塞IO操做,這個調度器的線程池會根據須要增加;對於普通的計算任務,請使用Schedulers.computation();Schedulers.io( )默認是一個CachedThreadScheduler,很像一個有線程緩存的新線程調度器 |
Schedulers.newThread( ) | 爲每一個任務建立一個新線程 |
Schedulers.trampoline( ) | 當其它排隊的任務完成後,在當前線程排隊開始執行 |
Observable.create(new ObservableOnSubscribe<String>() {
@Override
public void subscribe(ObservableEmitter<String> observableEmitter) throws Exception {
LogUtil.w("subscribe method is running in thread:" + Thread.currentThread().getName());
observableEmitter.onNext("a");
observableEmitter.onComplete();
}
}).map(new Function<String, String>() {
@Override
public String apply(@NonNull String s) throws Exception {
LogUtil.w("first map is running in thread:" + Thread.currentThread().getName());
return s;
}
}).subscribeOn(Schedulers.io())
.observeOn(Schedulers.newThread())
.map(new Function<String, String>() {
@Override
public String apply(String s) throws Exception {
LogUtil.w("second map is running in thread:" + Thread.currentThread().getName());
return s;
}
})
.observeOn(Schedulers.io())
.map(new Function<String, String>() {
@Override
public String apply(String s) throws Exception {
LogUtil.w("third map is running in thread:" + Thread.currentThread().getName());
return s;
}
})
.observeOn(Schedulers.computation())
.map(new Function<String, String>() {
@Override
public String apply(@NonNull String s) throws Exception {
LogUtil.w("fourth map is running in thread:" + Thread.currentThread().getName());
return s;
}
})
.observeOn(AndroidSchedulers.mainThread())
.subscribe(new Consumer<String>() {
@Override
public void accept(String s) throws Exception {
LogUtil.w("consumer accept method is running in thread:" + Thread.currentThread().getName());
}
});複製代碼
運行後咱們能夠獲得以下結果:
03-20 11:44:23.716 8687-8723/? W/x_log:RxJavaActivity$27.subscribe(L:528): subscribe method is running in thread:RxCachedThreadScheduler-1
03-20 11:44:23.717 8687-8723/? W/x_log:RxJavaActivity$28.apply(L:535): first map is running in thread:RxCachedThreadScheduler-1
03-20 11:44:23.721 8687-8724/? W/x_log:RxJavaActivity$29.apply(L:543): second map is running in thread:RxNewThreadScheduler-1
03-20 11:44:23.726 8687-8725/? W/x_log:RxJavaActivity$30.apply(L:551): third map is running in thread:RxCachedThreadScheduler-2
03-20 11:44:23.729 8687-8726/? W/x_log:RxJavaActivity$31.apply(L:559): fourth map is running in thread:RxComputationThreadPool-1
03-20 11:44:23.836 8687-8687/? W/x_log:RxJavaActivity$32.accept(L:567): consumer accept method is running in thread:main複製代碼
Observable.just("1", "2", "2", "3", "4", "5")//建立Observable
.map(Integer::parseInt)//對每一項執行Integer.parseInt方法
.filter(s -> s > 1)//過濾出全部值 >1 的對象
.distinct() //去重,這裏也能夠傳遞一個方法,來定義兩個對象是否equals的策略,很是靈活
.take(3)//取到前3個
.reduce((sum, item) -> sum + item) //累加
.subscribe(System.out::println);//9 打印出最終累加的結果。複製代碼
這裏不做詳解,具體的介紹能夠看扔物線的這篇文章,對RxJava的入門者有很大的啓發。其中也講到了RxJava和Retrofit如何結合來實現更簡潔的代碼
RxBus並非一個庫,而是一種模式,是使用了RxJava的思想來達到EventBus的數據傳遞效果。這篇文章把RxBus講的比較詳細。
This project is deprecated in favor of RxJava and RxAndroid. These projects permit the same event-driven programming model as Otto, but they’re more capable and offer better control of threading.
爲了支持 RxJava 和 RxAndroid,咱們已經廢棄了這個項目。這兩個項目提供了和 Otto 同樣的基於事件驅動的編程模型,並且他們更強大,並提供更好的線程控制。
If you’re looking for guidance on migrating from Otto to Rx, this post is a good start.
若是你正在尋找從 Otto 遷移到 Rx 的教程,閱讀這篇文章將會是一個很好的開始。
@GET("/token")
public Observable<String> getToken();
@GET("/user")
public Observable<User> getUser(@Query("token") String token, @Query("userId") String userId);
...
getToken()
.flatMap(new Func1<String, Observable<User>>() {
@Override
public Observable<User> call(String token) {
return getUser(token, userId);
})
.observeOn(AndroidSchedulers.mainThread())
.subscribe(new Observer<User>() {
@Override
public void onNext(User user) {
userView.setUser(user);
}
@Override
public void onCompleted() {
}
@Override
public void onError(Throwable error) {
// Error handling
...
}
});複製代碼
@GET("/date1")
public Observable<String> getDate1();
@GET("/data2")
public Observable<String> getData2() ... progressDialog.show() Observable.merge(getData1(), getData2()) .subscribeOn(Schedulers.newThread()) .observeOn(AndroidSchedulers.mainThread()) .subscribe(new Observer() {
@Override
public void onSubscribe(Disposable disposable) {
}
@Override
public void onNext(String s) {
// 這裏進行結果處理
}
@Override
public void onError(Throwable throwable) {
//error handling
progressDialog.dismiss();
}
@Override
public void onComplete() {
progressDialog.dismiss();
}
});複製代碼
RxView.clicks(button)
.throttleFirst(1, TimeUnit.SECONDS)
.subscribe(new Observer<Object>() {
@Override
public void onCompleted() {
log.d ("completed");
}
@Override
public void onError(Throwable e) {
log.e("error");
}
@Override
public void onNext(Object o) {
log.d("button clicked");
}
});複製代碼
用簡單的話講就是當N個結點發生的時間太靠近(即發生的時間差小於設定的值T),debounce就會自動過濾掉前N-1個結點。
好比在作百度地址聯想的時候,可使用debounce減小頻繁的網絡請求。避免每輸入(刪除)一個字就作一次聯想
RxTextView.textChangeEvents(inputEditText)
.debounce(400, TimeUnit.MILLISECONDS)
.observeOn(AndroidSchedulers.mainThread())
.subscribe(new Observer<TextViewTextChangeEvent>() {
@Override
public void onCompleted() {
log.d("onComplete");
}
@Override
public void onError(Throwable e) {
log.d("Error");
}
@Override
public void onNext(TextViewTextChangeEvent onTextChangeEvent) {
log.d(format("Searching for %s", onTextChangeEvent.text().toString()));
}
});複製代碼
rx-preferences -使SharedPreferences支持RxJava
RxAndroid -RxJava的Android拓展
RxLifecycle -幫助使用了RxJava的安卓應用控制生命週期
RxBinding -安卓UI控件的RxJava綁定API
Android-ReactiveLocation -Google Play Service API wrapped in RxJava
storio -支持RxJava的數據庫
retrofit -支持RxJava的網絡請求庫
sqlbrite -支持RxJava的sqlite數據庫
RxPermissions -RxJava實現的Android運行時權限控制
reark -RxJava architecture library for Android
frodo -Android Library for Logging RxJava Observables and Subscribers.
通用的數據流,強大的操做符,靈活的線程調度,簡單完善的異常處理機制,函數式編程等等特性,奠基了RxJava的強大地位。
Android 系統中處處都是異步,好比網絡請求,文件讀寫,數據庫查詢,系統服務或者第三方SDK服務等,這些異步請求很是耗時,須要在非UI線程中執行,而對UI的修改又必需要在主線程中執行。若是再包含多個異步執行嵌套的話,就會讓咱們的代碼顯得凌亂。經過RxJava提供的強大而通用的異步處理機制,可使咱們的代碼邏輯更清晰,便於後期的維護。而且如今RxJava的生態愈來愈大,我的認爲,之後全部的涉及異步操做的系統服務,第三方庫,第三方服務SDK都會以Observable或類Observable的方式提供給咱們調用,而不是像如今這樣,讓咱們傳遞一個又一個的listener。