RxJava
是一款基於 Java VM
實現的響應式編程擴展庫 - 基於觀察者模式的異步和事件處理框架。RxJava
官方目前同時維護了兩個版本,分別是 1.x
和 2.x
,區別是它們使用不一樣的 group id
和 namespaces
。java
版本 | group id | namespaces |
---|---|---|
v1.x | io.reactivex | io.reactivex |
v2.x | io.reactivex.rxjava2 | rx |
本系列的文章將針對 RxJava 1.x
進行介紹,先給出 Github
的地址:react
經過 Gradle 引入相關依賴:android
compile 'io.reactivex:rxjava:1.0.14'
compile 'io.reactivex:rxandroid:1.0.1'
複製代碼
一個精準的解釋以下:RxJava
是一個運行於 Java VM
,由可觀測序列組成的,異步、基於事件的函數庫。git
換句話說,『一樣是作異步,爲何人們用它,而不用現成的 AsyncTask
/ Handler
/ XXX
/ ... ?』github
一個詞:簡潔。編程
異步操做很關鍵的一點是程序的簡潔性,由於在調度過程比較複雜的狀況下,異步代碼常常會既難寫也難被讀懂。 Android
創造的 AsyncTask 和Handler
,其實都是爲了讓異步代碼更加簡潔。RxJava 的優點也是簡潔,但它的簡潔的不同凡響之處在於,隨着程序邏輯變得愈來愈複雜,它依然可以保持簡潔。後端
在 Android
開發中,假設有這樣一個需求:界面上有一個自定義的視圖 imageCollectorView
,它的做用是顯示多張圖片,並能使用 addImage(Bitmap) 方法來任意增長顯示的圖片。如今須要程序將一個給出的目錄數組 File[] folders
中每一個目錄下的 png
圖片都加載出來並顯示在 imageCollectorView
中。數組
注意: 因爲讀取圖片的過程較爲耗時,須要放在後臺執行,而圖片的顯示則必須在 UI 線程執行。緩存
經常使用的實現方式有多種,這裏給出其中一種:多線程
new Thread() {
@Override
public void run() {
super.run();
for (File folder : folders) {
File[] files = folder.listFiles();
for (File file : files) {
if (file.getName().endsWith(".png")) {
final Bitmap bitmap = getBitmapFromFile(file);
getActivity().runOnUiThread(new Runnable() {
@Override
public void run() {
imageCollectorView.addImage(bitmap);
}
});
}
}
}
}
}.start();
複製代碼
而若是使用 RxJava
,實現方式是這樣的:
Observable.from(folders)
.flatMap(new Func1<File, Observable<File>>() {
@Override
public Observable<File> call(File file) {
return Observable.from(file.listFiles());
}
})
.filter(new Func1<File, Boolean>() {
@Override
public Boolean call(File file) {
return file.getName().endsWith(".png");
}
})
.map(new Func1<File, Bitmap>() {
@Override
public Bitmap call(File file) {
return getBitmapFromFile(file);
}
})
.subscribeOn(Schedulers.io())
.observeOn(AndroidSchedulers.mainThread())
.subscribe(new Action1<Bitmap>() {
@Override
public void call(Bitmap bitmap) {
imageCollectorView.addImage(bitmap);
}
});
複製代碼
能夠發現,使用 RxJava 方式代碼量明顯大大增長,所謂簡潔從何而來?
這裏說的簡潔是指的邏輯上的。觀察一下你會發現,RxJava
的這個實現,是一條從上到下的鏈式調用,沒有任何嵌套,這在邏輯的簡潔性上是具備優點的。當需求變得複雜時,這種優點將更加明顯(試想若是還要求只選取前 10 張圖片,常規方式要怎麼辦?若是有更多這樣那樣的要求呢?再試想,在這一大堆需求實現完兩個月以後須要改功能,當你翻回這裏看到本身當初寫下的那一片迷之縮進,你能保證本身將迅速看懂,而不是對着代碼從新捋一遍思路?)。
另外,若是你的 IDE
是 Android Studio
,其實每次打開某個 Java
文件的時候,你會看到被自動 Lambda
化的預覽,這將讓你更加清晰地看到程序邏輯:
Observable.from(folders)
.flatMap((Func1) (folder) -> { Observable.from(file.listFiles()) })
.filter((Func1) (file) -> { file.getName().endsWith(".png") })
.map((Func1) (file) -> { getBitmapFromFile(file) })
.subscribeOn(Schedulers.io())
.observeOn(AndroidSchedulers.mainThread())
.subscribe((Action1) (bitmap) -> { imageCollectorView.addImage(bitmap) });
複製代碼
因此,RxJava
有啥優勢?就好在簡潔,優勢就是把複雜邏輯,經過函數式編程模型穿成一條線。
RxJava
的異步實現,是經過一種擴展的觀察者模式來實現的。
觀察者模式面向的需求是:A
對象(觀察者)對 B
對象(被觀察者)的某種變化高度敏感,須要在 B
變化的一瞬間作出反應。
舉個例子,新聞裏喜聞樂見的警察抓小偷,警察須要在小偷伸手做案的時候實施抓捕。在這個例子裏,警察是觀察者,小偷是被觀察者,警察須要時刻盯着小偷的一舉一動,才能保證不會漏過任何瞬間。
程序的觀察者模式略有不一樣,觀察者不須要時刻盯着被觀察者(例如 A
不須要每過 2ms
就檢查一次 B
的狀態),而是採用註冊( Register
)或者稱爲訂閱(Subscribe
)的方式,告訴被觀察者:我須要你的某種狀態,你要在它變化的時候通知我。
採起這樣被動的觀察方式,既省去了反覆檢索狀態的資源消耗,也可以獲得最高的反饋速度。
Android
開發中一個典型的例子是點擊監聽器 OnClickListener
。對設置 OnClickListener
來講,View
是被觀察者,OnClickListener
是觀察者,兩者經過 setOnClickListener()
方法達成訂閱關係。訂閱以後用戶點擊按鈕的瞬間,Android Framework
就會將點擊事件發送給已註冊的 OnClickListener 。
OnClickListener
的觀察者模式大體以下圖:
如圖所示,經過 setOnClickListener()
方法,Button
持有 OnClickListener
的引用(這一過程沒有在圖上畫出)。當用戶點擊時,Button
自動調用 OnClickListener
的 onClick()
方法。
按照觀察者模式抽象出來的各個概念:
就由專用的觀察者模式轉變成了通用的觀察者模式,以下圖:
RxJava
有四個基本概念:
Observable
和 Observer
經過 subscribe()
方法實現訂閱關係,使得Observable
能夠在須要的時候發出事件來通知 Observer
。
與傳統觀察者模式不一樣,RxJava
的事件回調方法除了普通事件 onNext()
(至關於 onClick()
) 以外,還定義了兩個特殊的事件:onCompleted()
和 onError()
。
RxJava
不只把每一個事件單獨處理,還會把它們看作一個隊列。RxJava
規定,當不會再有新的 onNext()
發出時,須要觸發 onCompleted()
方法做爲事件完成標誌。
在事件處理過程當中出異常時,onError()
會被觸發,同時隊列自動終止,不容許再有事件發出。
在一個正確運行的事件序列中, onCompleted() 和 onError() 有且只有一個被調用,而且是事件序列中的最後一個執行。
RxJava
的觀察者模式大體以下圖:
基於以上的概念,RxJava
的基本使用有 3 個步驟:
Observer
即觀察者,它決定事件觸發的時候將有怎樣的行爲。 RxJava
中的 Observer
接口的聲明方式:
Observer<String> observer = new Observer<String>() {
@Override
public void onNext(String s) {
Log.d(tag, "Item: " + s);
}
@Override
public void onCompleted() {
Log.d(tag, "Completed!");
}
@Override
public void onError(Throwable e) {
Log.d(tag, "Error: " + e.getMessage());
}
};
複製代碼
除了 Observer
接口以外,RxJava
還內置了一個實現了 Observer
的抽象類:Subscriber
。 Subscriber
對 Observer
接口進行了一些擴展,但他們的基本使用方式是徹底同樣的:
Subscriber<String> subscriber = new Subscriber<String>() {
@Override
public void onNext(String s) {
Log.d(tag, "Item: " + s);
}
@Override
public void onCompleted() {
Log.d(tag, "Completed!");
}
@Override
public void onError(Throwable e) {
Log.d(tag, "Error: " + e.getMessage());
}
};
複製代碼
實質上,在 RxJava
的 subscribe
過程當中,Observer
也老是會先被轉換成一個 Subscriber
再使用。因此若是你只想使用基本功能,選擇 Observer
和 Subscriber
是徹底同樣的。它們的區別對於使用者來講主要有兩點:
這是 Subscriber
增長的方法。它會在 subscribe
剛開始,而事件還未發送以前被調用。能夠用於作一些準備工做,例如數據的清零或重置。這是一個可選方法,默認狀況下它的實現爲空。
須要注意的是,若是對準備工做的線程有要求(例如: 彈出一個顯示進度的對話框,這必須在主線程執行),onStart()
就不適用了。由於它老是在 subscribe
所發生的線程被調用,而不能指定線程。要在指定的線程來作準備工做,可使用 doOnSubscribe()
方法,具體能夠在後面的章節中看到。
這是 Subscriber
所實現的另外一個接口 Subscription
的方法,用於取消訂閱。在這個方法被調用後,Subscriber
將再也不接收事件。通常在這個方法調用前,可使用 isUnsubscribed()
先判斷一下狀態。
unsubscribe()
這個方法很重要,由於在 subscribe()
以後, Observable
會持有 Subscriber 的引用。這個引用若是不能及時被釋放,將有內存泄露的風險。
注意:在再也不使用的時候儘快在合適的地方(例如: onPause()
和 onStop()
等方法中)調用 unsubscribe()
來解除引用關係,以免內存泄露的發生。
Observable
即被觀察者,它決定何時觸發事件以及觸發怎樣的事件。 RxJava
使用 create()
方法來建立一個 Observable
,併爲它定義事件觸發規則。示例以下:
Observable observable = Observable.create(new Observable.OnSubscribe<String>() {
@Override
public void call(Subscriber<? super String> subscriber) {
subscriber.onNext("Hello");
subscriber.onNext("Hi");
subscriber.onNext("Aloha");
subscriber.onCompleted();
}
});
複製代碼
能夠看到,這裏傳入了一個 OnSubscribe
對象做爲參數。OnSubscribe
會被存儲在返回的 Observable
對象中。
它的做用至關於一個計劃表,當 Observable
被訂閱的時候,OnSubscribe
的 call()
方法會自動被調用,事件序列就會依照設定依次觸發(對於上面的代碼,就是觀察者Subscriber
將會被調用三次 onNext()
和一次 onCompleted()
)。
這樣,由被觀察者調用了觀察者的回調方法,就實現了由被觀察者向觀察者的事件傳遞,即觀察者模式。
create()
方法是 RxJava
最基本的建立事件序列的方法。基於這個方法,RxJava
還提供了一些方法用於快捷建立事件隊列,例如 just()
方法:
Observable observable = Observable.just("Hello", "Hi", "Aloha");
// 將會依次調用方法序列:onNext("Hello") -> onNext("Hi") -> onCompleted()
複製代碼
將傳入的數組或 Iterable
拆分紅具體對象後,依次發送給觀察者,示例以下:
String[] words = {"Hello", "Hi", "Aloha"};
Observable observable = Observable.from(words);
// 將會依次調用方法序列:onNext("Hello") -> onNext("Hi") -> onCompleted()
複製代碼
建立了 Observable
和 Observer
以後,再用 subscribe()
方法將它們關聯起來,整條鏈子就能夠工做了。代碼很簡單:
observable.subscribe(observer);
// 或者
observable.subscribe(subscriber);
複製代碼
可能會注意到,subscribe() 這個方法有點怪:它看起來是『observable 訂閱了 observer / subscriber』,而不是『observer / subscriber 訂閱了 observable』。這看起來就像『雜誌訂閱了讀者』同樣顛倒了對象關係。
這讓人讀起來有點彆扭,不過若是把 API 設計成 『observer.subscribe(observable) / subscriber.subscribe(observable)』,雖然更加符合思惟邏輯,但對流式 API 的設計就形成影響了,比較起來明顯是得不償失的。
Observable.subscribe(Subscriber)
的內部實現是這樣的(核心代碼):
public Subscription subscribe(Subscriber subscriber) {
subscriber.onStart();
onSubscribe.call(subscriber);
return subscriber;
}
複製代碼
能夠看到subscriber()
作了3件事:
(a). 調用Subscriber.onStart()
這個方法在前面已經介紹過,是一個可選的準備方法。
(b). 調用Observable中的OnSubscribe.call(Subscriber)
事件發送的邏輯開始運行。從這也能夠看出,在RxJava中,Observable並非在建立的時候就當即開始發送事件,而是在它被訂閱的時候,即當subscribe()方法執行的時候。
(c). 返回Subscription
將傳入的Subscriber做爲Subscription返回。這是爲了方便後面的unsubscribe()。
整個過程當中對象間的關係以下圖:
或者能夠看動圖:
除了 subscribe(Observer)
和 subscribe(Subscriber)
,subscribe()
還支持不完整定義的回調,RxJava
會自動根據定義建立出 Subscriber
。形式以下:
Action1<String> onNextAction = new Action1<String>() {
// onNext()
@Override
public void call(String s) {
Log.d(tag, s);
}
};
Action1<Throwable> onErrorAction = new Action1<Throwable>() {
// onError()
@Override
public void call(Throwable throwable) {
// Error handling
}
};
Action0 onCompletedAction = new Action0() {
// onCompleted()
@Override
public void call() {
Log.d(tag, "completed");
}
};
// 自動建立 Subscriber ,並使用 onNextAction 來定義 onNext()
observable.subscribe(onNextAction);
// 自動建立 Subscriber ,並使用 onNextAction 和 onErrorAction 來定義 onNext() 和 onError()
observable.subscribe(onNextAction, onErrorAction);
// 自動建立 Subscriber ,並使用 onNextAction、 onErrorAction 和 onCompletedAction 來定義 onNext()、 onError() 和 onCompleted()
observable.subscribe(onNextAction, onErrorAction, onCompletedAction);
複製代碼
簡單解釋一下這段代碼中出現的 Action1
和 Action0
。
Action0
是 RxJava
的一個接口,它只有一個方法 call()
,這個方法是無參無返回值的。因爲 onCompleted()
方法也是無參無返回值的,所以 Action0
能夠被當成一個包裝對象,將 onCompleted()
的內容打包起來將本身做爲一個參數傳入 subscribe()
以實現不完整定義的回調。
Action1
也是一個接口,它一樣只有一個方法 call(T param)
,這個方法也無返回值,但有一個參數。與 Action0
同理,因爲 onNext(T obj)
和 onError(Throwable error)
也是單參數無返回值的,所以 Action1
能夠將 onNext(obj)
和 onError(error)
打包起來傳入 subscribe()
以實現不完整定義的回調。
事實上,雖然
Action0
和Action1
在API
中使用最普遍,但RxJava
提供了多個ActionX
形式的接口 (例如:Action2
,Action3
),它們能夠被用以包裝不一樣的無返回值的方法。
將字符串數組 names 中的全部字符串依次打印出來:
String[] names = ...;
Observable.from(names)
.subscribe(new Action1<String>() {
@Override
public void call(String name) {
Log.d(tag, name);
}
});
複製代碼
int drawableRes = ...;
ImageView imageView = ...;
Observable.create(new OnSubscribe<Drawable>() {
@Override
public void call(Subscriber<? super Drawable> subscriber) {
Drawable drawable = getTheme().getDrawable(drawableRes));
subscriber.onNext(drawable);
subscriber.onCompleted();
}
}).subscribe(new Observer<Drawable>() {
@Override
public void onNext(Drawable drawable) {
imageView.setImageDrawable(drawable);
}
@Override
public void onCompleted() {
}
@Override
public void onError(Throwable e) {
Toast.makeText(activity, "Error!", Toast.LENGTH_SHORT).show();
}
});
複製代碼
正如上面兩個例子這樣,建立出 Observable
和 Subscriber
,再用 subscribe()
將它們串起來,一次 RxJava
的基本使用就完成了,很是簡單!
然而。
在 RxJava
的默認規則中,事件的發出和消費都是在同一個線程的。也就是說,若是隻用上面的方法,實現出來的只是一個同步的觀察者模式。觀察者模式自己的目的就是『後臺處理,前臺回調』的異步機制,所以異步對於 RxJava
是相當重要的。而要實現異步,則須要用到 RxJava
的另外一個核心的概念 Scheduler
,後續將給出詳細介紹。
歡迎關注技術公衆號: 零壹技術棧
本賬號將持續分享後端技術乾貨,包括虛擬機基礎,多線程編程,高性能框架,異步、緩存和消息中間件,分佈式和微服務,架構學習和進階等學習資料和文章。