說說node中的可寫流

以前寫過一篇流的文章,說說node中可讀流和可寫流,比較宏觀的介紹了流的一些基本概念。

下面是以前文章裏的一張圖:
javascript


能夠看到可寫流相對是比較簡單的,主要就是實現了一個緩存的能力。而這篇文章主要介紹可寫流的實現細節,no bb show code。

咱們來具體看看實現可寫流的核心類的代碼:java

WritableState 類

WritableState類用來存放配置,部分代碼以下:
node


比較重要的配置有:

  • buffered:緩衝區,用來存放等待寫入的內容
  • highWaterMark:緩衝區大小,可寫流默認16kb
  • writing: 正在寫入(防止同一個tich屢次寫入)
  • corked: 強制將數據寫入緩衝區,避免出現行頭阻塞(文檔中的解釋:The primary intent of writable.cork() is to accommodate a situation in which several small chunks are written to the stream in rapid succession)
  • closed:流是否已經關閉

Writeable 類

Writeable類主要作了如下幾件事情:web

  1. 初始化WritableState實例
  2. 根據options配置重寫_write等方法
  3. 調用父類構造函數

所有代碼以下:
api

Writeable.prototype.write

這是對外暴露的write方法,核心邏輯是調用了writeOrBuffer方法,根據字面意思就能夠看出,是寫入數據或者緩存起來。代碼以下:緩存

 1Writable.prototype.write = function(chunk, encoding, cb{
2  const state = this._writableState;
3
4  // chunk轉換成buffer
5  if (chunk === null) {
6    throw new ERR_STREAM_NULL_VALUES();
7  } else if (!state.objectMode) {
8    if (typeof chunk === 'string') {
9      if (state.decodeStrings !== false) {
10        chunk = Buffer.from(chunk, encoding);
11        encoding = 'buffer';
12      }
13    } else if (chunk instanceof Buffer) {
14      encoding = 'buffer';
15    } else if (Stream._isUint8Array(chunk)) {
16      chunk = Stream._uint8ArrayToBuffer(chunk);
17      encoding = 'buffer';
18    } else {
19      throw new ERR_INVALID_ARG_TYPE(
20        'chunk', ['string''Buffer''Uint8Array'], chunk);
21    }
22  }
23
24  // 若是流已經結束,繼續寫入會報錯
25  let err;
26  if (state.ending) {
27    err = new ERR_STREAM_WRITE_AFTER_END();
28  } else if (state.destroyed) {
29    err = new ERR_STREAM_DESTROYED('write');
30  }
31
32  // 存在錯誤會在下個tick調用回掉。不存在錯誤調用writeOrBuffer方法馬上寫入或者存入緩衝區
33  if (err) {
34    process.nextTick(cb, err);
35    errorOrDestroy(this, err, true);
36    return false;
37  } else {
38    state.pendingcb++;
39    return writeOrBuffer(this, state, chunk, encoding, cb);
40  }
41};
複製代碼

下面看看writeOrBuffer方法,若是系統正在寫入,則放入緩衝區。不然直接調用_write方法寫入。若是_write返回fasle。則須要等待"drain"事件觸發後才能夠繼續寫入:app

緩衝區

前面介紹過WritableState類,緩衝區就是這個類中的buffered對象。排隊寫入的內容會暫時存放在這裏。而higherWaterMark標識是否已經超出了合理的緩存值(默認爲16kb)
函數

思考一下:若是超出了higherWaterMark還繼續寫入會怎麼樣呢?post

drain事件

若是正在寫入,write方法會返回false。當寫入完成,就會發出「drain」事件通知咱們,下面是官網對「drain」事件的描述:ui

若是調用 stream.write(chunk) 返回 false,則當能夠繼續寫入數據到流時會觸發 'drain' 事件。

能夠看出,可寫流最終寫入數據是經過_write完成的,若是你想實現一個自定義的可寫流,其實就是實現一個新的 _write方法。具體的作法能夠在官網中找到示例。

那麼node中的可寫流是如何實現的呢?

咱們通常經過fs.createWriteStream方法來建立可寫流。這個方法會實例化WriteStream這個類。

咱們能夠在/nodejs/node/blob/master/lib/internal/fs/streams.js路徑下看到WriteStream的實現,其實就是實現了一個_write方法。僞代碼以下:

 1const { Writable } = require('stream');
2const fs = require('fs');
3
4// 繼承Writeable
5function WriteStream(path, options{
6  Writable.call(this, options);
7}
8ObjectSetPrototypeOf(WriteStream.prototype, Writable.prototype);
9ObjectSetPrototypeOf(WriteStream, Writable);
10
11// 實現_write方法
12WriteStream.prototype._write = function(data, encoding, cb{
13  // 用法見fs
14  fs.write(this.fd, data, 0, data.length, this.pos, (er, bytes) => {
15    this.bytesWritten += bytes;
16    cb();
17  });
18
19  if (this.pos !== undefined)  this.pos += data.length;
20};
複製代碼

Stream類

最後再看下Writeable的父類,源碼在internal/streams/legacy.js中,代碼很少,主要作了如下幾件事:

  1. 繼承events模塊,得到事件通訊能力
  2. 實現pipe方法(經過監聽「data」事件讀取數據。由於讀取數據較寫入數據快不少,爲了不內存溢出,經過監聽「drain」事件來控制流量)
  3. 註冊onend、onclose等方法

因此說基類Stream主要就是提供事件能力,和pipe這個api,代碼以下:

 1const {
2  ObjectSetPrototypeOf,
3} = primordials;
4
5const EE = require('events');
6
7// 繼承events模塊
8function Stream(opts{
9  EE.call(this, opts);
10}
11ObjectSetPrototypeOf(Stream.prototype, EE.prototype);
12ObjectSetPrototypeOf(Stream, EE);
13
14// 實現pipe方法
15Stream.prototype.pipe = function(dest, options{
16  const source = this;
17
18  function ondata(chunk{
19    // 若是可寫流處於「不可寫」狀態,則暫停
20    if (dest.writable && dest.write(chunk) === false && source.pause) {
21      source.pause();
22    }
23  }
24  // 經過監聽‘data’事件讀取數據
25  source.on('data', ondata);
26  // 若是這是一個可讀流,觸發「drain」事件時,調用resume方法繼續讀取數據
27  function ondrain({
28    if (source.readable && source.resume) {
29      source.resume();
30    }
31  }
32  // 監聽「drain」事件
33  dest.on('drain', ondrain);
34
35  // 註冊監聽「onend」、「onclose」方法
36  // If the 'end' option is not supplied, dest.end() will be called when
37  // source gets the 'end' or 'close' events.  Only dest.end() once.
38  if (!dest._isStdio && (!options || options.end !== false)) {
39    source.on('end', onend);
40    source.on('close', onclose);
41  }
42
43  let didOnEnd = false;
44  function onend({
45    if (didOnEnd) return;
46    didOnEnd = true;
47
48    dest.end();
49  }
50
51
52  function onclose({
53    if (didOnEnd) return;
54    didOnEnd = true;
55
56    if (typeof dest.destroy === 'function') dest.destroy();
57  }
58
59  // 下面作的都是清除工做
60  // Don't leave dangling pipes when there are errors.
61  function onerror(er{
62    cleanup();
63    if (EE.listenerCount(this'error') === 0) {
64      throw er; // Unhandled stream error in pipe.
65    }
66  }
67
68  source.on('error', onerror);
69  dest.on('error', onerror);
70
71  // Remove all the event listeners that were added.
72  function cleanup({
73    source.removeListener('data', ondata);
74    dest.removeListener('drain', ondrain);
75
76    source.removeListener('end', onend);
77    source.removeListener('close', onclose);
78
79    source.removeListener('error', onerror);
80    dest.removeListener('error', onerror);
81
82    source.removeListener('end', cleanup);
83    source.removeListener('close', cleanup);
84
85    dest.removeListener('close', cleanup);
86  }
87
88  source.on('end', cleanup);
89  source.on('close', cleanup);
90
91  dest.on('close', cleanup);
92  dest.emit('pipe', source);
93
94  // 返回這個實例,支持鏈式調用
95  // Allow for unix-like usage: A.pipe(B).pipe(C)
96  return dest;
97};
98
99module.exports = Stream;
複製代碼

總結一下:

Stream類提供能力:
  • 事件能力
  • pipe方法
  • 容錯處理和事件清除等
Writeable類提供能力:
  • 維護配置參數(維護在WritableState類)
  • 維護緩衝區
  • 調用底層_write方法寫入數據
WriteStream提供能力:
  • 調用fs.write方法寫入數據
fs.createWriteableStream提供能力:
  • 實例化WriteStream
相關文章
相關標籤/搜索