TLDR;
這篇文章的風格是在致敬 Jim 老師;致敬,致敬,懂嗎,不是抄襲,程序員的事怎麼能叫抄襲。
固然我對 Node.js 的 stream 也是現學現賣,有使用不當的地方,敬請指出。
原文連接 歡迎 star。javascript
寫這篇文章的初衷是年前看 SICP 的時候,第二章介紹構造數據抽象的時候有提到 Lisp 對序列的處理採用相似『信號流』的方式。因此很天然的就想到了 Node.js 中的 pipe 方式,因而就一直想用 pipe 的方式嘗試一下。java
同 Jim 老師的這篇 文章 中描述的同樣, 我也是懶癌發做,從年尾拖到今年年初,而後在年初又看到了 Jim 老師 的博客,深受啓發,終於下定決心要開始碼了...... 而後,嗯,又拖到昨天。促使我下定決心要寫的主要緣由是昨天部門的年會!反正年會跟我這種死肥宅也沒多大關係,在你們 happy 的時候構思了下代碼實現,回家用了一夜的時候補上了代碼。node
Jim 老師在他的文章裏面也說了,JS 的那些數組操做 (map
/ reduce
/filter
) 啥的,每次調用的時候都會進行一次完整的遍歷。試想一下若是有一個第一個數是1,長度是 1億 的遞增爲 1 的數組,須要把全部的數組都乘 3,再排除其中的奇數,若是用 (map
/filter
) 的方法,只要也須要循環 一億五千萬次;那麼若是有其餘辦法能只循環一億次,是否是節省了大量的內存資源和循環消耗的時間。git
廢話很少說,直接上代碼吧。程序員
在編寫代碼時,咱們應該有一些方法將程序像鏈接水管同樣鏈接起來 -- 當咱們須要獲取一些數據時,能夠去經過"擰"其餘的部分來達到目的。這也應該是IO應有的方式。 -- Doug McIlroy. October 11, 1964github
關於 node 的 stream 能夠看看這篇 文章。數組
下面是代碼部分,整個代碼我是在邊學 pipe 邊用一夜的時間倉促寫就的,懶癌發做,也不想再重構了,各位相公講究看吧,求別噴代碼。app
const stream = require('stream') const last = Symbol() // 在 selfArray 中接收一個真正的數組 // 返回一個可讀流 // 若是再作的精細點,能夠作成可讀可寫流,這樣就能經過控制流的大小,來控制內存的大小,別幾億條數據直接撐爆內存了 // 不過對後面 reduce 的處理就比較麻煩 function selfArray(a) { const rs = new stream.Readable({ objectMode: true }) a.forEach((v, index) => { rs.push(v) }) rs.push(last) rs.push(null) return rs }
上面的 selfArray 在流的最後面 push 了一個 Symbol 對象來標誌整個流的輸入結束,留待爲以後 reduce 的使用。性能
Map
/Filter
/Reduce
的實現function forEach(callback) { const ws = new stream.Writable({ objectMode: true }) let index = 0 ws._write = function (chunk, enc, next) { if (chunk !== last) { callback(chunk, index++) next() } } return ws } function filter(callback) { const trans = new stream.Transform({ readableObjectMode: true, writableObjectMode: true }) let index = 0 trans._transform = function (chunk, enc, next) { if (chunk === last) { next(null, last) } else { let condition = callback(chunk, index++) if (condition) { this.push(chunk) } next() } } return trans } function map(callback) { const trans = new stream.Transform({ readableObjectMode: true, writableObjectMode: true }) let index = 0 trans._transform = function (chunk, enc, next) { if (chunk === last) { next(null, last) } else { next(null, callback(chunk, index++)) } } return trans } function reduce(callback, initial) { const trans = new stream.Transform({ readableObjectMode: true, writableObjectMode: true }) let index = 0, current = initial, prev = initial trans._transform = function (chunk, enc, next) { if (chunk === last) { if (index > 1) { prev = callback(prev, current, index - 1) } this.push(prev) this.push(last) return next(null, last) } if (initial === void 0 && index === 0) { prev = chunk } if (index > 0) { prev = callback(prev, current, index - 1) } current = chunk index++ next() } return trans }
上面的代碼在 reduce 的實現稍微麻煩了一些,reduce 對沒有初始值,原始數組爲空的條件下有各類不一樣的處理狀況,翻看了下 MDN 的解釋又本身實現了下。測試
selfArray([9, 2, 6, 3, 5, 6, 7, 1, 4, 4]) .pipe(map(v => v * 3)) .pipe(filter(v => v % 2)) .pipe(reduce((p, c) => p + c, 0)) .pipe(forEach(v => { console.log('pipe 計算最後的結果是:', v) }))
爲了好看我故意把各類括號都刪掉了。嗯,看起來還挺完美,咱們來測試下
selfArray([9, 2, 6, 3, 5, 6, 7, 1, 4, 4]) .pipe(map(v => { console.log('map:', v) return v * 3 })) .pipe(filter(v => { console.log('filter:', v) return v % 2 })) .pipe(reduce((p, c) => { console.log('reduce:', p, c) return p + c }, 0)) .pipe(forEach(v => { console.log('pipe 計算最後的結果是:', v) })) 加上 log 以後能夠看到結算結果是: map: 9 filter: 27 map: 2 filter: 6 map: 6 filter: 18 map: 3 filter: 9 reduce: 0 27 map: 5 filter: 15 reduce: 27 9 map: 6 filter: 18 map: 7 filter: 21 reduce: 36 15 map: 1 filter: 3 reduce: 51 21 map: 4 filter: 12 map: 4 filter: 12 reduce: 72 3 pipe 計算最後的結果是: 75
從上面的 log 能夠看到, 第一個數 9 先執行了 map
,而後在 3 以後就直接進入了 filter
,此時第 2 個數 2 也開始被 map
處理,而後被 filter
處理,可是因爲 3 以後是偶數不會被 reduce
接收, reduce
會一直等到第二個奇數,也就是 3 進入以後纔會被處理... 嗯,直到最終的計算結果是 75, 被 forEach
消耗。
雖然我沒有像 Jim 老師同樣進行性能測試,可是猜想也知道 pipe 的方式在數量比較小的時候確定要弱於正常方式,pipe 的好處在於數據量比較大的時候,可使用比較小的內存,儘快的處理數組中前置的數據。