歡迎來個人博客閱讀:《Node.js源碼解析-pipe實現》node
從前面兩篇文章,咱們瞭解到。想要把 Readable 的數據寫到 Writable,就必須先手動的將數據讀入內存,而後寫入 Writable。換句話說,每次傳遞數據時,都須要寫以下的模板代碼git
readable.on('readable', (err) => { if(err) throw err writable.write(readable.read()) })
爲了方便使用,Node.js 提供了 pipe()
方法,讓咱們能夠優雅的傳遞數據github
readable.pipe(writable)
如今,就讓咱們來看看它是如何實現的吧函數
首先須要先調用 Readable 的 pipe()
方法this
// lib/_stream_readable.js Readable.prototype.pipe = function(dest, pipeOpts) { var src = this; var state = this._readableState; // 記錄 Writable switch (state.pipesCount) { case 0: state.pipes = dest; break; case 1: state.pipes = [state.pipes, dest]; break; default: state.pipes.push(dest); break; } state.pipesCount += 1; // ... src.once('end', endFn); dest.on('unpipe', onunpipe); // ... dest.on('drain', ondrain); // ... src.on('data', ondata); // ... // 保證 error 事件觸發時,onerror 首先被執行 prependListener(dest, 'error', onerror); // ... dest.once('close', onclose); // ... dest.once('finish', onfinish); // ... // 觸發 Writable 的 pipe 事件 dest.emit('pipe', src); // 將 Readable 改成 flow 模式 if (!state.flowing) { debug('pipe resume'); src.resume(); } return dest; };
執行 pipe()
函數時,首先將 Writable 記錄到 state.pipes
中,而後綁定相關事件,最後若是 Readable 不是 flow 模式,就調用 resume()
將 Readable 改成 flow 模式prototype
Readable 從數據源獲取到數據後,觸發 data 事件,執行 ondata()
debug
ondata()
相關代碼:code
// lib/_stream_readable.js // 防止在 dest.write(chunk) 內調用 src.push(chunk) 形成 awaitDrain 重複增長,awaitDrain 不能清零,Readable 卡住的狀況 // 詳情見 https://github.com/nodejs/node/issues/7278 var increasedAwaitDrain = false; function ondata(chunk) { debug('ondata'); increasedAwaitDrain = false; var ret = dest.write(chunk); if (false === ret && !increasedAwaitDrain) { // 防止在 dest.write() 內調用 src.unpipe(dest),致使 awaitDrain 不能清零,Readable 卡住的狀況 if (((state.pipesCount === 1 && state.pipes === dest) || (state.pipesCount > 1 && state.pipes.indexOf(dest) !== -1) ) && !cleanedUp) { debug('false write response, pause', src._readableState.awaitDrain); src._readableState.awaitDrain++; increasedAwaitDrain = true; } // 進入 pause 模式 src.pause(); } }
在 ondata(chunk)
函數內,經過 dest.write(chunk)
將數據寫入 Writable事件
此時,在 _write()
內部可能會調用 src.push(chunk)
或使其 unpipe,這會致使 awaitDrain 屢次增長,不能清零,Readable 卡住ip
當不能再向 Writable 寫入數據時,Readable 會進入 pause 模式,直到全部的 drain 事件觸發
觸發 drain 事件,執行 ondrain()
// lib/_stream_readable.js var ondrain = pipeOnDrain(src); function pipeOnDrain(src) { return function() { var state = src._readableState; debug('pipeOnDrain', state.awaitDrain); if (state.awaitDrain) state.awaitDrain--; // awaitDrain === 0,且有 data 監聽器 if (state.awaitDrain === 0 && EE.listenerCount(src, 'data')) { state.flowing = true; flow(src); } }; }
每一個 drain 事件觸發時,都會減小 awaitDrain,直到 awaitDrain 爲 0。此時,調用 flow(src)
,使 Readable 進入 flow 模式
到這裏,整個數據傳遞循環已經創建,數據會順着循環源源不斷的流入 Writable,直到全部數據寫入完成
無論寫入過程當中是否出現錯誤,最後都會執行 unpipe()
// lib/_stream_readable.js // ... function unpipe() { debug('unpipe'); src.unpipe(dest); } // ... Readable.prototype.unpipe = function(dest) { var state = this._readableState; var unpipeInfo = { hasUnpiped: false }; // 啥也沒有 if (state.pipesCount === 0) return this; // 只有一個 if (state.pipesCount === 1) { if (dest && dest !== state.pipes) return this; // 沒有指定就 unpipe 全部 if (!dest) dest = state.pipes; state.pipes = null; state.pipesCount = 0; state.flowing = false; if (dest) dest.emit('unpipe', this, unpipeInfo); return this; } // 沒有指定就 unpipe 全部 if (!dest) { var dests = state.pipes; var len = state.pipesCount; state.pipes = null; state.pipesCount = 0; state.flowing = false; for (var i = 0; i < len; i++) dests[i].emit('unpipe', this, unpipeInfo); return this; } // 找到指定 Writable,並 unpipe var index = state.pipes.indexOf(dest); if (index === -1) return this; state.pipes.splice(index, 1); state.pipesCount -= 1; if (state.pipesCount === 1) state.pipes = state.pipes[0]; dest.emit('unpipe', this, unpipeInfo); return this; };
Readable.prototype.unpipe()
函數會根據 state.pipes
屬性和 dest 參數,選擇執行策略。最後會觸發 dest 的 unpipe 事件
unpipe 事件觸發後,調用 onunpipe()
,清理相關數據
// lib/_stream_readable.js function onunpipe(readable, unpipeInfo) { debug('onunpipe'); if (readable === src) { if (unpipeInfo && unpipeInfo.hasUnpiped === false) { unpipeInfo.hasUnpiped = true; // 清理相關數據 cleanup(); } } }
在整個 pipe 的過程當中,Readable 是主動方 ( 負責整個 pipe 過程:包括數據傳遞、unpipe 與異常處理 ),Writable 是被動方 ( 只須要觸發 drain 事件 )
總結一下 pipe 的過程:
首先執行 readbable.pipe(writable)
,將 readable 與 writable 對接上
當 readable 中有數據時,readable.emit('data')
,將數據寫入 writable
若是 writable.write(chunk)
返回 false,則進入 pause 模式,等待 drain 事件觸發
drain 事件所有觸發後,再次進入 flow 模式,寫入數據
無論數據寫入完成或發生中斷,最後都會調用 unpipe()
unpipe()
調用 Readable.prototype.unpipe()
,觸發 dest 的 unpipe 事件,清理相關數據
參考: