Node.js源碼解析-Writable實現

Node.js源碼解析-Writable實現

歡迎來個人博客閱讀:《Node.js源碼解析-Writable實現》node

對於一個 stream 模塊來講,最基本的就是讀和寫。讀由 Readable 負責,寫則是由 Writable 負責。Readable 的實現上篇文章已經介紹過了,這篇介紹 Writable 的實現git

Base

在開始以前,先看看 Writable 的構造函數github

// lib/_stream_readable.js

function Writable(options) {
  // 只能在 Writable 或 Duplex 的上下文中執行
  if (!(realHasInstance.call(Writable, this)) &&
      !(this instanceof Stream.Duplex)) {
    return new Writable(options);
  }

  // Wrirable 的狀態集
  this._writableState = new WritableState(options, this);

  // legacy
  this.writable = true;

  if (options) {
    if (typeof options.write === 'function')
      this._write = options.write;

    if (typeof options.writev === 'function')
      this._writev = options.writev;

    if (typeof options.destroy === 'function')
      this._destroy = options.destroy;

    if (typeof options.final === 'function')
      this._final = options.final;
  }

  Stream.call(this);
}

function WritableState(options, stream) {
  options = options || {};

  // object 模式標識
  this.objectMode = !!options.objectMode;

  if (stream instanceof Stream.Duplex)
    this.objectMode = this.objectMode || !!options.writableObjectMode;

  var hwm = options.highWaterMark;
  var defaultHwm = this.objectMode ? 16 : 16 * 1024;
  this.highWaterMark = (hwm || hwm === 0) ? hwm : defaultHwm;

  // highWaterMark 高水位標識
  // 此時,write() 返回 false
  // 默認 16k
  this.highWaterMark = Math.floor(this.highWaterMark);

  this.finalCalled = false;
  // drain 事件標識
  this.needDrain = false;
  // 剛調用 end() 時的狀態標識
  this.ending = false;
  // end() 調用完成後的狀態標識
  this.ended = false;
  this.finished = false;
  this.destroyed = false;

  var noDecode = options.decodeStrings === false;
  // 數據寫入前,是否應該將 string 解析爲 buffer
  this.decodeStrings = !noDecode;
  this.defaultEncoding = options.defaultEncoding || 'utf8';

  // 緩存中的數據長度
  // 不是真正的 buffer 長度,而是正在等待寫入的數據長度
  this.length = 0;
  // writing 標識
  this.writing = false;
  // cork 標識
  this.corked = 0;
  // 標識是否有異步回調
  this.sync = true;
  // 標識正在寫入緩存中的內容
  this.bufferProcessing = false;
  // _write() 和 _writev() 函數的回調
  this.onwrite = onwrite.bind(undefined, stream);
  // 調用 write(chunk, cb) 時的回調函數
  this.writecb = null;
  // 需寫入的單個 chunk 塊長度
  this.writelen = 0;

  // Writable 的緩衝池實現也是一個鏈表,其每一個節點的結構以下:
  // {
  //   chunk,
  //   encoding,
  //   isBuf,
  //   callback,
  //   next
  // }  

  // 緩衝池頭節點
  this.bufferedRequest = null;
  // 緩衝池尾節點
  this.lastBufferedRequest = null;
  // 緩衝池的大小
  this.bufferedRequestCount = 0;

  // 還須要執行的 callback 數量,必須在 finish 事件發生以前降爲 0
  this.pendingcb = 0;
  this.prefinished = false;
  this.errorEmitted = false;

  // cork 的回調函數,最多隻能有兩個函數對象
  var corkReq = { next: null, entry: null, finish: undefined };
  corkReq.finish = onCorkedFinish.bind(undefined, corkReq, this);
  this.corkedRequestsFree = corkReq;
}

Writable 與 Readable 相似,也使用一個對象 ( WritableState ) 對狀態和屬性進行集中管理數組

在 Writable 的構造函數參數中,options.write 函數是必須的,options.writev 則是用於批量寫入數據,能夠選擇實現緩存

Writable 的緩衝池也是由鏈表實現,但與 Readable 不一樣的是,Writable 的緩衝池實現更簡單,其節點結構以下:異步

{
  chunk,    // 數據塊,多是 object / string / buffer
  encoding, // 數據塊編碼
  isBuf,    // buffer 標識
  callback, // write(chunk, cb) 的回調函數
  next      // 下一個寫入任務
}

除此以外,WritableState 還使用 bufferedRequest、lastBufferedRequest、bufferedRequestCount 屬性分別記錄緩衝池的頭、尾節點和節點數量函數

與 Duplex 的關係

在 Duplex 的源碼中有這麼一段註釋this

// a duplex stream is just a stream that is both readable and writable.
// Since JS doesn't have multiple prototypal inheritance, this class
// prototypally inherits from Readable, and then parasitically from
// Writable.

意思是: Duplex 流既是 Readable 流又是 Writable 流,可是因爲 JS 中的繼承是基於原型的,沒有多繼承。因此 Duplex 是繼承自 Readable,寄生自 Writable編碼

寄生自 Writable 體如今兩個方面:prototype

  1. duplex instanceof Writable 爲 true

  2. duplex 具備 Writable 的屬性和方法

// lib/_stream_writable.js

var realHasInstance;
if (typeof Symbol === 'function' && Symbol.hasInstance) {
  realHasInstance = Function.prototype[Symbol.hasInstance];
  Object.defineProperty(Writable, Symbol.hasInstance, {
    value: function(object) {
      if (realHasInstance.call(this, object))
        return true;

      return object && object._writableState instanceof WritableState;
    }
  });
} else {
  realHasInstance = function(object) {
    return object instanceof this;
  };
}

function Writable(options) {
  if (!(realHasInstance.call(Writable, this)) &&
      !(this instanceof Stream.Duplex)) {
    return new Writable(options);
  }
  // ...
}

能夠看出,經過修改 Writable 的 Symbol.hasInstance 使得 duplex/writable instanceof Writabletrue。Writable 的構造函數也只能在 writable 或 duplex 的上下文中調用,使 duplex 具備 Writable 的屬性

// lib/_stream_duplex.js

util.inherits(Duplex, Readable);

var keys = Object.keys(Writable.prototype);
// 獲取 Writable 的全部方法
for (var v = 0; v < keys.length; v++) {
  var method = keys[v];
  if (!Duplex.prototype[method])
    Duplex.prototype[method] = Writable.prototype[method];
}

function Duplex(options) {
  if (!(this instanceof Duplex))
    return new Duplex(options);

  Readable.call(this, options);
  Writable.call(this, options);
  // ...
}

遍歷 Writable 原型上的方法,並添加到 Duplex 的原型上,使 duplex 具備 Writable 的方法

Write 過程

Writable 的寫過程比 Readable 的讀過程簡單得多,不用考慮異步操做,直接寫入便可

// lib/_stream_writable.js

Writable.prototype.write = function(chunk, encoding, cb) {
  var state = this._writableState;
  var ret = false;
  // 判斷是不是 buffer
  var isBuf = Stream._isUint8Array(chunk) && !state.objectMode;
  // ...
  if (state.ended)
    // Writable 結束後繼續寫入數據會報錯
    // 觸發 error 事件
    writeAfterEnd(this, cb);
  else if (isBuf || validChunk(this, state, chunk, cb)) { 
    // 對 chunk 進行校驗
    // 不能爲 null
    // undefined 和非字符串只在 objectMode 下接受
    state.pendingcb++;
    ret = writeOrBuffer(this, state, isBuf, chunk, encoding, cb);
  }

  return ret;
};

write() 函數對傳入數據進行初步處理與校驗後交由 writeOrBuffer() 函數繼續處理

// lib/_stream_writable.js

