[譯] RxJS: 操做符狀態管理

原文連接:RxJS: Managing Operator State
原文做者:Nicholas Jamieson;發表於2019年2月12日
譯者:yk;如需轉載,請註明出處,謝謝合做!git

攝影:Victoire Joncheray,來自 Unsplashgithub

在 RxJS 5.5 引入了管道操做符(pipeable operators)以後,編寫用戶級(userland)操做符變得更爲簡單了。typescript

管道操做符屬於高階函數(higher-order function):即返回值爲函數的函數。所返回的函數接受一個 observable(可觀察對象)做爲參數,並返回一個 observable。因此,要建立一個操做符,你沒必要劃分 OperatorSubscriber,只要寫一個函數就好了。shell

這聽起來很簡單。api

然而在某些狀況下你須要格外當心,尤爲是當你的操做符在存儲內部狀態時更應如此。app

舉個例子

讓咱們來看這樣一個例子:一個將接收到的數據及其索引顯示到終端的 debug 操做符。函數

咱們的操做符須要維護一些內部狀態:索引——每收到一次 next 通知時就會遞增。有個很戇的辦法是直接將狀態存儲在操做符的內部,就像這樣:ui

import { MonoTypeOperatorFunction } from "rxjs";
import { tap } from "rxjs/operators";

export function debug<T>(): MonoTypeOperatorFunction<T> {
  let index = -1;
  // 讓咱們假設不存在 map 操做符,因而咱們只能用 tap 來維護內部存儲中的索引
  // 該操做符的目的是爲了代表:運行結果取決於狀態存儲的位置
  return tap(t => console.log(`[${++index}]: ${t}`));
}
複製代碼

該辦法會存在許多問題,並致使一些意料以外的行爲和難以定位的 bug。spa

存在的問題

第一個問題是:咱們的操做符不具備引用透明(referentially transparent)性。當一個函數的返回值能夠替代該函數而不影響程序運行,那麼咱們稱這個函數是引用透明的。debug

讓咱們來看看當這個操做符的返回值與多個 observables 進行組合時會發生什麼:

import { range } from "rxjs";
import { debug } from "./debug";

const op = debug();
console.log("first use:");
range(1, 2).pipe(op).subscribe();
console.log("second use:");
range(1, 2).pipe(op).subscribe();
複製代碼

運行結果爲:

first use:
[0] 1
[1] 2
second use:
[2] 1
[3] 2
複製代碼

好吧,我知道這很使人意外。在第二個 observable 中,索引並無從 0 開始計數。

第二個問題是:只有在首次訂閱該操做符返回的 observable 時,其行爲纔會是合理的。

如今,讓咱們屢次訂閱由 debug 操做符組成的 observable,看看會發生什麼:

import { range } from "rxjs";
import { debug } from "./debug";

const source = range(1, 2).pipe(debug());
console.log("first use:");
source.subscribe();
console.log("second use:");
source.subscribe();
複製代碼

運行結果爲:

first use:
[0] 1
[1] 2
second use:
[2] 1
[3] 2
複製代碼

仍是一樣使人意外的結果:在第二次訂閱中,索引依舊沒有從 0 開始計數。

因此該如何解決這些問題呢?

解決方案

這兩個問題均可以經過基於每一個訂閱的狀態存儲(storing the state on a per-subscription basis)來解決。如下是幾種實現方法:

第一種方法是使用 Observable 的構造函數來建立操做符返回值(observable)。若是將 index 變量放入傳給 constructor 的函數中,那麼每次訂閱的狀態都會被獨立存儲。寫法以下:

import { MonoTypeOperatorFunction, Observable } from "rxjs";
import { tap } from "rxjs/operators";

export function debug<T>(): MonoTypeOperatorFunction<T> {
  return source => new Observable<T>(subscriber => {
    let index = -1;
    return source.pipe(
      tap(t => console.log(`[${++index}]: ${t}`))
    ).subscribe(subscriber);
  });
}
複製代碼

第二種方法,也是我比較喜歡的,就是使用 defer 來實現基於每一個訂閱的狀態存儲。若是將 index 變量放入傳給 defer 的工廠函數中,它就能夠按每一個訂閱獨立存儲狀態。寫法以下:

譯者注:defer() 的參數爲一個返回值爲 observable 的工廠函數 observableFactory,詳見文檔

import { defer, MonoTypeOperatorFunction } from "rxjs";
import { tap } from "rxjs/operators";

export function debug<T>(): MonoTypeOperatorFunction<T> {
  return source => defer(() => {
    let index = -1;
    return source.pipe(
      tap(t => console.log(`[${++index}]: ${t}`))
    );
  });
}
複製代碼

還有個較爲複雜的方法,就是使用 scan 操做符。scan 會維護每一個訂閱的狀態,該狀態由 seed 參數進行初始化,而後經過 accumulator(累加器)函數計算並返回結果。在本例中,index 能夠像這樣存儲在 scan 中:

譯者注:accumulatorseedscan() 的兩個參數,詳見文檔

import { MonoTypeOperatorFunction } from "rxjs";
import { map, scan } from "rxjs/operators";

export function debug<T>(): MonoTypeOperatorFunction<T> {
  return source => source.pipe(
    scan<T, [T, number]>(([, index], t) => [t, index + 1], [undefined!, -1]), map(([t, index]) => (console.log(`[${index}]: ${t}`), t)) ); } 複製代碼

若是用以上任意一種方法來代替一開始那個很戇的辦法,輸出將會是下面這樣:

first use:
[0] 1
[1] 2
second use:
[0] 1
[1] 2
複製代碼

如你所願:一切都在乎料之中。

相關文章
相關標籤/搜索