NodeJS Stream 三:readable

什麼是可讀流

可讀流是生產數據用來供程序消費的流。咱們常見的數據生產方式有讀取磁盤文件、讀取網絡請求內容等,看一下前面介紹什麼是流用的例子:javascript

const rs = fs.createReadStream(filePath);

rs 就是一個可讀流,其生產數據的方式是讀取磁盤的文件,咱們常見的控制檯 process.stdin 也是一個可讀流:css

process.stdin.pipe(process.stdout);

經過簡單的一句話能夠把控制檯的輸入打印出來,process.stdin 生產數據的方式是讀取用戶在控制檯的輸入。java

回頭再看一下咱們對可讀流的定義:可讀流是生產數據用來供程序消費的流。gulp

自定義可讀流

除了系統提供給咱們的 fs.CreateReadStream 咱們還常用 gulp 或者 vinyl-fs 提供的 src 方法網絡

gulp.src(['*.js', 'dist/**/*.scss'])

若是咱們想本身以某種特定的方式生產數據,交給程序消費,那麼改如何開始呢?dom

簡單兩步便可異步

  1. 繼承 sream 模塊的 Readable
  2. 重寫 **_read** 方法,調用 this.push 將生產的數據放入待讀取隊列

Readable 類已經把可讀流要作的大部分工做完成,咱們只須要繼承它,而後把生產數據的方式寫在 _read 方法裏就能夠實現一個自定義的可讀流。oop

若是咱們想實現一個每 100 毫秒生產一個隨機數的流(沒什麼用處)ui

const Readable = require('stream').Readable;

class RandomNumberStream extends Readable {
    constructor(max) {
        super()
    }

    _read() {
        const ctx = this;
        setTimeout(() => {
            const randomNumber = parseInt(Math.random() * 10000);

            // 只能 push 字符串或 Buffer,爲了方便顯示打一個回車
            ctx.push(`${randomNumber}\n`);
        }, 100);
    }
}

module.exports = RandomNumberStream;

類繼承部分代碼很簡單,主要看一下 _read 方法的實現,有幾個值得注意的地方this

  1. Readable 類中默認有 _read 方法的實現,不過什麼都沒有作,咱們作的是覆蓋重寫
  2. _read 方法有一個參數 size,用來向 read 方法指定應該讀取多少數據返回,不過只是一個參考數據,不少實現忽略此參數,咱們這裏也忽略了,後面會詳細提到
  3. 經過 this.push 向緩衝區推送數據,緩衝區概念後面會提到,暫時理解爲擠到了水管中可消費了
  4. push 的內容只能是字符串或者 Buffer,不能是數字
  5. push 方法有第二個參數 encoding,用於第一個參數是字符串時指定 encoding

執行一下看看效果

const RandomNumberStream = require('./RandomNumberStream');

const rns = new RandomNumberStream();

rns.pipe(process.stdout);

這樣能夠看到數字源源不斷的顯示到了控制檯上,咱們實現了一個產生隨機數的可讀流,還有幾個小問題待解決

如何停下來

咱們每隔 100 毫秒向緩衝區推送一個數字,那麼就像讀取一個本地文件總有讀完的時候,如何停下來標識數據讀取完畢?

向緩衝區 push 一個 null 就能夠。咱們修改一下代碼,容許消費者定義須要多少個隨機數字:

const Readable = require('stream').Readable;

class RandomNumberStream extends Readable {
    constructor(max) {
        super()
        this.max = max;
    }

    _read() {
        const ctx = this;

        setTimeout(() => {
            if (ctx.max) {
                const randomNumber = parseInt(Math.random() * 10000);

                // 只能 push 字符串或 Buffer,爲了方便顯示打一個回車
                ctx.push(`${randomNumber}\n`);
                ctx.max -= 1;
            } else {
                ctx.push(null);
            }
        }, 100);
    }
}

module.exports = RandomNumberStream;

咱們使用了一個 max 的標識,容許消費者指定須要的字符數,在實例化的時候指定便可

const RandomNumberStream = require('./RandomNumberStream');

const rns = new RandomNumberStream(5);

rns.pipe(process.stdout);

這樣能夠看到控制檯只打印了 5 個字符

爲何是 setTimeout 而不是 setInterval

細心的同窗可能注意到,咱們每隔 100 毫秒生產一個隨機數並非調用的 setInterval,而是使用的 setTimeout,爲何僅僅是延時了一下並無重複生產,結果倒是正確的呢?

這就須要瞭解流的兩種工做方式

  1. 流動模式:數據由底層系統讀出,並儘量快地提供給應用程序
  2. 暫停模式:必須顯示地調用 read() 方法來讀取若干數據塊

流在默認狀態下是處於暫停模式的,也就是須要程序顯式的調用 read() 方法,可咱們的例子中並無調用就能夠獲得數據,由於咱們的流經過 pipe() 方法切換成了流動模式,這樣咱們的 _read() 方法會自動被反覆調用,直到數據讀取完畢,因此咱們每次 _read() 方法裏面只須要讀取一次數據便可。

