Nodejs Stream pipe 的使用與實現原理分析

經過流咱們能夠將一大塊數據拆分爲一小部分一點一點的流動起來,而無需一次性所有讀入,在 Linux 下咱們能夠經過 | 符號實現,相似的在 Nodejs 的 Stream 模塊中一樣也爲咱們提供了 pipe() 方法來實現。html

1. Nodejs Stream pipe 基本示例

選擇 Koa 來實現這個簡單的 Demo,由於以前有人在 「Nodejs技術棧」 交流羣問過一個問題,怎麼在 Koa 中返回一個 Stream,順便在下文藉此機會提下。node

1.1 未使用 Stream pipe 狀況

在 Nodejs 中 I/O 操做都是異步的,先用 util 模塊的 promisify 方法將 fs.readFile 的 callback 形式轉爲 Promise 形式,這塊代碼看似沒問題,可是它的體驗不是很好,由於它是將數據一次性讀入內存再進行的返回,當數據文件很大的時候也是對內存的一種消耗,所以不推薦它。git

const Koa = require('koa');
const fs = require('fs');
const app = new Koa();
const { promisify } = require('util');
const { resolve } = require('path');
const readFile = promisify(fs.readFile);

app.use(async ctx => {
  try {
    ctx.body = await readFile(resolve(__dirname, 'test.json'));
  } catch(err) { ctx.body = err };
});

app.listen(3000);
複製代碼

1.2 使用 Stream pipe 狀況

下面,再看看怎麼經過 Stream 的方式在 Koa 框架中響應數據github

...
app.use(async ctx => {
  try {
    const readable = fs.createReadStream(resolve(__dirname, 'test.json'));
    ctx.body = readable;
  } catch(err) { ctx.body = err };
});
複製代碼

以上在 Koa 中直接建立一個可讀流賦值給 ctx.body 就能夠了,你可能疑惑了爲何沒有 pipe 方法,由於框架給你封裝好了,不要被表象所迷惑了,看下相關源碼:json

// https://github.com/koajs/koa/blob/master/lib/application.js#L256
function respond(ctx) {
  ...
  let body = ctx.body;
  if (body instanceof Stream) return body.pipe(res);
  ...
}
複製代碼

沒有神奇之處,框架在返回的時候作了層判斷,由於 res 是一個可寫流對象,若是 body 也是一個 Stream 對象(此時的 Body 是一個可讀流),則使用 body.pipe(res) 以流的方式進行響應。api

1.3 使用 Stream VS 不使用 Stream

看到一個圖片,不得不說畫的實在太萌了,來源 www.cnblogs.com/vajoy/p/634…安全

2 pipe 的調用過程與實現原理分析

以上最後以流的方式響應數據最核心的實現就是使用 pipe 方法來實現的輸入、輸出,本節的重點也是研究 pipe 的實現,最好的打開方式經過閱讀源碼實現吧。微信

2.1 順藤摸瓜

在應用層咱們調用了 fs.createReadStream() 這個方法,順藤摸瓜找到這個方法建立的可讀流對象的 pipe 方法實現,如下僅列舉核心代碼實現,基於 Nodejs v12.x 源碼。app

2.1.1 /lib/fs.js

導出一個 createReadStream 方法,在這個方法裏面建立了一個 ReadStream 可讀流對象,且 ReadStream 來自 internal/fs/streams 文件,繼續向下找。框架

// https://github.com/nodejs/node/blob/v12.x/lib/fs.js
// 懶加載,主要在用到的時候用來實例化 ReadStream、WriteStream ... 等對象
function lazyLoadStreams() {
  if (!ReadStream) {
    ({ ReadStream, WriteStream } = require('internal/fs/streams'));
    [ FileReadStream, FileWriteStream ] = [ ReadStream, WriteStream ];
  }
}

function createReadStream(path, options) {
  lazyLoadStreams();
  return new ReadStream(path, options); // 建立一個可讀流
}

module.exports = fs = {
  createReadStream, // 導出 createReadStream 方法
  ...
}
複製代碼

2.1.2 /lib/internal/fs/streams.js

這個方法裏定義了構造函數 ReadStream,且在原型上定義了 open、_read、_destroy 等方法,並無咱們要找的 pipe 方法。

可是呢經過 ObjectSetPrototypeOf 方法實現了繼承,ReadStream 繼承了 Readable 在原型中定義的函數,接下來繼續查找 Readable 的實現

// https://github.com/nodejs/node/blob/v12.x/lib/internal/fs/streams.js
const { Readable, Writable } = require('stream');

function ReadStream(path, options) {
  if (!(this instanceof ReadStream))
    return new ReadStream(path, options);

  ...
  Readable.call(this, options);
  ...
}
ObjectSetPrototypeOf(ReadStream.prototype, Readable.prototype);
ObjectSetPrototypeOf(ReadStream, Readable);

ReadStream.prototype.open = function() { ... };

ReadStream.prototype._read = function(n) { ... };;

ReadStream.prototype._destroy = function(err, cb) { ... };
...

module.exports = {
  ReadStream,
  WriteStream
};
複製代碼

2.1.3 /lib/stream.js

