node中的Stream-Readable和Writeable解讀

在node中,只要涉及到文件IO的場景通常都會涉及到一個類-Stream。Stream是對IO設備的抽象表示,其在JAVA中也有涉及,主要體如今四個類-InputStream、Reader、OutputStream、Writer,其中InputStream和OutputStream類針對字節數據進行讀寫;Reader和Writer針對字符數據讀寫。同時Java中有多種針對這四種類型的擴展類,如節點流、緩衝流和轉換流等。比較而言,node中Stream類型也和Java中的相似,一樣提供了支持字節和字符讀寫的Readable和Writeable類,也存在轉換流Transform類,本文主要分析node中Readable和Writeable的實現機制,從底層的角度更好的理解Readable和Writeable實現機制,解讀在讀寫過程當中發生的一些重要事件。node

Readable類

Readable對應於Java中的InputStream和Reader兩個類,針對Readable設置encode編碼可完成內部數據由Buffer到字符的轉換。Readable Stream有兩種模式,即flowing和paused模式。這兩種模式對於用戶而言區別在於是否須要手動調用Readable.prototype.read(n),讀取緩衝區的數據。查詢node API文檔可知觸發flowing模式有三種方式:數組

  • 偵聽data事件
  • readable.resume()
  • readable.pipe()
    而觸發paused模式一樣有幾種方式:
  • 移除data事件
  • readable.pause()
  • readable.unpipe()
    可能這樣講解你們仍不明白Readable Stream這兩種模式的區別,那麼下文從更深層次分析兩種模式的機制。

深刻Readable的實現

Readable繼承EventEmitter,你們也都知道。可是相信你們應該不怎麼熟悉Readable的實例屬性**_readableState**。該屬性是一個ReadableState類型的對象,保存了Readable實例的重要信息,如讀取模式(是否爲對象模式)、highWaterMark(緩衝區存放的最大字節數)、緩衝區、flowing模式等。在Readable的實現中,到處使用ReadableState對象記錄當前讀取狀態,並設置緩衝區保證讀操做的順利進行。緩存

首先須要針對Readable.prototype.read方法進行特別解讀:less

if (n === 0 &&
      state.needReadable &&
      (state.length >= state.highWaterMark || state.ended)) {
    debug('read: emitReadable', state.length, state.ended);
    if (state.length === 0 && state.ended)
      endReadable(this);
    else
      emitReadable(this);
    return null;
  }

當讀入的數據爲0時,執行emitReadable操做。這意味着,針對Readable Stream執行read(0)方法會觸發readable事件,可是不會讀當前緩衝區。所以使用read(0)能夠完成一些比較巧妙的事情,如在readable處理函數中可使用read(0)觸發下一次readable事件,可選的操做讀緩衝區。異步

繼續分析代碼,若是讀入的數據並非0,則計算讀取緩衝區的具體字節數,函數

n = howMuchToRead(n, state);

function howMuchToRead(n, state) {
  if (state.length === 0 && state.ended)
    return 0;

  if (state.objectMode)
    return n === 0 ? 0 : 1;

  if (n === null || isNaN(n)) {
    // only flow one buffer at a time
    if (state.flowing && state.buffer.length)
      return state.buffer[0].length;
    // 如果paused狀態,則讀所有的緩衝區
    else
      return state.length;
  }

  if (n <= 0)
    return 0;

  if (n > state.highWaterMark)
    state.highWaterMark = computeNewHighWaterMark(n);

  // don't have that much.  return null, unless we've ended.
  if (n > state.length) {
    if (!state.ended) {
      state.needReadable = true;
      return 0;
    } else {
      return state.length;
    }
  }

  return n;
}

針對對象模式的讀取,每次只讀一個;對於處在flowing模式下的讀取,每次只讀緩衝區中第一個buffer的長度;在paused模式下則讀取所有緩衝區的長度;若讀取的字節數大於設置的緩衝區最大值,則適當擴大緩衝區的大小(默認爲16k,最大爲8m);若讀取的長度大於當前緩衝區的大小,設置needReadable屬性並準備數據等待下一次讀取。this

接下來,判斷是否須要準備數據。在這裏,依賴於needReadable的值,編碼

var doRead = state.needReadable;
  debug('need readable', doRead);

  if (state.length === 0 || state.length - n < state.highWaterMark) {
    doRead = true;
    debug('length less than watermark', doRead);
  }

  // reading, then it's unnecessary.
  if (state.ended || state.reading) {
    doRead = false;
    debug('reading or ended', doRead);
  }

