經過源碼解析 Node.js 中導流(pipe)的實現

Node.js中,流(Stream)是其衆多原生對象的基類,它對處理潛在的大文件提供了支持,也抽象了一些場景下的數據處理和傳遞。在它對外暴露的接口中,最爲神奇的,莫過於導流(pipe)方法了。鑑於近期本身正在閱讀Node.js中的部分源碼,也來從源碼層面分享下導流的具體實現。html

正題

如下是一個關於導流的簡單例子:node

'use strict'
import {createReadStream, createWriteStream} from 'fs'

createReadStream('/path/to/a/big/file').pipe(createWriteStream('/path/to/the/dest'))

再結合官方文檔,咱們能夠把pipe方法的主要功能分解爲:git

  • 不斷歷來源可讀流中得到一個指定長度的數據。github

  • 將獲取到的數據寫入目標可寫流。api

  • 平衡讀取和寫入速度,防止讀取速度大大超過寫入速度時,出現大量滯留數據。函數

好,讓咱們跟隨Node.js項目裏lib/_stream_readable.jslib/_stream_writable.js中的代碼,逐個解析這三個主要功能的實現。this

讀取數據

剛建立出的可讀流只是一個記錄了一些初始狀態的空殼,裏面沒有任何數據,而且其狀態不屬於官方文檔中的流動模式(flowing mode)和暫停模式(paused mode)中的任何一種,算是一種僞暫停模式,由於此時實例的狀態中記錄它是否爲暫停模式的變量還不是標準的布爾值,而是null,但又可經過將暫停模式轉化爲流動模式的行爲(調用實例的resume()方法),將可讀流切換至流動模式。在外部代碼中,咱們能夠手動監聽可讀流的data事件,讓其進入流動模式:prototype

// lib/_stream_readable.js
// ...

Readable.prototype.on = function(ev, fn) {
  var res = Stream.prototype.on.call(this, ev, fn);

  if (ev === 'data' && false !== this._readableState.flowing) {
    this.resume();
  }

  // ...

  return res;
};

可見,可讀流類經過二次封裝父類(EventEmitter)的on()方法,替咱們在監聽data事件時,將流切換至了流動模式。而開始讀取數據的動做,則存在於resume()方法調用的內部方法resume_()中,讓咱們一窺究竟:debug

// lib/_stream_readable.js
// ...

function resume_(stream, state) {
  if (!state.reading) {
    debug('resume read 0');
    stream.read(0);
  }

  // ...
}

經過向可讀流讀取一次空數據(大小爲0),將會觸發實例層面實現的_read()方法,開始讀取數據,而後利用讀到的數據觸發data事件:code

// lib/_stream_readable.js
// ...

Readable.prototype.read = function(n) {
  // ...
  // 這次判斷的意圖爲,若是可讀流的緩衝中已滿,則只空觸發readable事件。
  if (n === 0 &&
      state.needReadable &&
      (state.length >= state.highWaterMark || state.ended)) {
    if (state.length === 0 && state.ended)
      endReadable(this);
    else
      emitReadable(this);
    return null;
  }
  
  // 若可讀流已經被傳入了終止符(null),且緩衝中沒有遺留數據,則結束這個可讀流
  if (n === 0 && state.ended) {
    if (state.length === 0)
      endReadable(this);
    return null;
  }

  // 若目前緩衝中的數據大小爲空,或未超過設置的警惕線,則進行一次數據讀取。
  if (state.length === 0 || state.length - n < state.highWaterMark) {
    doRead = true;
  }

  if (doRead) {
    // ...
    this._read(state.highWaterMark);
  }

  // ...

  if (ret !== null)
    this.emit('data', ret);

  return ret;
};

可見,在可讀流的read()方法內部,經過調用在實例層面實現的_read(size)方法,取得了一段(設置的警惕線)大小的數據,可是,你可能會疑惑,這只是讀取了一次數據啊,理想狀況下,應該是循環調用_read(size)直至取完全部數據纔對啊!?其實,這部分的邏輯存在於咱們實現_read(size)方法時,在其內部調用的this.push(data)方法中,在最後其會調用私有方法maybeReadMore_(),再次觸發read(0),接着在read(0)函數的代碼中再次判斷可讀流是否可以結束,不然再進行一次_read(size)讀取:

