本次試圖淺析探索Nodejs的Stream模塊中對於Readable類的一部分實現(可寫流也差很少)。其中會以可讀流兩種模式中的paused mode即暫停模式的表現形式來解讀源碼上的實現,爲何不分析flowing mode天然是由於這個模式是咱們經常使用的其原理相比暫停模式下相對簡單(實際上是由於筆者老是喜歡關注一些邊邊角角的東西,不按套路出牌=。=),同時核心方法都是同樣的,一通百通嘛,有興趣的童鞋能夠本身看下完整源碼。html
歡迎關注個人博客,不按期更新中——前端
首先先明確爲何Nodejs要實現一個stream,這就要清楚關於生產者消費者問題的概念。node
生產者消費者問題(英語:Producer-consumer problem),也稱有限緩衝問題(英語:Bounded-buffer problem),是一個多線程同步問題的經典案例。該問題描述了共享固定大小緩衝區的兩個線程——即所謂的「生產者」和「消費者」——在實際運行時會發生的問題。生產者的主要做用是生成必定量的數據放到緩衝區中,而後重複此過程。與此同時,消費者也在緩衝區消耗這些數據。該問題的關鍵就是要保證生產者不會在緩衝區滿時加入數據,消費者也不會在緩衝區中空時消耗數據。
簡單來講就是內存問題。與前端不一樣,後端對於內存仍是至關敏感的,好比讀取文件這種操做,若是文件很小就算了,但若是這個文件一個g呢?難道全讀出來?這確定是不可取的。經過流的形式讀一部分寫一部分慢慢處理纔是一個可取的方式。PS:有關爲何使用stream歡迎你們百(谷)度(歌)一下。git
如今咱們將本身實現一個可讀流,以此來方便觀察以後數據的流動過程:github
const Readable = require('stream').Readable; // 實現一個可讀流 class SubReadable extends Readable { constructor(dataSource, options) { super(options); this.dataSource = dataSource; } // 文檔提出必須經過_read方法調用push來實現對底層數據的讀取 _read() { console.log('閾值規定大小:', arguments['0'] + ' bytes') const data = this.dataSource.makeData() let result = this.push(data) if(data) console.log('添加數據大小:', data.toString().length + ' bytes') console.log('已緩存數據大小: ', subReadable._readableState.length + ' bytes') console.log('超過閾值限制或數據推送完畢:', !result) console.log('====================================') } } // 模擬資源池 const dataSource = { data: new Array(1000000).fill('1'), // 每次讀取時推送必定量數據 makeData() { if (!dataSource.data.length) return null; return dataSource.data.splice(dataSource.data.length - 5000).reduce((a,b) => a + '' + b) } //每次向緩存推5000字節數據 }; const subReadable = new SubReadable(dataSource);
至此subReadable即是咱們實現的自定義可讀流。後端
先來看下總體的流程:
可讀流會經過_read()
方式從資源讀取數據到緩存池,同時設置了一個閾值highWaterMark
,標記數據到緩存池大小的一個上限,這個閾值是會浮動的,最小值也是默認值爲16384。當消費者監聽了readable
事件以後,就能夠顯式調用read()
方法來讀取數據。api
經過註冊readable事件以此來觸發暫停模式:緩存
subReadable.on('readable', () => { console.log('緩存剩餘數據大小: ', subReadable._readableState.length + ' byte') console.log('------------------------------------') })
能夠發現當註冊readable
事件後可對流會從底層資源推送數據到緩存直到達到超過閾值或者底層數據所有加載完。多線程
調用read(n); n = 1000;函數
首先修改資源池大小data: new Array(10000).fill('1')
(方便打印數據),執行read(1000)每次讀取1000字節資源讀取資源:
subReadable.on('readable', () => { let chunk = subReadable.read(1000) if(chunk) console.log(`讀取 ${chunk.length} bytes數據`); console.log('緩存剩餘數據大小: ', subReadable._readableState.length + ' byte') console.log('------------------------------------') })
結果執行了兩次讀取數據,同時若是每次讀取的字節少於緩存中的數據,則可讀流不會再從資源加載新的數據。
無參調用read()
subReadable.on('readable', () => { let chunk = subReadable.read() if(chunk) console.log(`讀取 ${chunk.length} bytes數據`); console.log('緩存剩餘數據大小: ', subReadable._readableState.length + ' byte') console.log('------------------------------------') })
直接調用read()
後,會逐步讀取徹底部資源,至於每次讀取多少下文會統一探討。
以上咱們依次嘗試了在實現可讀流後觸發暫停模式會發生的事情,接下來做者將會對如下幾個可能有疑問的點進行探究:
_read()
方法並在其中調用push()
read()
與傳入固定數據的區別_read()
方法並在其中調用push()
Readable.prototype._read = function(n) { this.emit('error', new errors.Error('ERR_STREAM_READ_NOT_IMPLEMENTED')); }; //只是定義接口 Readable.prototype.read = function(n) { ... var doRead = state.needReadable; if (doRead) { this._read(state.highWaterMark); } }
當咱們調用subReadable.read()便會執行到上面的代碼,能夠發現,源碼中
對於_read()
只是定義了一個接口,裏面並無具體實現,若是咱們不本身定義那麼就會報錯。同時read()
中會執行它經過它調用push()
來從資源中讀取數據,而且傳入highWaterMark
,這個值你能夠用也能夠不用由於_read()
是咱們本身實現的。
Readable.prototype.push = function(chunk, encoding) { ... return readableAddChunk(this, chunk, encoding, false, skipChunkCheck); };
從代碼中能夠看出,將底層資源推送到緩存中的核心操做是經過push,經過語義化也能夠看出push方法中最後會進行添加新數據的操做。因爲以後方法中嵌套不少,不一一展現,直接來看最後調用的方法:
// readableAddChunk最後會調用addChunk function addChunk(stream, state, chunk, addToFront) { ... state.buffer.push(chunk); //數據推送到buffer中 if (state.needReadable)//判斷此屬性值來看是否觸發readable事件 emitReadable(stream); maybeReadMore(stream, state);//可能會推送更多數據到緩存 }
咱們能夠看出,方法調用的最後確實執行了資源數據推送到緩存的操做。與此同時在會判斷needReadable屬性值來看是否觸發readable回調事件。而這也爲以後咱們來分析爲何註冊了readable事件以後會執行一次回調埋下了伏筆。最後調用maybeReadMore()則是蓄滿緩存池的方法。
先來看下源碼裏是如何綁定的事件:
Readable.prototype.on = function(ev, fn) { if (ev === 'data') { ... } else if (ev === 'readable') { const state = this._readableState; state.needReadable = true;//設定屬性爲true,觸發readable回調 ... process.nextTick(nReadingNextTick, this); } }; function nReadingNextTick(self) { self.read(0); } //以後執行read(0) => _read() => push() => addChunk() // => maybeReadMore()
maybeReadMore()中當緩存池存儲大小小於閾值時則會一直調用read(0)不讀取數據,可是會一直push底層資源到緩存:
function maybeReadMore_(stream, state) { ... if (state.length < state.highWaterMark) { stream.read(0); } }
上文提到過,綁定事件後會開始推送數據至緩存池,最後會執行到addChunk()方法,內部經過needReadable屬性來判斷是否觸發readable事件。當你第一次綁定事件時會執行state.needReadable = true;,從而在最後推送數據後會執行觸發readable的操做。
read()
與傳入特定數值的區別區別在執行read()方法的時候,會將參數n傳入到下面這個函數中由它來計算如今應該應該讀取多少數據:
function howMuchToRead(n, state) { if (n <= 0 || (state.length === 0 && state.ended)) return 0; if (state.objectMode) return 1; if (n !== n) { // Only flow one buffer at a time if (state.flowing && state.length) return state.buffer.head.data.length; else return state.length; } // If we're asking for more than the current hwm, then raise the hwm. if (n > state.highWaterMark) state.highWaterMark = computeNewHighWaterMark(n); if (n <= state.length) return n; // Don't have enough if (!state.ended) { //傳輸沒有結束都是false state.needReadable = true; return 0; } return state.length; }
當直接調用read(),n參數則爲NaN,當處於流動模式的時候n則爲buffer頭數據的長度,不然是整個緩存的數據長度。若爲read(n)傳入數字,大於當前的hwm時能夠發現會從新計算一個hwm,與此同時若是已緩存的數據小於請求的數據量,那麼將設置state.needReadable = true;
並返回0;
第一次試圖梳理源碼的思路,一路寫下來發現有不少想說可是又不知道怎麼連貫的理清楚=。= 既然代碼細節也有些說不清,不過最後仍是進行一個核心思路的提煉:
源碼的邊界狀況比較多。做者若是哪裏說錯了請指正=。=
PS:源碼地址
慣例po做者的博客,不定時更新中——有問題歡迎在issues下交流。