若是當前緩衝區爲空,或者緩衝區並未超出咱們設定的最大值,那麼就能夠繼續準備數據;若是此時正在準備數據或者已經結束讀取,那麼就放棄準備數據。一旦doRead爲true,那麼進入準備數據階段,prototype

if (doRead) {
    debug('do read');
    state.reading = true;
    state.sync = true;
    // if the length is currently zero, then we *need* a readable event.
    if (state.length === 0)
      state.needReadable = true;
    // call internal read method
    // 默認Readable未實現_read,拋出Error
    // 針對自定義的Readable子類,_read可修改state.buffer的數量,進行預處理,
    // 而後由下面的fromList讀出去緩存中的相關數據
    this._read(state.highWaterMark);
    state.sync = false;
  }

接下來設置相關的標誌位,進行_read處理。針對這個私有方法_read,文檔上有特殊說明,自定義的Readable實現類須要實現這個方法,在該方法中手動添加數據到Readable對象的讀緩衝區,而後進行Readable的讀取。能夠理解爲_read函數爲讀取數據前的準備工做(準備數據),針對的是流的實現者而言。debug

if (doRead && !state.reading)
    n = howMuchToRead(nOrig, state);
    
  var ret;
  if (n > 0)
    ret = fromList(n, state);
  else
    ret = null;

  if (ret === null) {
    state.needReadable = true;
    n = 0;
  }

  state.length -= n;

  if (state.length === 0 && !state.ended)
    state.needReadable = true;

  if (nOrig !== n && state.ended && state.length === 0)
    endReadable(this);

  // flowing模式下的數據讀取依賴於 read函數
  // data事件觸發的次數,依賴於howMuchToRead計算的次數
  if (ret !== null)
    this.emit('data', ret);

一旦在_read中更新了緩衝區,那麼咱們須要從新計算(消費者,便可寫流)讀取的字節數。fromList方法完成了讀緩衝區的slice,若是是objectMode下的讀,則只讀緩衝區的第一個對象;針對未傳參數的read方法而言,默認讀取所有緩衝區等等。從讀緩衝區讀取完數據以後設置相關flag,如needReadable,最終,觸發data事件,結束!

上節提到,設置data事件的執行函數會進入flowing模式的讀,而上文看到正是read方法觸發了data事件,而默認條件下Readable處於paused狀態,所以在paused狀態讀取數據須要手動執行read函數,每次read讀取完畢觸發一次data事件。從這點看出,flowing和paused狀態區別在於是否須要手動執行read()來獲取數據。flowing狀態下,咱們無需執行read,僅須要設置data事件處理函數或者設定導流目標pipe;而在paused狀態下,不只僅是簡單的執行read方法,由於讀緩衝區的內容時刻在改變,一旦讀緩衝區又有新數據,簡單執行read()就無法知足需求(由於咱們沒法知道是否又有新數據到來),所以須要偵聽讀緩衝區的相關事件,即readable事件,在該事件處理函數中進行read相關數據。

那麼,什麼狀況下會觸發readable事件呢?在實現_read私有方法中,咱們使用stream.push(chunk)或stream.unshift(chunk)方法注入數據到讀緩衝區,那麼push和unshift方法都實現了下面的邏輯,

if (state.flowing && state.length === 0 && !state.sync) {
  stream.emit('data', chunk);
  stream.read(0);
} else {
  // update the buffer info.
  state.length += state.objectMode ? 1 : chunk.length;
  if (addToFront)
    state.buffer.unshift(chunk);
  else
    state.buffer.push(chunk);

  if (state.needReadable)
    emitReadable(stream);
}

function emitReadable(stream) {
  var state = stream._readableState;
  state.needReadable = false;
  if (!state.emittedReadable) {
    debug('emitReadable', state.flowing);
    state.emittedReadable = true;
    if (state.sync)
      process.nextTick(emitReadable_, stream);
    else
      emitReadable_(stream);
  }
}

function emitReadable_(stream) {
  debug('emit readable');
  stream.emit('readable');
  flow(stream);
}
// 在flowing狀態下,自動讀取流(替代paused狀態下手動read)
function flow(stream) {
  var state = stream._readableState;
  debug('flow', state.flowing);
  if (state.flowing) {
    do {
      var chunk = stream.read();
    } while (null !== chunk && state.flowing);
  }
}

