Node.js Readable Stream的實現簡析

做者:肖磊javascript

我的主頁:githubjava

Readable Stream是對數據源的一種抽象。它提供了從數據源獲取數據並緩存,以及將數據提供給數據消費者的能力。node

接下來分別經過Readable Stream的2種模式來學習下可讀流是如何獲取數據以及將數據提供給消費者的。git

Flowing模式

node-stream-flowing

flowing模式下,可讀流自動從系統的底層讀取數據,並經過EventEmitter接口的事件提供給消費者。若是不是開發者須要本身去實現可讀流,你們可以使用最爲簡單的readable.pipe()方法去消費數據。github

接下來咱們就經過一個簡單的實例去具體分析下flowing模式下,可讀流是如何工做的。緩存

const { Readable } = require('stream')

let c = 97 - 1
// 實例化一個可讀流
const rs = new Readable({
  read () {
    if (c >= 'z'.charCodeAt(0)) return rs.push(null)

    setTimeout(() => {
      // 向可讀流中推送數據
      rs.push(String.fromCharCode(++c))
    }, 100)
  }
})

// 將可讀流的數據pipe到標準輸出並打印出來
rs.pipe(process.stdout)

process.on('exit', () => {
  console.error('\n_read() called ' + (c - 97) + ' times')
})
複製代碼

首先咱們先來看下Readable構造函數的實現:app

function Readable(options) {
  if (!(this instanceof Readable))
    return new Readable(options);

  // _readableState裏面保存了關於可讀流的不一樣階段的狀態值,下面會具體的分析
  this._readableState = new ReadableState(options, this);

  // legacy
  this.readable = true;

  if (options) {
    // 重寫內部的_read方法,用以自定義從數據源獲取數據
    if (typeof options.read === 'function')
      this._read = options.read;

    if (typeof options.destroy === 'function')
    // 重寫內部的_destory方法
      this._destroy = options.destroy;
  }

  Stream.call(this);
}
複製代碼

在咱們建立可讀流實例時,傳入了一個read方法,用以自定義從數據源獲取數據的方法,若是是開發者須要本身去實現可讀流,那麼這個方法必定須要去自定義,不然在程序的運行過程當中會報錯ReadableState構造函數中定義了不少關於可讀流的不一樣階段的狀態值:less

function ReadableState(options, stream) {
  options = options || {};

  ...

  // object stream flag. Used to make read(n) ignore n and to
  // make all the buffer merging and length checks go away
  // 是否爲對象模式,若是是的話,那麼從緩衝區得到的數據爲對象
  this.objectMode = !!options.objectMode;

  if (isDuplex)
    this.objectMode = this.objectMode || !!options.readableObjectMode;

  // the point at which it stops calling _read() to fill the buffer
  // Note: 0 is a valid value, means "don't call _read preemptively ever"
  // 高水位線,一旦buffer緩衝區的數據量大於hwm時,就會中止調用從數據源再獲取數據
  var hwm = options.highWaterMark;
  var readableHwm = options.readableHighWaterMark;
  var defaultHwm = this.objectMode ? 16 : 16 * 1024;  // 默認值

  if (hwm || hwm === 0)
    this.highWaterMark = hwm;
  else if (isDuplex && (readableHwm || readableHwm === 0))
    this.highWaterMark = readableHwm;
  else
    this.highWaterMark = defaultHwm;

  // cast to ints.
  this.highWaterMark = Math.floor(this.highWaterMark);

  // A linked list is used to store data chunks instead of an array because the
  // linked list can remove elements from the beginning faster than
  // array.shift()
  // readable可讀流內部的緩衝區
  this.buffer = new BufferList();
  // 緩衝區數據長度
  this.length = 0;
  this.pipes = null;
  this.pipesCount = 0;
  // flowing模式的初始值
  this.flowing = null;
  // 是否已將源數據所有讀取完畢
  this.ended = false;
  // 是否觸發了end事件
  this.endEmitted = false;
  // 是否正在從源數據處讀取數據到緩衝區
  this.reading = false;

  // a flag to be able to tell if the event 'readable'/'data' is emitted
  // immediately, or on a later tick. We set this to true at first, because
  // any actions that shouldn't happen until "later" should generally also
  // not happen before the first read call.
  this.sync = true;

  // whenever we return null, then we set a flag to say
  // that we're awaiting a 'readable' event emission.
  this.needReadable = false;
  this.emittedReadable = false;
  this.readableListening = false;
  this.resumeScheduled = false;

  // has it been destroyed
  this.destroyed = false;

  // Crypto is kind of old and crusty. Historically, its default string
  // encoding is 'binary' so we have to make this configurable.
  // Everything else in the universe uses 'utf8', though.
  // 編碼方式
  this.defaultEncoding = options.defaultEncoding || 'utf8';

  // 在pipe管道當中正在等待drain事件的寫入流
  // the number of writers that are awaiting a drain event in .pipe()s
  this.awaitDrain = 0;

  // if true, a maybeReadMore has been scheduled
  this.readingMore = false;

  this.decoder = null;
  this.encoding = null;
  if (options.encoding) {
    if (!StringDecoder)
      StringDecoder = require('string_decoder').StringDecoder;
    this.decoder = new StringDecoder(options.encoding);
    this.encoding = options.encoding;
  }
}
複製代碼

