原文連接:RxJS: Managing Operator State
原文做者:Nicholas Jamieson;發表於2019年2月12日
譯者:yk;如需轉載,請註明出處,謝謝合做!git
攝影:Victoire Joncheray,來自 Unsplashgithub
在 RxJS 5.5 引入了管道操做符(pipeable operators)以後,編寫用戶級(userland)操做符變得更爲簡單了。typescript
管道操做符屬於高階函數(higher-order function):即返回值爲函數的函數。所返回的函數接受一個 observable(可觀察對象)做爲參數,並返回一個 observable。因此,要建立一個操做符,你沒必要劃分 Operator
和 Subscriber
,只要寫一個函數就好了。shell
這聽起來很簡單。api
然而在某些狀況下你須要格外當心,尤爲是當你的操做符在存儲內部狀態時更應如此。app
讓咱們來看這樣一個例子:一個將接收到的數據及其索引顯示到終端的 debug
操做符。函數
咱們的操做符須要維護一些內部狀態:索引——每收到一次 next
通知時就會遞增。有個很戇的辦法是直接將狀態存儲在操做符的內部,就像這樣:ui
import { MonoTypeOperatorFunction } from "rxjs";
import { tap } from "rxjs/operators";
export function debug<T>(): MonoTypeOperatorFunction<T> {
let index = -1;
// 讓咱們假設不存在 map 操做符,因而咱們只能用 tap 來維護內部存儲中的索引
// 該操做符的目的是爲了代表:運行結果取決於狀態存儲的位置
return tap(t => console.log(`[${++index}]: ${t}`));
}
複製代碼
該辦法會存在許多問題,並致使一些意料以外的行爲和難以定位的 bug。spa
第一個問題是:咱們的操做符不具備引用透明(referentially transparent)性。當一個函數的返回值能夠替代該函數而不影響程序運行,那麼咱們稱這個函數是引用透明的。debug
讓咱們來看看當這個操做符的返回值與多個 observables 進行組合時會發生什麼:
import { range } from "rxjs";
import { debug } from "./debug";
const op = debug();
console.log("first use:");
range(1, 2).pipe(op).subscribe();
console.log("second use:");
range(1, 2).pipe(op).subscribe();
複製代碼
運行結果爲:
first use:
[0] 1
[1] 2
second use:
[2] 1
[3] 2
複製代碼
好吧,我知道這很使人意外。在第二個 observable 中,索引並無從 0 開始計數。
第二個問題是:只有在首次訂閱該操做符返回的 observable 時,其行爲纔會是合理的。
如今,讓咱們屢次訂閱由 debug
操做符組成的 observable,看看會發生什麼:
import { range } from "rxjs";
import { debug } from "./debug";
const source = range(1, 2).pipe(debug());
console.log("first use:");
source.subscribe();
console.log("second use:");
source.subscribe();
複製代碼
運行結果爲:
first use:
[0] 1
[1] 2
second use:
[2] 1
[3] 2
複製代碼
仍是一樣使人意外的結果:在第二次訂閱中,索引依舊沒有從 0 開始計數。
因此該如何解決這些問題呢?
這兩個問題均可以經過基於每一個訂閱的狀態存儲(storing the state on a per-subscription basis)來解決。如下是幾種實現方法:
第一種方法是使用 Observable
的構造函數來建立操做符返回值(observable)。若是將 index
變量放入傳給 constructor
的函數中,那麼每次訂閱的狀態都會被獨立存儲。寫法以下:
import { MonoTypeOperatorFunction, Observable } from "rxjs";
import { tap } from "rxjs/operators";
export function debug<T>(): MonoTypeOperatorFunction<T> {
return source => new Observable<T>(subscriber => {
let index = -1;
return source.pipe(
tap(t => console.log(`[${++index}]: ${t}`))
).subscribe(subscriber);
});
}
複製代碼
第二種方法,也是我比較喜歡的,就是使用 defer
來實現基於每一個訂閱的狀態存儲。若是將 index
變量放入傳給 defer
的工廠函數中,它就能夠按每一個訂閱獨立存儲狀態。寫法以下:
譯者注:
defer()
的參數爲一個返回值爲 observable 的工廠函數observableFactory
,詳見文檔。
import { defer, MonoTypeOperatorFunction } from "rxjs";
import { tap } from "rxjs/operators";
export function debug<T>(): MonoTypeOperatorFunction<T> {
return source => defer(() => {
let index = -1;
return source.pipe(
tap(t => console.log(`[${++index}]: ${t}`))
);
});
}
複製代碼
還有個較爲複雜的方法,就是使用 scan
操做符。scan
會維護每一個訂閱的狀態,該狀態由 seed
參數進行初始化,而後經過 accumulator
(累加器)函數計算並返回結果。在本例中,index
能夠像這樣存儲在 scan
中:
譯者注:
accumulator
和seed
爲scan()
的兩個參數,詳見文檔。
import { MonoTypeOperatorFunction } from "rxjs";
import { map, scan } from "rxjs/operators";
export function debug<T>(): MonoTypeOperatorFunction<T> {
return source => source.pipe(
scan<T, [T, number]>(([, index], t) => [t, index + 1], [undefined!, -1]), map(([t, index]) => (console.log(`[${index}]: ${t}`), t)) ); } 複製代碼
若是用以上任意一種方法來代替一開始那個很戇的辦法,輸出將會是下面這樣:
first use:
[0] 1
[1] 2
second use:
[0] 1
[1] 2
複製代碼
如你所願:一切都在乎料之中。