深刻解析Node.js Stream ondata觸發時機與順序

今天小編就爲你們分享一篇關於Node.js Stream ondata觸發時機與順序的探索,寫的十分的全面細緻,具備必定的參考價值,對此有須要的朋友能夠參考學習下。若有不足之處,歡迎批評指正。前端

無用邏輯異步

當時研究pipe細節是基於Node.js v8.11.1的源碼,其中針對上游的ondata事件處理有以下一段代碼:函數

// If the user pushes more data while we're writing to dest then we'll end up
// in ondata again. However, we only want to increase awaitDrain once because
// dest will only emit one 'drain' event for the multiple writes.
// => Introduce a guard on increasing awaitDrain.
var increasedAwaitDrain = false;
src.on('data', ondata);
function ondata(chunk) {
  debug('ondata');
  increasedAwaitDrain = false;
  var ret = dest.write(chunk);
  if (false === ret && !increasedAwaitDrain) {
    if (((state.pipesCount === 1 && state.pipes === dest) ||
        (state.pipesCount > 1 && state.pipes.indexOf(dest) !== -1)) &&
      !cleanedUp) {
      debug('false write response, pause', src._readableState.awaitDrain);
      src._readableState.awaitDrain++;
      increasedAwaitDrain = true;
    }
//在此我向你們推薦一個前端全棧開發交流圈:619586920 突破技術瓶頸,提高思惟能力
    src.pause();
  }
}

重點關注increasedAwaitDrain變量,理解這個變量指望達到什麼目的,而後仔細閱讀代碼,會發現if (false === ret && !increasedAwaitDrain)語句中increasedAwaitDrain變量確定是false,由於前一行纔將該變量賦值爲false,這樣一來這個變量就變得毫無心義。學習

increasedAwaitDrain = false; 
var ret = dest.write(chunk); 
if (false === ret && !increasedAwaitDrain) {}

以上就是關鍵的三行代碼,由於Node.js是單線程且dest.write(chunk)內部沒有修改變量increasedAwaitDrain的值,那麼if語句中increasedAwaitDrain的值確定仍是false,即increasedAwaitDrain相關邏輯沒有達到所指望的目標。優化

無用代碼出現的緣由ui

前段雖已經分析出increasedAwaitDrain沒起到做用,但做者爲何寫了這樣一段邏輯呢?其實在定義increasedAwaitDrain語句的上方,做者說可能存在這樣一種狀況:「當咱們接收到一次上游的ondata事件並嘗試將數據寫到下游時,上游可能同時又有一個data事件觸發,而這兩個ondata的數據在寫入下游時可能都返回false,從而致使src._readableState.awaitDrain++執行兩次」。 awaitDrain++執行兩次是做者不但願看到的狀況,由於下游觸發drain事件時awaitDrain相應減1,直到其值爲0時才讓上游從新流動,若是awaitDrain++執行兩次,下游卻只觸發一次drain事件,awaitDrain就不會爲0,上游不從新流動也就沒法繼續讀取數據。this

真相的探索過程prototype

雖然從理性上認爲increasedAwaitDrain沒起到做用,但也沒法確定加絕對,本身嘗試去求助,沒有出現高手指點出問題所在,但一個同事聽我描述後,說可能這就是個BUG,雖心中以爲可能性不大,但仍是抱着試試看的心態切換到master分支上去瞅瞅,隨即發現最新的代碼裏並無與increasedAwaitDrain相似的邏輯,間接說明v8.11.1分支上increasedAwaitDrain相關邏輯的確無用。 雖然比較確定這裏存在一段無用代碼,但應該如何理解做者在increasedAwaitDrain上方的註釋呢?爲了進一步揭露真相,本身繼續花時間去看了看stream.Readable相關代碼,想知道data事件的觸發時機與順序是如何決定的。線程

readable流的簡單原理debug

在進一步解釋data事件的觸發順序前,簡單講一下readable流的實現原理,若是須要本身實現一個readable流,可使用new stream.Readable(options)方法,其中options可包含四個屬性:highWaterMark、encoding、objectMode、read。最主要的是read屬性,當流的使用者須要數據時,read方法被用來從數據源獲取數據,而後經過this.push(chunk)將數據傳遞給使用者,若是沒有更多數據可供讀取時使用this.push(null)表示讀取結束。

const Readable = require('stream').Readable;
let letter = 'ABCDEFG'.split('');
let index = 0;
const rs = new Readable({
  read(size) {
    this.push(letter[index++] || null);
  }
});
rs.on('data', chunk => {
  console.log(chunk.toString());
});
//在此我向你們推薦一個前端全棧開發交流圈:619586920 突破技術瓶頸,提高思惟能力
// 輸出
// A
// B
// C
// ...

這裏ondata雖然沒有明顯調用read方法,但內部依舊是經過調用read方法結合this.push輸出數據,而且在源代碼內部能夠發現經過參數傳遞的read方法實際上被賦值給this._read,而後在Readable.prototype.read中調用this._read獲取數據。

靈魂代碼

爲了進一步說明stream.Readable的data事件觸發順序與場景,將有關官方源碼通過修改和刪減成以下:

function Readable(options) {
  this._read = options.read; // 將參數傳遞的read函數賦值到this._read
}
// 使用者經過調用read方法獲取數據
Readable.prototype.read = function (size) {
  var state = this._readableState;
  // 模擬鎖,一次_read若是沒有返回(this.push),後續read不會繼續調用_read讀取數據
  if (!state.reading) {
    state.reading = true;
    state.sync = true; // sync用於在push方法中指示_read內部是否同步調用了push
    this._read(size);
    state.sync = false;    
  }
  // _read內部若是是同步調用push,數據會放入緩衝區
  // _read內部若是是異步調用push且緩衝區沒有內容,數據可能emit data返回
  // 嘗試從緩衝區(state.buffer)中獲取大小爲size的數據,若是獲取成功則觸發data事件
  if (ret) 
    this.emit('data', ret);
  return ret;
};
// 在this._read執行過程當中經過this.push輸出數據
Readable.prototype.push = function (chunk, encoding) {
  var state = this._readableState;
  // 本次_read獲取到數據,打開鎖
  state.reading = false;
  // 流動模式 & 緩衝區沒有數據 & 非同步返回,則直接觸發data事件
  if (state.flowing && state.length === 0 && !state.sync) {
    stream.emit('data', chunk);
    stream.read(0); // 觸發下一次讀取,_read異步push的話仍是會到這裏,相似flow中的保持流出於流動
  }
  else {
    // 將數據放入緩衝區
    state.length += chunk.length;
    state.buffer.push(chunk);
  }
};
// 暫停流動
Readable.prototype.pause = function() {
  if (this._readableState.flowing !== false) {
    this._readableState.flowing = false;
    this.emit('pause');
  }
  return this;
};
function flow(stream) {
  const state = stream._readableState;
  while (state.flowing && stream.read() !== null);
}
//在此我向你們推薦一個前端全棧開發交流圈:619586920 突破技術瓶頸,提高思惟能力

data事件的觸發時機與順序

時機

data的觸發只有兩處:

  • 流若是處於流動模式 & 緩衝區沒有數據 & 異步調用push,此時數據不通過緩衝區,直接觸發data事件
  • 不知足上述狀況時,push的數據會被放入緩衝區,而後再嘗試從緩衝區讀取指定size的數據並觸發data事件

順序

關於data的觸發順序,實際是由emit順序決定,爲討論原始問題:「increasedAwaitDrain相關邏輯爲何能夠被刪除?」,將代碼簡化:

let count = 0;
src.on('data', chunk => {
  let ret = dest.write(chunk);
  if (!ret) {
    count++;
    src.pause();
  }
});

當監聽流的data事件時,流最終會經過resume並調用flow函數進入流動模式模式,即不斷的調用read方法讀取數據。接下來分析如下幾種場景,當dest.write(chunk)返回false時++count會執行幾回,注意結合前文的靈魂代碼。

//在此我向你們推薦一個前端全棧開發交流圈:619586920 突破技術瓶頸,提高思惟能力

場景一:每次_read同步push一次數據

當發生第一次讀取,數據同步push到緩衝區,緊接着從緩衝區中讀取數據並經過emit data的方式傳遞到ondata中,若是此時dest.write(chunk)返回false,count++將執行一次,接着因爲調用了stream.pause(),while條件state.flowing爲false致使stream.read再也不被調用,在流從新流動前,count的值不會繼續增長。

場景二:每次_read異步push一次數據

當發生第一次讀取,異步push的數據將直接經過emit data傳遞到ondata中,而read函數中的emit因爲沒法從緩衝區讀取數據從而不會觸發,同時read返回null致使while循環也相應中止,此種狀況下異步push觸發data事件後,緊接着的stream.read(0)會繼續保持流的流動,當dest.write(chunk)返回false,count++執行一次並將流暫停,緊接着會繼續調用一次read,但此次數據將被放入緩衝區且不觸發data事件,count++依舊只執行一次。 場景二流暫停一次後再次流動時,數據消耗模式與以前會有所差別,會優先消耗緩衝區數據直至爲空時回到以前的模式,但這一樣不會致使count++執行屢次。

場景三:每次_read屢次同步push數據

與場景一相似,只是每次_read會屢次往緩衝區寫入數據,最終data事件仍是依靠從緩衝區讀數據後觸發。

場景四:每次_read屢次異步push數據

同場景二相似,假設在一次_read中有兩次異步push,當第一個異步push執行時,data事件觸發且其中的dest.write(chunk)返回false,致使count++同時流被暫停,等第二個異步push執行時,因爲流已經暫停,數據將寫入緩衝區而不是觸發data事件,因此count++只執行一次。

場景五:_read操做可能同步或異步push

不論是同步或者異步push,當一次ondata內部將流設置爲暫停模式後,flow函數中while條件state.flowing爲false將致使stream.read再也不調用,異步的push的emit data判斷條件一樣再也不知足,即目前階段內部不會再有data事件觸發直到外部再次間接或直接調用read方法。 以上五個場景是爲了分析該問題而模擬的,實際只要能理解第五個場景就能明白全部。

小結

文章最終寫出來的內容與我最開始的初衷所偏離,並且本身不知道如何評價這篇文章的好壞,但爲了寫這文章花了兩天業餘時間去深刻理解stream.Readable倒是很是有收穫的一件事情,更堅決本身在寫文章的路途上能夠走的更遠。 PS:猜想爲何有爛電影的存在,多是由於導演長時間投入的創做會讓他迷失在內部而沒法發現問題,寫文章也是,難以經過閱讀去優化費心思寫的文章。

PS:下圖是美團博客的,也許我寫了這麼多卻抵不上這張圖,說明方式很重要。

結語

感謝您的觀看,若有不足之處,歡迎批評指正。

相關文章
相關標籤/搜索