在上面的例子中,當實例化一個可讀流rs後,調用可讀流實例的pipe方法。這正式開始了可讀流在flowing模式下從數據源開始獲取數據,以及process.stdout對數據的消費。異步

Readable.prototype.pipe = function (dest, pipeOpts) {
  var src = this
  var state = this._readableState
  ...

  // 可讀流實例監聽data,可讀流會從數據源獲取數據,同時數據被傳遞到了消費者
  src.on('data', ondata)
  function ondata (chunk) {
    ...
    var ret = dest.write(chunk)
    ...
  }

  ...
}
複製代碼

Node提供的可讀流有3種方式能夠將初始態flowing = null的可讀流轉化爲flowing = true函數

  • 監聽data事件
  • 調用stream.resume()方法
  • 調用stream.pipe()方法

事實上這3種方式都回歸到了一種方式上:strean.resume(),經過調用這個方法,將可讀流的模式改變爲flowing態。繼續回到上面的例子當中,在調用了rs.pipe()方法後,實際上內部是調用了src.on('data', ondata)監聽data事件,那麼咱們就來看下這個方法當中作了哪些工做。

Readable.prototype.on = function (ev, fn) {
  ...
  // 監聽data事件
  if (ev === 'data') {
    // 可讀流一開始的flowing狀態是null
    // Start flowing on next tick if stream isn't explicitly paused
    if (this._readableState.flowing !== false)
      this.resume();
  } else if (ev === 'readable') {
    ...
  }

  return res;
}
複製代碼

可讀流監聽data事件,並調用resume方法:

Readable.prototype.resume = function() {
  var state = this._readableState;
  if (!state.flowing) {
    debug('resume');
    // 置爲flowing狀態
    state.flowing = true;
    resume(this, state);
  }
  return this;
};

function resume(stream, state) {
  if (!state.resumeScheduled) {
    state.resumeScheduled = true;
    process.nextTick(resume_, stream, state);
  }
}

function resume_(stream, state) {
  if (!state.reading) {
    debug('resume read 0');
    // 開始從數據源中獲取數據
    stream.read(0);
  }

  state.resumeScheduled = false;
  // 若是是flowing狀態的話,那麼將awaitDrain置爲0
  state.awaitDrain = 0;
  stream.emit('resume');
  flow(stream);
  if (state.flowing && !state.reading)
    stream.read(0);
}
複製代碼

resume方法會判斷這個可讀流是否處於flowing模式下,同時在內部調用stream.read(0)開始從數據源中獲取數據(其中stream.read()方法根據所接受到的參數會有不一樣的行爲):

TODO: 這個地方可說明stream.read(size)方法接收到的不一樣的參數

Readable.prototype.read = function (n) {
  ...
  
  if (n === 0 &&
      state.needReadable &&
      (state.length >= state.highWaterMark || state.ended)) {
    debug('read: emitReadable', state.length, state.ended);
    // 若是緩存中沒有數據且處於end狀態
    if (state.length === 0 && state.ended)
    // 流狀態結束
      endReadable(this);
    else
    // 觸發readable事件
      emitReadable(this);
    return null;
  }

  ...

  // 從緩存中能夠讀取的數據
  n = howMuchToRead(n, state);

  // 判斷是否應該從數據源中獲取數據
  // if we need a readable event, then we need to do some reading.
  var doRead = state.needReadable;
  debug('need readable', doRead);

  // if we currently have less than the highWaterMark, then also read some
  // 若是buffer的長度爲0或者buffer的長度減去須要讀取的數據的長度 < hwm 的時候,那麼這個時候還須要繼續讀取數據
  // state.length - n 即表示當前buffer已有的數據長度減去須要讀取的數據長度後,若是還小於hwm話,那麼doRead仍然置爲true
  if (state.length === 0 || state.length - n < state.highWaterMark) {
    // 繼續read數據
    doRead = true;
    debug('length less than watermark', doRead);
  }

  // however, if we've ended, then there's no point, and if we're already
  // reading, then it's unnecessary.
  // 若是數據已經讀取完畢,或者處於正在讀取的狀態,那麼doRead置爲false代表不須要讀取數據
  if (state.ended || state.reading) {
    doRead = false;
    debug('reading or ended', doRead);
  } else if (doRead) {
    debug('do read');
    state.reading = true;
    state.sync = true;
    // if the length is currently zero, then we *need* a readable event.
    // 若是當前緩衝區的長度爲0,首先將needReadable置爲true,那麼再當緩衝區有數據的時候就觸發readable事件
    if (state.length === 0)
      state.needReadable = true;
    // call internal read method
    // 從數據源獲取數據,多是同步也多是異步的狀態,這個取決於自定義_read方法的內部實現,可參見study裏面的示例代碼
    this._read(state.highWaterMark);
    state.sync = false;
    // If _read pushed data synchronously, then `reading` will be false,
    // and we need to re-evaluate how much data we can return to the user.
    // 若是_read方法是同步,那麼reading字段將會爲false。這個時候須要從新計算有多少數據須要從新返回給消費者
    if (!state.reading)
      n = howMuchToRead(nOrig, state);
  }

  // ret爲輸出給消費者的數據
  var ret;
  if (n > 0)
    ret = fromList(n, state);
  else
    ret = null;

  if (ret === null) {
    state.needReadable = true;
    n = 0;
  } else {
    state.length -= n;
  }

  if (state.length === 0) {
    // If we have nothing in the buffer, then we want to know
    // as soon as we *do* get something into the buffer.
    if (!state.ended)
      state.needReadable = true;

    // If we tried to read() past the EOF, then emit end on the next tick.
    if (nOrig !== n && state.ended)
      endReadable(this);
  }

  // 只要從數據源獲取的數據不爲null,即未EOF時,那麼每次讀取數據都會觸發data事件
  if (ret !== null)
    this.emit('data', ret);

  return ret;
}
複製代碼

這個時候可讀流從數據源開始獲取數據,調用this._read(state.highWaterMark)方法,對應着例子當中實現的read()方法:

const rs = new Readable({
  read () {
    if (c >= 'z'.charCodeAt(0)) return rs.push(null)

    setTimeout(() => {
      // 向可讀流中推送數據
      rs.push(String.fromCharCode(++c))
    }, 100)
  }
})
複製代碼

read方法當中有一個很是中的方法須要開發者本身去調用,就是stream.push方法,這個方法即完成從數據源獲取數據,並供消費者去調用。

Readable.prototype.push = function (chunk, encoding) {
  ....
  // 對從數據源拿到的數據作處理
  return readableAddChunk(this, chunk, encoding, false, skipChunkCheck);
}

function readableAddChunk (stream, chunk, encoding, addToFront, skipChunkCheck) {
  ... 
  // 是否添加數據到頭部
      if (addToFront) {
        // 若是不能在寫入數據
        if (state.endEmitted)
          stream.emit('error',
                      new errors.Error('ERR_STREAM_UNSHIFT_AFTER_END_EVENT'));
        else
          addChunk(stream, state, chunk, true);
      } else if (state.ended) { // 已經EOF,可是仍然還在推送數據,這個時候會報錯
        stream.emit('error', new errors.Error('ERR_STREAM_PUSH_AFTER_EOF'));
      } else {
        // 完成一次讀取後,當即將reading的狀態置爲false
        state.reading = false;
        if (state.decoder && !encoding) {
          chunk = state.decoder.write(chunk);
          if (state.objectMode || chunk.length !== 0)
            // 添加數據到尾部
            addChunk(stream, state, chunk, false);
          else
            maybeReadMore(stream, state);
        } else {
          // 添加數據到尾部
          addChunk(stream, state, chunk, false);
        }
      }
  ...

  return needMoreData(state);
}