在 stream.js 的實現中,有條註釋:在 Readable/Writable/Duplex/... 以前導入 Stream,緣由是爲了不 cross-reference(require),爲何會這樣?

第一步 stream.js 這裏將 require('internal/streams/legacy') 導出複製給了 Stream。

在以後的 _stream_readable、Writable、Duplex ... 模塊也會反過來引用 stream.js 文件,具體實現下面會看到。

Stream 導入了 internal/streams/legacy

上面 /lib/internal/fs/streams.js 文件從 stream 模塊獲取了一個 Readable 對象,就是下面的 Stream.Readable 的定義。

// https://github.com/nodejs/node/blob/v12.x/lib/stream.js
// Note: export Stream before Readable/Writable/Duplex/...
// to avoid a cross-reference(require) issues
const Stream = module.exports = require('internal/streams/legacy');

Stream.Readable = require('_stream_readable');
Stream.Writable = require('_stream_writable');
Stream.Duplex = require('_stream_duplex');
Stream.Transform = require('_stream_transform');
Stream.PassThrough = require('_stream_passthrough');
...
複製代碼

2.1.4 /lib/internal/streams/legacy.js

上面的 Stream 等於 internal/streams/legacy,首先繼承了 Events 模塊,以後呢在原型上定義了 pipe 方法,剛開始看到這裏的時候覺得實現是在這裏了,但後來看 _stream_readable 的實現以後,發現 _stream_readable 繼承了 Stream 以後本身又從新實現了 pipe 方法,那麼疑問來了這個模塊的 pipe 方法是幹嗎的?何時會被用?翻譯文件名 「legacy=遺留」?有點沒太理解,難道是遺留了?有清楚的大佬能夠指點下,也歡迎在公衆號 「Nodejs技術棧」 後臺加我微信一塊討論下!

// https://github.com/nodejs/node/blob/v12.x/lib/internal/streams/legacy.js
const {
  ObjectSetPrototypeOf,
} = primordials;
const EE = require('events');
function Stream(opts) {
  EE.call(this, opts);
}
ObjectSetPrototypeOf(Stream.prototype, EE.prototype);
ObjectSetPrototypeOf(Stream, EE);

Stream.prototype.pipe = function(dest, options) {
  ...
};

module.exports = Stream;
複製代碼

2.1.5 /lib/_stream_readable.js

在 _stream_readable.js 的實現裏面定義了 Readable 構造函數,且繼承於 Stream,這個 Stream 正是咱們上面提到的 /lib/stream.js 文件,而在 /lib/stream.js 文件里加載了 internal/streams/legacy 文件且重寫了裏面定義的 pipe 方法。

通過上面一系列的分析,終於找到可讀流的 pipe 在哪裏,同時也更進一步的認識到了在建立一個可讀流時的執行調用過程,下面將重點來看這個方法的實現。

module.exports = Readable;
Readable.ReadableState = ReadableState;

const EE = require('events');
const Stream = require('stream');

ObjectSetPrototypeOf(Readable.prototype, Stream.prototype);
ObjectSetPrototypeOf(Readable, Stream);

function Readable(options) {
  if (!(this instanceof Readable))
    return new Readable(options);

  ...
  Stream.call(this, options); // 繼承自 Stream 構造函數的定義
}
...
複製代碼

2.2 _stream_readable 實現分析

2.2.1 聲明構造函數 Readable

聲明構造函數 Readable 繼承 Stream 的構造函數和原型。

Stream 是 /lib/stream.js 文件,上面分析了,這個文件繼承了 events 事件,此時也就擁有了 events 在原型中定義的屬性,例如 on、emit 等方法。

const Stream = require('stream');
ObjectSetPrototypeOf(Readable.prototype, Stream.prototype);
ObjectSetPrototypeOf(Readable, Stream);

function Readable(options) {
  if (!(this instanceof Readable))
    return new Readable(options);

  ...

  Stream.call(this, options);
}
複製代碼

2.2.2 聲明 pipe 方法,訂閱 data 事件

在 Stream 的原型上聲明 pipe 方法,訂閱 data 事件,src 爲可讀流對象,dest 爲可寫流對象。

咱們在使用 pipe 方法的時候也是監聽的 data 事件,一邊讀取數據一邊寫入數據。

看下 ondata() 方法裏的幾個核心實現:

  • dest.write(chunk):接收 chunk 寫入數據,若是內部的緩衝小於建立流時配置的 highWaterMark,則返回 true,不然返回 false 時應該中止向流寫入數據,直到 'drain' 事件被觸發
  • src.pause():可讀流會中止 data 事件,意味着此時暫停數據寫入了。

之因此調用 src.pause() 是爲了防止讀入數據過快來不及寫入,何時知道來不及寫入呢,要看 dest.write(chunk) 何時返回 false,是根據建立流時傳的 highWaterMark 屬性,默認爲 16384 (16kb),對象模式的流默認爲 16。

Readable.prototype.pipe = function(dest, options) {
  const src = this;
  src.on('data', ondata);
  function ondata(chunk) {
    const ret = dest.write(chunk);
    if (ret === false) {
      ...
      src.pause();
    }
  }
  ...
};
複製代碼

