原文連接: medium.com/@benlesh/on… 本文爲 RxJS 中文社區 翻譯文章,如需轉載,請註明出處,謝謝合做! 若是你也想和咱們一塊兒,翻譯更多優質的 RxJS 文章以奉獻給你們,請點擊【這裏】javascript
RxJS 中的 Subjects 常常被誤解。由於它們容許你命令式地向 Observable 流中推送值,當人們不太清楚如何將某個東西變成 Observable 時,他們傾向於濫用 Subjects 。此模式看起來有點像這樣...java
// 當人們發現不太清楚如何作時,一般會首先使用 Subjects
// (不要這樣作)
const subject = new Subject();
button.addEventListener('click', () => subject.next('click'));
subject.subscribe(x => console.log(x));
複製代碼
雖然這對於 RxJS 新手 (對於這個階段來講太正常不過了) 頗有幫助,但這不是以 「Rx 的方式」在處理問題。理想的是在 Observable 中包裝事件註冊,既能夠監聽事件,又能夠取消事件監聽。看起來像這樣:git
// 這樣好些了,但請使用 Observable.fromEvent(button, 'click') 來替代
const clicks = new Observable(observer => {
const handler = (e) => observer.next(e);
button.addEventListener('click', handler);
return () => button.removeEventListener('click', handler);
});
複製代碼
爲何展現這個跟 Subjects 沒半點關係的示例?好吧,一點是它展現了爲何不老是須要使用 Subject,另一點這有個隱藏的 subject... (某種程度上能夠說是 subject )。這裏要注意的一點是,Observable 是經過 addEventListener 來包裝按鈕的處理函數的註冊,而 addEventListener 自己就是一個 subject 。…至少根據 「Gang Of Four」 的觀察者模式來講是這樣的。github
你可能知道,RxJS 主要是關於 Observables 和 Observers 的,但它也是 Subjects 相關的。然而在 Gof 的 設計模式中是找不到 observables 的,Subjects 和 Observers 是 觀察者模式中的根本。設計模式
模式自己很簡單。Observers 是具備通知方法的類,Subject 也是類,它具備向內部觀察者列表添加或刪除觀察者的方法和通知觀察者列表的方法。promise
RxJS 中的 Subjects 並無太差區別。當使用 observer 對 Rx Subject 調用 subscribe
時,Subject 會將該 observer 添加到內部的觀察者列表中。一樣的,若是使用一到三個函數來調用 subscribe
,Subject 會將它們包裝成一個 observer,而後添加到觀察者列表中。當調用 Subject 的 next(value)
時,它會遍歷觀察者列表並將 value
傳遞給 next
方法。對於 error
和 complete
也是一樣的。要想從 subject 的觀察者列表中移除 observer,只需簡單調用 subscription 的 unsubscribe
方法便可,subscription 是將 observer 添加到觀察者列表中時返回的。app
const subject = new Subject();
// 將 observer1 添加到觀察者列表
const sub1 = subject.subscribe(observer1);
// 將 observer2 添加到觀察者列表
const sub2 = subject.subscribe(observer2);
// 使用 "hi there" 來通知列表中的全部觀察者
subject.next('hi there');
// 將 observer1 從觀察者列表中移除
sub1.unsubscribe();
複製代碼
實際上,RxJS 中的 Subjects 不一樣於 GoF 觀察者模式中的 Subjects,但它們的 API 是 Observable 的鴨子類型。實際上,在 RxJS 中 Subjects 更甚,它們繼承自 Observable 。優勢是全部 Subjects 都具備與 Observable 相同的操做符和方法。異步
大概 Subject 和 Observable 之間一個很重要的區別就是 Subject 是有狀態的,它維護觀察者列表。另外一方面,Observable 真的只是一個函數,它創建了觀察自己。函數
雖然 Subjects 是 Observables,但 Subjects 還實現了 Observer 接口。也就是說,它們擁有 next
、error
和 complete
方法。這些方法用來通知 subject 內部觀察者列表中的 observers 。這意味着 subject 能夠用做訂閱任何 observable 的 observer 。oop
// 爲了使兩個觀察者 observer1 和 observer2 「共享」 tick$,
// 咱們能夠經過 Subject 來傳輸全部通知,像這樣
const tick$ = Observable.interval(1000);
const subject = new Subject();
subject.subscribe(observer1);
subject.subscribe(observer2);
tick$.subscribe(subject);
複製代碼
上面的示例是將 observable tick$
「多播」 給兩個觀察者: observer1
和 observer2
。這其實就是 RxJS 中大多數多播操做符內部所作的事情。例如 publish
、publishReplay
、multicast
、share
,等等。真的,這纔是 RxJS 中 Subjects 的主要用法。
在 RxJS 中,Subjects 不能重用。也就是說,當一個 Subject 完成或報錯時,便不可再使用了。若是你嘗試在已關閉的 Subject (調用過 complete
或 error
方法)上調用 next
,它會默認忽略通知。若是想 Subject 在完成後調用 next
時進行顯示地報錯,你能夠在 subject 實例上直接調用 unsubscribe
。
// Subject 之死
const subject = new Subject();
subject.subscribe(x => console.log(x));
subject.next(1); // 1
subject.next(2); // 2
subject.complete();
subject.next(3); // 悄悄地忽略
subject.unsubscribe();
subject.next(4); // Unhandled ObjectUnsubscribedError
複製代碼
可是在 RxJS 的當前版本中,會帶來了一些使人困惑的痛點。由於 Rx observables 不會「捕獲」錯誤,咱們會遭遇一些奇怪的行爲。我曾經嘲笑過 Promises 實現了錯誤「捕獲」,但在多播場景中它或許是正確的。個人意思是當我說 Rx observable 不「捕獲」錯誤時,是表示當錯誤滲透到觀察者鏈末端並的末端時,若是錯誤未被處理,它會被從新拋出。
// 演示缺乏錯誤處理時會進行從新拋出
const badObservable = Observable.throw(new Error('haha'));
try {
badObservable.subscribe({
next: x => console.log(x),
error: null,
complete: () => console.log('done')
});
} catch (err) {
console.error(err); // 輸出自定義錯誤: "haha"
}
複製代碼
如今咱們來想一想,當你循環觀察者列表並通知它們時會發生什麼(正如 subject 所作的)。
for (let observer of observers) {
observer.next('notify'); // 若是在這裏調用 throw 會發生什麼?
}
// 提示: 會報錯會打破循環
// 注意: 好吧,這不只僅是個提示
複製代碼
假設一些操做符是同步處理的(map
、filter
、scan
等等),若是其中一個或任何其它同步操做符報錯了,你會在多播(使用 Subject 來循環觀察者列表並通知它們)的下游獲得一些詭異的行爲:
// 會發生奇怪的行爲
const source$ = Observable.interval(1000).share();
const mapped$ = source$.map(x => {
if (x === 1) {
throw new Error('oops');
}
return x;
});
source$.subscribe(x => console.log('A', x));
mapped$.subscribe(x => console.log('B', x));
source$.subscribe(x => console.log('C', x));
// "A" 0
// "B" 0
// "C" 0
// "A" 1
// Uncaught Error: "oops"
複製代碼
在上面的示例中,大多數用戶會指望 A 和 C 能繼續通知。輸出 B 的 observable 死了是能夠理解的,它報錯了,但其餘流和源流也死了使人想當困惑。任意的第三方均可以殺掉共享的 observable 流以及未知數量的兄弟流,不該該是這樣的。這是一個脆弱的抽象,咱們須要在 RxJS 接下來的版本中修復它。
臨時解決上述場景中的問題很簡單,感謝調度器( schedulers )。你能夠在多播後使用 observeOn
,這樣就能夠解決此問題,由於錯誤再也不是同步拋出的。
const source$ = Observable.interval(1000)
.share()
.observeOn(Rx.Scheduler.asap); // 點睛之筆
const mapped$ = source$.map(x => {
if (x === 1) {
throw new Error('oops');
}
return x;
});
source$.subscribe(x => console.log('A', x));
mapped$.subscribe(x => console.log('B', x));
source$.subscribe(x => console.log('C', x));
// "A" 0
// "B" 0
// "C" 0
// "A" 1
// Uncaught Error: "oops"
// "C" 1
// "A" 2
// "C" 2
// "A" 3
// "C" 3
// ... 等等
複製代碼
還有另外一種臨時解決方案,若是你能夠管理它的話,它的性能會更好一些,方法是隻需爲全部的 subscriptions 添加錯誤處理方法。
const source$ = Observable.interval(1000)
.share()
.observeOn(Rx.Scheduler.asap); // 點睛之筆
const mapped$ = source$.map(x => {
if (x === 1) {
throw new Error('oops');
}
return x;
});
source$.subscribe(x => console.log('A', x));
mapped$.subscribe(
x => console.log('B', x),
err => console.log('Error handled: ' + err.message)
);
source$.subscribe(x => console.log('C', x));
// "A" 0
// "B" 0
// "C" 0
// "A" 1
// "Error handled: oops"
// "C" 1
// "A" 2
// "C" 2
// "A" 3
// "C" 3
// ... 等等
複製代碼
TC39 Observable 提議 新的化身,不包括 CancelToken
業務,提議自己就是一整篇文章,它可能會在沒有錯誤處理方法的狀況下經過「捕獲」錯誤來解決這個問題。也就是說,它再也不會到達觀察者鏈末端並從新拋出錯誤。在 RxJS 將來的版本中,我想咱們也會作一樣的事情,由於這纔是正確的。固然,這個問題是公開討論的,但我我的認爲這不會有太多阻力。