哇哦, 咱們又多了一天時間,因此讓咱們來學點新東西好讓這一天過得很棒吧 🙂。前端
各位好, 但願你如今已經作的很好了。 這是咱們關於 RxJava2 Android 系列文章的第八話 [ 第一話,第二話,第三話,第四話,第五話,第六話,第七話,第八話 ] 。在這一篇文章中將討論 Rx 中的 Subjects(主題)。java
研究動機 : 本文研究動機和系列文章 第一話 中分享給你們的相同。react
引言 : 當我開始與 Rx 的這段旅程時, Subjects 就是我最困惑的一個部分。在大多數我開始去讀任一博客的時候,我老是獲得這樣一個定義: 「 Subjects 就像一個 Observable 和 Observer 同時存在同樣。」 由於我不是一個聰明的人,因此這一點一直讓我很困惑,所以在用 Rx 作了不少練習以後,有一天我獲得了關於 Subjects 的概念,我驚訝於這個概念的強大,因此在這篇文章中我將和你一塊兒討論這個概念以及這個概念有多強大,或許在一些地方我不正確的使用了這個概念,可是此次讓你學到這個概念,在本文最後,你將會和 Subjects 成爲很好的朋友。🙂android
若是你和我同樣,認爲 Subjects 就像是 Observer 和 Observable 的組合,那麼請儘可能忘掉這個概念。如今我將要修改一下 Observable 和 Observer 的概念. 對於 Observable 我會建議你閱讀 Rx Observable 和 開發者 ( 我 ) 之間的對話 [ Android RxJava2 ] (這什麼鬼系列 )第五話 而且 Observer 我會建議你閱讀 繼續 Rx Observable 和 開發者 ( 我 ) 之間的對話 (Observable 求婚 Observer) [ Android RxJava2 ](這什麼鬼系列)第七話 。而後你就能夠很輕易的理解本篇文章,如今我會在下面和你分享一下 Obsevable 和 Observer 的一些 API。ios
這是 Observable 的代碼,如圖所示代碼總行數爲 3000 多行。 正如咱們所知,Observable 一般使用其不一樣的方法將數據轉換爲流,下面我給出一個簡單的例子。git
public static void main(String[] args) {
List<String> list = Arrays.asList("Hafiz", "Waleed", "Hussain");
Observable<String> stringObservable = Observable.fromIterable(list);
}
複製代碼
接下來咱們須要 Observer 從 Observable 中獲得數據。如今我將第一次向你展現 Obsever 的一些 API。github
就像咱們看到的 Observer 很是簡單,只有 4 個方法,那如今是時候在示例中使用一下這個 Observer 了。後端
/**
* Created by waleed on 09/07/2017.
*/
public class Subjects {
public static void main(String[] args) {
List<String> list = Arrays.asList("Hafiz", "Waleed", "Hussain");
Observable<String> stringObservable = Observable.fromIterable(list);
Observer<String> stringObserver = new Observer<String>() {
@Override
public void onSubscribe(Disposable disposable) {
System.out.println("onSubscribe");
}
@Override
public void onNext(String s) {
System.out.println(s);
}
@Override
public void onError(Throwable throwable) {
System.out.println(throwable.getMessage());
}
@Override
public void onComplete() {
System.out.println("onComplete");
}
};
stringObservable.subscribe(stringObserver);
}
}
複製代碼
它的輸出很簡單. 如今咱們成功修訂了 Observable 和 Observer API’s , 當作訂閱時,Observable 基本是調用咱們的 Observer API’s。 任什麼時候候 Observable 想要提供數據,老是要調用 Observaer 的 onNext ( data ) 方法。 任什麼時候候發生錯誤 Observable 會調用 Observer 的 onError(e) 方法。
任什麼時候候流操做完成 Observable 會調用 Observer 的 onComplete() 方法. 這是這兩個 API 之間的一個簡單關係.bash
如今我將要開始咱們今天的主題,若是再次對 Observable 和 Observer 有任何疑惑,請嘗試閱讀我上文中提到的文章,或者在評論中提問。 我認爲 Rx 中關於 Subjects 的定義放到最後討論,如今我將向你解釋一個更簡單的例子,它將使咱們能夠更直接的掌握 Rx 中 Subjects 的概念。ide
Observable<String> stringObservable = Observable.create(observableEmitter -> {
observableEmitter.onNext("Event");
});
複製代碼
這是能夠發射一個字符串的 Observable。
Consumer<String> consumer = new Consumer<String>() {
@Override
public void accept(String s) {
System.out.println(s);
}
};
複製代碼
這是一個將會訂閱 Observable 的消費者。
while (true) {
Thread.sleep(1000);
stringObservable.subscribe(consumer);
}
複製代碼
這段代碼會在每一秒後產生一個事件。 爲了方便閱讀我把完整的代碼代碼貼出。
public class Subjects {
public static void main(String[] args) throws InterruptedException {
Observable<String> stringObservable = Observable.create(observableEmitter -> {
observableEmitter.onNext("Event");
});
Consumer<String> consumer = new Consumer<String>() {
@Override
public void accept(String s) {
System.out.println(s);
}
};
while (true) {
Thread.sleep(1000);
stringObservable.subscribe(consumer);
}
}
}
複製代碼
Output: Event Event Event Event
這是一個簡單的例子,我認爲沒有必要過多的解釋,如今有趣的部分是,我會用不一樣的技術來寫出會有同樣輸出的新的例子。 在深刻以前,嘗試閱讀下面的代碼。
class ObservableObserver extends Observable<String> implements Observer<String>.
複製代碼
這很簡單,我建立了一個名爲 ObservableObserver 的新類, 它繼承自 Observable 而且實現了 Observer 接口。 因此這意味這它能夠做爲 Observable 增強版 和 Observer. 我不認爲這會有任何疑問,因此咱們已經知道 Observable 老是會生成流,因此這個類也有這個能力,由於它繼承自 Observable。而後咱們可知 Observer 能夠經過 訂閱 Observable 來觀察 Observable 中的任何流,那麼咱們的新類也能夠完成這些工做,由於它實現了 Observer 接口,BOOM。 很簡單。 如今我要給你看所有代碼,代碼只是爲了解釋這個概念並不意味着它是一個 成熟 的代碼。
class ObservableObserver extends Observable<String> implements Observer<String> {
private Observer<? super String> observer;
@Override
protected void subscribeActual(Observer<? super String> observer) { // Observable abstract method
this.observer = observer;
}
@Override
public void onSubscribe(Disposable disposable) { //Observer API
if (observer != null) {
observer.onSubscribe(disposable);
}
}
@Override
public void onNext(String s) {//Observer API
if (observer != null) {
observer.onNext(s);
}
}
@Override
public void onError(Throwable throwable) {//Observer API
if (observer != null) {
observer.onError(throwable);
}
}
@Override
public void onComplete() {//Observer API
if (observer != null) {
observer.onComplete();
}
}
public Observable<String> getObservable() {
return this;
}
}
複製代碼
又一個很簡單的類,咱們已經使用過上面的全部方法了,只是在這裏有一個區別,就是咱們在同一個類中使用了 Observable 和 Observer 的相關方法。
public static void main(String[] args) throws InterruptedException {
ObservableObserver observableObserver = new ObservableObserver();
observableObserver.getObservable().subscribe(System.out::println);
while (true) {
Thread.sleep(1000);
observableObserver.onNext("Event");
}
}
複製代碼
Output: Event Event Event
在上面的代碼中有兩行很重要,我將要給你們解釋一下: **observableObserver.getObservable(): **這裏,我從 ObservableObserver 類獲取 Observable 並訂閱 Observer . **observableObserver.onNext(「Event」): **這裏,當事件發生時調用 Observer API 方法. 由於做爲一個自我閉環的類,因此我可以從這個既是 Observabel 又是 Observer 的類中得到好處。如今有一個驚喜,你已經掌握了 Subjects 的概念,若是你不信的話來看下面圖中的代碼:
這是 RxJava2 Subject 類的代碼,如今你能夠明白爲何人們會說 Subjiects 既是 Observable 又是 Observer,由於它使用了兩個的 API 方法。 如今的 RxJava 中可使用不一樣類型的 Subjects, 這是咱們下面要討論的內容。
在 RxJava 中你能夠獲取到 4 種類型的 Subjiects。 1. Publish Subject 2. Behaviour Subject 3. Replay Subject 4. Async Subject
public static void main(String[] args) throws InterruptedException {
Subject<String> subject = PublishSubject.create();
// Subject<String> subject = BehaviorSubject.create();
// Subject<String> subject = ReplaySubject.create();
// Subject<String> subject = AsyncSubject.create(); I will explain in the end
subject.subscribe(System.out::println);
int eventCounter = 0;
while (true) {
Thread.sleep(100);
subject.onNext("Event "+ (++eventCounter));
}
}
複製代碼
Output: Event 1 Event 2 Event 3 Event 4 Event 5 Event 6 Event 7 Event 8 Event 9 Event 10
通常來講若是你運行上面的代碼,你將會看到輸出中除了 AsyncSubject 的其餘 Subjects 輸出都是相同的,如今是時候來區別一下這些 Subjects 的類型了。 **1. Publish Subject: **在該類型 Subject 中,咱們能夠獲取實時的數據,例如個人一個 Publish Subject 是獲取傳感器數據,那麼如今我訂閱了該 Subject, 我將之獲取最新的值,示例以下:
public static void main(String[] args) throws InterruptedException {
Subject<String> subject = PublishSubject.create();
int eventCounter = 0;
while (true) {
Thread.sleep(100);
subject.onNext("Event " + (++eventCounter));
if (eventCounter == 10)
subject.subscribe(System.out::println);
}
}
複製代碼
Output: Event 11 Event 12 Event 13 Event 14 Event 15 Event 16
因此,在這裏 publish subject 發佈數據是從 0 開始,而在訂閱的時候已經發布到了 10,正如你所見,輸出的數據爲 Event 11。
**2. Behaviour Subject: **在這種類型的 Subjects 中,咱們將獲取這個 Subject 最後發佈出的值和新的將要發出的值,爲了簡單起見,請閱讀下面的代碼。
public static void main(String[] args) throws InterruptedException {
Subject<String> subject = BehaviorSubject.create();
int eventCounter = 0;
while (true) {
Thread.sleep(100);
subject.onNext("Event " + (++eventCounter));
if (eventCounter == 10)
subject.subscribe(System.out::println);
}
}
複製代碼
Output: Event 10 Event 11 Event 12 Event 13 Event 14 Event 15
正如輸出中你所看到的那樣,我也得到了 「 Event 10」 這個值,而且這個值在我訂閱以前就已經發布了。這意味着若是我想要訂閱以前的最後一個值的話,我可使用這個類型的 Subject。
**3. Replay Subject: **在這個類型的 Subject 中,當我訂閱時能夠沒有顧及的得到全部發布的數據值,簡單起見仍是直接上代碼吧。
public static void main(String[] args) throws InterruptedException {
Subject<String> subject = ReplaySubject.create();
int eventCounter = 0;
while (true) {
Thread.sleep(100);
subject.onNext("Event " + (++eventCounter));
if (eventCounter == 10)
subject.subscribe(System.out::println);
}
}
複製代碼
Output: Event 1 Event 2 Event 3 Event 4 Event 5 Event 6 Event 7 Event 8 Event 9 Event 10 Event 11 Event 12
如今我再次在 event 10 的時候訂閱,可是我能夠得到全部的歷史數據,因此這很簡單嘛。
**4. Async Subject: **在這個類型的 Subject 中,咱們將得到最後發佈的數據值,這個數據值是 Subject 在完成和終止前發射的,爲了簡單起見,依舊是直接上代碼吧。
public static void main(String[] args) throws InterruptedException {
Subject<String> subject = AsyncSubject.create();
subject.subscribe(System.out::println);
int eventCounter = 0;
while (true) {
Thread.sleep(100);
subject.onNext("Event " + (++eventCounter));
if (eventCounter == 10) {
subject.onComplete();
break;
}
}
}
複製代碼
Output: Event 10 Process finished with exit code 0
在這裏,你能夠看到在值爲 10 的時候以完成標識結束了 Subject 而且在程序完成後和程序退出以前,我獲得了輸出的 Event 10 ,因此這意味着它的意思是任什麼時候候我想要經過 Subject 得到最後一次發佈的的數據值可使用 Async Subject。
再次重複一下: Publish Subject: 我不關心以前的發佈歷史,我只關心新的或者最新的值。 Behaviour Subject: 我關心該 Subject 發佈的最後一個值和新值。 Replay Subject: 我關心全部發布了新值的歷史數據。 Async Subject: 我只關心在完成或終止以前由主題發出的最後一個值。
總結: 你好呀朋友,但願你對這個知識點已經很清晰了,另外盡你最大的努力去動手實踐這些概念,如今,我想要和各位說再見了,還有祝你們有個愉快的週末。 🙂
掘金翻譯計劃 是一個翻譯優質互聯網技術文章的社區,文章來源爲 掘金 上的英文分享文章。內容覆蓋 Android、iOS、前端、後端、區塊鏈、產品、設計、人工智能等領域,想要查看更多優質譯文請持續關注 掘金翻譯計劃、官方微博、知乎專欄。