function writeOrBuffer(stream, state, isBuf, chunk, encoding, cb) {
  if (!isBuf) {
    // 非 buffer 的狀況有: string 或 object
    // 爲 object,表明是 objectMode,直接返回便可
    // 爲 string,則需解碼成 buffer
    var newChunk = decodeChunk(state, chunk, encoding);
    if (chunk !== newChunk) {
      isBuf = true;
      encoding = 'buffer';
      chunk = newChunk;
    }
  }
  var len = state.objectMode ? 1 : chunk.length;

  state.length += len;

  var ret = state.length < state.highWaterMark;
  if (!ret) state.needDrain = true;

  if (state.writing || state.corked) {
    // 正在寫或處於 cork 狀態
    // 將數據塊添加到緩衝池鏈表爲尾部
    var last = state.lastBufferedRequest;
    state.lastBufferedRequest = {
      chunk,
      encoding,
      isBuf,
      callback: cb,
      next: null
    };
    if (last) {
      last.next = state.lastBufferedRequest;
    } else {
      state.bufferedRequest = state.lastBufferedRequest;
    }
    state.bufferedRequestCount += 1;
  } else {
    // 寫入數據
    doWrite(stream, state, false, len, chunk, encoding, cb);
  }

  return ret;
}

writeOrBuffer() 函數對 chunk 進行處理後,根據 Writable 自身狀態決定應什麼時候寫入數據

若是正在寫入或處於 cork 狀態,就將數據存儲到緩衝池鏈表尾部,等待之後處理。不然,直接調用 doWrite() 寫入數據

當緩存達到 highWaterMark 時,writeOrBuffer() 返回 false,表示不該該再寫入數據

// lib/_stream_writable.js

function doWrite(stream, state, writev, len, chunk, encoding, cb) {
  state.writelen = len; // 寫入的數據塊長度
  state.writecb = cb; // 寫入操做的回調函數
  state.writing = true;
  state.sync = true; // 同步狀態標識
  if (writev) // 一次寫入多個數據塊
    stream._writev(chunk, state.onwrite);
  else // 一次寫入單個數據塊
    stream._write(chunk, encoding, state.onwrite);
  state.sync = false;
}

doWrite() 函數中,根據 writev 參數決定該執行 _write() 仍是 _writev()

_write() 函數用於寫入單個數據塊,_writev() 函數用於寫入多個數據塊

_write() / _writev() 中的回調函數不是傳入的 cb 而是 state.onwrite,其定義以下:

this.onwrite = onwrite.bind(undefined, stream);

可知,寫入完成後,執行 onwrite(stream, err)

// lib/_stream_writable.js

function onwrite(stream, er) {
  var state = stream._writableState;
  var sync = state.sync;
  var cb = state.writecb;

  // 更新 state
  onwriteStateUpdate(state);

  if (er) // 發生錯誤
    onwriteError(stream, state, sync, er, cb);
  else {
    var finished = needFinish(state);

    if (!finished &&
        !state.corked &&
        !state.bufferProcessing &&
        state.bufferedRequest) {
      // 清空緩衝池
      // 有 _writev() 函數時,執行 _writev() 一次寫入多個數據塊
      // 沒有,則循環執行 _write() 寫入單個數據塊
      clearBuffer(stream, state);
    }

    if (sync) { // 表明寫入操做是同步的,須要在 process.nextTick() 中執行 callback
      process.nextTick(afterWrite, stream, state, finished, cb);
    } else { // 表明寫入操做是異步的,直接執行 callback 便可
      afterWrite(stream, state, finished, cb);
    }
  }
}

當寫入過程當中有錯誤發生時,會執行 onwriteError(),繼而調用 cb(err) 並觸發 error 事件

若是寫入過程正確執行,則先查看還有多少數據塊正在等待寫入,有多個,就執行 clearBuffer() 清空緩存,而後執行 afterWrite()

// lib/_stream_writable.js

function afterWrite(stream, state, finished, cb) {
  if (!finished)
    onwriteDrain(stream, state);
  state.pendingcb--;
  // 執行回調函數
  cb();
  // 檢查是否應該結束 Writable
  finishMaybe(stream, state);
}

Cork

當有大量小數據塊須要寫入時,若是一個個寫入,會致使效率低下。Writable 提供了 cork()uncork() 兩個方法用於大量小數據塊寫入的狀況

先將寫操做柱塞住,等到緩存達到必定量後,再解除柱塞,而後一次性將存儲的數據塊寫入,這個操做須要 _writev() 支持

// lib/_stream_writable.js

Writable.prototype.cork = function() {
  var state = this._writableState;

  // 增長柱塞的次數
  state.corked++;
};