// lib/_stream_readable.js
// ...

Readable.prototype.push = function(chunk, encoding) {
  var state = this._readableState;
  // ...
  return readableAddChunk(this, state, chunk, encoding, false);
};

function readableAddChunk(stream, state, chunk, encoding, addToFront) {
  // ...
  if (er) {
    stream.emit('error', er);
  } else if (chunk === null) {
    state.reading = false;
    onEofChunk(stream, state); // 當傳入終止符時,將可讀流的結束標識(state.ended)設爲true
  }
  // ...
      maybeReadMore(stream, state);
    }
  } 

  // ...
}

function maybeReadMore(stream, state) {
  if (!state.readingMore) {
    // ...
    process.nextTick(maybeReadMore_, stream, state);
  }
}

function maybeReadMore_(stream, state) {
    // ...
    stream.read(0);
}

function onEofChunk(stream, state) {
  if (state.ended) return;
  // ...
  state.ended = true;
  // ...
}

好的,此時從可讀流中讀取數據的整個核心流程已經實現了,讓咱們概括一下:

  • 剛建立出的可讀流只是一個空殼,保存着一些初始狀態。

  • 監聽它的data事件,將會自動調用該可讀流的resume()方法,使流切換至流動模式。

  • resume()方法的內部函數_resume()中,對可讀流進行了一次read(0)調用。

  • read(0)調用的內部,首先檢查流是否符合告終束條件,若符合,則結束之。不然調用實例實現的_read(size)方法讀取一段預設的警惕線(highWaterMark)大小的數據。

  • 在實例實現_read(size)函數時內部調用的this.push(data)方法裏,會先判斷的讀到的數據是否爲結束符,如果,則將流的狀態設爲結束,而後再一次對可讀流調用read(0)

寫入數據

和可讀流同樣,剛建立出的可寫流也只是一個記錄了相關狀態(包括預設的寫入緩衝大小)的空殼。直接調用它的write方法,該方法會在其內部調用writeOrBuffer函數來對數據是否能夠直接一次性所有寫入進行判斷:

// lib/_stream_writable.js
// ...

function writeOrBuffer(stream, state, chunk, encoding, cb) {
  // ...
  var ret = state.length < state.highWaterMark;

  // 記錄可寫流是否須要出發drain事件
  if (!ret)
    state.needDrain = true;

  if (state.writing || state.corked) {
    // 若可寫流正在被寫入或被人工阻塞,則先將寫入操做排隊
    // ...
  } else {
    doWrite(stream, state, false, len, chunk, encoding, cb);
  }

  return ret;
}

function doWrite(stream, state, writev, len, chunk, encoding, cb) {
  // ...
  if (writev)
    stream._writev(chunk, state.onwrite);
  else
    stream._write(chunk, encoding, state.onwrite);
  // ...
}

從代碼中可知,在writeOrBuffer函數記錄下了數據是否能夠被一次性寫入後,調用了實例層實現的_write()_writev()方法進行了實際的寫入操做。那麼,若是不能一次性寫入完畢,那麼在真正寫入完畢時,又是如何進行通知的呢?嗯,答案就在設置的state.onwrite回調中:

// lib/_stream_writable.js
// ...

function onwrite(stream, er) {
  // ...

  if (er)
    onwriteError(stream, state, sync, er, cb);
  else {
    // ...
    if (sync) {
      process.nextTick(afterWrite, stream, state, finished, cb);
    } else {
      afterWrite(stream, state, finished, cb);
    }
  }
}

function afterWrite(stream, state, finished, cb) {
  if (!finished)
    onwriteDrain(stream, state);
  // ...
}

function onwriteDrain(stream, state) {
  if (state.length === 0 && state.needDrain) {
    state.needDrain = false;
    stream.emit('drain');
  }
}

