簡介初級核心概念建立Observableof()from()fromEvent()fromPromise()interval()ajax()操做符(Operators)mapfiltershare其餘錯誤處理重試失敗的可觀察對象問題記錄fromPromise no longer exported in master (v6)參考文獻javascript
RxJS
是ReactiveX
編程理念的JavaScript
版本。ReactiveX
來自微軟,它是一種針對異步數據流的編程。簡單來講,它將一切數據,包括HTTP請求,DOM事件或者普通數據等包裝成流的形式,而後用強大豐富的操做符對流進行處理,使你能以同步編程的方式處理異步數據,並組合不一樣的操做符來輕鬆優雅的實現你所須要的功能。 html
RxJS 提供了一種對 Observable
類型的實現,直到 Observable
成爲了 JavaScript 語言的一部分而且瀏覽器支持它以前,它都是必要的。這個庫還提供了一些工具函數,用於建立和使用可觀察對象。這些工具函數可用於:java
Observables 做爲被觀察者,是一個值或事件的流集合;而 Observer 則做爲觀察者,根據 Observables 進行處理。web
Observables 與 Observer 之間的訂閱發佈關係(觀察者模式) 以下:ajax
RxJS 中提供了不少操做符,用於建立 Observable 對象,經常使用的操做符以下: 編程
import { Component, OnInit } from '@angular/core';
import { Observable,of } from 'rxjs';
@Component({
selector: 'app-observable',
templateUrl: '
<h2>使用of來建立可觀察對象</h2>
<div>
<button (click)="getData()">Click here</button>
</div>
'
})
export class ObservableComponent implements OnInit {
constructor() { }
ngOnInit(): void {
}
getData() {
// Create simple observable that emits three values
const myObservable = of(1, 2, 3);
// Create observer object
const myObserver = {
next: x => console.log('Observer got a next value: ' + x),
error: err => console.error('Observer got an error: ' + err),
complete: () => console.log('Observer got a complete notification'),
};
// Execute with the observer object
myObservable.subscribe(myObserver);
}
}
複製代碼
啓動該項目,打開頁面並點擊按鈕,出現這樣的結果:json
import { Component, OnInit } from '@angular/core';
import { Observable,from } from 'rxjs';
@Component({
selector: 'app-observable',
templateUrl: '
<h2>使用from函數建立可觀察對象</h2>
<div>
<button (click)="fromData()">根據數組建立Observable</button>
</div>
'
})
export class ObservableComponent implements OnInit {
constructor() { }
ngOnInit(): void {
}
fromData() {
let persons = [
{ name: 'Dave', age: 34, salary: 2000 },
{ name: 'Nick', age: 37, salary: 32000 },
{ name: 'Howie', age: 40, salary: 26000 },
{ name: 'Brian', age: 40, salary: 30000 },
{ name: 'Kevin', age: 47, salary: 24000 },
];
const myObservable = from(persons);
myObservable.subscribe(person => console.log(person));
}
}
複製代碼
頁面測試結果爲:segmentfault
import { Component, OnInit } from '@angular/core';
import { Observable,fromEvent } from 'rxjs';
@Component({
selector: 'app-observable',
templateUrl: '
<h2>使用fromEvent函數建立可觀察對象</h2>
<div>
<p id="content">Hello Hresh</p>
</div>
'
})
export class ObservableComponent implements OnInit {
constructor() { }
ngOnInit(): void {
this.fromEvent();
}
fromEvent() {
const el = document.getElementById('content');
const mouseMoves = fromEvent(el, 'click');
const subscription = mouseMoves.subscribe(() => {
el.style.color = 'red';
});
}
}
複製代碼
運行項目,點擊文本,文本將會變爲紅色。api
關於 fromEvent 在實際生產中有個典型的應用:輸入提示(type-ahead)建議。數組
可觀察對象能夠簡化輸入提示建議的實現方式。典型的輸入提示要完成一系列獨立的任務:
徹底用 JavaScript 的傳統寫法實現這個功能可能須要大量的工做。使用可觀察對象,你可使用這樣一個 RxJS 操做符的簡單序列:
import { fromEvent } from 'rxjs';
import { ajax } from 'rxjs/ajax';
import { debounceTime, distinctUntilChanged, filter, map, switchMap } from 'rxjs/operators';
const searchBox = document.getElementById('search-box');
const typeahead = fromEvent(searchBox, 'input').pipe(
map((e: KeyboardEvent) => (e.target as HTMLInputElement).value),
filter(text => text.length > 2), //判斷輸入內容長度是否大於2
debounceTime(500), //等待,直到用戶中止輸入(這個例子中是中止 1/2 秒)
distinctUntilChanged(), // 等待搜索文本發生變化。
switchMap(() => ajax('/api/endpoint')) //將搜索請求發送到服務。
);
typeahead.subscribe(data => {
// Handle the data from the API
});
複製代碼
import { Component, OnInit } from '@angular/core';
import { from, Observable } from 'rxjs';
import { fromPromise } from 'rxjs/internal/observable/fromPromise';
@Component({
selector: 'app-observable',
templateUrl: '
<h2>使用from函數建立可觀察對象</h2>
<div>
<button (click)="fromPromiseData()">根據Promise建立Observable</button>
</div>
'
})
export class ObservableComponent implements OnInit {
constructor() { }
ngOnInit(): void {
}
fromPromiseData() {
const myObservable = fromPromise(new Promise((resolve, reject) => {
setTimeout(() => {
// tslint:disable-next-line:prefer-const
let username = 'hresh----Promise';
resolve(username);
}, 2000);
}));
myObservable.subscribe({
next(data) { console.log(data); },
error(err) { console.error('Error' + err); },
complete() { console.log('completed'); }
});
}
}
複製代碼
在 Rxjs6 以後,from 能夠用來代替 fromPromise,因此上述代碼改成 from()也是能夠的。頁面測試結果爲:
這裏擴展一下,官方文檔有個案例是經過 fetch 方法返回 Promise 對象,這裏我作了一些修改:
import { Component, OnInit } from '@angular/core';
import { from, Observable } from 'rxjs';
import { fromPromise } from 'rxjs/internal/observable/fromPromise';
@Component({
selector: 'app-observable',
templateUrl: '
<h2>使用from函數建立可觀察對象</h2>
<div>
<button (click)="fromPromiseData()">根據Promise建立Observable</button>
<br>
<button (click)="fromData2()">根據Promise獲得的數組建立Observable</button>
</div>
'
})
export class ObservableComponent implements OnInit {
constructor() { }
ngOnInit(): void {
}
fromPromiseData() {
const myObservable = from(fetch('http://a.itying.com/api/productlist'));
myObservable.subscribe({
next(data) { console.log(data); },
error(err) { console.error('Error' + err); },
complete() { console.log('completed'); }
});
}
fromData2() {
//fetch()返回的Promise,獲取Promise對象中的內容,即數組
let arrayData = [];
fetch('http://a.itying.com/api/productlist').then(response => response.json()).then((data => {
arrayData = data.result;
}));
let myObservable = null;
setTimeout(() => {
myObservable = from(arrayData);
myObservable.subscribe(data => console.log(data));
}, 2000);
}
}
複製代碼
頁面測試結果爲:
import { Component, OnInit } from '@angular/core';
import { interval, Observable } from 'rxjs';
@Component({
selector: 'app-observable',
templateUrl: '
<h2>使用interval函數建立可觀察對象</h2>
<div>
<button (click)="interval2()">根據interval建立Observable</button>
</div>
'
})
export class ObservableComponent implements OnInit {
constructor() { }
ngOnInit(): void {
}
interval2() {
const secondsCounter = interval(1000);
// Subscribe to begin publishing values
const oData = secondsCounter.subscribe(n =>
console.log(`It's been ${n} seconds since subscribing!`));
setTimeout(() => {
console.log('取消計數操做');
oData.unsubscribe(); /*5s後取消數據顯示*/
}, 5000);
}
}
複製代碼
interval 操做符支持一個數值類型的參數,用於表示定時的間隔。上面代碼表示每隔 1s,會輸出一個遞增的值,初始值從 0 開始。頁面測試結果爲:
import { Component, OnInit } from '@angular/core';
import {ajax} from 'rxjs/ajax';
@Component({
selector: 'app-observable',
templateUrl: '
<h2>使用ajax函數建立可觀察對象</h2>
<div>
<button (click)="ajax2()">根據ajax建立Observable</button>
</div>
'
})
export class ObservableComponent implements OnInit {
constructor() { }
ngOnInit(): void {
}
ajax2(){
const apiData = ajax('http://a.itying.com/api/productlist');
// Subscribe to create the request
apiData.subscribe(res => console.log(res.status, res.response));
}
}
複製代碼
頁面測試結果:
上述咱們講解的建立 Observable 的方法其實就是 Rxjs 的操做符,操做符是基於可觀察對象構建的一些對集合進行復雜操做的函數。RxJS 還定義了一些操做符,好比 map()
、filter()
、concat()
和 flatMap()
。
操做符接受一些配置項,而後返回一個以來源可觀察對象爲參數的函數。當執行這個返回的函數時,這個操做符會觀察來源可觀察對象中發出的值,轉換它們,並返回由轉換後的值組成的新的可觀察對象。
import { map } from 'rxjs/operators';
const nums = of(1, 2, 3);
const squareValues = map((val: number) => val * val);
const squaredNums = squareValues(nums);
squaredNums.subscribe(x => console.log(x));
// Logs
// 1
// 4
// 9
複製代碼
能夠看到 map 接受一個 function 做爲參數, 經過該 function 能夠把每一個元素作平方運算進行轉換.。
你可使用管道來把這些操做符連接起來。管道讓你能夠把多個由操做符返回的函數組合成一個。pipe()
函數以你要組合的這些函數做爲參數,而且返回一個新的函數,當執行這個新函數時,就會順序執行那些被組合進去的函數。
應用於某個可觀察對象上的一組操做符就像一個處理流程 —— 也就是說,對你感興趣的這些值進行處理的一組操做步驟。這個處理流程自己不會作任何事。你須要調用 subscribe()
來經過處理流程得出並生成一個結果。
import { filter, map } from 'rxjs/operators';
const nums = of(1, 2, 3, 4, 5);
// Create a function that accepts an Observable.
const squareOddVals = pipe(
filter((n: number) => n % 2 !== 0),
map(n => n * n)
);
// Create an Observable that will run the filter and map functions
const squareOdd = squareOddVals(nums);
// Subscribe to run the combined functions
squareOdd.subscribe(x => console.log(x));
// Logs
// 1
// 9
// 25
複製代碼
pipe()
函數也同時是 RxJS 的 Observable
上的一個方法,因此你能夠用下列簡寫形式來達到一樣的效果:
import { filter, map } from 'rxjs/operators';
const squareOdd = of(1, 2, 3, 4, 5)
.pipe(
filter(n => n % 2 !== 0),
map(n => n * n)
);
// Subscribe to get values
squareOdd.subscribe(x => console.log(x));
複製代碼
在講解 share 操做符前,咱們須要瞭解一下冷熱兩種模式下的 Observables。官方定義以下:
Cold Observables 在被訂閱後運行,也就是說,observables 序列僅在 subscribe 函數被調用後纔會推送數據。與 Hot Observables 不一樣之處在於,Hot Observables 在被訂閱以前就已經開始產生數據,例如
mouse move
事件。
從原理來講是這樣的: Cold內部會建立一個新的數據生產者, 而Hot則會一直使用外部的數據生產者.
舉個例子:
Cold: 就至關於我在B站看英雄聯盟賽事重播,能夠從頭看起。
Hot: 就至關於看英雄聯盟賽事直播, 若是來晚了, 那麼前面就看不到了。
share() 操做符容許多個訂閱者共享同一個 Observable. 也就是把 Cold 變成 Hot。看以下示例:
import { Component, OnInit } from '@angular/core';
import { interval } from 'rxjs';
import { share, take } from 'rxjs/operators';
@Component({
selector: 'app-observable',
templateUrl: '
<h2>share操做符</h2>
<div>
<button (click)="shareData()">Click</button>
</div>
'
})
export class ObservableComponent implements OnInit {
constructor() { }
ngOnInit(): void {
}
shareData() {
const numbers = interval(1000).pipe(
take(5),
share()
);
function subscribeToNumbers(name) {
numbers.subscribe(
x => console.log(`${name}: ${x}`)
);
}
subscribeToNumbers('Dave');
const anotherSubscription = () => subscribeToNumbers('Nick');
setTimeout(anotherSubscription, 2500);
}
}
複製代碼
頁面測試結果爲:
這裏 interval 是每隔1秒產生一個數據, take(5)表示取5個數, 也就是1,2,3,4,5,而後 share()就把這個 observable 從 cold 變成了 hot 的。後邊 Dave 進行了訂閱,2.5秒之後, Nick 進行了訂閱。
RxJS 提供了不少操做符,不過只有少數是經常使用的。 以下圖所示:
關於這些操做符的使用能夠參看 RxJS 系列之三 - Operators 詳解 和 RxJS API 文檔。
除了能夠在訂閱時提供 error()
處理器外,RxJS 還提供了 catchError
操做符,它容許你在管道中處理已知錯誤。
假設你有一個可觀察對象,它發起 API 請求,而後對服務器返回的響應進行映射。若是服務器返回了錯誤或值不存在,就會生成一個錯誤。若是你捕獲這個錯誤並提供了一個默認值,流就會繼續處理這些值,而不會報錯。
import { ajax } from 'rxjs/ajax';
import { map, catchError } from 'rxjs/operators';
// Return "response" from the API. If an error happens,
// return an empty array.
const apiData = ajax('http://a.itying.com/api/productlist').pipe(
map(res => {
if (!res.response) {
throw new Error('Value expected!');
}
return res.response;
}),
catchError(err => of([]))
);
apiData.subscribe({
next(x) { console.log('data: ', x); },
error(err) { console.log('errors already caught... will not run'); }
});
複製代碼
頁面測試結果:
catchError
提供了一種簡單的方式進行恢復,而 retry
操做符讓你能夠嘗試失敗的請求。
能夠在 catchError
以前使用 retry
操做符。它會訂閱到原始的來源可觀察對象,它能夠從新運行致使結果出錯的動做序列。若是其中包含 HTTP 請求,它就會從新發起那個 HTTP 請求。
下列代碼把前面的例子改爲了在捕獲錯誤以前重發請求:
import { ajax } from 'rxjs/ajax';
import { map, retry, catchError } from 'rxjs/operators';
const apiData = ajax('/api/data').pipe(
retry(3), // Retry up to 3 times before failing
map(res => {
if (!res.response) {
throw new Error('Value expected!');
}
return res.response;
}),
catchError(err => of([]))
);
apiData.subscribe({
next(x) { console.log('data: ', x); },
error(err) { console.log('errors already caught... will not run'); }
});
複製代碼
注意:不要重試登陸認證請求,這些請求只應該由用戶操做觸發。咱們確定不會但願自動重複發送登陸請求致使用戶的帳號被鎖定。
RxJS version:
6.0.0-alpha.3
Code to reproduce:
import { fromPromise } from 'rxjs';
複製代碼
Expected behavior:
should work just as it did with v5.5 (but different location)
解決辦法:統一使用 from 來代替 fromPromise。