Node.js源碼解析-Readable實現

Node.js源碼解析-Readable實現

歡迎來個人博客閱讀:《Node.js源碼解析-Readable實現》node

想要了解 Readable 的實現,最好的方法是順着 Readable 的 Birth-Death 走一遍git

Base

在瞭解 Readable 的 Birth-Death 以前,先看看 Readable 的構造函數github

// lib/_stream_readable.js

function Readable(options) {
  if (!(this instanceof Readable))
    return new Readable(options);

  // Readable 流的狀態集
  this._readableState = new ReadableState(options, this);

  // legacy
  this.readable = true;

  if (options) {
    if (typeof options.read === 'function')
      // 真實數據來源,Readable.prototyoe._read() 函數會拋出異常,所以必須有options.read
      this._read = options.read;

    if (typeof options.destroy === 'function')
      this._destroy = options.destroy;
  }

  Stream.call(this);
}

function ReadableState(options, stream) {
  options = options || {};

  // object 模式標識
  this.objectMode = !!options.objectMode;

  if (stream instanceof Stream.Duplex)
    this.objectMode = this.objectMode || !!options.readableObjectMode;

  var hwm = options.highWaterMark;
  var defaultHwm = this.objectMode ? 16 : 16 * 1024;
  this.highWaterMark = (hwm || hwm === 0) ? hwm : defaultHwm;

  // highWaterMark 高水位標識
  // 內部緩存高於 highWaterMark 時,會中止調用 _read() 獲取數據
  // 默認 16k
  this.highWaterMark = Math.floor(this.highWaterMark);

  // Readable 流內部緩衝池,是一個 buffer 鏈表
  // 之因此不用數組實現,是由於鏈表增刪頭尾元素更快
  this.buffer = new BufferList();
  // 緩存大小
  this.length = 0;
  // pipe 的流
  this.pipes = null;
  this.pipesCount = 0;
  // flow 模式標識
  this.flowing = null;
  // Readable 狀態標識,爲 true 表示數據源已讀取完畢
  // 此時 Readable 中可能還有數據,不能再向緩衝池中 push() 數據
  this.ended = false;
  // Readable 狀態標識,爲 true 表示 end 事件已觸發
  // 此時 Readable 中數據已讀取完畢,不能再向緩衝池中 push() 或 unshift() 數據
  this.endEmitted = false;
  // Readable 狀態標識,爲 true 表示正在調用 _read() 讀取數據
  this.reading = false;
  this.sync = true;
  // 標識須要觸發 readable 事件
  this.needReadable = false;
  // 標識已觸發 readable 事件
  this.emittedReadable = false;
  this.readableListening = false;
  this.resumeScheduled = false;
  this.destroyed = false;
  this.defaultEncoding = options.defaultEncoding || 'utf8';
  this.awaitDrain = 0;
  this.readingMore = false;

  // 解碼器
  this.decoder = null;
  this.encoding = null;
  if (options.encoding) {
    if (!StringDecoder)
      StringDecoder = require('string_decoder').StringDecoder;
    this.decoder = new StringDecoder(options.encoding);
    this.encoding = options.encoding;
  }
}

在 Readable 的構造函數中,可經過 options 傳入參數,其中 options.read 函數是必需的數組

readable._readableState 中保存了 Readable 的各類狀態與屬性緩存

Birth-Death

在這裏將 Readable 的 Birth-Death 分爲五個狀態:less

表中爲 this._readableSate 的屬性異步

  • start: 初始狀態,Readable 剛剛被建立,還未調用 readable.read()函數

|length|reading|ended|endEmitted|
|--|--|--|--|
|0|false|false|false|ui

  • reading: 表明正在從數據源中讀取數據,此時緩存大小 this._readableSate.length 小於 highWaterMark,應讀取數據使緩存達到 highWaterMarkthis

|length|reading|ended|endEmitted|
|--|--|--|--|
|< highWaterMark|true|false|false|

  • read: Readable 從數據源讀取數據後的相對穩定狀態

|length|reading|ended|endEmitted|
|--|--|--|--|
|>= highWaterMark|false|false|false|

  • ended: 數據已經所有讀取完成( push(null) ),此時 push(chunk) 會報 stream.push() after EOF 錯誤

|length|reading|ended|endEmitted|
|--|--|--|--|
|>= 0|false|true|false|

  • endEmitted: end 事件觸發完成,此時 unshift(chunk) 會報 stream.unshift() after end event 錯誤

|length|reading|ended|endEmitted|
|--|--|--|--|
|0|false|true|true|

它們之間的關係以下:

1           4         5
  start ==> reading ==> ended ==> endEmitted
             || /\
           2 \/ || 3
             read

1. start ==> reading

start 狀態變爲 reading 狀態,發生在第一次調用 read()

// lib/_stream_readable.js

Readable.prototype.read = function(n) {
  debug('read', n);
  n = parseInt(n, 10);
  var state = this._readableState;
  var nOrig = n;

  if (n !== 0)
    state.emittedReadable = false;

  // 調用 read(0)時,若是緩存大於 highWaterMark 則直接觸發 readable 事件
  if (n === 0 &&
      state.needReadable &&
      (state.length >= state.highWaterMark || state.ended)) {
    debug('read: emitReadable', state.length, state.ended);
    if (state.length === 0 && state.ended)
      endReadable(this);
    else
      emitReadable(this);
    return null;
  }

  // 計算可讀數據量
  // n = NaN ==> 讀取所有
  // n <= state.length ==> 讀取 n
  // n > state.length ==> 讀取 0,並使 Readable 從數據源讀取數據
  // 
  // n > state.highWaterMark ==> 從新計算 highWaterMark,大小是大於 n 的最小 2^x
  n = howMuchToRead(n, state);

  // 當 Readable 已經讀完時,調用 endReadable() ,結束 Readable
  if (n === 0 && state.ended) {
    if (state.length === 0)
      endReadable(this);
    return null;
  }

  // 判斷是否應該從數據源讀取數據
  // BEGIN
  var doRead = state.needReadable;
  debug('need readable', doRead);

  if (state.length === 0 || state.length - n < state.highWaterMark) {
    doRead = true;
    debug('length less than watermark', doRead);
  }
  // END

  if (state.ended || state.reading) {
    // 對於 ended 或 reading 狀態的 Readable 是不須要讀取數據的
    doRead = false;
    debug('reading or ended', doRead);
  } else if (doRead) {
    // 讀取數據
    debug('do read');
    state.reading = true;
    state.sync = true;

    if (state.length === 0)
      state.needReadable = true;
    // 從數據源讀取數據,多是異步,也多是同步
    this._read(state.highWaterMark);
    state.sync = false;
    // 由於 _read() 函數多是異步的,也多是同步的
    // 在同步狀況下,須要從新確承認讀長度
    if (!state.reading)
      n = howMuchToRead(nOrig, state);
  }

  // 獲取數據
  var ret;
  if (n > 0) ret = fromList(n, state); // 從緩衝池中讀取數據
  else ret = null;

  if (ret === null) {
    state.needReadable = true;
    n = 0;
  } else {
    state.length -= n;
  }

  // ...

  if (ret !== null)
    this.emit('data', ret);

  return ret;
};

// 必須實現的方法
Readable.prototype._read = function(n) {
  this.emit('error', new Error('_read() is not implemented'));
};

// 計算可讀長度
function howMuchToRead(n, state) {
  if (n <= 0 || (state.length === 0 && state.ended))
    return 0;
  if (state.objectMode)
    return 1;
  if (n !== n) { // NaN
    if (state.flowing && state.length)
      return state.buffer.head.data.length;
    else
      return state.length;
  }

  if (n > state.highWaterMark)
    // 當須要數據大於 highWaterMark 時,調整 highWaterMark 大小到大於 n 的最小 2^x
    state.highWaterMark = computeNewHighWaterMark(n);
  if (n <= state.length)
    return n;
  // 緩衝池中數據不夠
  if (!state.ended) {
    state.needReadable = true;
    return 0;
  }
  return state.length;
}

調用 read() 後,若是緩衝池中數據不夠或讀取後低於 highWaterMark,則調用 _read() 來讀取更多的數據,不然直接返回讀取的數據

當指望數據量大於 highWaterMark 時,從新計算 highWaterMark,大小是大於指望數據量的最小 2^x

2. reading ==> read

調用 _read() 後,會異步或同步地將調用 push(chunk),將數據放入緩衝池,並使 Readable 從 reading 狀態變爲 read 狀態

// lib/_stream_readable.js

Readable.prototype.push = function(chunk, encoding) {
  var state = this._readableState;
  var skipChunkCheck;

  if (!state.objectMode) {
    if (typeof chunk === 'string') {
      encoding = encoding || state.defaultEncoding;
      // 若是指定編碼與 Readable 編碼不一樣,則將 chunk 使用指定編碼解碼爲 Buffer
      if (encoding !== state.encoding) {
        chunk = Buffer.from(chunk, encoding);
        encoding = '';
      }
      // string 不須要檢查
      skipChunkCheck = true;
    }
  } else {
    // object mode 的 Readable 也不須要檢查
    skipChunkCheck = true;
  }

  return readableAddChunk(this, chunk, encoding, false, skipChunkCheck);
};

function readableAddChunk(stream, chunk, encoding, addToFront, skipChunkCheck) {
  var state = stream._readableState;
  if (chunk === null) { // 結束信號
    state.reading = false;
    onEofChunk(stream, state);
  } else {
    var er;
    if (!skipChunkCheck) // 檢查 chunk 格式
      er = chunkInvalid(state, chunk);
    if (er) {
      stream.emit('error', er);
    } else if (state.objectMode || chunk && chunk.length > 0) {
      if (typeof chunk !== 'string' &&
          Object.getPrototypeOf(chunk) !== Buffer.prototype &&
          !state.objectMode) {
        chunk = Stream._uint8ArrayToBuffer(chunk);
      }

      if (addToFront) { // unshift() 的 hook
        if (state.endEmitted)
          stream.emit('error', new Error('stream.unshift() after end event'));
        else
          addChunk(stream, state, chunk, true); // 將數據添加到緩衝池中
      } else if (state.ended) {
        stream.emit('error', new Error('stream.push() after EOF'));
      } else {
        state.reading = false;
        if (state.decoder && !encoding) {
          chunk = state.decoder.write(chunk);
          if (state.objectMode || chunk.length !== 0)
            addChunk(stream, state, chunk, false); // 將數據添加到緩衝池中
          else 
            maybeReadMore(stream, state); // 會在 addChunk() 函數內部調用
        } else {
          addChunk(stream, state, chunk, false); // 將數據添加到緩衝池中
        }
      }
    } else if (!addToFront) {
      state.reading = false;
    }
  }

  return needMoreData(state);
  // return !state.ended &&  數據源還有數據
  //          (state.needReadable ||  須要更多數據
  //           state.length < state.highWaterMark ||  緩存小於 highWaterMark
  //           state.length === 0)
}

function addChunk(stream, state, chunk, addToFront) {
  if (state.flowing && state.length === 0 && !state.sync) {
    // 對於 flow 模式的 Readable,直接觸發 data 事件,並繼續讀取數據就行
    stream.emit('data', chunk);
    stream.read(0);
  } else {
    state.length += state.objectMode ? 1 : chunk.length;
    if (addToFront)
      state.buffer.unshift(chunk);
    else
      state.buffer.push(chunk);

    if (state.needReadable)
      emitReadable(stream);
  }

  // 在容許的狀況下,讀取數據直到 highWaterMark
  maybeReadMore(stream, state);
}

調用 push(chunk) 時,會將 chunk 放入緩衝池內,並改變 Readable 的狀態。若是 Readable 處於 ended 狀態,會報 stream.push() after EOF 錯誤

若是緩存小於 highWaterMark,返回 true,意味着須要寫入更多的數據

