在面向對象的架構中,開發者致力於建立一組解耦的實體。這樣的話,實體就能夠在不用妨礙整個系統的狀況下能夠被測試、複用和維護。設計這種系統就帶來一個棘手的負面影響:維護相關對象之間的統一。java
在Smalltalk MVC架構中,建立模式的第一個例子就是用來解決這個問題的。用戶界面框架提供一種途徑使UI元素與包含數據的實體對象相分離,而且同時,它提供一種靈活的方法來保持它們之間的同步。編程
在這本暢銷的四人組編寫的《設計模式——可複用面向對象軟件的基礎》一書中,觀察者模式是最有名的設計模式之一。它是一種行爲模式並提供一種以一對多的依賴來綁定對象的方法:即當一個對象發生變化時,依賴它的全部對象都會被通知而且會自動更新。設計模式
在本章中,咱們將會對觀察者模式有一個概述,它是如何實現的以及如何用RxJava來擴展,Observable是什麼,以及Observables如何與Iterables相關聯。數組
觀察者模式很適合下面這些場景中的任何一個:緩存
在RxJava的世界裏,咱們有四種角色:架構
Observables和Subjects是兩個「生產」實體,Observers和Subscribers是兩個「消費」實體。併發
當咱們異步執行一些複雜的事情,Java提供了傳統的類,例如Thread、Future、FutureTask、CompletableFuture來處理這些問題。當複雜度提高,這些方案就會變得麻煩和難以維護。最糟糕的是,它們都不支持鏈式調用。app
RxJava Observables被設計用來解決這些問題。它們靈活,且易於使用,也能夠鏈式調用,而且能夠做用於單個結果程序上,更有甚者,也能夠做用於序列上。不管什麼時候你想發射單個標量值,或者一連串值,甚至是無窮個數值流,你均可以使用Observable。框架
Observable的生命週期包含了三種可能的易於與Iterable生命週期事件相比較的事件,下表展現瞭如何將Observable async/push 與 Iterable sync/pull相關聯起來。異步
Event | Iterable(pull) | Observable(push) |
---|---|---|
檢索數據 | T next() |
onNext(T) |
發現錯誤 | throws Exception |
onError(Throwable) |
完成 | !hasNext() |
onCompleted() |
使用Iterable時,消費者從生產者那裏以同步的方式獲得值,在這些值獲得以前線程處於阻塞狀態。相反,使用Observable時,生產者以異步的方式把值推給觀察者,不管什麼時候,這些值都是可用的。這種方法之因此更靈活是由於即使值是同步或異步方式到達,消費者在這兩種場景均可以根據本身的須要來處理。
爲了更好地複用Iterable接口,RxJava Observable類擴展了GOF觀察者模式的語義。引入了兩個新的接口:
從發射物的角度來看,有兩種不一樣的Observables:熱的和冷的。一個"熱"的Observable典型的只要一建立完就開始發射數據,所以全部後續訂閱它的觀察者可能從序列中間的某個位置開始接受數據(有一些數據錯過了)。一個"冷"的Observable會一直等待,直到有觀察者訂閱它纔開始發射數據,所以這個觀察者能夠確保會收到整個數據序列。
在接下來的小節中將討論Observables提供的兩種建立Observable的方法。
create()方法使開發者有能力從頭開始建立一個Observable。它須要一個OnSubscribe對象,這個對象繼承Action1,當觀察者訂閱咱們的Observable時,它做爲一個參數傳入並執行call()函數。
Observable.create(new Observable.OnSubscribe<Object>(){
@Override
public void call(Subscriber<? super Object> subscriber) {
}
});
複製代碼
Observable經過使用subscriber變量並根據條件調用它的方法來和觀察者通訊。讓咱們看一個「現實世界」的例子:
Observable<Integer> observableString = Observable.create(new Observable.OnSubscribe<Integer>() {
@Override
public void call(Subscriber<? super Integer> observer) {
for (int i = 0; i < 5; i++) {
observer.onNext(i);
}
observer.onCompleted();
}
});
Subscription subscriptionPrint = observableString.subscribe(new Observer<Integer>() {
@Override
public void onCompleted() {
System.out.println("Observable completed");
}
@Override
public void onError(Throwable e) {
System.out.println("Oh,no! Something wrong happened!");
}
@Override
public void onNext(Integer item) {
System.out.println("Item is " + item);
}
});
複製代碼
例子故意寫的簡單,是由於即使是你第一次見到RxJava的操做,我想讓你明白接下來要發生什麼。
咱們建立一個新的Observable<Integer>
,它執行了5個元素的for循環,一個接一個的發射他們,最後完成。
另外一方面,咱們訂閱了Observable,返回一個Subscription 。一旦咱們訂閱了,咱們就開始接受整數,並一個接一個的打印出它們。咱們並不知道要接受多少整數。事實上,咱們也無需知道是由於咱們爲每種場景都提供對應的處理操做:
在上一個例子中,咱們建立了一個整數序列並一個一個的發射它們。假如咱們已經有一個列表呢?咱們是否是能夠不用for循環而也能夠一個接一個的發射它們呢?
在下面的例子代碼中,咱們從一個已有的列表中建立一個Observable序列:
List<Integer> items = new ArrayList<Integer>();
items.add(1);
items.add(10);
items.add(100);
items.add(200);
Observable<Integer> observableString = Observable.from(items);
Subscription subscriptionPrint = observableString.subscribe(new Observer<Integer>() {
@Override
public void onCompleted() {
System.out.println("Observable completed");
}
@Override
public void onError(Throwable e) {
System.out.println("Oh,no! Something wrong happened!");
}
@Override
public void onNext(Integer item) {
System.out.println("Item is " + item);
}
});
複製代碼
輸出的結果和上面的例子絕對是同樣的。
from()
建立符能夠從一個列表/數組來建立Observable,並一個接一個的從列表/數組中發射出來每個對象,或者也能夠從Java Future
類來建立Observable,併發射Future對象的.get()
方法返回的結果值。傳入Future
做爲參數時,咱們能夠指定一個超時的值。Observable將等待來自Future
的結果;若是在超時以前仍然沒有結果返回,Observable將會觸發onError()
方法通知觀察者有錯誤發生了。
若是咱們已經有了一個傳統的Java函數,咱們想把它轉變爲一個Observable又改怎麼辦呢?咱們能夠用create()
方法,正如咱們先前看到的,或者咱們也能夠像下面那樣使用以此來省去許多模板代碼:
Observable<String> observableString = Observable.just(helloWorld());
Subscription subscriptionPrint = observableString.subscribe(new Observer<String>() {
@Override
public void onCompleted() {
System.out.println("Observable completed");
}
@Override
public void onError(Throwable e) {
System.out.println("Oh,no! Something wrong happened!");
}
@Override
public void onNext(String message) {
System.out.println(message);
}
});
複製代碼
helloWorld()
方法比較簡單,像這樣:
private String helloWorld(){
return "Hello World";
}
複製代碼
無論怎樣,它能夠是咱們想要的任何函數。在剛纔的例子中,咱們一旦建立了Observable,just()
執行函數,當咱們訂閱Observable時,它就會發射出返回的值。
just()
方法能夠傳入一到九個參數,它們會按照傳入的參數的順序來發射它們。just()
方法也能夠接受列表或數組,就像from()
方法,可是它不會迭代列表發射每一個值,它將會發射整個列表。一般,當咱們想發射一組已經定義好的值時會用到它。可是若是咱們的函數不是時變性的,咱們能夠用just來建立一個更有組織性和可測性的代碼庫。
最後注意just()
建立符,它發射出值後,Observable正常結束,在上面那個例子中,咱們會在控制檯打印出兩條信息:「Hello World」和「Observable completed」。
當咱們須要一個Observable毫無理由的再也不發射數據正常結束時,咱們可使用empty()
。咱們可使用never()
建立一個不發射數據而且也永遠不會結束的Observable。咱們也可使用throw()
建立一個不發射數據而且以錯誤結束的Observable。
subject
是一個神奇的對象,它能夠是一個Observable同時也能夠是一個Observer:它做爲鏈接這兩個世界的一座橋樑。一個Subject能夠訂閱一個Observable,就像一個觀察者,而且它能夠發射新的數據,或者傳遞它接受到的數據,就像一個Observable。很明顯,做爲一個Observable,觀察者們或者其它Subject均可以訂閱它。
一旦Subject訂閱了Observable,它將會觸發Observable開始發射。若是原始的Observable是「冷」的,這將會對訂閱一個「熱」的Observable變量產生影響。
RxJava提供四種不一樣的Subject:
Publish是Subject的一個基礎子類。讓咱們看看用PublishSubject實現傳統的Observable Hello World
:
PublishSubject<String> stringPublishSubject = PublishSubject.create();
Subscription subscriptionPrint = stringPublishSubject.subscribe(new Observer<String>() {
@Override
public void onCompleted() {
System.out.println("Observable completed");
}
@Override
public void onError(Throwable e) {
System.out.println("Oh,no!Something wrong happened!");
}
@Override
public void onNext(String message) {
System.out.println(message);
}
});
stringPublishSubject.onNext("Hello World");
複製代碼
在剛纔的例子中,咱們建立了一個PublishSubject
,用create()
方法發射一個String
值,而後咱們訂閱了PublishSubject。此時,沒有數據要發送,所以咱們的觀察者只能等待,沒有阻塞線程,也沒有消耗資源。就在這隨時準備從subject接收值,若是subject沒有發射值那麼咱們的觀察者就會一直在等待。再次聲明的是,無需擔憂:觀察者知道在每一個場景中該作什麼,咱們不用擔憂何時是由於它是響應式的:系統會響應。咱們並不關心它何時響應。咱們只關心它響應時該作什麼。
最後一行代碼展現了手動發射字符串「Hello World」,它觸發了觀察者的onNext()
方法,讓咱們在控制檯打印出「Hello World」信息。
讓咱們看一個更復雜的例子。話說咱們有一個private
聲明的Observable,外部不能訪問。Observable在它生命週期內發射值,咱們不用關心這些值,咱們只關心他們的結束。
首先,咱們建立一個新的PublishSubject來響應它的onNext()
方法,而且外部也能夠訪問它。
final PublishSubject<Boolean> subject = PublishSubject.create();
subject.subscribe(new Observer<Boolean>() {
@Override
public void onCompleted() {
}
@Override
public void onError(Throwable e) {
}
@Override
public void onNext(Boolean aBoolean) {
System.out.println("Observable Completed");
}
});
複製代碼
而後,咱們建立「私有」的Observable,只有subject才能夠訪問的到。
Observable.create(new Observable.OnSubscribe<Integer>() {
@Override
public void call(Subscriber<? super Integer> subscriber) {
for (int i = 0; i < 5; i++) {
subscriber.onNext(i);
}
subscriber.onCompleted();
}
}).doOnCompleted(new Action0() {
@Override
public void call() {
subject.onNext(true);
}
}).subscribe();
複製代碼
Observable.create()
方法包含了咱們熟悉的for循環,發射數字。doOnCompleted()
方法指定當Observable結束時要作什麼事情:在subject上發射true。最後,咱們訂閱了Observable。很明顯,空的subscribe()
調用僅僅是爲了開啓Observable,而不用管已發出的任何值,也不用管完成事件或者錯誤事件。爲了這個例子咱們須要它像這樣。
在這個例子中,咱們建立了一個能夠鏈接Observables而且同時可被觀測的實體。當咱們想爲公共資源建立獨立、抽象或更易觀測的點時,這是極其有用的。
簡單的說,BehaviorSubject會首先向他的訂閱者發送截至訂閱前最新的一個數據對象(或初始值),而後正常發送訂閱後的數據流。
BehaviorSubject<Integer> behaviorSubject = BehaviorSubject.create(1);
複製代碼
在這個短例子中,咱們建立了一個能發射整形(Integer)的BehaviorSubject。因爲每當Observes訂閱它時就會發射最新的數據,因此它須要一個初始值。
ReplaySubject會緩存它所訂閱的全部數據,向任意一個訂閱它的觀察者重發:
ReplaySubject<Integer> replaySubject = ReplaySubject.create();
複製代碼
當Observable完成時AsyncSubject只會發佈最後一個數據給已經訂閱的每個觀察者。
AsyncSubject<Integer> asyncSubject = AsyncSubject.create();
複製代碼
本章中,咱們瞭解到了什麼是觀察者模式,爲何Observables在今天的編程場景中如此重要,以及如何建立Observables和subjects。
下一章中,咱們將建立第一個基於RxJava的Android應用程序,學習如何檢索數據來填充listview,以及探索如何建立一個基於RxJava的響應式UI。