咱們已經快把全部基本的轉換(Transformation)、過濾(Filter)和合並(Combination)的 operators 講完了。今天要講錯誤處理(Error Handling)的 operators,錯誤處理是非同步行爲中的一大難題,尤爲有多個交錯的非同步行爲時,更容易凸顯錯誤處理的困難。javascript
就讓咱們一塊兒來看看在 RxJS 中能如何處理錯誤吧!java
catch 是很常見的非同步錯誤處理方法,在 RxJS 中也可以直接用 catch 來處理錯誤,在 RxJS 中的 catch 能夠回傳一個 observable 來送出新的值,讓咱們直接來看示例:web
var source = Rx.Observable.from(['a','b','c','d',2])
.zip(Rx.Observable.interval(500), (x,y) => x);
var example = source
.map(x => x.toUpperCase())
.catch(error => Rx.Observable.of('h'));
example.subscribe({
next: (value) => { console.log(value); },
error: (err) => { console.log('Error: ' + err); },
complete: () => { console.log('complete'); }
});複製代碼
這個示例咱們每隔 500 毫秒會送出一個字串(String),並用字串的方法 toUpperCase()
來把字串的英文字母改爲大寫,過程當中可能未知的緣由送出了一個數值(Number) 2
致使發生例外(數值沒有 toUpperCase 的方法),這時咱們在後面接的 catch 就能抓到錯誤。socket
catch 能夠回傳一個新的 Observable、Promise、Array 或任何 Iterable 的事件,來傳送以後的元素。fetch
以咱們的例子來講最後就會在送出 X
就結束,畫成 Marble Diagram 以下ui
source : ----a----b----c----d----2|
map(x => x.toUpperCase())
----a----b----c----d----X|
catch(error => Rx.Observable.of('h'))
example: ----a----b----c----d----h|複製代碼
這裏能夠看到,當錯誤發生後就會進到 catch 並從新處理一個新的 observable,咱們能夠利用這個新的 observable 來送出咱們想送的值。spa
也能夠在遇到錯誤後,讓 observable 結束,以下.net
var source = Rx.Observable.from(['a','b','c','d',2])
.zip(Rx.Observable.interval(500), (x,y) => x);
var example = source
.map(x => x.toUpperCase())
.catch(error => Rx.Observable.empty());
example.subscribe({
next: (value) => { console.log(value); },
error: (err) => { console.log('Error: ' + err); },
complete: () => { console.log('complete'); }
});複製代碼
回傳一個 empty 的 observable 來直接結束(complete)。
另外 catch 的 callback 能接收第二個參數,這個參數會接收當前的 observalbe,咱們能夠回傳當前的 observable 來作到從新執行,示例以下
var source = Rx.Observable.from(['a','b','c','d',2])
.zip(Rx.Observable.interval(500), (x,y) => x);
var example = source
.map(x => x.toUpperCase())
.catch((error, obs) => obs);
example.subscribe({
next: (value) => { console.log(value); },
error: (err) => { console.log('Error: ' + err); },
complete: () => { console.log('complete'); }
});複製代碼
這裏能夠看到咱們直接回傳了當前的 obserable(其實就是 example)來從新執行,畫成 Marble Diagram 以下
source : ----a----b----c----d----2|
map(x => x.toUpperCase())
----a----b----c----d----X|
catch((error, obs) => obs)
example: ----a----b----c----d--------a----b----c----d--..複製代碼
由於是咱們只是簡單的示範,因此這裏會一直無限循環,實務上一般會用在斷線重連的情境。
另上面的處理方式有一個簡化的寫法,叫作 retry()
。
若是咱們想要一個 observable 發生錯誤時,從新嘗試就能夠用 retry 這個方法,跟咱們前一個講示例的行爲是一致
var source = Rx.Observable.from(['a','b','c','d',2])
.zip(Rx.Observable.interval(500), (x,y) => x);
var example = source
.map(x => x.toUpperCase())
.retry();
example.subscribe({
next: (value) => { console.log(value); },
error: (err) => { console.log('Error: ' + err); },
complete: () => { console.log('complete'); }
});複製代碼
一般這種無限的 retry
會放在即時同步的從新鏈接,讓咱們在連線斷掉後,不斷的嘗試。另外咱們也能夠設定只嘗試幾回,以下
var source = Rx.Observable.from(['a','b','c','d',2])
.zip(Rx.Observable.interval(500), (x,y) => x);
var example = source
.map(x => x.toUpperCase())
.retry(1);
example.subscribe({
next: (value) => { console.log(value); },
error: (err) => { console.log('Error: ' + err); },
complete: () => { console.log('complete'); }
});
// a
// b
// c
// d
// a
// b
// c
// d
// Error: TypeError: x.toUpperCase is not a function複製代碼
這裏咱們對 retry 傳入一個數值 1
,可以讓咱們只重複嘗試 1 次後送出錯誤,畫成 Marble Diagram 以下
source : ----a----b----c----d----2|
map(x => x.toUpperCase())
----a----b----c----d----X|
retry(1)
example: ----a----b----c----d--------a----b----c----d----X|複製代碼
這種處理方式很適合用在 HTTP request 失敗的場景中,咱們能夠設定從新發送幾回後,再秀出錯誤訊息。
RxJS 還提供了另外一種方法 retryWhen
,他能夠把例外發生的元素放到一個 observable 中,讓咱們能夠直接操做這個 observable,並等到這個 observable 操做完後再從新訂閱一次本來的 observable。
這裏咱們直接來看代碼
var source = Rx.Observable.from(['a','b','c','d',2])
.zip(Rx.Observable.interval(500), (x,y) => x);
var example = source
.map(x => x.toUpperCase())
.retryWhen(errorObs => errorObs.delay(1000));
example.subscribe({
next: (value) => { console.log(value); },
error: (err) => { console.log('Error: ' + err); },
complete: () => { console.log('complete'); }
});複製代碼
這裏 retryWhen 咱們傳入一個 callback,這個 callback 有一個參數會傳入一個 observable,這個 observable 不是本來的 observable(example) 而是例外事件送出的錯誤所組成的一個 observable,咱們能夠對這個由錯誤所組成的 observable 作操做,等到此次的處理完成後就會從新訂閱咱們本來的 observable。
這個示例咱們是把錯誤的 observable 送出錯誤延遲 1 秒,這會使後面從新訂閱的動做延遲 1 秒才執行,畫成 Marble Diagram 以下
source : ----a----b----c----d----2|
map(x => x.toUpperCase())
----a----b----c----d----X|
retryWhen(errorObs => errorObs.delay(1000))
example: ----a----b----c----d-------------------a----b----c----d----...複製代碼
從上圖能夠看到後續從新訂閱的行爲就被延後了,但實務上咱們不太會用 retryWhen 來作從新訂閱的延遲,一般是直接用 catch 作到這件事。這裏只是爲了示範 retryWhen 的行爲,實務上咱們一般會把 retryWhen 拿來作錯誤通知或是例外收集,以下
var source = Rx.Observable.from(['a','b','c','d',2])
.zip(Rx.Observable.interval(500), (x,y) => x);
var example = source
.map(x => x.toUpperCase())
.retryWhen(
errorObs => errorObs.map(err => fetch('...')));
example.subscribe({
next: (value) => { console.log(value); },
error: (err) => { console.log('Error: ' + err); },
complete: () => { console.log('complete'); }
});複製代碼
這裏的 errorObs.map(err => fetch('...'))
能夠把 errorObs 裏的每一個錯誤變成 API 的發送,一般這裏個 API 會像是送訊息到公司的通信頻道(Slack 等等),這樣可讓工程師立刻知道可能哪一個 API 掛了,這樣咱們就能即時地處理。
retryWhen 其實是在背地裏創建一個 Subject 並把錯誤放入,會在對這個 Subject 進行內部的訂閱,由於咱們尚未講到 Subject 的觀念,你們能夠先把它看成 Observable 就行了,另外記得這個 observalbe 預設是無限的,若是咱們把它結束,本來的 observable 也會跟着結束。
咱們有時候可能會想要 retry 一直重複訂閱的效果,但沒有錯誤發生,這時就能夠用 repeat 來作到這件事,示例以下
var source = Rx.Observable.from(['a','b','c'])
.zip(Rx.Observable.interval(500), (x,y) => x);
var example = source.repeat(1);
example.subscribe({
next: (value) => { console.log(value); },
error: (err) => { console.log('Error: ' + err); },
complete: () => { console.log('complete'); }
});
// a
// b
// c
// a
// b
// c
// complete複製代碼
這裏 repeat 的行爲跟 retry 基本一致,只是 retry 只有在例外發生時才觸發,畫成 Marble Diagram 以下
source : ----a----b----c|
repeat(1)
example: ----a----b----c----a----b----c|複製代碼
一樣的咱們能夠不給參數讓他無限循環,以下
var source = Rx.Observable.from(['a','b','c'])
.zip(Rx.Observable.interval(500), (x,y) => x);
var example = source.repeat();
example.subscribe({
next: (value) => { console.log(value); },
error: (err) => { console.log('Error: ' + err); },
complete: () => { console.log('complete'); }
});複製代碼
這樣咱們就能夠作動不斷重複的行爲,這個能夠在創建輪詢時使用,讓咱們不斷地發 request 來更新畫面。
最後咱們來看一個錯誤處理在實際應用中的小示例
const title = document.getElementById('title');
var source = Rx.Observable.from(['a','b','c','d',2])
.zip(Rx.Observable.interval(500), (x,y) => x)
.map(x => x.toUpperCase());
// 一般 source 會是創建即時同步的連線,像是 web socket
var example = source.catch(
(error, obs) => Rx.Observable.empty()
.startWith('連線發生錯誤: 5秒後重連')
.concat(obs.delay(5000))
);
example.subscribe({
next: (value) => { title.innerText = value },
error: (err) => { console.log('Error: ' + err); },
complete: () => { console.log('complete'); }
});複製代碼
這個示例其實就是模仿在即時同步斷線時,利用 catch 返回一個新的 observable,這個 observable 會先送出錯誤訊息而且把本來的 observable 延遲 5 秒再作合併,雖然這只是一個模仿,但它清楚的展現了 RxJS 在作錯誤處理時的靈活性。
今天咱們講了三個錯誤處理的方法還有一個 repeat operator,這幾個方法都頗有機會在實際上用到,不知道今天你們有沒有收穫呢? 若是有任何問題,歡迎在下方留言給我,謝謝!