通常說到RxJS,都會講他是基於流的響應式的結合觀察者和迭代器模式的一種庫。因此下面會從這幾個關鍵詞來說。javascript
var iterable = [1, 2];
var iterator = iterable[Symbol.iterator]();
iterator.next(); // => { value: "1", done: false}
iterator.next(); // => { value: "2", done: false}
iterator.next(); // => { value: undefined, done: true}複製代碼
var observable = Rx.Observable
// 經過create方法建立一個Observable
// 回調函數會接受observer參數,也就是觀察者角色
.create(function(observer) {
observer.next('hi');
observer.next('world');
setTimeout(() => {
observer.next('這一段是異步操做');
}, 30)
})
// 訂閱這個 observable
// 只有在訂閱以後,纔會在流Observable變化的時候,調用observer提供的方法,並通知他
// 訂閱以後也能夠取消訂閱,調用unsubscribe()便可
console.log('start')
var subscription = observable.subscribe(function(value) {
console.log(value);
})
console.log('end')
setTimeOut(()=> {
subscription.unsubscribe()
}, 5000)
// 程序會依次輸出
'start'
"hi"
'world'
'end'
'這一段是異步操做'複製代碼
Rx.Observable.of(2)
.map(v => v * 2)
.subscribe(v => console.log('output:' + v));
// output:4複製代碼
var Observer = {
next(value) { /* 處理值*/ },
error(error) { /* 處理異常 */ },
complete() { /* 處理已完成態 */ }
};
next(): 接收Observable發出的值 (必傳)
error(): 不一樣於迭代器裏面用try catch,Observer用error方法接收錯誤 (可選)
complete(): 當沒有新的數據發出的時候,觸發操做 (可選)複製代碼
// 建立一個Observable,一秒鐘輸出一個數字,只取三個就結束
var source = Rx.Observable.interval(1000).take(3);
// 定義兩個observer對象
var observerA = {
next: value => console.log('A next: ' + value),
error: error => console.log('A error: ' + error),
complete: () => console.log('A complete!')
}
var observerB = {
next: value => console.log('B next: ' + value),
error: error => console.log('B error: ' + error),
complete: () => console.log('B complete!')
}
// 建立一個subject —— 特殊的Observable
var subject = new Rx.Subject()
// observerA訂閱Subject
subject.subscribe(observerA)
// Subject又以observer的身份訂閱Observable
source.subscribe(subject);
setTimeout(() => {
subject.subscribe(observerB);
}, 1000);
// 輸出:
// "A next: 0"
// "A next: 1"
// "B next: 1"
// "A next: 2"
// "B next: 2"
// "A complete!"
// "B complete!"
A、B兩個observer互不影響,是獨立的複製代碼
var observable = Rx.Observable.create(function (observer) {
observer.next(1);
observer.next(2);
observer.next(3);
observer.complete();
});
console.log('before subscribe');
observable.observeOn(Rx.Scheduler.async) // 原本是同步的,變成了異步
.subscribe({
next: (value) => { console.log(value); },
error: (err) => { console.log('Error: ' + err); },
complete: () => { console.log('complete'); }
});
console.log('after subscribe');
// "before subscribe"
// "after subscribe"
// 1
// 2
// 3
// "complete"複製代碼