該文章整合了多篇網絡文章(整合之處已設置超連接,可點擊直接瞭解原文),目的僅僅是爲了和大夥分享,更加通俗易懂的瞭解流的各個流程的初始。本人也是node的初學菜鳥,有描述錯誤或誤人子弟的地方多請大神們多多指出。html
讀緩衝區(readable buffer):這裏的讀是個形容詞,是指可讀流臨時存放data(只能是字符串或者Buffer,不能是數字)的緩衝區。(讀緩衝區就像一個水電站同樣,感受這樣描述比較好理解flowing、paused模式)node
flowing模式:即流動模式,就像打開水電站的水閘同樣,上游的水和下游完徹底全連通直到上游來源的數據耗盡。json
paused模式:即暫停模式,就像水電站的水閘在你指定的時候(使用stream.read())纔會打開。不過,當你使用read()打開水閘的時候是一個超天然現象---水電站裏的水瞬間被抽乾,上游的水還沒來得及填充水電站。而後自動關閉水閘,等待你的下一次「惠顧「read()。網絡
_read:上游水源經過_read裏中的push、unshift方法流入水電站中。函數
readable:在數據塊能夠從流中讀取的時候發出。它對應的處理器沒有參數,能夠在處理器裏調用read([size])方法讀取數據。ui
data:有數據可讀時發出。它對應的處理器有一個參數,表明數據。若是你只想快快地讀取一個流的數據,給data關聯一個處理器是最方便的辦法。處理器的參數是Buffer對象,若是你調用了Readable的setEncoding(encoding)方法,處理器的參數就是String對象。this
end:當數據被讀完時發出。對應的處理器沒有參數。編碼
close:當底層的資源,如文件,已關閉時發出。不是全部的Readable流都會發出這個事件。對應的處理器沒有參數。spa
error:當在接收數據中出現錯誤時發出。對應的處理器參數是Error的實例,它的message屬性描述了錯誤緣由,stack屬性保存了發生錯誤時的堆棧信息。.net
read([size]):若是你給read方法傳遞了一個大小做爲參數,那它會返回指定數量的數據,若是數據不足,就會返回null。若是你不給read方法傳參,它會返回內部緩衝區裏的全部數據,若是沒有數據,會返回null,此時有可能說明遇到了文件末尾。read返回的數據多是Buffer對象,也多是String對象。
setEncoding(encoding):給流設置一個編碼格式,用於解碼讀到的數據。調用此方法後,read([size])方法返回String對象。
pause():暫停可讀流,再也不發出data事件
resume():恢復可讀流,繼續發出data事件
pipe(destination,[options]):把這個可讀流的輸出傳遞給destination指定的Writable流,兩個流組成一個管道。options是一個JS對象,這個對象有一個布爾類型的end屬性,默認值爲true,當end爲true時,Readable結束時自動結束Writable。注意,咱們能夠把一個Readable與若干Writable連在一塊兒,組成多個管道,每個Writable都能獲得一樣的數據。這個方法返回destination,若是destination自己又是Readable流,就能夠級聯調用pipe(好比咱們在使用gzip壓縮、解壓縮時就會這樣,立刻會講到)。
unpipe([destination]):端口與指定destination的管道。不傳遞destination時,斷開與這個可讀流連在一塊兒的全部管道。
經過添加 data 事件監聽器來啓動數據監聽
調用 resume() 方法啓動數據流
調用 pipe() 方法將數據轉接到另外一個 可寫流
在流沒有 pipe() 時,調用 pause() 方法能夠將流暫停
pipe() 時,須要移除全部 data 事件的監聽,再調用 unpipe() 方法
data listener
readable listener
read()——若是當前緩衝區爲空,或者緩衝區並未超出咱們設定的最大值,那麼就能夠繼續準備數據;若是此時正在準備數據(_read())或者已經結束讀取(push(null)),那麼就放棄準備數據。
1.在paused模式下則讀取所有緩衝區的長度;若讀取的字節數(n)大於設置的緩衝區最大值,則適當擴大緩衝區的大小(默認爲16k,最大爲8m);若讀取的長度大於當前緩衝區的大小,設置needReadable屬性並準備數據等待下一次讀取。
2.若是當前緩衝區爲空,或者緩衝區並未超出咱們設定的最大值,那麼就能夠繼續準備數據;若是此時正在準備數據(_read())或者已經結束讀取(push(null)),那麼就放棄準備數據。
3.針對這個私有方法_read,文檔上有特殊說明,自定義的Readable實現類須要實現這個方法,在該方法中手動添加數據到Readable對象的讀緩衝區,而後進行Readable的讀取。能夠理解爲_read函數爲讀取數據前的準備工做(準備數據),針對的是流的實現者而言。
1.對於處在flowing模式下的讀取,每次只讀緩衝區中第一個buffer的長度
2.針對這個私有方法_read,文檔上有特殊說明,自定義的Readable實現類須要實現這個方法,在該方法中手動添加數據到Readable對象的讀緩衝區,而後進行Readable的讀取。能夠理解爲_read函數爲讀取數據前的準備工做(準備數據),針對的是流的實現者而言。
//這是一個將存放多條json字符串的txt文件讀取成json的例子 const stream = require('stream'); const fs = require('fs'); const util = require('util'); function JSONLineReader(source) { stream.Readable.call(this); this._source = source; this._foundLineEnd = false; this._buffer = ''; source.on('readable', function() {//監聽source何時準備好,那麼咱們就能夠用read()或則readable listener去觸發JSONLineReader的_read方法 this.read(); // this.on('readable', function(data) { // console.log('readable'); // }); }.bind(this)) } util.inherits(JSONLineReader, stream.Readable); JSONLineReader.prototype._read = function(size) { var chunk; var line; var lineIndex; var result; if (this._buffer.length === 0) { chunk = this._source.read(); this._buffer += chunk; //一次就拿完 只是看何時push null } lineIndex = this._buffer.indexOf('\n'); if (lineIndex !== -1) { line = this._buffer.slice(0, lineIndex); if (line) { result = JSON.parse(line); this._buffer = this._buffer.slice(lineIndex + 1); this.emit('object', result);util.inspect(result)) this.push(util.inspect(result)); } else { this._buffer = this._buffer.slice(1); } } } let input = fs.createReadStream(__dirname + '/json-lines.txt', { encoding: 'utf8' }); var jsonLineReader = new JSONLineReader(input); jsonLineReader.on('object', function(obj) { console.log('pos:', obj); }) /*json-lines.txt {"success":false,"code":501} {"success":true,"code":202} {"success":false,"code":503} {"success":true,"code":204} {"success":false,"code":505} {"success":true,"code":206} {"success":false,"code":507} {"success":true,"code":208} {"success":false,"code":509} */
let stream = require('stream'); let util = require('util'); util.inherits(flowingReadableDemo, stream.Readable); function flowingReadableDemo(opt) { stream.Readable.call(this, opt); this.quotes = ["yessdasdsa", "noasdasdas", "maybe"]; this._index = 0; } flowingReadableDemo.prototype._read = function() { if (this._index >= this.quotes.length) { this.push(null); } else { this.push(this.quotes[this._index]); this._index += 1; } }; let r = new flowingReadableDemo(); r.on('data', function(data) { console.log("Callback read: " + data.toString()); // flowing狀態下,咱們無需執行read,僅須要設置data事件處理函數或者設定導流目標pipe }); r.on('end', function(data) { console.log("No more answers."); });