歡迎來個人博客閱讀:《Node.js源碼解析-Writable實現》node
對於一個 stream 模塊來講,最基本的就是讀和寫。讀由 Readable 負責,寫則是由 Writable 負責。Readable 的實現上篇文章已經介紹過了,這篇介紹 Writable 的實現git
在開始以前,先看看 Writable 的構造函數github
// lib/_stream_readable.js function Writable(options) { // 只能在 Writable 或 Duplex 的上下文中執行 if (!(realHasInstance.call(Writable, this)) && !(this instanceof Stream.Duplex)) { return new Writable(options); } // Wrirable 的狀態集 this._writableState = new WritableState(options, this); // legacy this.writable = true; if (options) { if (typeof options.write === 'function') this._write = options.write; if (typeof options.writev === 'function') this._writev = options.writev; if (typeof options.destroy === 'function') this._destroy = options.destroy; if (typeof options.final === 'function') this._final = options.final; } Stream.call(this); } function WritableState(options, stream) { options = options || {}; // object 模式標識 this.objectMode = !!options.objectMode; if (stream instanceof Stream.Duplex) this.objectMode = this.objectMode || !!options.writableObjectMode; var hwm = options.highWaterMark; var defaultHwm = this.objectMode ? 16 : 16 * 1024; this.highWaterMark = (hwm || hwm === 0) ? hwm : defaultHwm; // highWaterMark 高水位標識 // 此時,write() 返回 false // 默認 16k this.highWaterMark = Math.floor(this.highWaterMark); this.finalCalled = false; // drain 事件標識 this.needDrain = false; // 剛調用 end() 時的狀態標識 this.ending = false; // end() 調用完成後的狀態標識 this.ended = false; this.finished = false; this.destroyed = false; var noDecode = options.decodeStrings === false; // 數據寫入前,是否應該將 string 解析爲 buffer this.decodeStrings = !noDecode; this.defaultEncoding = options.defaultEncoding || 'utf8'; // 緩存中的數據長度 // 不是真正的 buffer 長度,而是正在等待寫入的數據長度 this.length = 0; // writing 標識 this.writing = false; // cork 標識 this.corked = 0; // 標識是否有異步回調 this.sync = true; // 標識正在寫入緩存中的內容 this.bufferProcessing = false; // _write() 和 _writev() 函數的回調 this.onwrite = onwrite.bind(undefined, stream); // 調用 write(chunk, cb) 時的回調函數 this.writecb = null; // 需寫入的單個 chunk 塊長度 this.writelen = 0; // Writable 的緩衝池實現也是一個鏈表,其每一個節點的結構以下: // { // chunk, // encoding, // isBuf, // callback, // next // } // 緩衝池頭節點 this.bufferedRequest = null; // 緩衝池尾節點 this.lastBufferedRequest = null; // 緩衝池的大小 this.bufferedRequestCount = 0; // 還須要執行的 callback 數量,必須在 finish 事件發生以前降爲 0 this.pendingcb = 0; this.prefinished = false; this.errorEmitted = false; // cork 的回調函數,最多隻能有兩個函數對象 var corkReq = { next: null, entry: null, finish: undefined }; corkReq.finish = onCorkedFinish.bind(undefined, corkReq, this); this.corkedRequestsFree = corkReq; }
Writable 與 Readable 相似,也使用一個對象 ( WritableState ) 對狀態和屬性進行集中管理數組
在 Writable 的構造函數參數中,options.write
函數是必須的,options.writev
則是用於批量寫入數據,能夠選擇實現緩存
Writable 的緩衝池也是由鏈表實現,但與 Readable 不一樣的是,Writable 的緩衝池實現更簡單,其節點結構以下:異步
{ chunk, // 數據塊,多是 object / string / buffer encoding, // 數據塊編碼 isBuf, // buffer 標識 callback, // write(chunk, cb) 的回調函數 next // 下一個寫入任務 }
除此以外,WritableState 還使用 bufferedRequest、lastBufferedRequest、bufferedRequestCount 屬性分別記錄緩衝池的頭、尾節點和節點數量函數
在 Duplex 的源碼中有這麼一段註釋this
// a duplex stream is just a stream that is both readable and writable. // Since JS doesn't have multiple prototypal inheritance, this class // prototypally inherits from Readable, and then parasitically from // Writable.
意思是: Duplex 流既是 Readable 流又是 Writable 流,可是因爲 JS 中的繼承是基於原型的,沒有多繼承。因此 Duplex 是繼承自 Readable,寄生自 Writable編碼
寄生自 Writable 體如今兩個方面:prototype
duplex instanceof Writable
爲 true
duplex 具備 Writable 的屬性和方法
// lib/_stream_writable.js var realHasInstance; if (typeof Symbol === 'function' && Symbol.hasInstance) { realHasInstance = Function.prototype[Symbol.hasInstance]; Object.defineProperty(Writable, Symbol.hasInstance, { value: function(object) { if (realHasInstance.call(this, object)) return true; return object && object._writableState instanceof WritableState; } }); } else { realHasInstance = function(object) { return object instanceof this; }; } function Writable(options) { if (!(realHasInstance.call(Writable, this)) && !(this instanceof Stream.Duplex)) { return new Writable(options); } // ... }
能夠看出,經過修改 Writable 的 Symbol.hasInstance
使得 duplex/writable instanceof Writable
爲 true
。Writable 的構造函數也只能在 writable 或 duplex 的上下文中調用,使 duplex 具備 Writable 的屬性
// lib/_stream_duplex.js util.inherits(Duplex, Readable); var keys = Object.keys(Writable.prototype); // 獲取 Writable 的全部方法 for (var v = 0; v < keys.length; v++) { var method = keys[v]; if (!Duplex.prototype[method]) Duplex.prototype[method] = Writable.prototype[method]; } function Duplex(options) { if (!(this instanceof Duplex)) return new Duplex(options); Readable.call(this, options); Writable.call(this, options); // ... }
遍歷 Writable 原型上的方法,並添加到 Duplex 的原型上,使 duplex 具備 Writable 的方法
Writable 的寫過程比 Readable 的讀過程簡單得多,不用考慮異步操做,直接寫入便可
// lib/_stream_writable.js Writable.prototype.write = function(chunk, encoding, cb) { var state = this._writableState; var ret = false; // 判斷是不是 buffer var isBuf = Stream._isUint8Array(chunk) && !state.objectMode; // ... if (state.ended) // Writable 結束後繼續寫入數據會報錯 // 觸發 error 事件 writeAfterEnd(this, cb); else if (isBuf || validChunk(this, state, chunk, cb)) { // 對 chunk 進行校驗 // 不能爲 null // undefined 和非字符串只在 objectMode 下接受 state.pendingcb++; ret = writeOrBuffer(this, state, isBuf, chunk, encoding, cb); } return ret; };
write()
函數對傳入數據進行初步處理與校驗後交由 writeOrBuffer()
函數繼續處理
// lib/_stream_writable.js function writeOrBuffer(stream, state, isBuf, chunk, encoding, cb) { if (!isBuf) { // 非 buffer 的狀況有: string 或 object // 爲 object,表明是 objectMode,直接返回便可 // 爲 string,則需解碼成 buffer var newChunk = decodeChunk(state, chunk, encoding); if (chunk !== newChunk) { isBuf = true; encoding = 'buffer'; chunk = newChunk; } } var len = state.objectMode ? 1 : chunk.length; state.length += len; var ret = state.length < state.highWaterMark; if (!ret) state.needDrain = true; if (state.writing || state.corked) { // 正在寫或處於 cork 狀態 // 將數據塊添加到緩衝池鏈表爲尾部 var last = state.lastBufferedRequest; state.lastBufferedRequest = { chunk, encoding, isBuf, callback: cb, next: null }; if (last) { last.next = state.lastBufferedRequest; } else { state.bufferedRequest = state.lastBufferedRequest; } state.bufferedRequestCount += 1; } else { // 寫入數據 doWrite(stream, state, false, len, chunk, encoding, cb); } return ret; }
writeOrBuffer()
函數對 chunk 進行處理後,根據 Writable 自身狀態決定應什麼時候寫入數據
若是正在寫入或處於 cork 狀態,就將數據存儲到緩衝池鏈表尾部,等待之後處理。不然,直接調用 doWrite()
寫入數據
當緩存達到 highWaterMark 時,writeOrBuffer()
返回 false,表示不該該再寫入數據
// lib/_stream_writable.js function doWrite(stream, state, writev, len, chunk, encoding, cb) { state.writelen = len; // 寫入的數據塊長度 state.writecb = cb; // 寫入操做的回調函數 state.writing = true; state.sync = true; // 同步狀態標識 if (writev) // 一次寫入多個數據塊 stream._writev(chunk, state.onwrite); else // 一次寫入單個數據塊 stream._write(chunk, encoding, state.onwrite); state.sync = false; }
在 doWrite()
函數中,根據 writev 參數決定該執行 _write()
仍是 _writev()
_write()
函數用於寫入單個數據塊,_writev()
函數用於寫入多個數據塊
_write()
/ _writev()
中的回調函數不是傳入的 cb 而是 state.onwrite
,其定義以下:
this.onwrite = onwrite.bind(undefined, stream);
可知,寫入完成後,執行 onwrite(stream, err)
// lib/_stream_writable.js function onwrite(stream, er) { var state = stream._writableState; var sync = state.sync; var cb = state.writecb; // 更新 state onwriteStateUpdate(state); if (er) // 發生錯誤 onwriteError(stream, state, sync, er, cb); else { var finished = needFinish(state); if (!finished && !state.corked && !state.bufferProcessing && state.bufferedRequest) { // 清空緩衝池 // 有 _writev() 函數時,執行 _writev() 一次寫入多個數據塊 // 沒有,則循環執行 _write() 寫入單個數據塊 clearBuffer(stream, state); } if (sync) { // 表明寫入操做是同步的,須要在 process.nextTick() 中執行 callback process.nextTick(afterWrite, stream, state, finished, cb); } else { // 表明寫入操做是異步的,直接執行 callback 便可 afterWrite(stream, state, finished, cb); } } }
當寫入過程當中有錯誤發生時,會執行 onwriteError()
,繼而調用 cb(err)
並觸發 error 事件
若是寫入過程正確執行,則先查看還有多少數據塊正在等待寫入,有多個,就執行 clearBuffer()
清空緩存,而後執行 afterWrite()
// lib/_stream_writable.js function afterWrite(stream, state, finished, cb) { if (!finished) onwriteDrain(stream, state); state.pendingcb--; // 執行回調函數 cb(); // 檢查是否應該結束 Writable finishMaybe(stream, state); }
當有大量小數據塊須要寫入時,若是一個個寫入,會致使效率低下。Writable 提供了 cork()
和 uncork()
兩個方法用於大量小數據塊寫入的狀況
先將寫操做柱塞住,等到緩存達到必定量後,再解除柱塞,而後一次性將存儲的數據塊寫入,這個操做須要 _writev()
支持
// lib/_stream_writable.js Writable.prototype.cork = function() { var state = this._writableState; // 增長柱塞的次數 state.corked++; };
因爲 cork()
函數可能會被屢次調用,因此 state.corked
須要記錄 cork()
調用的次數,是個 number
// lib/_stream_writable.js Writable.prototype.uncork = function() { var state = this._writableState; if (state.corked) { // 減小柱塞的次數 state.corked--; if (!state.writing && !state.corked && !state.finished && !state.bufferProcessing && state.bufferedRequest) // 清空緩衝池 clearBuffer(this, state); } };
當 state.corked === 0
時,才能表示柱塞已經所有解除完畢,能夠執行 clearBuffer()
來處理緩存中的數據
// lib/_stream_writable.js function clearBuffer(stream, state) { // 正在清空 buffer 的標識 state.bufferProcessing = true; // 緩存的頭節點 var entry = state.bufferedRequest; if (stream._writev && entry && entry.next) { // _writev() 函數存在,且有一個以上數據塊,就使用 _writev() 寫入數據,效率更高 var l = state.bufferedRequestCount; var buffer = new Array(l); var holder = state.corkedRequestsFree; holder.entry = entry; var count = 0; var allBuffers = true; // 取得全部數據塊 while (entry) { buffer[count] = entry; if (!entry.isBuf) allBuffers = false; entry = entry.next; count += 1; } buffer.allBuffers = allBuffers; // 寫入數據 doWrite(stream, state, true, state.length, buffer, '', holder.finish); state.pendingcb++; state.lastBufferedRequest = null; // 保證最多隻有兩個實例 if (holder.next) { state.corkedRequestsFree = holder.next; holder.next = null; } else { var corkReq = { next: null, entry: null, finish: undefined }; corkReq.finish = onCorkedFinish.bind(undefined, corkReq, state); state.corkedRequestsFree = corkReq; } } else { // 一個個的寫入 while (entry) { var chunk = entry.chunk; var encoding = entry.encoding; var cb = entry.callback; var len = state.objectMode ? 1 : chunk.length; doWrite(stream, state, false, len, chunk, encoding, cb); entry = entry.next; // 若是寫操做不是同步執行的,就意味着須要等待這次寫入完成,再繼續寫入 if (state.writing) { break; } } if (entry === null) state.lastBufferedRequest = null; } state.bufferedRequestCount = 0; // 修正緩存的頭節點 state.bufferedRequest = entry; state.bufferProcessing = false; }
執行 clearBuffer()
時,根據是否有 _writev()
函數和待寫入數據塊數量,決定使用 _writev()
仍是 _write()
寫入數據
_writev()
: 會先將全部數據塊包裝成數組,而後寫入。寫入完成後,回調 corkReq.finish
_write()
: 只須要將數據塊一個個寫入便可
在使用 _writev()
的狀況下,寫入完成後回調 corkReq.finish
也就是 onCorkedFinish()
函數
// lib/_stream_writable.js function onCorkedFinish(corkReq, state, err) { var entry = corkReq.entry; corkReq.entry = null; // 依次執行回調函數 while (entry) { var cb = entry.callback; state.pendingcb--; cb(err); entry = entry.next; } // 保證最多隻有兩個實例 if (state.corkedRequestsFree) { state.corkedRequestsFree.next = corkReq; } else { state.corkedRequestsFree = corkReq; } }
根據緩衝池鏈表的順序,依次執行寫操做的回調函數
每次調用 stream.write(chunk, cb)
,Writable 都會根據自身狀態,決定是將 chunk 加到緩衝池,仍是直接寫入
當須要寫入大量小數據塊時,推薦先使用 cork()
將寫操做柱塞住,待調用完畢後,再調用 uncork()
解除柱塞,而後一次性寫入全部緩存數據
參考: