node之stream(上)——readable

題外話

該文章整合了多篇網絡文章(整合之處已設置超連接,可點擊直接瞭解原文),目的僅僅是爲了和大夥分享,更加通俗易懂的瞭解流的各個流程的初始。本人也是node的初學菜鳥,有描述錯誤或誤人子弟的地方多請大神們多多指出。html

readable

咱們先來安利一些思路,方便理清楚邏輯:)。

讀緩衝區(readable buffer):這裏的讀是個形容詞,是指可讀流臨時存放data(只能是字符串或者Buffer,不能是數字)的緩衝區。(讀緩衝區就像一個水電站同樣,感受這樣描述比較好理解flowing、paused模式)node

flowing模式:即流動模式,就像打開水電站的水閘同樣,上游的水和下游完徹底全連通直到上游來源的數據耗盡。json

paused模式:即暫停模式,就像水電站的水閘在你指定的時候(使用stream.read())纔會打開。不過,當你使用read()打開水閘的時候是一個超天然現象---水電站裏的水瞬間被抽乾,上游的水還沒來得及填充水電站。而後自動關閉水閘,等待你的下一次「惠顧「read()。網絡

_read:上游水源經過_read裏中的push、unshift方法流入水電站中。函數

事件 查看原文

readable:在數據塊能夠從流中讀取的時候發出。它對應的處理器沒有參數,能夠在處理器裏調用read([size])方法讀取數據。ui

data:有數據可讀時發出。它對應的處理器有一個參數,表明數據。若是你只想快快地讀取一個流的數據,給data關聯一個處理器是最方便的辦法。處理器的參數是Buffer對象,若是你調用了Readable的setEncoding(encoding)方法,處理器的參數就是String對象。this

end:當數據被讀完時發出。對應的處理器沒有參數。編碼

close:當底層的資源,如文件,已關閉時發出。不是全部的Readable流都會發出這個事件。對應的處理器沒有參數。spa

error:當在接收數據中出現錯誤時發出。對應的處理器參數是Error的實例,它的message屬性描述了錯誤緣由,stack屬性保存了發生錯誤時的堆棧信息。.net

函數 查看原文

read([size]):若是你給read方法傳遞了一個大小做爲參數,那它會返回指定數量的數據,若是數據不足,就會返回null。若是你不給read方法傳參,它會返回內部緩衝區裏的全部數據,若是沒有數據,會返回null,此時有可能說明遇到了文件末尾。read返回的數據多是Buffer對象,也多是String對象。

setEncoding(encoding):給流設置一個編碼格式,用於解碼讀到的數據。調用此方法後,read([size])方法返回String對象。

pause():暫停可讀流,再也不發出data事件

resume():恢復可讀流,繼續發出data事件

pipe(destination,[options]):把這個可讀流的輸出傳遞給destination指定的Writable流,兩個流組成一個管道。options是一個JS對象,這個對象有一個布爾類型的end屬性,默認值爲true,當end爲true時,Readable結束時自動結束Writable。注意,咱們能夠把一個Readable與若干Writable連在一塊兒,組成多個管道,每個Writable都能獲得一樣的數據。這個方法返回destination,若是destination自己又是Readable流,就能夠級聯調用pipe(好比咱們在使用gzip壓縮、解壓縮時就會這樣,立刻會講到)。

unpipe([destination]):端口與指定destination的管道。不傳遞destination時,斷開與這個可讀流連在一塊兒的全部管道。

流動模式和暫停模式切換 查看原文

流從默認的暫停模式切換到流動模式能夠使用如下幾種方式:

經過添加 data 事件監聽器來啓動數據監聽

調用 resume() 方法啓動數據流

調用 pipe() 方法將數據轉接到另外一個 可寫流

從流動模式切換爲暫停模式又兩種方法:

在流沒有 pipe() 時,調用 pause() 方法能夠將流暫停

pipe() 時,須要移除全部 data 事件的監聽,再調用 unpipe() 方法

觸發準備數據(_read)的方法

data listener

readable listener

read()——若是當前緩衝區爲空,或者緩衝區並未超出咱們設定的最大值,那麼就能夠繼續準備數據;若是此時正在準備數據(_read())或者已經結束讀取(push(null)),那麼就放棄準備數據。

工做流程 查看原文

