歡迎來個人博客閱讀:《Node.js源碼解析-Readable實現》node
想要了解 Readable 的實現,最好的方法是順着 Readable 的 Birth-Death 走一遍git
在瞭解 Readable 的 Birth-Death 以前,先看看 Readable 的構造函數github
// lib/_stream_readable.js function Readable(options) { if (!(this instanceof Readable)) return new Readable(options); // Readable 流的狀態集 this._readableState = new ReadableState(options, this); // legacy this.readable = true; if (options) { if (typeof options.read === 'function') // 真實數據來源,Readable.prototyoe._read() 函數會拋出異常,所以必須有options.read this._read = options.read; if (typeof options.destroy === 'function') this._destroy = options.destroy; } Stream.call(this); } function ReadableState(options, stream) { options = options || {}; // object 模式標識 this.objectMode = !!options.objectMode; if (stream instanceof Stream.Duplex) this.objectMode = this.objectMode || !!options.readableObjectMode; var hwm = options.highWaterMark; var defaultHwm = this.objectMode ? 16 : 16 * 1024; this.highWaterMark = (hwm || hwm === 0) ? hwm : defaultHwm; // highWaterMark 高水位標識 // 內部緩存高於 highWaterMark 時,會中止調用 _read() 獲取數據 // 默認 16k this.highWaterMark = Math.floor(this.highWaterMark); // Readable 流內部緩衝池,是一個 buffer 鏈表 // 之因此不用數組實現,是由於鏈表增刪頭尾元素更快 this.buffer = new BufferList(); // 緩存大小 this.length = 0; // pipe 的流 this.pipes = null; this.pipesCount = 0; // flow 模式標識 this.flowing = null; // Readable 狀態標識,爲 true 表示數據源已讀取完畢 // 此時 Readable 中可能還有數據,不能再向緩衝池中 push() 數據 this.ended = false; // Readable 狀態標識,爲 true 表示 end 事件已觸發 // 此時 Readable 中數據已讀取完畢,不能再向緩衝池中 push() 或 unshift() 數據 this.endEmitted = false; // Readable 狀態標識,爲 true 表示正在調用 _read() 讀取數據 this.reading = false; this.sync = true; // 標識須要觸發 readable 事件 this.needReadable = false; // 標識已觸發 readable 事件 this.emittedReadable = false; this.readableListening = false; this.resumeScheduled = false; this.destroyed = false; this.defaultEncoding = options.defaultEncoding || 'utf8'; this.awaitDrain = 0; this.readingMore = false; // 解碼器 this.decoder = null; this.encoding = null; if (options.encoding) { if (!StringDecoder) StringDecoder = require('string_decoder').StringDecoder; this.decoder = new StringDecoder(options.encoding); this.encoding = options.encoding; } }
在 Readable 的構造函數中,可經過 options 傳入參數,其中 options.read
函數是必需的數組
readable._readableState
中保存了 Readable 的各類狀態與屬性緩存
在這裏將 Readable 的 Birth-Death 分爲五個狀態:less
表中爲
this._readableSate
的屬性異步
start: 初始狀態,Readable 剛剛被建立,還未調用 readable.read()
函數
|length|reading|ended|endEmitted|
|--|--|--|--|
|0|false|false|false|ui
reading: 表明正在從數據源中讀取數據,此時緩存大小 this._readableSate.length
小於 highWaterMark
,應讀取數據使緩存達到 highWaterMark
this
|length|reading|ended|endEmitted|
|--|--|--|--|
|< highWaterMark|true|false|false|
read: Readable 從數據源讀取數據後的相對穩定狀態
|length|reading|ended|endEmitted|
|--|--|--|--|
|>= highWaterMark|false|false|false|
ended: 數據已經所有讀取完成( push(null)
),此時 push(chunk)
會報 stream.push() after EOF
錯誤
|length|reading|ended|endEmitted|
|--|--|--|--|
|>= 0|false|true|false|
endEmitted: end 事件觸發完成,此時 unshift(chunk)
會報 stream.unshift() after end event
錯誤
|length|reading|ended|endEmitted|
|--|--|--|--|
|0|false|true|true|
它們之間的關係以下:
1 4 5 start ==> reading ==> ended ==> endEmitted || /\ 2 \/ || 3 read
start 狀態變爲 reading 狀態,發生在第一次調用 read()
時
// lib/_stream_readable.js Readable.prototype.read = function(n) { debug('read', n); n = parseInt(n, 10); var state = this._readableState; var nOrig = n; if (n !== 0) state.emittedReadable = false; // 調用 read(0)時,若是緩存大於 highWaterMark 則直接觸發 readable 事件 if (n === 0 && state.needReadable && (state.length >= state.highWaterMark || state.ended)) { debug('read: emitReadable', state.length, state.ended); if (state.length === 0 && state.ended) endReadable(this); else emitReadable(this); return null; } // 計算可讀數據量 // n = NaN ==> 讀取所有 // n <= state.length ==> 讀取 n // n > state.length ==> 讀取 0,並使 Readable 從數據源讀取數據 // // n > state.highWaterMark ==> 從新計算 highWaterMark,大小是大於 n 的最小 2^x n = howMuchToRead(n, state); // 當 Readable 已經讀完時,調用 endReadable() ,結束 Readable if (n === 0 && state.ended) { if (state.length === 0) endReadable(this); return null; } // 判斷是否應該從數據源讀取數據 // BEGIN var doRead = state.needReadable; debug('need readable', doRead); if (state.length === 0 || state.length - n < state.highWaterMark) { doRead = true; debug('length less than watermark', doRead); } // END if (state.ended || state.reading) { // 對於 ended 或 reading 狀態的 Readable 是不須要讀取數據的 doRead = false; debug('reading or ended', doRead); } else if (doRead) { // 讀取數據 debug('do read'); state.reading = true; state.sync = true; if (state.length === 0) state.needReadable = true; // 從數據源讀取數據,多是異步,也多是同步 this._read(state.highWaterMark); state.sync = false; // 由於 _read() 函數多是異步的,也多是同步的 // 在同步狀況下,須要從新確承認讀長度 if (!state.reading) n = howMuchToRead(nOrig, state); } // 獲取數據 var ret; if (n > 0) ret = fromList(n, state); // 從緩衝池中讀取數據 else ret = null; if (ret === null) { state.needReadable = true; n = 0; } else { state.length -= n; } // ... if (ret !== null) this.emit('data', ret); return ret; }; // 必須實現的方法 Readable.prototype._read = function(n) { this.emit('error', new Error('_read() is not implemented')); }; // 計算可讀長度 function howMuchToRead(n, state) { if (n <= 0 || (state.length === 0 && state.ended)) return 0; if (state.objectMode) return 1; if (n !== n) { // NaN if (state.flowing && state.length) return state.buffer.head.data.length; else return state.length; } if (n > state.highWaterMark) // 當須要數據大於 highWaterMark 時,調整 highWaterMark 大小到大於 n 的最小 2^x state.highWaterMark = computeNewHighWaterMark(n); if (n <= state.length) return n; // 緩衝池中數據不夠 if (!state.ended) { state.needReadable = true; return 0; } return state.length; }
調用 read()
後,若是緩衝池中數據不夠或讀取後低於 highWaterMark,則調用 _read()
來讀取更多的數據,不然直接返回讀取的數據
當指望數據量大於 highWaterMark 時,從新計算 highWaterMark,大小是大於指望數據量的最小 2^x
調用 _read()
後,會異步或同步地將調用 push(chunk)
,將數據放入緩衝池,並使 Readable 從 reading 狀態變爲 read 狀態
// lib/_stream_readable.js Readable.prototype.push = function(chunk, encoding) { var state = this._readableState; var skipChunkCheck; if (!state.objectMode) { if (typeof chunk === 'string') { encoding = encoding || state.defaultEncoding; // 若是指定編碼與 Readable 編碼不一樣,則將 chunk 使用指定編碼解碼爲 Buffer if (encoding !== state.encoding) { chunk = Buffer.from(chunk, encoding); encoding = ''; } // string 不須要檢查 skipChunkCheck = true; } } else { // object mode 的 Readable 也不須要檢查 skipChunkCheck = true; } return readableAddChunk(this, chunk, encoding, false, skipChunkCheck); }; function readableAddChunk(stream, chunk, encoding, addToFront, skipChunkCheck) { var state = stream._readableState; if (chunk === null) { // 結束信號 state.reading = false; onEofChunk(stream, state); } else { var er; if (!skipChunkCheck) // 檢查 chunk 格式 er = chunkInvalid(state, chunk); if (er) { stream.emit('error', er); } else if (state.objectMode || chunk && chunk.length > 0) { if (typeof chunk !== 'string' && Object.getPrototypeOf(chunk) !== Buffer.prototype && !state.objectMode) { chunk = Stream._uint8ArrayToBuffer(chunk); } if (addToFront) { // unshift() 的 hook if (state.endEmitted) stream.emit('error', new Error('stream.unshift() after end event')); else addChunk(stream, state, chunk, true); // 將數據添加到緩衝池中 } else if (state.ended) { stream.emit('error', new Error('stream.push() after EOF')); } else { state.reading = false; if (state.decoder && !encoding) { chunk = state.decoder.write(chunk); if (state.objectMode || chunk.length !== 0) addChunk(stream, state, chunk, false); // 將數據添加到緩衝池中 else maybeReadMore(stream, state); // 會在 addChunk() 函數內部調用 } else { addChunk(stream, state, chunk, false); // 將數據添加到緩衝池中 } } } else if (!addToFront) { state.reading = false; } } return needMoreData(state); // return !state.ended && 數據源還有數據 // (state.needReadable || 須要更多數據 // state.length < state.highWaterMark || 緩存小於 highWaterMark // state.length === 0) } function addChunk(stream, state, chunk, addToFront) { if (state.flowing && state.length === 0 && !state.sync) { // 對於 flow 模式的 Readable,直接觸發 data 事件,並繼續讀取數據就行 stream.emit('data', chunk); stream.read(0); } else { state.length += state.objectMode ? 1 : chunk.length; if (addToFront) state.buffer.unshift(chunk); else state.buffer.push(chunk); if (state.needReadable) emitReadable(stream); } // 在容許的狀況下,讀取數據直到 highWaterMark maybeReadMore(stream, state); }
調用 push(chunk)
時,會將 chunk 放入緩衝池內,並改變 Readable 的狀態。若是 Readable 處於 ended 狀態,會報 stream.push() after EOF
錯誤
若是緩存小於 highWaterMark,返回 true,意味着須要寫入更多的數據
從 read 到 reading 狀態,意味着須要讀取更多的數據,即緩存小於 highWaterMark
緩存與 highWaterMark 的關係能夠根據 push(chunk)
的返回值來判斷,可是須要使用者手動處理。所以,爲了方便使用,addChunk()
函數會自動調用 maybeReadMore()
來異步讀取數據。這樣,即便單次 _read()
沒法達到 highWaterMark,也能夠經過屢次異步讀取,使數據流動起來
// lib/_stream_readable.js function maybeReadMore(stream, state) { if (!state.readingMore) { state.readingMore = true; process.nextTick(maybeReadMore_, stream, state); } } function maybeReadMore_(stream, state) { var len = state.length; while (!state.reading && !state.flowing && !state.ended && state.length < state.highWaterMark) { debug('maybeReadMore read 0'); stream.read(0); if (len === state.length) // 取不到數據就放棄 break; else len = state.length; } state.readingMore = false; }
在 maybeReadMore()
函數內,經過異步讀取數據,直到 highWaterMark
那麼爲何是異步讀取數據呢?
由於,在 _read()
函數內,可能不止一次調用 push(chunk)
若是是同步,push(chunk)
後,由於沒有達到 highWaterMark,會繼續調用 read(0)
,發生第二次 _read()
。第二次 _read()
也可能致使第三次 _read()
,直到 highWaterMark
待整個調用完畢後,緩衝池內會有 highWaterMark * n( _read()
內調用 push(chunk)
次數 )的數據,而這與 highWaterMark 的設計是不符的
若是是異步,則能夠等 _read()
執行完畢後,在 process.nextTick()
內再次調用 _read()
讀取數據,不會發生上面的問題
當數據源讀取完畢時,須要調用 push(null)
來通知 Rreadable 數據源已經讀取完畢。push(null)
函數內部會調用 onEofChunk()
// lib/_stream_readable.js function onEofChunk(stream, state) { if (state.ended) return; if (state.decoder) { var chunk = state.decoder.end(); if (chunk && chunk.length) { state.buffer.push(chunk); state.length += state.objectMode ? 1 : chunk.length; } } state.ended = true; // 觸發 readable 事件,通知監聽者來處理剩餘數據 emitReadable(stream); }
onEofChunk()
函數將 readable 標記爲 ended 狀態後,禁止再向緩衝池內 push 數據。此時,緩衝池內可能還有數據
ended 狀態的 Readable 內可能還有數據。所以,當數據所有被讀取後,須要調用 endReadable()
來結束 Readable
// lib/_stream_readable.js function endReadable(stream) { var state = stream._readableState; // state.length 必定是 0 if (state.length > 0) throw new Error('"endReadable()" called on non-empty stream'); if (!state.endEmitted) { state.ended = true; process.nextTick(endReadableNT, state, stream); } } function endReadableNT(state, stream) { // 防止中間調用 unshift(chunk),向緩衝池中放入數據 if (!state.endEmitted && state.length === 0) { state.endEmitted = true; stream.readable = false; stream.emit('end'); } }
調用 endReadable()
時,緩衝池必定爲空。整個調用完成後,觸發 end 事件,Readable 將不能再讀取或寫入( push()
/ unshift()
)數據
到這裏,已經走完了 Readable 的整個 Birth-Death 過程
整個過程就以下面這個圖:
1 4 5 start ==> reading ==> ended ==> endEmitted || /\ 2 \/ || 3 read 1. read() 2. push(chunk) 3. maybeReadMore() ==> read(0) 4. push(null) 5. endReadable()
根據這個圖還有代碼,在腦殼裏面,把 Readable 的模型運行一遍,就能瞭解它的實現了
參考: