Subject能夠當作是一個橋樑或者代理,在某些ReactiveX實現中(如RxJava),它同時充當了Observer和Observable的角色。由於它是一個Observer,它能夠訂閱一個或多個Observable;又由於它是一個Observable,它能夠轉發它收到(Observe)的數據,也能夠發射新的數據。html
因爲一個Subject訂閱一個Observable,它能夠觸發這個Observable開始發射數據(若是那個Observable是"冷"的--就是說,它等待有訂閱纔開始發射數據)。所以有這樣的效果,Subject能夠把原來那個"冷"的Observable變成"熱"的。java
針對不一樣的場景一共有四種類型的Subject。他們並非在全部的實現中所有都存在,並且一些實現使用其它的命名約定(例如,在RxScala中Subject被稱做PublishSubject)。react
一個AsyncSubject只在原始Observable完成後,發射來自原始Observable的最後一個值。(若是原始Observable沒有發射任何值,AsyncObject也不發射任何值)它會把這最後一個值發射給任何後續的觀察者。 緩存
然而,若是原始的Observable由於發生了錯誤而終止,AsyncSubject將不會發射任何數據,只是簡單的向前傳遞這個錯誤通知。 async
當觀察者訂閱BehaviorSubject時,它開始發射原始Observable最近發射的數據(若是此時尚未收到任何數據,它會發射一個默認值),而後繼續發射其它任何來自原始Observable的數據。 spa
然而,若是原始的Observable由於發生了一個錯誤而終止,BehaviorSubject將不會發射任何數據,只是簡單的向前傳遞這個錯誤通知。 線程
PublishSubject只會把在訂閱發生的時間點以後來自原始Observable的數據發射給觀察者。須要注意的是,PublishSubject可能會一建立完成就馬上開始發射數據(除非你能夠阻止它發生),所以這裏有一個風險:在Subject被建立後到有觀察者訂閱它以前這個時間段內,一個或多個數據可能會丟失。若是要確保來自原始Observable的全部數據都被分發,你須要這樣作:或者使用Create建立那個Observable以便手動給它引入"冷"Observable的行爲(當全部觀察者都已經訂閱時纔開始發射數據),或者改用ReplaySubject。 代理
若是原始的Observable由於發生了一個錯誤而終止,PublishSubject將不會發射任何數據,只是簡單的向前傳遞這個錯誤通知。 code
ReplaySubject會發射全部來自原始Observable的數據給觀察者,不管它們是什麼時候訂閱的。也有其它版本的ReplaySubject,在重放緩存增加到必定大小的時候或過了一段時間後會丟棄舊的數據(原始Observable發射的)。server
若是你把ReplaySubject看成一個觀察者使用,注意不要從多個線程中調用它的onNext方法(包括其它的on系列方法),這可能致使同時(非順序)調用,這會違反Observable協議,給Subject的結果增長了不肯定性。
假設你有一個Subject,你想把它傳遞給其它的代理或者暴露它的Subscriber接口,你能夠調用它的asObservable方法,這個方法返回一個Observable。具體使用方法能夠參考Javadoc文檔。
若是你把 Subject
看成一個 Subscriber
使用,注意不要從多個線程中調用它的onNext方法(包括其它的on系列方法),這可能致使同時(非順序)調用,這會違反Observable協議,給Subject的結果增長了不肯定性。
要避免此類問題,你能夠將 Subject
轉換爲一個 SerializedSubject
,相似於這樣:
mySafeSubject = new SerializedSubject( myUnsafeSubject );