Node.js源碼解析-pipe實現

Node.js源碼解析-pipe實現

歡迎來個人博客閱讀:《Node.js源碼解析-pipe實現》node

從前面兩篇文章,咱們瞭解到。想要把 Readable 的數據寫到 Writable,就必須先手動的將數據讀入內存,而後寫入 Writable。換句話說,每次傳遞數據時,都須要寫以下的模板代碼git

readable.on('readable', (err) => {
  if(err) throw err

  writable.write(readable.read())
})

爲了方便使用,Node.js 提供了 pipe() 方法,讓咱們能夠優雅的傳遞數據github

readable.pipe(writable)

如今,就讓咱們來看看它是如何實現的吧函數

pipe

首先須要先調用 Readable 的 pipe() 方法this

// lib/_stream_readable.js

Readable.prototype.pipe = function(dest, pipeOpts) {
  var src = this;
  var state = this._readableState;

  // 記錄 Writable
  switch (state.pipesCount) {
    case 0:
      state.pipes = dest;
      break;
    case 1:
      state.pipes = [state.pipes, dest];
      break;
    default:
      state.pipes.push(dest);
      break;
  }
  state.pipesCount += 1;

  // ...

    src.once('end', endFn);

  dest.on('unpipe', onunpipe);
  
  // ...

  dest.on('drain', ondrain);

  // ...

  src.on('data', ondata);

  // ...

  // 保證 error 事件觸發時,onerror 首先被執行
  prependListener(dest, 'error', onerror);

  // ...

  dest.once('close', onclose);
  
  // ...

  dest.once('finish', onfinish);

  // ...

  // 觸發 Writable 的 pipe 事件
  dest.emit('pipe', src);

  // 將 Readable 改成 flow 模式
  if (!state.flowing) {
    debug('pipe resume');
    src.resume();
  }

  return dest;
};

執行 pipe() 函數時,首先將 Writable 記錄到 state.pipes 中,而後綁定相關事件,最後若是 Readable 不是 flow 模式,就調用 resume() 將 Readable 改成 flow 模式prototype

傳遞數據

Readable 從數據源獲取到數據後,觸發 data 事件,執行 ondata()debug

ondata() 相關代碼:code

// lib/_stream_readable.js

  // 防止在 dest.write(chunk) 內調用 src.push(chunk) 形成 awaitDrain 重複增長,awaitDrain 不能清零,Readable 卡住的狀況
  // 詳情見 https://github.com/nodejs/node/issues/7278
  var increasedAwaitDrain = false;
  function ondata(chunk) {
    debug('ondata');
    increasedAwaitDrain = false;
    var ret = dest.write(chunk);
    if (false === ret && !increasedAwaitDrain) {
      // 防止在 dest.write() 內調用 src.unpipe(dest),致使 awaitDrain 不能清零,Readable 卡住的狀況
      if (((state.pipesCount === 1 && state.pipes === dest) ||
           (state.pipesCount > 1 && state.pipes.indexOf(dest) !== -1)
          ) && 
          !cleanedUp) {
        debug('false write response, pause', src._readableState.awaitDrain);
        src._readableState.awaitDrain++;
        increasedAwaitDrain = true;
      }
      // 進入 pause 模式
      src.pause();
    }
  }

ondata(chunk) 函數內,經過 dest.write(chunk) 將數據寫入 Writable事件

此時,在 _write() 內部可能會調用 src.push(chunk) 或使其 unpipe,這會致使 awaitDrain 屢次增長,不能清零,Readable 卡住ip

當不能再向 Writable 寫入數據時,Readable 會進入 pause 模式,直到全部的 drain 事件觸發

觸發 drain 事件,執行 ondrain()

// lib/_stream_readable.js

  var ondrain = pipeOnDrain(src);

  function pipeOnDrain(src) {
    return function() {
      var state = src._readableState;
      debug('pipeOnDrain', state.awaitDrain);
      if (state.awaitDrain)
        state.awaitDrain--;
      // awaitDrain === 0,且有 data 監聽器
      if (state.awaitDrain === 0 && EE.listenerCount(src, 'data')) {
        state.flowing = true;
        flow(src);
      }
    };
  }

每一個 drain 事件觸發時,都會減小 awaitDrain,直到 awaitDrain 爲 0。此時,調用 flow(src),使 Readable 進入 flow 模式

到這裏,整個數據傳遞循環已經創建,數據會順着循環源源不斷的流入 Writable,直到全部數據寫入完成

unpipe

無論寫入過程當中是否出現錯誤,最後都會執行 unpipe()

// lib/_stream_readable.js

// ...

  function unpipe() {
    debug('unpipe');
    src.unpipe(dest);
  }

// ...

Readable.prototype.unpipe = function(dest) {
  var state = this._readableState;
  var unpipeInfo = { hasUnpiped: false };

  // 啥也沒有
  if (state.pipesCount === 0)
    return this;

  // 只有一個
  if (state.pipesCount === 1) {
    if (dest && dest !== state.pipes)
      return this;
    // 沒有指定就 unpipe 全部
    if (!dest)
      dest = state.pipes;

    state.pipes = null;
    state.pipesCount = 0;
    state.flowing = false;
    if (dest)
      dest.emit('unpipe', this, unpipeInfo);
    return this;
  }

  // 沒有指定就 unpipe 全部
  if (!dest) {
    var dests = state.pipes;
    var len = state.pipesCount;
    state.pipes = null;
    state.pipesCount = 0;
    state.flowing = false;

    for (var i = 0; i < len; i++)
      dests[i].emit('unpipe', this, unpipeInfo);
    return this;
  }

  // 找到指定 Writable,並 unpipe
  var index = state.pipes.indexOf(dest);
  if (index === -1)
    return this;

  state.pipes.splice(index, 1);
  state.pipesCount -= 1;
  if (state.pipesCount === 1)
    state.pipes = state.pipes[0];

  dest.emit('unpipe', this, unpipeInfo);

  return this;
};

Readable.prototype.unpipe() 函數會根據 state.pipes 屬性和 dest 參數,選擇執行策略。最後會觸發 dest 的 unpipe 事件

unpipe 事件觸發後,調用 onunpipe(),清理相關數據

// lib/_stream_readable.js

  function onunpipe(readable, unpipeInfo) {
    debug('onunpipe');
    if (readable === src) {
      if (unpipeInfo && unpipeInfo.hasUnpiped === false) {
        unpipeInfo.hasUnpiped = true;
        // 清理相關數據
        cleanup();
      }
    }
  }

End

在整個 pipe 的過程當中,Readable 是主動方 ( 負責整個 pipe 過程:包括數據傳遞、unpipe 與異常處理 ),Writable 是被動方 ( 只須要觸發 drain 事件 )

總結一下 pipe 的過程:

  • 首先執行 readbable.pipe(writable),將 readable 與 writable 對接上

  • 當 readable 中有數據時,readable.emit('data'),將數據寫入 writable

  • 若是 writable.write(chunk) 返回 false,則進入 pause 模式,等待 drain 事件觸發

  • drain 事件所有觸發後,再次進入 flow 模式,寫入數據

  • 無論數據寫入完成或發生中斷,最後都會調用 unpipe()

  • unpipe() 調用 Readable.prototype.unpipe(),觸發 dest 的 unpipe 事件,清理相關數據

參考:

相關文章
相關標籤/搜索