這裏我要比比兩句哈哈。下面的備註來自的原文大神寫的很是的細膩,還包括了源碼的解讀,不過不太適合初入瞭解流的同志們,把整個思路理清楚了以後更配喲~
paused模式:

圖片描述

  1. 1.在paused模式下則讀取所有緩衝區的長度;若讀取的字節數(n)大於設置的緩衝區最大值,則適當擴大緩衝區的大小(默認爲16k,最大爲8m);若讀取的長度大於當前緩衝區的大小,設置needReadable屬性並準備數據等待下一次讀取。

  2. 2.若是當前緩衝區爲空,或者緩衝區並未超出咱們設定的最大值,那麼就能夠繼續準備數據;若是此時正在準備數據(_read())或者已經結束讀取(push(null)),那麼就放棄準備數據。

  3. 3.針對這個私有方法_read,文檔上有特殊說明,自定義的Readable實現類須要實現這個方法,在該方法中手動添加數據到Readable對象的讀緩衝區,而後進行Readable的讀取。能夠理解爲_read函數爲讀取數據前的準備工做(準備數據),針對的是流的實現者而言。

flowing模式:

圖片描述

  1. 1.對於處在flowing模式下的讀取,每次只讀緩衝區中第一個buffer的長度

  2. 2.針對這個私有方法_read,文檔上有特殊說明,自定義的Readable實現類須要實現這個方法,在該方法中手動添加數據到Readable對象的讀緩衝區,而後進行Readable的讀取。能夠理解爲_read函數爲讀取數據前的準備工做(準備數據),針對的是流的實現者而言。

實例

paused模式:
//這是一個將存放多條json字符串的txt文件讀取成json的例子
const stream = require('stream');
const fs = require('fs');
const util = require('util');

function JSONLineReader(source) {
    stream.Readable.call(this);
    this._source = source;
    this._foundLineEnd = false;
    this._buffer = '';

    source.on('readable', function() {//監聽source何時準備好,那麼咱們就能夠用read()或則readable listener去觸發JSONLineReader的_read方法
        this.read();
        // this.on('readable', function(data) {
        //     console.log('readable');
        // });
    }.bind(this))
}

util.inherits(JSONLineReader, stream.Readable);

JSONLineReader.prototype._read = function(size) {
    var chunk;
    var line;
    var lineIndex;
    var result;
    if (this._buffer.length === 0) {
        chunk = this._source.read();
        this._buffer += chunk; //一次就拿完 只是看何時push null
    }
    lineIndex = this._buffer.indexOf('\n');
    if (lineIndex !== -1) {
        line = this._buffer.slice(0, lineIndex);
        if (line) {
            result = JSON.parse(line);
            this._buffer = this._buffer.slice(lineIndex + 1);
            this.emit('object', result);util.inspect(result))
            this.push(util.inspect(result));
        } else {
            this._buffer = this._buffer.slice(1);
        }
    }

}

let input = fs.createReadStream(__dirname + '/json-lines.txt', {
    encoding: 'utf8'
});

var jsonLineReader = new JSONLineReader(input);

jsonLineReader.on('object', function(obj) {
    console.log('pos:', obj);
})

/*json-lines.txt
{"success":false,"code":501}
{"success":true,"code":202}
{"success":false,"code":503}
{"success":true,"code":204}
{"success":false,"code":505}
{"success":true,"code":206}
{"success":false,"code":507}
{"success":true,"code":208}
{"success":false,"code":509}
*/
flowing模式:
let stream = require('stream');
let util = require('util');
util.inherits(flowingReadableDemo, stream.Readable);

function flowingReadableDemo(opt) {
    stream.Readable.call(this, opt);
    this.quotes = ["yessdasdsa", "noasdasdas", "maybe"];
    this._index = 0;
}
flowingReadableDemo.prototype._read = function() {
    if (this._index >= this.quotes.length) {
        this.push(null);
    } else {
        this.push(this.quotes[this._index]);
        this._index += 1;
    }
};
let r = new flowingReadableDemo();
r.on('data', function(data) {
    console.log("Callback read: " + data.toString());
    // flowing狀態下,咱們無需執行read,僅須要設置data事件處理函數或者設定導流目標pipe
});
r.on('end', function(data) {
    console.log("No more answers.");
});
相關文章
相關標籤/搜索