可見,在回調函數的執行中,會對該可寫流該次被寫入的數據是否超過了警惕線的狀態進行判斷,若是是,則觸發drain事件,進行通知。

咱們也能夠調用end()方法來代表要結束這個寫入流,並進行最後一次寫入,end()方法的內部最終會調用endWritable()函數來說可寫流的狀態切換爲已結束:

// lib/_stream_writable.js
// ...

function endWritable(stream, state, cb) {
  // ...
  state.ended = true;
  stream.writable = false;
}

此時,向可寫流中寫入數據的整個核心流程已經實現了,這個流程和可寫流的循環讀取流程不一樣,它是直線的,概括一下:

  • 剛建立出的可寫流只是一個空殼,保存着一些初始狀態。

  • 調用write()方法,其內部的writeOrBuffer()檢測該次寫入的數據是否須要被暫存在緩衝區中。

  • writeOrBuffer()函數調用實例實現的_write()_writev()方法,進行實際的寫入,完成後調用回調函數state.onwrite

  • 回調函數中檢測該次寫入是否被緩衝,如果,觸發drain事件。

  • 重複以上過程,直至調用end()方法結束該可寫流。

導流

在摸清了從可讀流中讀數據,和向可寫流中寫數據實現的核心流程後,Node.js中實現導流的核心流程其實已經呼之欲出了。首先,爲了開始從源可讀流讀取數據,在pipe()方法的內部,它主動爲源可讀流添加了data事件的監聽函數:

// lib/_stream_readable.js
// ...

Readable.prototype.pipe = function(dest, pipeOpts) {
  // ...

  src.on('data', ondata);
  function ondata(chunk) {
      // ...
      src.pause();
    }
  }

  // ...
  return dest;
};

從代碼中可見,若向目標可寫流寫入一次數據時,目標可寫流表示該次寫入它須要進行緩衝,則主動將源可讀流切換至暫停模式。那麼,源可讀流經過什麼手段得知能夠再次讀取數據並寫入呢?嗯,經過監聽目標可寫流的drain事件:

// lib/_stream_readable.js
// ...

Readable.prototype.pipe = function(dest, pipeOpts) {
  // ...
  var ondrain = pipeOnDrain(src);
  dest.on('drain', ondrain);

  // ...
  return dest;
};

function pipeOnDrain(src) {
  return function() {
    var state = src._readableState;
    
    // 目標可寫流可能會存在屢次寫入須要進行緩衝的狀況,需確保全部須要緩衝的寫入都
    // 完成後,再次將可讀流切換至流動模式。
    if (state.awaitDrain)
      state.awaitDrain--;
    if (state.awaitDrain === 0 && EE.listenerCount(src, 'data')) {
      state.flowing = true;
      flow(src);
    }
  };
}

最後,監聽源可讀流的結束事件,對應着結束目標可寫流:

// lib/_stream_readable.js
// ...

Readable.prototype.pipe = function(dest, pipeOpts) {
  // ...
  var endFn = doEnd ? onend : cleanup;
  if (state.endEmitted)
    process.nextTick(endFn);
  else
    src.once('end', endFn);

  function onend() {
    debug('onend');
    dest.end();
  }

  // ...
  return dest;
};

因爲前面的鋪墊,實際導流操做的核心流程其實實現得很是輕鬆,概括一下:

  • 主動監聽源可讀流的data事件,在該事件的監聽函數中,向目標可寫流寫入數據。

  • 若目標可寫流表示該寫入操做須要進行緩衝,則馬上將源可讀流切換至暫停模式。

  • 監聽目標可寫流的drain事件,當目標可寫流裏全部須要緩衝的寫入操做都完畢後,將流從新切換回流動模式。

  • 監聽源可讀流的end事件,相應地結束目標可寫流。

最後

Node.js中流的實際實現其實很是龐大,複雜,精妙。每個流的內部,都管理着大量狀態。本文僅僅只是在龐大的流的實現中,選擇了一條主線,進行了闡述。你們若是有閒,很是推薦完整地閱讀一遍其實現。

參考:

相關文章
相關標籤/搜索