第一次接觸rxjs也是由於angular2應用,內置了rxjs的依賴,瞭解以後發現它的強大,是一個能夠代替promise的框架,可是隻處理promise的東西有點拿尚方寶劍砍蚊子的意思。javascript
若是咱們的應用是徹底rxjs的應用,會顯得代碼比較清晰,代碼寫的爽。css
angular團隊和微軟合做,採用的typescript和rxjs,互相宣傳。。html
rxjs是一個比較簡單的庫,它只有Observable,Observer,subscription,subject,Operators,Scheduler6個對象概念。比較相似於觀察者模式,若是再瞭解一些函數式編程和node的stream就更好了。java
中文文檔node
observable是一個可觀察對象,也相似觀察者模式中的可觀察對象,後面的Subscription就至關於觀察者模式中的訂閱者。github
給一個例子:ajax
var observable = Rx.Observable.create(function (observer) { observer.next(1); observer.next(2); observer.next(3); setTimeout(() => { observer.next(4); observer.complete(); }, 1000); });
建立了一個Obervable對象,這裏用到了create操做符。typescript
create操做符:建立一個新的 Observable ,當觀察者( Observer )訂閱該 Observable 時,它會執行指定的函數。編程
如上例子中的observer,給一個典型的observer例子:
var observer={ next:x=>console.log('Observer got a next value: ' + x), error: err => console.error('Observer got an error: ' + err), complete: () => console.log('Observer got a complete notification') }
有點相似promise的返回,每來一個「流」就會執行一個next,出錯會執行一個observer的error,完成後或者調用complete便再也不監聽observable,執行complete函數。這些函數的集合也就是observer。
要使用觀察者,須要訂閱可觀察對象:
observable.subscribe(observer)
訂閱是一個表示一次性資源的對象,一般是一個可觀察對象的執行。
它有一個重要的方法:unsubscribe,顧名思義。。。
好比observable的例子:
var observable = Rx.Observable.create(function (observer) { observer.next(1); observer.next(2); observer.next(3); setTimeout(() => { observer.next(4); observer.complete(); }, 1000); }); var observer={ next:x=>console.log('Observer got a next value: ' + x), error: err => console.error('Observer got an error: ' + err), complete: () => console.log('Observer got a complete notification') }; observable.subscribe(observer); //返回 Observer got a next value: 1 Observer got a next value: 2 Observer got a next value: 3 Observer got a next value: 4 //after 1s return Observer got a complete notification
若是在最後調用subscription.unsubscribe();
那麼4就不會執行,complete也不會執行,就會取消掉這個觀察。
Subject是容許值被多播到多個觀察者的一種特殊的Observable。然而純粹的可觀察對象是單播的(每個訂閱的觀察者擁有單獨的可觀察對象的執行)。
subject是Observable對象,而且自帶next,error,complete函數,因此咱們不用在定義observer:
var subject = new Rx.Subject(); subject.subscribe({ next: (v) => console.log('observerA: ' + v) }); subject.subscribe({ next: (v) => console.log('observerB: ' + v) }); subject.next(1); subject.next(2); //返回 observerA: 1 observerB: 1 observerA: 2 observerB: 2
因爲subject自帶next等等的函數,因此它也是個observer,也能夠這樣用:
var subject = new Rx.Subject(); subject.subscribe({ next: (v) => console.log('observerA: ' + v) }); subject.subscribe({ next: (v) => console.log('observerB: ' + v) }); var observable = Rx.Observable.from([1, 2, 3]); observable.subscribe(subject); // You can subscribe providing a Subject
rx由於operators強大,咱們能夠流式的處理主要由於有operators在。
操做符是可觀察對象上定義的方法,例如.map(...),.filter(...),.merge(...),等等。他們相似fp,返回新的observable而subscription對象也會繼承。
好比
Rx.Observable.interval(500).filter(x => x%2==1).subscribe( res => console.log(res) ); // 一秒輸出一個數,返回單數。
這裏的filter就是操做符,咱們經過操做符來完成一系列的神奇操做。
什麼是調度者?調度者控制着什麼時候啓動一個訂閱和什麼時候通知被髮送。
名稱 | 類型 | 屬性 | 描述 |
---|---|---|---|
queue | Scheduler | 在當前事件幀中調度隊列(trampoline 調度器)。迭代操做符使用此調度器。 | |
asap | Scheduler | 微任務隊列上的調度, 使用盡量快的轉化機制, 或者是 Node.js 的 process.nextTick(),或者是 Web Worker 的消息通道,或者 setTimeout , 或者其餘。異步轉化使用此調度器. | |
async | Scheduler | 使用 setInterval 調度工做。基於時間的操做符使用此調度器。 | |
animationFrame | Scheduler | 使用 requestAnimationFrame 調度工做。與平臺的重繪同步使用此調度器。 |
var observable = Rx.Observable.create(function (observer) { observer.next(1); observer.next(2); observer.next(3); observer.complete(); }) .observeOn(Rx.Scheduler.async); console.log('just before subscribe'); observable.subscribe({ next: x => console.log('got value ' + x), error: err => console.error('something wrong occurred: ' + err), complete: () => console.log('done'), }); console.log('just after subscribe'); //返回 just before subscribe just after subscribe got value 1 got value 2 got value 3 done
這是由於observeOn(Rx.Scheduler.async)在Observable.create和最終的Observer之間引入了一個代理Observer。
var proxyObserver = { next: (val) => { Rx.Scheduler.async.schedule( (x) => finalObserver.next(x), 0 /* delay */, val /* will be the x for the function above */ ); }, // ... }
<input id="text"></input> <script> var text = document.querySelector('#text'); text.addEventListener('keyup', (e) =>{ var searchText = e.target.value; // 發送輸入內容到後臺 $.ajax({ url: `xx.com/${searchText}`, success: data => { // 拿到後臺返回數據,並展現搜索結果 render(data); } }); }); </script>
以前實現一個搜索效果,其實須要這樣的代碼,應用到函數節流還須要寫爲
clearTimeout(timer); // 定時器,在 250 毫秒後觸發 timer = setTimeout(() => { console.log('發起請求..'); },250)
還要考慮一種狀況,若是咱們搜索了a,而後立刻改成了b,會返回a的結果,這樣咱們就須要判斷一下:
clearTimeout(timer) timer = setTimeout(() => { // 聲明一個當前所搜的狀態變量 currentSearch = '書'; var searchText = e.target.value; $.ajax({ url: `xx.com/${searchText}`, success: data => { // 判斷後臺返回的標誌與咱們存的當前搜索變量是否一致 if (data.search === currentSearch) { // 渲染展現 render(data); } else { // .. } } });
這種代碼其實就很雜亂了。
若是用rxjs,咱們的代碼能簡單而且清楚不少:
var text = document.querySelector('#text'); var inputStream = Rx.Observable.fromEvent(text, 'keyup') .debounceTime(250) .pluck('target', 'value') .switchMap(url => Http.get(url)) .subscribe(data => render(data));
rxjs版的promise.all
const getPostOne$ = Rx.Observable.timer(1000).mapTo({id: 1}); const getPostTwo$ = Rx.Observable.timer(2000).mapTo({id: 2}); Rx.Observable.forkJoin(getPostOne$, getPostTwo$).subscribe(res => console.log(res)) //返回 [ { id: 1 }, { id: 2 } ]
能夠保存上一個值
Rx.Observable .fromEvent(document, 'scroll') .map(e => window.pageYOffset) .pairwise() .subscribe(pair => console.log(pair)); // pair[1] - pair[0]
合併兩個流的值,並只發出最新的值
const clicks$ = Rx.Observable.fromEvent(document, 'click'); const innerObservable$ = Rx.Observable.interval(1000); clicks$.switchMap(event => innerObservable$) .subscribe(val => console.log(val));
每次點擊觸發才發送interval值,而且點擊以後interval從新發送,取消掉以前的值。若是是mergeMap,則不取消以前的值。
返回promise
let source = Rx.Observable .of(42) .toPromise(); source.then((value) => console.log('Value: %s', value)); // => Value: 42
將 Promise 轉化爲 Observable。
var result = Rx.Observable.fromPromise(fetch('http://myserver.com/')); result.subscribe(x => console.log(x), e => console.error(e));
有了和promise相互轉化的api,就很方便的用rx,ng2中內置rx,用着不爽就職意改爲promise來寫。
public takeUntil(notifier: Observable): Observable
發出源 Observable 發出的值,直到notifier:Observable 發出值。
rx.Observable.interval(1000).takeUntil(rx.Observable.fromEvent(document,'click'))
觸發interval,而後每次點擊中止觸發。
因此它還有一個用法就是創建一個stop流,來避免手動調用unsubscribe。
const data$ = this.getData(); const cancelBtn = this.element.querySelector('.cancel-button'); const rangeSelector = this.element.querySelector('.rangeSelector'); const cancel$ = Observable.fromEvent(cancelBtn, 'click'); const range$ = Observable.fromEvent(rangeSelector, 'change').map(e => e.target.value); const stop$ = Observable.merge(cancel$, range$.filter(x => x > 500)) this.subscription = data$.takeUntil(stop$).subscribe(data => this.updateData(data));
BehaviorSubject繼承自Observable類,它儲存着要發射給消費者的最新的值。
不管什麼時候一個新的觀察者訂閱它,都會當即接受到這個來自BehaviorSubject的"當前值"。
好比
var subject = new Rx.BehaviorSubject(0); // 0 is the initial value subject.subscribe({ next: (v) => console.log('observerA: ' + v) }); subject.next(1); subject.next(2); subject.subscribe({ next: (v) => console.log('observerB: ' + v) }); subject.next(3); //返回 observerA: 0 observerA: 1 observerA: 2 observerB: 2 observerA: 3 observerB: 3
每次next就傳一個值,在observer裏面寫函數處理。
咱們有一個material table的例子來看。
代碼看文最後
咱們作的是一個table中的filter功能,相似find item by name。
通常的思路就是獲取這個input的值,函數節流,在咱們的table數據中filter這個name,而後給原來綁定的data賦值。
對於rx的寫法就很清楚了。
Observable.fromEvent(this.filter.nativeElement, 'keyup') .debounceTime(150) .distinctUntilChanged() .subscribe(() => { if (!this.dataSource) { return; } this.dataSource.filter = this.filter.nativeElement.value; });
咱們獲取輸入的值,節流,去重,賦值給this.dataSource,this.dataSource實際上是ExampleDataSource的實例。
ExampleDatabase類是生成數據的類,能夠忽略,ExampleDataSource是咱們作處理的一個類,material暴露了一個connect方法,返回的observable直接綁定table的data。
主要的處理在ExampleDataSource裏:
export class ExampleDataSource extends DataSource<any> { _filterChange = new BehaviorSubject(''); get filter(): string { return this._filterChange.value; } set filter(filter: string) { this._filterChange.next(filter); } constructor(private _exampleDatabase: ExampleDatabase) { super(); } /** Connect function called by the table to retrieve one stream containing the data to render. */ connect(): Observable<UserData[]> { const displayDataChanges = [ this._exampleDatabase.dataChange, this._filterChange, ]; return Observable.merge(...displayDataChanges).map(() => { return this._exampleDatabase.data.slice().filter((item: UserData) => { let searchStr = (item.name + item.color).toLowerCase(); return searchStr.indexOf(this.filter.toLowerCase()) != -1; }); }); }
咱們設置了filter這個屬性的get和set,每次咱們按下按鍵,給this.dataSource.filter賦值的時候,實際上,咱們調用了BehaviorSubject的next方法,
發了一個事件。咱們還須要merge一下_exampleDatabase.dataChange事件,爲了當table數據改變的時候,咱們能作出相應的處理。
而後就用map操做符,filter一下咱們的data數據。給table數據綁定material已經幫咱們作了。
附文:
import {Component, ElementRef, ViewChild} from '@angular/core'; import {DataSource} from '@angular/cdk'; import {BehaviorSubject} from 'rxjs/BehaviorSubject'; import {Observable} from 'rxjs/Observable'; import 'rxjs/add/operator/startWith'; import 'rxjs/add/observable/merge'; import 'rxjs/add/operator/map'; import 'rxjs/add/operator/debounceTime'; import 'rxjs/add/operator/distinctUntilChanged'; import 'rxjs/add/observable/fromEvent'; @Component({ selector: 'table-filtering-example', styleUrls: ['table-filtering-example.css'], templateUrl: 'table-filtering-example.html', }) export class TableFilteringExample { displayedColumns = ['userId', 'userName', 'progress', 'color']; exampleDatabase = new ExampleDatabase(); dataSource: ExampleDataSource | null; @ViewChild('filter') filter: ElementRef; ngOnInit() { this.dataSource = new ExampleDataSource(this.exampleDatabase); Observable.fromEvent(this.filter.nativeElement, 'keyup') .debounceTime(150) .distinctUntilChanged() .subscribe(() => { if (!this.dataSource) { return; } this.dataSource.filter = this.filter.nativeElement.value; }); } } /** Constants used to fill up our data base. */ const COLORS = ['maroon', 'red', 'orange', 'yellow', 'olive', 'green', 'purple', 'fuchsia', 'lime', 'teal', 'aqua', 'blue', 'navy', 'black', 'gray']; const NAMES = ['Maia', 'Asher', 'Olivia', 'Atticus', 'Amelia', 'Jack', 'Charlotte', 'Theodore', 'Isla', 'Oliver', 'Isabella', 'Jasper', 'Cora', 'Levi', 'Violet', 'Arthur', 'Mia', 'Thomas', 'Elizabeth']; export interface UserData { id: string; name: string; progress: string; color: string; } /** An example database that the data source uses to retrieve data for the table. */ export class ExampleDatabase { /** Stream that emits whenever the data has been modified. */ dataChange: BehaviorSubject<UserData[]> = new BehaviorSubject<UserData[]>([]); get data(): UserData[] { return this.dataChange.value; } constructor() { // Fill up the database with 100 users. for (let i = 0; i < 100; i++) { this.addUser(); } } /** Adds a new user to the database. */ addUser() { const copiedData = this.data.slice(); copiedData.push(this.createNewUser()); this.dataChange.next(copiedData); } /** Builds and returns a new User. */ private createNewUser() { const name = NAMES[Math.round(Math.random() * (NAMES.length - 1))] + ' ' + NAMES[Math.round(Math.random() * (NAMES.length - 1))].charAt(0) + '.'; return { id: (this.data.length + 1).toString(), name: name, progress: Math.round(Math.random() * 100).toString(), color: COLORS[Math.round(Math.random() * (COLORS.length - 1))] }; } } /** * Data source to provide what data should be rendered in the table. Note that the data source * can retrieve its data in any way. In this case, the data source is provided a reference * to a common data base, ExampleDatabase. It is not the data source's responsibility to manage * the underlying data. Instead, it only needs to take the data and send the table exactly what * should be rendered. */ export class ExampleDataSource extends DataSource<any> { _filterChange = new BehaviorSubject(''); get filter(): string { return this._filterChange.value; } set filter(filter: string) { this._filterChange.next(filter); } constructor(private _exampleDatabase: ExampleDatabase) { super(); } /** Connect function called by the table to retrieve one stream containing the data to render. */ connect(): Observable<UserData[]> { const displayDataChanges = [ this._exampleDatabase.dataChange, this._filterChange, ]; return Observable.merge(...displayDataChanges).map(() => { return this._exampleDatabase.data.slice().filter((item: UserData) => { let searchStr = (item.name + item.color).toLowerCase(); return searchStr.indexOf(this.filter.toLowerCase()) != -1; }); }); } disconnect() {} }