今早看民工叔的文章的時候, 發現對Rxjs所知甚少, 因而去官方看了下教程, 整理出一些東西, 寫成此文。
Rxjs聽說會在2017年流行起來, 由於其處理異步邏輯,數據流, 事件很是擅長。 可是其學習曲線相比Promise, EventEmitter陡峭了很多。 並且民工叔也說:"因爲RxJS的抽象程度很高,因此,能夠用很簡短代碼表達很複雜的含義,這對開發人員的要求也會比較高,須要有比較強的概括能力。" 本文就Rx.js的幾個核心概念作出闡述。 儘量以通俗易懂的方式解釋這些概念。要是本文有誤或不完善的地方,歡迎指出。html
先上代碼:前端
let foo = Rx.Observable.create(observer => { console.log('Hello'); observer.next(42); }); foo.subscribe(x => console.log(x)); foo.subscribe(y => console.log(y));
輸出react
"Hello" 42 "Hello" 42
這裏能夠把foo想象成一個函數,這意味着你每次調用foo都會致使傳入Rx.Observable.create裏的回調函數從新執行一次, 調用的方式爲foo.subscribe(callback), 至關於foo()。 接收函數返回值的方式也從var a = foo()改成經過傳入回調函數的方式獲取。第三行的observer.next表示返回一個值, 你能夠調用屢次,每次調用observer.next後, 會先將next裏的值返回給foo.subcribe裏的回調函數, 執行完後再返回。observer.complete, observer.error來控制流程。 具體看代碼:git
var observable = Rx.Observable.create(observer => { try { observer.next(1); console.log('hello'); observer.next(2); observer.next(3); observer.complete(); observer.next(4); } catch (err) { observer.error(err); } }); let = subcription = observable.subscribe(value => { console.log(value) })
運行結果:github
1 hello 2 3
如上的第一個回調函數裏的結構是推薦的結構。 當observable的執行出現異常的時候,經過observer.error將錯誤返回, 然而observable.subscribe的回調函數沒法接收到.由於observer.complete已經調用, 所以observer.next(4)的返回是無效的. Observable不是能夠返回多個值的Promise 雖然得到Promise的值的方式也是經過then函數這種相似的方式, 可是new Promise(callback)裏的callback回調永遠只會執行一次!由於Promise的狀態是不可逆的。數組
可使用其餘方式建立Observable, 看代碼:promise
var clicks = Rx.Observable.fromEvent(document, 'click'); clicks.subscribe(x => console.log(x));
當用戶對document產生一個click行爲的時候, 就會打印事件對象到控制檯上。app
先看代碼:dom
let foo = Rx.Observable.create(observer => { console.log('Hello'); observer.next(42); }); let observer = x => console.log(x); foo.subscribe(observer);
代碼中的第二個變量就是observer. 沒錯, observer就是當Observable"返回"值的時候接受那個值的函數!第一行中的observer其實就是經過foo.subscribe傳入的callback. 只不過稍加封裝了。 怎麼封裝的? 看代碼:異步
let foo = Rx.Observable.create(observer => { try { console.log('Hello'); observer.next(42); observer.complete(); observer.next(10); } catch(e) { observer.error(e) } }); let observer = { next(value) { console.log(value) }, complete() { console.log('completed'), error(err) { console.error(err) } } foo.subscribe(observer);
你看到observer被定義成了一個對象, 其實這纔是完整的observer. 傳入一個callback到observable.subcribe至關於傳入了{ next: callback }
。
Subscription是什麼, 先上代碼:
var observable = Rx.Observable.interval(1000); var subscription = observable.subscribe(x => console.log(x)); setTimeout(() => { subscription.unsubscribe(); }, 3100)
運行結果:
0 1 2
Rx.Observable.interval能夠返回一個可以發射(返回)0, 1, 2, 3..., n數字的Observable, 返回的時間間隔這裏是1000ms。 第二行中的變量就是subscription。 subscription有一個unsubscribe方法, 這個方法可讓subscription訂閱的observable發射的數據被observer忽略掉.通俗點說就是取消訂閱。
unsubscribe存在一個陷阱。 先看代碼:
var foo = Rx.Observable.create((observer) => { var i = 0 setInterval(() => { observer.next(i++) console.log('hello') }, 1000) }) const subcription = foo.subscribe((i) => console.log(i)) subcription.unsubscribe()
運行結果:
hello hello hello ...... hello
unsubscribe只會讓observer忽略掉observable發射的數據,可是setInterval依然會繼續執行。 這看起來彷佛是一個愚蠢的設計。 因此不建議這樣寫。
Subject是一種可以發射數據給多個observer的Observable, 這讓Subject看起來就好像是EventEmitter。 先上代碼:
var subject = new Rx.Subject(); subject.subscribe({ next: (v) => console.log('observerA: ' + v) }); subject.subscribe({ next: (v) => console.log('observerB: ' + v) }); subject.next(1); subject.next(2);
運行結果:
observerA: 1 observerB: 1 observerA: 2 observerB: 2
與Observable不一樣的是, Subject發射數據給多個observer。 其次, 定義subject的時候並無傳入callback, 這是由於subject自帶next, complete, error等方法。從而能夠發射數據給observer。 這和EventEmitter很相似。observer並不知道他subscribe的是Obervable仍是Subject。 對observer來講是透明的。 並且Subject還有各類派生, 好比說:
BehaviorSubject 可以保留最近的數據,使得當有subscribe的時候,立馬發射出去。看代碼:
var subject = new Rx.BehaviorSubject(0); // 0 is the initial value subject.subscribe({ next: (v) => console.log('observerA: ' + v) }); subject.next(1); subject.next(2); subject.subscribe({ next: (v) => console.log('observerB: ' + v) }); subject.next(3);
運行結果:
observerA: 0 observerA: 1 observerA: 2 observerB: 2 observerA: 3 observerB: 3
ReplaySubject 可以保留最近的一些數據, 使得當有subscribe的時候,將這些數據發射出去。看代碼:
var subject = new Rx.ReplaySubject(3); subject.subscribe({ next: (v) => console.log('observerA: ' + v) }); subject.next(1); subject.next(2); subject.next(3); subject.next(4); subject.subscribe({ next: (v) => console.log('observerB: ' + v) }); subject.next(5);
輸出結果:
observerA: 1 observerA: 2 observerA: 3 observerA: 4 observerB: 2 observerB: 3 observerB: 4 observerA: 5 observerB: 5
第一行的聲明表示ReplaySubject最大可以記錄的數據的數量是3。
AsyncSubject 只會發射結束前的一個數據。 看代碼:
var subject = new Rx.AsyncSubject(); subject.subscribe({ next: (v) => console.log('observerA: ' + v) }); subject.next(1); subject.next(2); subject.next(3); subject.next(4); subject.subscribe({ next: (v) => console.log('observerB: ' + v) }); subject.next(5); subject.complete();
輸出結果:
observerA: 5 observerB: 5
既然subject有next, error, complete三種方法, 那subject就能夠做爲observer! 看代碼:
var subject = new Rx.Subject(); subject.subscribe({ next: (v) => console.log('observerA: ' + v) }); subject.subscribe({ next: (v) => console.log('observerB: ' + v) }); var observable = Rx.Observable.from([1, 2, 3]); observable.subscribe(subject);
輸出結果:
observerA: 1 observerB: 1 observerA: 2 observerB: 2 observerA: 3 observerB: 3
也就是說, observable.subscribe能夠傳入一個subject來訂閱其消息。 這就好像是Rxjs中的一顆語法糖, Rxjs有專門的實現。
Multicasted Observables 是一種藉助Subject來將數據發射給多個observer的Observable。 看代碼:
var source = Rx.Observable.from([1, 2, 3]); var subject = new Rx.Subject(); var multicasted = source.multicast(subject); multicasted.subscribe({ next: (v) => console.log('observerA: ' + v) }); multicasted.subscribe({ next: (v) => console.log('observerB: ' + v) }); multicasted.connect();
Rx.Observable.from可以逐一發射數組中的元素, 在multicasted.connect()調用以前的任何subscribe都不會致使source發射數據。multicasted.connect()至關於以前的observable.subscribe(subject)。所以不能將multicasted.connect()寫在subscribe的前面。由於這會致使在執行multicasted.connect()的時候source發射數據, 可是subject又沒保存數據, 致使兩個subscribe沒法接收到任何數據。
最好是第一個subscribe的時候可以獲得當前已有的數據, 最後一個unsubscribe的時候就中止Observable的執行, 至關於Observable發射的數據都被忽略。
refCount就是可以返回這樣的Observable的方法
var source = Rx.Observable.interval(500); var subject = new Rx.Subject(); var refCounted = source.multicast(subject).refCount(); var subscription1, subscription2, subscriptionConnect; console.log('observerA subscribed'); subscription1 = refCounted.subscribe({ next: (v) => console.log('observerA: ' + v) }); setTimeout(() => { console.log('observerB subscribed'); subscription2 = refCounted.subscribe({ next: (v) => console.log('observerB: ' + v) }); }, 600); setTimeout(() => { console.log('observerA unsubscribed'); subscription1.unsubscribe(); }, 1200); setTimeout(() => { console.log('observerB unsubscribed'); subscription2.unsubscribe(); }, 2000);
輸出結果:
observerA subscribed observerA: 0 observerB subscribed observerA: 1 observerB: 1 observerA unsubscribed observerB: 2 observerB unsubscribed
Observable上有不少方法, 好比說map, filter, merge等等。 他們基於調用它們的observable,返回一個全新的observable。 並且他們都是純方法。 operators分爲兩種, instance operators 和 static operators。 instance operators是存在於observable實例上的方法, 也就是實例方法; static operators是存在於Observable這個類型上的方法, 也就是靜態方法。Rxjs擁有不少強大的operators。
本身實現一個operators:
function multiplyByTen(input) { var output = Rx.Observable.create(function subscribe(observer) { input.subscribe({ next: (v) => observer.next(10 * v), error: (err) => observer.error(err), complete: () => observer.complete() }); }); return output; } var input = Rx.Observable.from([1, 2, 3, 4]); var output = multiplyByTen(input); output.subscribe(x => console.log(x));
輸出結果:
10 20 30 40
import React from 'react'; import ReactDOM from 'react-dom'; import Rx from 'rx'; class Main extends React.Component { constructor (props) { super(props); this.state = {count: 0}; } // Click events are now observables! No more proactive approach. componentDidMount () { const plusBtn = document.getElementById('plus'); const minusBtn = document.getElementById('minus'); const plus$ = Rx.Observable.fromEvent(plusBtn, 'click').map(e => 1); const minus$ = Rx.Observable.fromEvent(minusBtn, 'click').map(e => -1); Rx.Observable.merge(plus$, minus$).scan((acc, n) => acc + n) .subscribe(value => this.setState({count: value})); } render () { return ( <div> <button id="plus">+</button> <button id="minus">-</button> <div>count: {this.state.count}</div> </div> ); } } ReactDOM.render(<Main/>, document.getElementById('app'));
merge用於合併兩個observable產生一個新的observable。 scan相似於Array中的reduce。 這個例子實現了點擊plus的時候+1, 點擊minus的時候-1。
假如沒有被複雜的異步,事件, 數據序列困擾, 若是promise已經足夠的話, 就不必適用Rx.js。