在 Node.js 中用 pipe 處理數組的實現

TLDR;
這篇文章的風格是在致敬 Jim 老師;致敬,致敬,懂嗎,不是抄襲,程序員的事怎麼能叫抄襲。
固然我對 Node.js 的 stream 也是現學現賣,有使用不當的地方,敬請指出。
原文連接 歡迎 star。javascript

寫這篇文章的初衷是年前看 SICP 的時候,第二章介紹構造數據抽象的時候有提到 Lisp 對序列的處理採用相似『信號流』的方式。因此很天然的就想到了 Node.js 中的 pipe 方式,因而就一直想用 pipe 的方式嘗試一下。java

同 Jim 老師的這篇 文章 中描述的同樣, 我也是懶癌發做,從年尾拖到今年年初,而後在年初又看到了 Jim 老師 的博客,深受啓發,終於下定決心要開始碼了...... 而後,嗯,又拖到昨天。促使我下定決心要寫的主要緣由是昨天部門的年會!反正年會跟我這種死肥宅也沒多大關係,在你們 happy 的時候構思了下代碼實現,回家用了一夜的時候補上了代碼。node

Jim 老師在他的文章裏面也說了,JS 的那些數組操做 (map/ reduce/filter) 啥的,每次調用的時候都會進行一次完整的遍歷。試想一下若是有一個第一個數是1,長度是 1億 的遞增爲 1 的數組,須要把全部的數組都乘 3,再排除其中的奇數,若是用 (map/filter) 的方法,只要也須要循環 一億五千萬次;那麼若是有其餘辦法能只循環一億次,是否是節省了大量的內存資源和循環消耗的時間。git

廢話很少說,直接上代碼吧。程序員

pipe

在編寫代碼時,咱們應該有一些方法將程序像鏈接水管同樣鏈接起來 -- 當咱們須要獲取一些數據時,能夠去經過"擰"其餘的部分來達到目的。這也應該是IO應有的方式。 -- Doug McIlroy. October 11, 1964github

關於 node 的 stream 能夠看看這篇 文章數組

下面是代碼部分,整個代碼我是在邊學 pipe 邊用一夜的時間倉促寫就的,懶癌發做,也不想再重構了,各位相公講究看吧,求別噴代碼。app

入口

const stream = require('stream')

const last = Symbol()

// 在 selfArray 中接收一個真正的數組
// 返回一個可讀流
// 若是再作的精細點,能夠作成可讀可寫流,這樣就能經過控制流的大小,來控制內存的大小,別幾億條數據直接撐爆內存了
// 不過對後面 reduce 的處理就比較麻煩
function selfArray(a) {
  const rs = new stream.Readable({
    objectMode: true
  })

  a.forEach((v, index) => {
    rs.push(v)
  })
  rs.push(last)
  rs.push(null)
  return rs
}

上面的 selfArray 在流的最後面 push 了一個 Symbol 對象來標誌整個流的輸入結束,留待爲以後 reduce 的使用。性能

Map/Filter/Reduce 的實現

function forEach(callback) {
  const ws = new stream.Writable({
    objectMode: true
  })
  let index = 0

  ws._write = function (chunk, enc, next) {
    if (chunk !== last) {
      callback(chunk, index++)
      next()
    }
  }

  return ws
}

function filter(callback) {
  const trans = new stream.Transform({
    readableObjectMode: true,
    writableObjectMode: true
  })

  let index = 0

  trans._transform = function (chunk, enc, next) {
    if (chunk === last) {
      next(null, last)
    } else {
      let condition = callback(chunk, index++)
      if (condition) {
        this.push(chunk)
      }
      next()
    }
  }
  return trans
}

function map(callback) {
  const trans = new stream.Transform({
    readableObjectMode: true,
    writableObjectMode: true
  })
  let index = 0
  trans._transform = function (chunk, enc, next) {
    if (chunk === last) {
      next(null, last)
    } else {
      next(null, callback(chunk, index++))
    }
  }
  return trans
}

function reduce(callback, initial) {
  const trans = new stream.Transform({
    readableObjectMode: true,
    writableObjectMode: true
  })

  let index = 0,
    current = initial,
    prev = initial


  trans._transform = function (chunk, enc, next) {

    if (chunk === last) {
      if (index > 1) {
        prev = callback(prev, current, index - 1)
      }
      this.push(prev)
      this.push(last)
      return next(null, last)
    }

    if (initial === void 0 && index === 0) {
      prev = chunk
    }

    if (index > 0) {
      prev = callback(prev, current, index - 1)
    }

    current = chunk
    index++
    next()
  }

  return trans
}

上面的代碼在 reduce 的實現稍微麻煩了一些,reduce 對沒有初始值,原始數組爲空的條件下有各類不一樣的處理狀況,翻看了下 MDN 的解釋又本身實現了下。測試

使用

selfArray([9, 2, 6, 3, 5, 6, 7, 1, 4, 4])
  .pipe(map(v => v * 3))
  .pipe(filter(v => v % 2))
  .pipe(reduce((p, c) => p + c, 0))
  .pipe(forEach(v => {
    console.log('pipe 計算最後的結果是:', v)
  }))

爲了好看我故意把各類括號都刪掉了。嗯,看起來還挺完美,咱們來測試下

selfArray([9, 2, 6, 3, 5, 6, 7, 1, 4, 4])
  .pipe(map(v => {
    console.log('map:', v)
    return v * 3
  }))
  .pipe(filter(v => {
    console.log('filter:', v)
    return v % 2
  }))
  .pipe(reduce((p, c) => {
    console.log('reduce:', p, c)
    return p + c
  }, 0))
  .pipe(forEach(v => {
    console.log('pipe 計算最後的結果是:', v)
  }))
  
  
加上 log 以後能夠看到結算結果是:
  
map: 9
filter: 27
map: 2
filter: 6
map: 6
filter: 18
map: 3
filter: 9
reduce: 0 27
map: 5
filter: 15
reduce: 27 9
map: 6
filter: 18
map: 7
filter: 21
reduce: 36 15
map: 1
filter: 3
reduce: 51 21
map: 4
filter: 12
map: 4
filter: 12
reduce: 72 3
pipe 計算最後的結果是: 75

從上面的 log 能夠看到, 第一個數 9 先執行了 map,而後在 3 以後就直接進入了 filter,此時第 2 個數 2 也開始被 map 處理,而後被 filter 處理,可是因爲 3 以後是偶數不會被 reduce 接收, reduce 會一直等到第二個奇數,也就是 3 進入以後纔會被處理... 嗯,直到最終的計算結果是 75, 被 forEach 消耗。

總結

雖然我沒有像 Jim 老師同樣進行性能測試,可是猜想也知道 pipe 的方式在數量比較小的時候確定要弱於正常方式,pipe 的好處在於數據量比較大的時候,可使用比較小的內存,儘快的處理數組中前置的數據。

相關文章
相關標籤/搜索