// 根據stream的狀態來對數據作處理
function addChunk(stream, state, chunk, addToFront) {
  // flowing爲readable stream的狀態,length爲buffer的長度
  // flowing模式下且爲異步讀取數據的過程時,可讀流的緩衝區並不保存數據,而是直接獲取數據後觸發data事件供消費者使用
  if (state.flowing && state.length === 0 && !state.sync) {
    // 對於flowing模式的Reabable,可讀流自動從系統底層讀取數據,直接觸發data事件,且繼續從數據源讀取數據stream.read(0)
    stream.emit('data', chunk);
    // 繼續從緩存池中獲取數據
    stream.read(0);
  } else {
    // update the buffer info.
    // 數據的長度
    state.length += state.objectMode ? 1 : chunk.length;
    // 將數據添加到頭部
    if (addToFront)
      state.buffer.unshift(chunk);
    else
    // 將數據添加到尾部
      state.buffer.push(chunk);

    // 觸發readable事件,即通知緩存當中如今有數據可讀
    if (state.needReadable)
      emitReadable(stream);
  }
  maybeReadMore(stream, state);
}
複製代碼

addChunk方法中完成對數據的處理,這裏須要注意的就是,在flowing態下,數據被消耗的途徑可能還不同:

  1. 從數據源獲取的數據可能進入可讀流的緩衝區,而後被消費者使用;
  2. 不進入可讀流的緩衝區,直接被消費者使用。

這2種狀況到底使用哪種還要看開發者的是同步仍是異步的去調用push方法,對應着state.sync的狀態值。

push方法被異步調用時,即state.syncfalse:這個時候對於從數據源獲取到的數據是直接經過觸發data事件以供消費者來使用,而不用存放到緩衝區。而後調用stream.read(0)方法重複讀取數據並供消費者使用。

push方法是同步時,即state.synctrue:這個時候從數據源獲取數據後,就不是直接經過觸發data事件來供消費者直接使用,而是首先上數據緩衝到可讀流的緩衝區。這個時候你看代碼可能會疑惑,將數據緩存起來後,那麼在flowing模式下,是如何流動起來的呢?事實上在一開始調用resume_方法時:

function resume_() {
  ...
  // 
  flow(stream);
  if (state.flowing && !state.reading)
    stream.read(0); // 繼續從數據源獲取數據
}

function flow(stream) {
  ...
  // 若是處理flowing狀態,那麼調用stream.read()方法用以從stream的緩衝區中獲取數據並供消費者來使用
  while (state.flowing && stream.read() !== null);
}
複製代碼

flow方法內部調用stream.read()方法取出可讀流緩衝區的數據供消費者使用,同時繼續調用stream.read(0)來繼續從數據源獲取數據。

以上就是在flowing模式下,可讀流是如何完成從數據源獲取數據並提供給消費者使用的大體流程。

paused模式

pasued模式下,消費者若是要獲取數據須要手動調用stream.read()方法去獲取數據。

舉個例子:

const { Readable } = require('stream')

let c = 97 - 1

const rs = new Readable({
  highWaterMark: 3,
  read () {
    if (c >= 'f'.charCodeAt(0)) return rs.push(null)
    setTimeout(() => {
      rs.push(String.fromCharCode(++c))
    }, 1000)
  }
})

rs.setEncoding('utf8')
rs.on('readable', () => {
  // console.log(rs._readableState.length)
  console.log('get the data from readable: ', rs.read())
})
複製代碼

經過監聽readable事件,開始出發可讀流從數據源獲取數據。