一旦處於flowing模式而且當前緩衝區沒有數據,那麼就當即將預處理的push(unshift)數據傳遞給data事件處理函數,並執行stream.read(0)。前文已經交代過,read(0)僅僅用來觸發readable事件,並不讀取緩衝區,這就是觸發readable的第一種狀況。

第二種則是第一種狀況以外的全部情景,即根據操做(push、unshift)的不一樣將數據插入讀緩衝區的不一樣位置。最後執行emitReadable函數,觸發readable事件。針對emitReadable函數,它的做用就是異步觸發readable事件,並執行flow函數。flow函數則針對flowing狀態的Readable作自適應讀取,免去了手動執行read函數和什麼時候執行read函數的苦惱。

這樣,對於Readable的實現者,一旦在_read函數插入有效數據到讀緩衝區,都會觸發readable事件,在paused狀態下,設置readable事件處理函數並手動執行read函數,即可完成數據的讀取;而在flowing狀態下,經過設置data事件處理函數或者定義pipe目標流一樣能夠實現讀取。

既然pipe一樣能夠觸發Readable進入flowing狀態,那麼pipe方法具體作了什麼呢?其實pipe針對Readable和Writeable作了限流,首先針對Readable的data事件進行偵聽,並執行Writeable的write函數,當Writeable的寫緩衝區大於一個臨界值(highWaterMark),致使write函數返回false(此時意味着Writeable沒法匹配Readable的速度,Writeable的寫緩衝區已經滿了),此時,pipe修改了Readable模式,執行pause方法,進入paused模式,中止讀取讀緩衝區。而同時Writeable開始刷新寫緩衝區,刷新完畢後異步觸發drain事件,在該事件處理函數中,設置Readable爲flowing狀態,並繼續執行flow函數不停的刷新讀緩衝區,這樣就完成了pipe限流。須要注意的是,Readable和Writeable各自維護了一個緩衝區,在實現的上有區別:Readable的緩衝區是一個數組,存放Buffer、String和Object類型;而Writeable則是一個有向鏈表,依次存放須要寫入的數據。

Writeable解讀

Writeable對應Java的OutputStream和Writer類,實現字節和字符數據的寫。與Readable相似,Writeable的實例對象一樣維護了一個狀態對象-WriteableState,記錄了當前輸出流的狀態信息,如寫緩衝區的最大值(hightWaterMark)、緩衝區(有向鏈表)和緩衝區長度等信息。在本節中,主要分析輸出流的關鍵方法write和事件drain,並解析輸出流的實現者須要實現的方法**_writewrite**的關係。

function write
----------------------------
if (state.ended)
    writeAfterEnd(this, cb);
  else if (validChunk(this, state, chunk, cb)) {
    state.pendingcb++;
    ret = writeOrBuffer(this, state, chunk, encoding, cb);
  }

  return ret;

在write方法中,判斷寫入數據的格式並執行writeOrBuffer函數,並返回執行結果,該返回值標示當前寫緩衝區是否已滿。真正執行寫入邏輯的是writeOrBuffer函數,該函數的做用在於刷新或者更新寫緩衝區,下面看看主要作了什麼,

function writeOrBuffer(stream, state, chunk, encoding, cb) {
  chunk = decodeChunk(state, chunk, encoding);

  if (chunk instanceof Buffer)
    encoding = 'buffer';
  var len = state.objectMode ? 1 : chunk.length;

  state.length += len;

  // 若是緩存的長度大於highWaterMark,須要刷新緩衝,因此設置needDrain標誌
  var ret = state.length < state.highWaterMark;
  // we must ensure that previous needDrain will not be reset to false.
  if (!ret)
    state.needDrain = true;

  // 緩存未處理的寫請求,在clearBuffer中執行緩存
  // 由此看出,Readable和Writeable都有緩存,Readable 中緩存的方式是數組(項爲Buffer,字符串或對象),Writeable的
  // 緩存則是對象鏈表
  if (state.writing || state.corked) {
    var last = state.lastBufferedRequest;
    state.lastBufferedRequest = new WriteReq(chunk, encoding, cb);
    if (last) {
      last.next = state.lastBufferedRequest;
    } else {
      state.bufferedRequest = state.lastBufferedRequest;
    }
    state.bufferedRequestCount += 1;
  } else {
    doWrite(stream, state, false, len, chunk, encoding, cb);
  }

  return ret;
}

