原文連接: medium.com/@benlesh/rx…
本文爲 RxJS 中文社區 翻譯文章,如需轉載,請註明出處,謝謝合做!
若是你也想和咱們一塊兒,翻譯更多優質的 RxJS 文章以奉獻給你們,請點擊【這裏】javascript
不時地會有人問我關於如何與 RxJS 配合使用 async 函數或 promises,還有更糟的,我被告之「事實」的真相是 async-await 和 Observables 並不能「在一塊兒使用」。RxJS 從一開始就具有與 Promises 的高度互操做性。但願這篇文章能對此有所啓發。java
例如,若是使用 switchMap,你能夠返回 Promise 來替代,就像返回 Observable 那樣。如下這些都是有效的:git
// Observable: 每1秒發出自增數值乘以100,共發出10次
const source$ = Observable.interval(1000)
.take(10)
.map(x => x * 100);
/** * 返回 promise,它等待 `ms` 毫秒併發出 "done" */
function promiseDelay(ms) {
return new Promise(resolve => {
setTimeout(() => resolve('done'), ms);
});
}
// 在 switchMap 中使用 promiseDelay
source$.switchMap(x => promiseDelay(x)) // 正常運行
.subscribe(x => console.log(x));
source$.switchMap(promiseDelay) // 更簡潔了
.subscribe(x => console.log(x));
// 或者使用 takeUntil
source$.takeUntil(doAsyncThing('hi')) // 徹底能夠運行
.subscribe(x => console.log(x))
// 或者相似這樣的奇怪組合
Observable.of(promiseDelay(100), promiseDelay(10000)).mergeAll()
.subscribe(x => console.log(x))複製代碼
若是你能夠訪問建立 promise 的函數,你可使用 Observable.defer()
來包裝它,以使 Observable 能夠在報錯時進行重試。github
function getErroringPromise() {
console.log('getErroringPromise called');
return Promise.reject(new Error('sad'));
}
Observable.defer(getErroringPromise)
.retry(3)
.subscribe(x => console.log);
// 輸出 "getErroringPromise called" 4次 (開始1次 + 3次重試), 而後報錯複製代碼
事實證實, defer 是個很是強大的小工具。你可使用它,基本上是直接使用 async 函數,它會建立一個發出返回值及完成的 Observable 。編程
Observable.defer(async function() {
const a = await promiseDelay(1000).then(() => 1);
const b = a + await promiseDelay(1000).then(() => 2);
return a + b + await promiseDelay(1000).then(() => 3);
})
.subscribe(x => console.log(x)) // 輸出 7複製代碼
這是 RxJS 中較少使用的功能,它來自 TC39 Observable 提議。訂閱 Observable 可不止一種方式! subscribe
是訂閱 Observable 的傳統方式,它返回用來取消數據流的 Subscription
對象。而 forEach
以一種不可取消的方式來訂閱 Observable ,它接收一個函數用於每一個值,並返回 Promise,該 Promise 體現了 Observable 的完成和錯誤路徑。promise
const click$ = Observable.fromEvent(button, 'click');
/** * 等待10次按鈕點擊,而後使用 fetch 將第10次點擊的時間戳發送給端點 */
async function doWork() {
await click$.take(10)
.forEach((_, i) => console.log(`click ${i + 1}`));
return await fetch(
'notify/tenclicks',
{ method: 'POST', body: Date.now() }
);
}複製代碼
toPromise
函數其實是有些巧妙的,由於它並非真正的「操做符」,而是以一種 RxJS 特定的方式來訂閱 Observable 並將其包裝成一個 Promise 。一旦 Observable 完成,Promise 便會 resolve Observable 最後發出的值。這意味着若是 Observable 發出值 「hi」 而後等待10秒才完成,那麼返回的 Promise 會等待10秒才 resolve 「hi」 。若是 Observable 一直不完成,那麼 Promise 便永遠不會 resolve 。併發
注意: 使用 toPromise() 是一種反模式,除非當你正在處理預期爲 Promise 的 API, 好比 async-awaitasync
const source$ = Observable.interval(1000).take(3); // 0, 1, 2
// 等待3秒,而後輸出 "2"
// 由於 Observable 須要3秒才能完成,而 interval 發出從0開始自增的數字
async function test() {
console.log(await source$.toPromise());
}複製代碼
不能否認地,若是你的目標是響應式編程,那麼大多數時間裏你可能想要使用 Observable ,可是 RxJS 嘗試去儘量地知足大衆需求,畢竟當下 Promises 仍是很受歡迎的。此外,在 async 函數中使用 RxJS Observables 和 forEach,爲管理併發性和在 async-await 中「只能正常運行」的任務開啓了大量有趣的可能性。函數
想學習更多 RxJS 知識, 我能夠親自教學或選擇在線學習,盡在rxworkshop.com!工具