用JavaScript本身實現惰性數據流、數據流操做符

最近看到SICP 3.5: Stream,裏面介紹了惰性求值,以及如何經過組合與抽象來操做惰性數據流。讓我感覺最深的幾點:html

  • 基於流的編程方式可以將數據處理的代碼組織成很是模塊化的方式,可以經過組合與抽象實現及其複雜的行爲,同時最大程度地控制項目代碼的複雜度
  • 在JavaScript的世界中,ES6的iterable和generator的背後的思想其實就是惰性數據流。對比如今人們對這個思想的詮釋(迭代器模式)和40年前的詮釋,感受很是奇妙,加深了我對iterable和generator的理解。
  • 對比惰性數據流和rxjs事件流,感受就像是來到了鏡中世界,幾乎能一一對應,卻又徹底不一樣。

接下來,讓咱們用JavaScript來本身實現惰性求值,在這個過程當中我會解釋上面的幾條領悟。python

從急切求值到惰性求值

假設原始的數據是[1, 1000]的整數:git

Array.from(Array(1000)).map((x,i)=>i+1)

假設咱們如今想找出其中的偶數:github

Array.from(Array(1000)).map((x,i)=>i+1)
    .filter(x=>x%2===0)

假設咱們如今想找出其中的3的倍數:編程

Array.from(Array(1000)).map((x,i)=>i+1)
    .filter(x=>x%2===0)
    .filter(x=>x%3===0)

假設咱們只須要找出第10個這樣的數:數組

Array.from(Array(1000)).map((x,i)=>i+1)
    .filter(x=>x%2===0)
    .filter(x=>x%3===0)
    [9]

這是在js中很是常見的數組管道式處理,借鑑了函數式的思想,將多個處理步驟串聯組合,讓代碼很是清晰簡潔。
可是你應該發現了一個很嚴重的問題:有大量的計算被浪費。咱們只須要第9個偶數,可是前面的處理鏈路實際上會把整個大數組都處理一遍。在現實編程中,數據源更大、數據處理步驟更多,計算浪費的問題會更嚴重。雖然管道式處理可以大大簡化代碼,可是經常有計算浪費的通病模塊化

爲了避免作多餘計算,咱們每每只能拋棄管道式處理,對於每一個數據,先讓它走完整個處理流程,而後循環,將下一個數據推入一樣的處理流程:函數

function compute() {
  let count = 0;
  for (let i = 1; i <= 1000; i++) {
    if (i % 2 == 0 && i % 3 == 0) {
      count++;
      if (count === 10) return i;
    }
  }
}

這樣確實可以避免計算浪費,可是失去了管道式處理的模塊化和簡潔性。在循環的方案中:循環代碼、處理代碼、判斷結束的代碼糅雜在一塊兒了。而在前面的管道式處理中,每種處理都是界線清晰的,很容易作到模塊化。
假設如今要實現一個UI界面,讓用戶經過鼠標拖拽的方式(而不是編寫代碼)來編排數據處理流程,你必定會選擇管道模型,而不是迭代模型。由於管道模型很是容易作到模塊化,你只要將不一樣的處理功能都實現成模塊,用戶只須要給這些模塊排個順序就行了。而迭代模型,你須要讓用戶可以建立迭代,而後在迭代裏面加入處理邏輯、跳出邏輯……工具

那麼,有沒有方式可以保持管道模型的簡潔性,同時可以避免計算浪費呢?有,那就是惰性數據流。惰性數據流是一種特殊的數據序列(與之相對應的,數組是一種常見的數據序列),它只有在真正須要的時候纔會計算出下一項的值。下面是一個例子,getRangeIterator能夠返回一個惰性數據流,它的數據就是依次返回[min,max]的整數。測試

function getRangeIterator(min, max) {
  return () => {
    const done = min > max;
    if (done) return { done: true };
    return {
      done: false,
      val: min,
      next: getRangeIterator(min + 1, max)
    };
  };
}

若是你不適應這種函數式的編程,能夠經過顯式維護一個狀態變量來記錄數據流狀態:

function getRangeIterator2(min, max) {
  let current = min;
  return function next() {
    const done = current > max;
    if (done) return { done: true };
    return {
      done: false,
      val: current++
    };
  };
}

下面的文章都會使用函數式的風格,不會使用帶反作用的賦值。你能夠嘗試實現過程式風格的版本,對比哪一種編程風格更加簡潔。
爲了測試咱們的惰性數據流,咱們寫一個工具函數來打印數據流中的全部數據:

function iteratorAll(it, cb) {
  const { done, val, next } = it(); // 計算下一個數據
  if (done) return;
  cb(val);
  iteratorAll(next, cb);
}

iteratorAll(getRangeIterator(20, 30), console.log);
// 會依次打印出20~30中的全部數字

使用惰性數據流來進行管道式的數據處理,完整代碼:

// 搭建數據流處理管道
const it = takeItUntil(
  // 第三道處理(中止邏輯)
  filterIt(
    // 第二道處理
    filterIt(
      // 第一道處理
      getRangeIterator(1, 1000), // 數據源
      x => x % 2 === 0
    ),
    x => x % 3 === 0
  ),
  (x, idx) => idx >= 10
);
// 打印數據流的最後一項
console.log(takeLast(it)); // 60

// 生成順序數字流的方法
function getRangeIterator(min, max) {
  return () => {
    const done = min > max;
    if (done) return { done: true };
    return {
      done: false,
      val: min,
      next: getRangeIterator(min + 1, max)
    };
  };
}
// 流處理方法,輸入是流,輸出也是流
function filterIt(it, cb) {
  return () => {
    const { done, val, next } = it();
    if (done)
      return {
        done: true
      };
    if (cb(val))
      return {
        done: false,
        val,
        next: filterIt(next, cb)
      };
    return filterIt(next, cb)();
  };
}

function takeItUntil(it, cb, currentIdx = 0) {
  return () => {
    const { done, val, next } = it();
    if (done || cb(val, currentIdx)) return { done: true };
    return {
      done: false,
      val,
      next: takeItUntil(next, cb, currentIdx + 1)
    };
  };
}

// 這個不是流處理,而是消費流的方法,由於它輸出的是數字而不是流
function takeLast(it) {
  const { done, val, next } = it();
  if (done) throw new Error("can't takeLast from empty stream");
  return helper(next, val);

  function helper(it, pre) {
    const { done, val, next } = it();
    if (done) return pre;
    return helper(next, val);
  }
}

下半部分定義的那些方法,所有都是能夠複用的流處理工具。
上半部分定義的流處理管道,實際是結構很是一致的,一層套一層,從內到外的處理管道。

鏈式調用

咱們還能夠優化一下流處理管道的組合方式,把嵌套的組合方式改爲鏈式調用的組合方式,更符合人的閱讀習慣:

// 搭建數據流處理管道
const it = chain(getRangeIterator(1, 1000)) // 數據源
  .pipe(filter(x => x % 2 === 0)) // 第一道處理
  .pipe(filter(x => x % 3 === 0)) // 第二道處理
  .pipe(takeUntil((x, idx) => idx >= 10)) // 第三道處理
  .unWrap();

// 打印數據流的最後一項
console.log(takeLast(it)); // 60

// 生成順序數字流的方法
function getRangeIterator(min, max) {
  return () => {
    const done = min > max;
    if (done) return { done: true };
    return {
      done: false,
      val: min,
      next: getRangeIterator(min + 1, max)
    };
  };
}
// 流處理方法配置方法
function filter(cb) {
  // 返回流處理函數
  return function _filter(it) {
    // 流處理函數輸入一個流,返回一個流
    return () => {
      const { done, val, next } = it();
      if (done)
        return {
          done: true
        };
      if (cb(val))
        return {
          done: false,
          val,
          next: _filter(next)
        };
      // val被filter掉,從流中拿出下一個值
      return _filter(next)();
    };
  };
}

function takeUntil(cb) {
  return it => _takeUntil(it, 0);
  function _takeUntil(it, currentIdx) {
    return () => {
      const { done, val, next } = it();
      if (done || cb(val, currentIdx)) return { done: true };
      return {
        done: false,
        val,
        next: _takeUntil(next, currentIdx + 1)
      };
    };
  }
}