Readable.prototype.on = function (env) {
  if (env === 'data') {
    ...
  } else if (env === 'readable') {
    // 監聽readable事件
    const state = this._readableState;
    if (!state.endEmitted && !state.readableListening) {
      state.readableListening = state.needReadable = true;
      state.emittedReadable = false;
      if (!state.reading) {
        process.nextTick(nReadingNextTick, this);
      } else if (state.length) {
        emitReadable(this);
      }
    }
  }
}

function nReadingNextTick(self) {
  debug('readable nexttick read 0');
  // 開始從數據源獲取數據
  self.read(0);
}
複製代碼

nReadingNextTick當中調用self.read(0)方法後,後面的流程和上面分析的flowing模式的可讀流從數據源獲取數據的流程類似,最後都要調用addChunk方法,將數據獲取到後推入可讀流的緩衝區:

function addChunk(stream, state, chunk, addToFront) {
  if (state.flowing && state.length === 0 && !state.sync) {
    ...
  } else {
    // update the buffer info.
    // 數據的長度
    state.length += state.objectMode ? 1 : chunk.length;
    // 將數據添加到頭部
    if (addToFront)
      state.buffer.unshift(chunk);
    else
    // 將數據添加到尾部
      state.buffer.push(chunk);

    // 觸發readable事件,即通知緩存當中如今有數據可讀
    if (state.needReadable)
      emitReadable(stream);
  }
  maybeReadMore(stream, state);
}
複製代碼

一旦有數據被加入到了緩衝區,且needReadable(這個字段表示是否須要觸發readable事件用以通知消費者來消費數據)爲true,這個時候會觸發readable告訴消費者有新的數據被push進了可讀流的緩衝區。此外還會調用maybeReadMore方法,異步的從數據源獲取更多的數據:

function maybeReadMore(stream, state) {
  if (!state.readingMore) {
    state.readingMore = true;
    process.nextTick(maybeReadMore_, stream, state);
  }
}

function maybeReadMore_(stream, state) {
  var len = state.length;
  // 在非flowing的模式下,且緩衝區的數據長度小於hwm
  while (!state.reading && !state.flowing && !state.ended &&
         state.length < state.highWaterMark) {
    debug('maybeReadMore read 0');
    stream.read(0);
    // 獲取不到數據後
    if (len === state.length)
      // didn't get any data, stop spinning.
      break;
    else
      len = state.length;
  }
  state.readingMore = false;
}
複製代碼

每當可讀流有新的數據被推動緩衝區,觸發readable事件後,消費者經過調用stream.read()方法來從可讀流中獲取數據。

背壓

當數據消費消費數據的速度慢於可寫流提供給消費者的數據後會產生背壓。

仍是經過pipe管道來看:

Readable.prototype.pipe = function () {
  ...
  
  // 監聽drain事件
  var ondrain = pipeOnDrain(src);
  dest.on('drain', ondrain);

  ...

  src.on('data', ondata)
  function ondata () {
    increasedAwaitDrain = false;
    // 向writable中寫入數據
    var ret = dest.write(chunk);
    if (false === ret && !increasedAwaitDrain) {
      ...     
      src.pause();
    }
  }
  ...
}

function pipeOnDrain(src) {
  return function() {
    var state = src._readableState;
    debug('pipeOnDrain', state.awaitDrain);
    // 減小pipes中awaitDrain的數量
    if (state.awaitDrain)
      state.awaitDrain--;
    // 若是awaitDrain的數量爲0,且readable上綁定了data事件時(EE.listenerCount返回綁定的事件回調數量)
    if (state.awaitDrain === 0 && EE.listenerCount(src, 'data')) {
      // 從新開啓flowing模式
      state.flowing = true;
      flow(src);
    }
  };
}
複製代碼

dest.write(chunk)返回false的時候,即表明可讀流給可寫流提供的數據過快,這個時候調用src.pause方法,暫停flowing狀態,同步也暫停可寫流從數據源獲取數據以及向可寫流輸入數據。這個時候只有當可寫流觸發drain事件時,會調用ondrain來恢復flowing,同時可讀流繼續向可寫流輸入數據。關於可寫流的背壓可參見關於Writable_stream的源碼分析。

以上就是經過可讀流的2種模式分析了下可讀流的內部工做機制。固然還有一些細節處你們有興趣的話能夠閱讀相關的源碼。

相關文章
相關標籤/搜索