因爲 cork() 函數可能會被屢次調用,因此 state.corked 須要記錄 cork() 調用的次數,是個 number

// lib/_stream_writable.js

Writable.prototype.uncork = function() {
  var state = this._writableState;

  if (state.corked) {
    // 減小柱塞的次數
    state.corked--;

    if (!state.writing &&
        !state.corked &&
        !state.finished &&
        !state.bufferProcessing &&
        state.bufferedRequest)
      // 清空緩衝池
      clearBuffer(this, state);
  }
};

state.corked === 0 時,才能表示柱塞已經所有解除完畢,能夠執行 clearBuffer() 來處理緩存中的數據

// lib/_stream_writable.js

function clearBuffer(stream, state) {
  // 正在清空 buffer 的標識
  state.bufferProcessing = true;
  // 緩存的頭節點
  var entry = state.bufferedRequest;

  if (stream._writev && entry && entry.next) {
    // _writev() 函數存在,且有一個以上數據塊,就使用 _writev() 寫入數據,效率更高
    var l = state.bufferedRequestCount;
    var buffer = new Array(l);
    var holder = state.corkedRequestsFree;
    holder.entry = entry;

    var count = 0;
    var allBuffers = true;
    // 取得全部數據塊
    while (entry) {
      buffer[count] = entry;
      if (!entry.isBuf)
        allBuffers = false;
      entry = entry.next;
      count += 1;
    }
    buffer.allBuffers = allBuffers;

    // 寫入數據
    doWrite(stream, state, true, state.length, buffer, '', holder.finish);

    state.pendingcb++;
    state.lastBufferedRequest = null;
    // 保證最多隻有兩個實例
    if (holder.next) { 
      state.corkedRequestsFree = holder.next;
      holder.next = null;
    } else {
      var corkReq = { next: null, entry: null, finish: undefined };
      corkReq.finish = onCorkedFinish.bind(undefined, corkReq, state);
      state.corkedRequestsFree = corkReq;
    }
  } else {
    // 一個個的寫入
    while (entry) {
      var chunk = entry.chunk;
      var encoding = entry.encoding;
      var cb = entry.callback;
      var len = state.objectMode ? 1 : chunk.length;

      doWrite(stream, state, false, len, chunk, encoding, cb);
      entry = entry.next;
      // 若是寫操做不是同步執行的,就意味着須要等待這次寫入完成,再繼續寫入
      if (state.writing) {
        break;
      }
    }

    if (entry === null)
      state.lastBufferedRequest = null;
  }

  state.bufferedRequestCount = 0;
  // 修正緩存的頭節點
  state.bufferedRequest = entry;
  state.bufferProcessing = false;
}

執行 clearBuffer() 時,根據是否有 _writev() 函數和待寫入數據塊數量,決定使用 _writev() 仍是 _write() 寫入數據

  • _writev(): 會先將全部數據塊包裝成數組,而後寫入。寫入完成後,回調 corkReq.finish

  • _write(): 只須要將數據塊一個個寫入便可

在使用 _writev() 的狀況下,寫入完成後回調 corkReq.finish 也就是 onCorkedFinish() 函數

// lib/_stream_writable.js

function onCorkedFinish(corkReq, state, err) {
  var entry = corkReq.entry;
  corkReq.entry = null;
  // 依次執行回調函數
  while (entry) {
    var cb = entry.callback;
    state.pendingcb--;
    cb(err);
    entry = entry.next;
  }
  // 保證最多隻有兩個實例
  if (state.corkedRequestsFree) {
    state.corkedRequestsFree.next = corkReq;
  } else {
    state.corkedRequestsFree = corkReq;
  }
}

根據緩衝池鏈表的順序,依次執行寫操做的回調函數

End

每次調用 stream.write(chunk, cb),Writable 都會根據自身狀態,決定是將 chunk 加到緩衝池,仍是直接寫入

當須要寫入大量小數據塊時,推薦先使用 cork() 將寫操做柱塞住,待調用完畢後,再調用 uncork() 解除柱塞,而後一次性寫入全部緩存數據

參考:

相關文章
相關標籤/搜索