writeOrBuffer首先針對數據進行編碼,字符串轉換成Buffer類型,若是設置了Writeable的ObjectMode模式則仍爲Object類型;接下來更新寫緩衝區的長度,並判斷寫緩衝區長度是否超過設定的Writeable的最大值(默認16k),若是超過超過則ret=false並更新WriteableState的屬性needDrain=true。ret的結果其實就是write方法返回值,所以一旦write返回值爲false,意味着當前寫緩衝區已滿,須要中止繼續寫入數據。

在Readable的pipe方法中,涉及到了Writeable的drain事件。該事件的觸發意味着寫緩衝區已能夠繼續緩存數據,可見drain事件與寫緩衝區嚴格相關。繼續分析writeOrBuffer函數,若當前輸出流正在寫數據,那麼則當前數據緩存至寫緩衝區(建立WriteReq對象);不然執行doWrite函數,刷新緩衝區。

function doWrite(stream, state, writev, len, chunk, encoding, cb) {
  state.writelen = len;
  state.writecb = cb;
  state.writing = true;
  state.sync = true;
  if (writev)
    stream._writev(chunk, state.onwrite);
  else
    stream._write(chunk, encoding, state.onwrite);
  state.sync = false;
}

doWrite函數設置了須要寫入數據的長度、寫入狀態等信息,並執行輸出流實現者須要實現的_write函數。在_write函數中,針對數據流向作最後的處理,這裏分析_write函數的具體實現。_write函數有三個參數,分別爲chunk,encoding和state.onwrite回調函數,對該回調函數稍後分析,先着重講解_write函數的實現。在node的fs模塊中,能夠經過fs.createWriteStream建立Writeable實例,經過執行

var writeStream = fs.createWriteStream('./output',{decodeStrings: false});
console.log(writeStream._write.toString());

-----------------輸出-----------------

function (data, encoding, cb) {
  if (!(data instanceof Buffer))
    return this.emit('error', new Error('Invalid data'));

  if (typeof this.fd !== 'number')
    return this.once('open', function() {
      this._write(data, encoding, cb);
    });

  var self = this;
  fs.write(this.fd, data, 0, data.length, this.pos, function(er, bytes) {
    if (er) {
      self.destroy();
      return cb(er);
    }
    self.bytesWritten += bytes;
    cb();
  });

  if (this.pos !== undefined)
    this.pos += data.length;
}

看出,在_write實現中,只接受Buffer類型的數據,接着執行fs.write操做,寫入到對應文件描述符fd對應的文件中,寫入成功或失敗後執行回調函數,即state.onwrite函數。

function onwrite(stream, er) {
  var state = stream._writableState;
  var sync = state.sync;
  var cb = state.writecb;

  onwriteStateUpdate(state);

  // 默認未重寫_write方法,會收到er值
  if (er)
    onwriteError(stream, state, sync, er, cb);
  else {
    // Check if we're actually ready to finish, but don't emit yet
    var finished = needFinish(state);

    // 寫緩存的數據
    if (!finished &&
        !state.corked &&
        !state.bufferProcessing &&
        state.bufferedRequest) {
      clearBuffer(stream, state);
    }

    // 異步觸發drain事件
    if (sync) {
      process.nextTick(afterWrite, stream, state, finished, cb);
    } else {
      afterWrite(stream, state, finished, cb);
    }
  }
}

在state.onwrite函數中主要工做有兩個:

  • 寫緩衝區的數據
  • 寫完緩衝區的數據後,異步觸發drain事件

第一步,在clearBuffer函數中,就是取出寫緩衝區(有向鏈表)的第一個WriteReq對象,執行doWrite函數,寫入緩衝區的第一個數據;這樣循環往復最終清空寫緩衝區,重置一些標誌位。

第二步,異步執行afterWrite函數,觸發drain事件,並判斷是否寫操做完畢觸發「finish」事件。這裏之因此強調異步觸發drain事件,是由於爲了保證先得到write()返回值爲false,給用戶綁定drain處理函數的時隙,而後再觸發drain事件。

至此,Writeable的重要流程已所有走通。能夠看出來,在覈心的write()中,判斷寫緩衝區是否已滿並返回該值,在適當條件下緩存數據或調用_write()寫數據,在Writeable實現者須要實現的** _write() 中,主要任務是數據寫入方向控制,完成最基本的任務**。

總結

對比Readable的read()和_read(),我總結了下這四個函數在「讀寫過程」中的執行順序與關係,以下圖所示:
Readable和Writeable的函數執行順序

相關文章
相關標籤/搜索