// takeLast不是流處理,而是消費流的方法(相似於前面的iteratorAll),由於它輸出的不是流
function takeLast(it) {
  const { done, val, next } = it();
  if (done) throw new Error("can't takeLast from empty stream");
  return helper(next, val);

  function helper(it, pre) {
    const { done, val, next } = it();
    if (done) return pre;
    return helper(next, val);
  }
}

function chain(it) {
  return {
    pipe: transformer => chain(transformer(it)),
    unWrap: () => it
  };
}

能夠看到,鏈式調用的惰性數據流處理,與文章最前面的數組鏈式調用同樣簡潔。

特性一:計算關係的創建與實際計算工做分離

【計算關係的創建】與【實際計算工做】的時機分離,這個是惰性數據流與普通數組處理的最大不一樣:

// 搭建數據流處理管道
const it = chain(getRangeIterator(1, 1000)) // 數據源
  .pipe(filter(x => x % 2 === 0)) // 第一道處理
  .pipe(filter(x => x % 3 === 0)) // 第二道處理
  .pipe(takeUntil((x, idx) => idx >= 10)) // 第三道處理
  .unWrap();

// 這個時候實際尚未任何數據拉取、處理,只是創建了計算關係

// 消費下游數據的時候,纔會把上游數據抽出來
console.log(takeLast(it));

創建計算關係的代碼不會消費和處理數據。
而普通的數組處理則作不到這一點,創建計算關係的代碼會馬上完成全部工做:

Array.from(Array(1000)).map((x,i)=>i+1)
    .filter(x=>x%2===0) // 前面已經完成全部數據的計算
    .filter(x=>x%3===0) // 前面已經完成全部數據的計算
    [9]                 // 前面已經完成全部數據的計算

若是你仔細觀察,會發現不少大型計算系統都是先定義好計算關係,而後才真正開始計算的。好比TensorFlow

model = tf.keras.models.Sequential([
  tf.keras.layers.Flatten(input_shape=(28, 28)),
  tf.keras.layers.Dense(128, activation='relu'),
  tf.keras.layers.Dropout(0.2),
  tf.keras.layers.Dense(10)
])

特性二:惰性數據流能夠是無窮多的

普通的(急切的)數據流是提早準備好全部數據,所以你沒法表示和處理無窮大的數據序列(好比斐波那契數列)。

而惰性數據流則是至關於只保存數列每一項的計算方法,在須要的時候才計算出來。這就是找規律和死記硬背的區別~
舉個例子,斐波那契數據流:

// 生成順序斐波那契流的方法
function getFibStream() {
  return helper(0, 1);

  function helper(pre2, pre1) {
    return () => {
      const val = pre2 + pre1;
      return {
        done: false,
        val,
        next: helper(pre1, val)
      };
    };
  }
}

上游數據流有無窮多個,是常常發生的事情,只要下游轉換流、消費者會中止,程序就會終止。舉個例子,打印1000之內的斐波那契數:

// 搭建數據流處理管道
const it = chain(getFibStream())
  .pipe(takeUntil(x => x >= 1000))
  .unWrap();

// 打印小於1000的斐波那契數
iteratorAll(it, console.log);

// 生成順序斐波那契流的方法
function getFibStream() {
  return helper(0, 1);

  function helper(pre2, pre1) {
    return () => {
      const val = pre2 + pre1;
      return {
        done: false,
        val,
        next: helper(pre1, val)
      };
    };
  }
}

function takeUntil(cb) {
  return it => _takeUntil(it, 0);
  function _takeUntil(it, currentIdx) {
    return () => {
      const { done, val, next } = it();
      if (done || cb(val, currentIdx)) return { done: true };
      return {
        done: false,
        val,
        next: _takeUntil(next, currentIdx + 1)
      };
    };
  }
}

function iteratorAll(it, cb) {
  const { done, val, next } = it();
  if (done) return;
  cb(val);
  iteratorAll(next, cb);
}

function chain(it) {
  return {
    pipe: transformer => chain(transformer(it)),
    unWrap: () => it
  };
}

敬請期待

相關文章
相關標籤/搜索