流動模式和暫停模式切換

流從默認的暫停模式切換到流動模式可使用如下幾種方式:

  1. 經過添加 data 事件監聽器來啓動數據監聽
  2. 調用 resume() 方法啓動數據流
  3. 調用 pipe() 方法將數據轉接到另外一個 可寫流

從流動模式切換爲暫停模式又兩種方法:

  1. 在流沒有 pipe() 時,調用 pause() 方法能夠將流暫停
  2. pipe() 時,須要移除全部 data 事件的監聽,再調用 unpipe() 方法

data 事件

使用了 pipe() 方法後數據就從可讀流進入了可寫流,但對咱們好像是個黑盒,數據到底是怎麼流向的呢?咱們看到切換流動模式和暫停模式的時候有兩個重要的名詞

  1. 流動模式對應的 data 事件
  2. 暫停模式對應的 read() 方法

這兩個機制是咱們可以驅動數據流動的緣由,先來看一下流動模式 data 事件,一旦咱們監聽了可讀流的 data 時、事件,流就進入了流動模式,咱們能夠改寫一下上面調用流的代碼

const RandomNumberStream = require('./RandomNumberStream');

const rns = new RandomNumberStream(5);

rns.on('data', chunk => {
  console.log(chunk);
});

這樣咱們能夠看到控制檯打印出了相似下面的結果

<Buffer 39 35 37 0a>
<Buffer 31 30 35 37 0a>
<Buffer 38 35 31 30 0a>
<Buffer 33 30 35 35 0a>
<Buffer 34 36 34 32 0a>

當可讀流生產出可供消費的數據後就會觸發 data 事件,data 事件監聽器綁定後,數據會被儘量地傳遞。data 事件的監聽器能夠在第一個參數收到可讀流傳遞過來的 Buffer 數據,這也就是咱們打印的 chunk,若是想顯示爲數字,能夠調用 Buffer 的 toString() 方法。

當數據處理完成後還會觸發一個 end 事件,應爲流的處理不是同步調用,因此若是咱們但願完過後作一些事情就須要監聽這個事件,咱們在代碼最後追加一句:

rns.on('end', () => {
  console.log('done');
});

這樣能夠在數據接收完了顯示 'done'

固然數據處理過程當中出現了錯誤會觸發 error 事件,咱們一樣能夠監聽,作異常處理:

rns.on('error', (err) => {
  console.log(err);
});

read(size)

流在暫停模式下須要程序顯式調用 read() 方法才能獲得數據。read() 方法會從內部緩衝區中拉取並返回若干數據,當沒有更多可用數據時,會返回null。

使用 read() 方法讀取數據時,若是傳入了 size 參數,那麼它會返回指定字節的數據;當指定的size字節不可用時,則返回null。若是沒有指定size參數,那麼會返回內部緩衝區中的全部數據。

如今有一個矛盾了,在流動模式下流生產出了數據,而後觸發 data 事件通知給程序,這樣很方便。在暫停模式下須要程序去讀取,那麼就有一種多是讀取的時候還沒生產好,若是咱們才用輪詢的方式未免效率有些低。

NodeJS 爲咱們提供了一個 readable 的事件,事件在可讀流準備好數據的時候觸發,也就是先監聽這個事件,收到通知又數據了咱們再去讀取就行了:

const rns = new RandomNumberStream(5);

rns.on('readable', () => {
  let chunk;
  while((chunk = rns.read()) !== null){
    console.log(chunk);
  }
});

這樣咱們一樣能夠讀取到數據,值得注意的一點是並非每次調用 read() 方法均可以返回數據,前面提到了若是可用的數據沒有達到 size 那麼返回 null,因此咱們在程序中加了個判斷。

數據會不會漏掉

開始使用流動模式的時候我常常會擔憂一個問題,上面代碼中可讀流在建立好的時候就生產數據了,那麼會不會在咱們綁定 readable 事件以前就生產了某些數據,觸發了 readable 事件,咱們尚未綁定,這樣不是極端狀況下會形成開頭數據的丟失嘛

可事實並不會,按照 NodeJS event loop 咱們建立流和調用事件監聽在一個事件隊列裏面,兒生產數據因爲涉及到異步操做,已經處於了下一個事件隊列,咱們監聽事件再慢也會比數據生產塊,數據不會丟失。

看到這裏,你們其實對 data事件、readable事件觸發時機, read() 方法每次讀多少數據,何時返回 null 還有又必定的疑問,由於到如今爲止咱們接觸到的仍然是一個黑盒,後面咱們介紹了可寫流後會在 back pressure 機制部分對這些內部細節結合源碼詳細講解,且聽下回分解吧。

相關文章
相關標籤/搜索