3. read ==> reading

從 read 到 reading 狀態,意味着須要讀取更多的數據,即緩存小於 highWaterMark

緩存與 highWaterMark 的關係能夠根據 push(chunk) 的返回值來判斷,可是須要使用者手動處理。所以,爲了方便使用,addChunk() 函數會自動調用 maybeReadMore() 來異步讀取數據。這樣,即便單次 _read() 沒法達到 highWaterMark,也能夠經過屢次異步讀取,使數據流動起來

// lib/_stream_readable.js

function maybeReadMore(stream, state) {
  if (!state.readingMore) {
    state.readingMore = true;
    process.nextTick(maybeReadMore_, stream, state);
  }
}

function maybeReadMore_(stream, state) {
  var len = state.length;
  while (!state.reading && !state.flowing && !state.ended &&
         state.length < state.highWaterMark) {
    debug('maybeReadMore read 0');
    stream.read(0);
    if (len === state.length) // 取不到數據就放棄
      break;
    else
      len = state.length;
  }
  state.readingMore = false;
}

maybeReadMore() 函數內,經過異步讀取數據,直到 highWaterMark

那麼爲何是異步讀取數據呢?

由於,在 _read() 函數內,可能不止一次調用 push(chunk)

若是是同步,push(chunk) 後,由於沒有達到 highWaterMark,會繼續調用 read(0),發生第二次 _read()。第二次 _read() 也可能致使第三次 _read() ,直到 highWaterMark

待整個調用完畢後,緩衝池內會有 highWaterMark * n( _read() 內調用 push(chunk) 次數 )的數據,而這與 highWaterMark 的設計是不符的

若是是異步,則能夠等 _read() 執行完畢後,在 process.nextTick() 內再次調用 _read() 讀取數據,不會發生上面的問題

4. reading ==> ended

當數據源讀取完畢時,須要調用 push(null) 來通知 Rreadable 數據源已經讀取完畢。push(null) 函數內部會調用 onEofChunk()

// lib/_stream_readable.js

function onEofChunk(stream, state) {
  if (state.ended) return;
  if (state.decoder) {
    var chunk = state.decoder.end();
    if (chunk && chunk.length) {
      state.buffer.push(chunk);
      state.length += state.objectMode ? 1 : chunk.length;
    }
  }
  state.ended = true;

  // 觸發 readable 事件,通知監聽者來處理剩餘數據
  emitReadable(stream);
}

onEofChunk() 函數將 readable 標記爲 ended 狀態後,禁止再向緩衝池內 push 數據。此時,緩衝池內可能還有數據

5. ended ==> endEmitted

ended 狀態的 Readable 內可能還有數據。所以,當數據所有被讀取後,須要調用 endReadable() 來結束 Readable

// lib/_stream_readable.js

function endReadable(stream) {
  var state = stream._readableState;

  // state.length 必定是 0
  if (state.length > 0)
    throw new Error('"endReadable()" called on non-empty stream');

  if (!state.endEmitted) {
    state.ended = true;
    process.nextTick(endReadableNT, state, stream);
  }
}

function endReadableNT(state, stream) {
  // 防止中間調用 unshift(chunk),向緩衝池中放入數據
  if (!state.endEmitted && state.length === 0) {
    state.endEmitted = true;
    stream.readable = false;
    stream.emit('end');
  }
}

調用 endReadable() 時,緩衝池必定爲空。整個調用完成後,觸發 end 事件,Readable 將不能再讀取或寫入( push() / unshift() )數據

End

到這裏,已經走完了 Readable 的整個 Birth-Death 過程

整個過程就以下面這個圖:

1           4         5
  start ==> reading ==> ended ==> endEmitted
             || /\
           2 \/ || 3
             read

1. read()
2. push(chunk)
3. maybeReadMore() ==> read(0)
4. push(null)
5. endReadable()

根據這個圖還有代碼,在腦殼裏面,把 Readable 的模型運行一遍,就能瞭解它的實現了

參考:

相關文章
相關標籤/搜索