2.2.3 訂閱 drain 事件,繼續流動數據

上面提到在 data 事件裏,若是調用 dest.write(chunk) 返回 false,就會調用 src.pause() 中止數據流動,何時再次開啓呢?

若是說能夠繼續寫入事件到流時會觸發 drain 事件,也是在 dest.write(chunk) 等於 false 時,若是 ondrain 不存在則註冊 drain 事件。

Readable.prototype.pipe = function(dest, options) {
  const src = this;
  src.on('data', ondata);
  function ondata(chunk) {
    const ret = dest.write(chunk);
    if (ret === false) {
      ...
      if (!ondrain) {
        // When the dest drains, it reduces the awaitDrain counter
        // on the source. This would be more elegant with a .once()
        // handler in flow(), but adding and removing repeatedly is
        // too slow.
        ondrain = pipeOnDrain(src);
        dest.on('drain', ondrain);
      }
      src.pause();
    }
  }
  ...
};

// 當可寫入流 dest 耗盡時,它將會在可讀流對象 source 上減小 awaitDrain 計數器
// 爲了確保全部須要緩衝的寫入都完成,即 state.awaitDrain === 0 和 src 可讀流上的 data 事件存在,切換流到流動模式
function pipeOnDrain(src) {
  return function pipeOnDrainFunctionResult() {
    const state = src._readableState;
    debug('pipeOnDrain', state.awaitDrain);
    if (state.awaitDrain)
      state.awaitDrain--;
    if (state.awaitDrain === 0 && EE.listenerCount(src, 'data')) {
      state.flowing = true;
      flow(src);
    }
  };
}

// stream.read() 從內部緩衝拉取並返回數據。若是沒有可讀的數據,則返回 null。在可讀流上 src 還有一個 readable 屬性,若是能夠安全地調用 readable.read(),則爲 true
function flow(stream) {
  const state = stream._readableState;
  debug('flow', state.flowing);
  while (state.flowing && stream.read() !== null);
}
複製代碼

2.2.4 觸發 data 事件

調用 readable 的 resume() 方法,觸發可讀流的 'data' 事件,進入流動模式。

Readable.prototype.pipe = function(dest, options) {
  const src = this;
  // Start the flow if it hasn't been started already.
  if (!state.flowing) {
    debug('pipe resume');
    src.resume();
  }
  ...
複製代碼

而後實例上的 resume(Readable 原型上定義的)會在調用 resume() 方法,在該方法內部又調用了 resume_(),最終執行了 stream.read(0) 讀取了一次空數據(size 設置的爲 0),將會觸發實例上的 _read() 方法,以後會在觸發 data 事件。

function resume(stream, state) {
  ...
  process.nextTick(resume_, stream, state);
}

function resume_(stream, state) {
  debug('resume', state.reading);
  if (!state.reading) {
    stream.read(0);
  }

  ...
}
複製代碼

2.2.5 訂閱 end 事件

end 事件:當可讀流中沒有數據可供消費時觸發,調用 onend 函數,執行 dest.end() 方法,代表已沒有數據要被寫入可寫流,進行關閉(關閉可寫流的 fd),以後再調用 stream.write() 會致使錯誤。

Readable.prototype.pipe = function(dest, options) {
  ...
  const doEnd = (!pipeOpts || pipeOpts.end !== false) &&
              dest !== process.stdout &&
              dest !== process.stderr;

  const endFn = doEnd ? onend : unpipe;
  if (state.endEmitted)
    process.nextTick(endFn);
  else
    src.once('end', endFn);

  dest.on('unpipe', onunpipe);
  ...

  function onend() {
    debug('onend');
    dest.end();
  }
}
複製代碼

2.2.6 觸發 pipe 事件

在 pipe 方法裏面最後還會觸發一個 pipe 事件,傳入可讀流對象

Readable.prototype.pipe = function(dest, options) {
  ...
  const source = this;
  dest.emit('pipe', src);
  ...
};
複製代碼

在應用層使用的時候能夠在可寫流上訂閱 pipe 事件,作一些判斷,具體可參考官網給的這個示例 stream_event_pipe

2.2.7 支持鏈式調用

最後返回 dest,支持相似 unix 的用法:A.pipe(B).pipe(C)

Readable.prototype.pipe = function(dest, options) {
  return dest;
};
複製代碼

3. 總結

本文整體分爲兩部分:

  • 第一部分相對較基礎,講解了 Nodejs Stream 的 pipe 方法在 Koa2 中是怎麼去應用的。
  • 第二部分仍以 Nodejs Stream pipe 方法爲題,查找它的實現,以及對源碼的一個簡單分析,其實 pipe 方法核心仍是要去監聽 data 事件,向可寫流寫入數據,若是內部緩衝大於建立流時配置的 highWaterMark,則要中止數據流動,直到 drain 事件觸發或者結束,固然還要監聽 end、error 等事件作一些處理。

4. Reference

  • nodejs.cn/api/stream.html
  • cnodejs.org/topic/56ba030271204e03637a3870
  • github.com/nodejs/node/blob/master/lib/_stream_readable.js
相關文章
相關標籤/搜索