Node.js 指南(流中的背壓)

流中的背壓

在數據處理過程當中會出現一個叫作背壓的常見問題,它描述了數據傳輸過程當中緩衝區後面數據的累積,當傳輸的接收端具備複雜的操做時,或者因爲某種緣由速度較慢時,來自傳入源的數據就有累積的趨勢,就像阻塞同樣。html

要解決這個問題,必須有一個委託系統來確保數據從一個源到另外一個源的平滑流動,不一樣的社區已經針對他們的程序獨特意解決了這個問題,Unix管道和TCP套接字就是很好的例子,而且一般被稱爲流量控制,在Node.js中,流是已採用的解決方案。node

本指南的目的是進一步詳細說明背壓是什麼,以及精確流如何在Node.js的源代碼中解決這個問題,本指南的第二部分將介紹建議的最佳實踐,以確保在實現流時應用程序的代碼是安全的和優化的。linux

咱們假設你對Node.js中背壓BufferEventEmitter的通常定義以及Stream的一些經驗有所瞭解。若是你尚未閱讀這些文檔,那麼首先查看API文檔並非一個壞主意,由於它有助於在閱讀本指南時擴展你的理解。git

數據處理的問題

在計算機系統中,數據經過管道、sockets和信號從一個進程傳輸到另外一個進程,在Node.js中,咱們找到了一種名爲Stream的相似機制。流很好!他們爲Node.js作了不少事情,幾乎內部代碼庫的每一個部分都使用該模塊,做爲開發人員,咱們鼓勵你使用它們!github

const readline = require('readline');

// process.stdin and process.stdout are both instances of Streams
const rl = readline.createInterface({
  input: process.stdin,
  output: process.stdout
});

rl.question('Why should you use streams? ', (answer) => {
  console.log(`Maybe it's ${answer}, maybe it's because they are awesome! :)`);

  rl.close();
});

經過比較Node.js的Stream實現的內部系統工具,能夠證實爲何經過流實現背壓機制是一個很好的優化的一個很好的例子。shell

在一種狀況下,咱們將使用一個大文件(約〜9gb)並使用熟悉的zip(1)工具對其進行壓縮。segmentfault

$ zip The.Matrix.1080p.mkv

雖然這須要幾分鐘才能完成,但在另外一個shell中咱們能夠運行一個腳本,該腳本採用Node.js的模塊zlib,它包含另外一個壓縮工具gzip(1)瀏覽器

const gzip = require('zlib').createGzip();
const fs = require('fs');

const inp = fs.createReadStream('The.Matrix.1080p.mkv');
const out = fs.createWriteStream('The.Matrix.1080p.mkv.gz');

inp.pipe(gzip).pipe(out);

要測試結果,請嘗試打開每一個壓縮文件,zip(1)工具壓縮的文件將通知你文件已損壞,而Stream完成的壓縮將無錯誤地解壓縮。安全

注意:在此示例中,咱們使用.pipe()將數據源從一端獲取到另外一端,可是,請注意沒有附加正確的錯誤處理程序。若是沒法正確接收數據塊,Readable源或gzip流將不會被銷燬,pump是一個實用工具,若是其中一個流失敗或關閉,它將正確地銷燬管道中的全部流,而且在這種狀況下是必須的!app

只有Nodejs 8.x或更早版本才須要pump,對於Node 10.x或更高版本,引入pipeline來替換pump。這是一個模塊方法,用於在流傳輸之間轉發錯誤和正確清理,並在管道完成時提供回調。

如下是使用管道的示例:

const { pipeline } = require('stream');
const fs = require('fs');
const zlib = require('zlib');

// Use the pipeline API to easily pipe a series of streams
// together and get notified when the pipeline is fully done.
// A pipeline to gzip a potentially huge video file efficiently:

pipeline(
  fs.createReadStream('The.Matrix.1080p.mkv'),
  zlib.createGzip(),
  fs.createWriteStream('The.Matrix.1080p.mkv.gz'),
  (err) => {
    if (err) {
      console.error('Pipeline failed', err);
    } else {
      console.log('Pipeline succeeded');
    }
  }
);

你還能夠在管道上調用promisify以將其與async/await一塊兒使用:

const stream = require('stream');
const fs = require('fs');
const zlib = require('zlib');

const pipeline = util.promisify(stream.pipeline);

async function run() {
    try {
        await pipeline(
            fs.createReadStream('The.Matrix.1080p.mkv'),
            zlib.createGzip(),
            fs.createWriteStream('The.Matrix.1080p.mkv.gz'),
        );
        console.log('Pipeline succeeded');
    } catch (err) {
        console.error('Pipeline failed', err);
    }
}

太多的數據,太快

有些狀況下,Readable流可能會過快地爲Writable提供數據 — 遠遠超過消費者能夠處理的數據!

當發生這種狀況時,消費者將開始排隊全部數據塊以供之後消費,寫入隊列將變得愈來愈長,所以在整個過程完成以前,必須將更多數據保存在內存中。

寫入磁盤比從磁盤讀取要慢不少,所以,當咱們嘗試壓縮文件並將其寫入咱們的硬盤時,將發生背壓,由於寫入磁盤將沒法跟上讀取的速度。

// Secretly the stream is saying: "whoa, whoa! hang on, this is way too much!"
// Data will begin to build up on the read-side of the data buffer as
// `write` tries to keep up with the incoming data flow.
inp.pipe(gzip).pipe(outputFile);

這就是背壓機制很重要的緣由,若是沒有背壓系統,該進程會耗盡系統的內存,有效地減緩了其餘進程,並獨佔你係統的大部分直到完成。

這致使了一些事情:

  • 減緩全部其餘當前進程。
  • 一個很是超負荷的垃圾收集器。
  • 內存耗盡。

在下面的示例中,咱們將取出.write()函數的返回值並將其更改成true,這有效地禁用了Node.js核心中的背壓支持,在任何對'modified'二進制文件的引用中,咱們正在談論在沒有return ret;行的狀況下運行node二進制,而改成return true;

垃圾收集器上的過分負荷

咱們來看看快速基準測試,使用上面的相同示例,咱們進行幾回試驗,以得到兩個二進制的中位時間。

trial (#)  | `node` binary (ms) | modified `node` binary (ms)
=================================================================
      1       |      56924         |           55011
      2       |      52686         |           55869
      3       |      59479         |           54043
      4       |      54473         |           55229
      5       |      52933         |           59723
=================================================================
average time: |      55299         |           55975

二者都須要大約一分鐘來運行,所以根本沒有太大差異,但讓咱們仔細看看以確認咱們的懷疑是否正確,咱們使用Linux工具dtrace來評估V8垃圾收集器發生了什麼。

GC(垃圾收集器)測量時間表示垃圾收集器完成單次掃描的完整週期的間隔:

approx. time (ms) | GC (ms) | modified GC (ms)
=================================================
          0       |    0    |      0
          1       |    0    |      0
         40       |    0    |      2
        170       |    3    |      1
        300       |    3    |      1

         *             *           *
         *             *           *
         *             *           *

      39000       |    6    |     26
      42000       |    6    |     21
      47000       |    5    |     32
      50000       |    8    |     28
      54000       |    6    |     35

雖然這兩個過程開始時相同,但彷佛以相同的速率運行GC,很明顯,在適當工做的背壓系統幾秒鐘後,它將GC負載分佈在4-8毫秒的一致間隔內,直到數據傳輸結束。

可是,當背壓系統不到位時,V8垃圾收集開始拖延,正常二進制文件在一分鐘內調用GC約75次,然而,修改後的二進制文件僅觸發36次。

這是因爲內存使用量增長而累積的緩慢而漸進的債務,隨着數據傳輸,在沒有背壓系統的狀況下,每一個塊傳輸使用更多內存。

分配的內存越多,GC在一次掃描中須要處理的內存就越多,掃描越大,GC就越須要決定能夠釋放什麼,而且在更大的內存空間中掃描分離的指針將消耗更多的計算能力。

內存耗盡

爲肯定每一個二進制的內存消耗,咱們使用/usr/bin/time -lp sudo ./node ./backpressure-example/zlib.js單獨爲每一個進程計時。

這是正常二進制的輸出:

Respecting the return value of .write()
=============================================
real        58.88
user        56.79
sys          8.79
  87810048  maximum resident set size
         0  average shared memory size
         0  average unshared data size
         0  average unshared stack size
     19427  page reclaims
      3134  page faults
         0  swaps
         5  block input operations
       194  block output operations
         0  messages sent
         0  messages received
         1  signals received
        12  voluntary context switches
    666037  involuntary context switches

虛擬內存佔用的最大字節大小約爲87.81mb。

如今更改.write()函數的返回值,咱們獲得:

Without respecting the return value of .write():
==================================================
real        54.48
user        53.15
sys          7.43
1524965376  maximum resident set size
         0  average shared memory size
         0  average unshared data size
         0  average unshared stack size
    373617  page reclaims
      3139  page faults
         0  swaps
        18  block input operations
       199  block output operations
         0  messages sent
         0  messages received
         1  signals received
        25  voluntary context switches
    629566  involuntary context switches

虛擬內存佔用的最大字節大小約爲1.52gb。

若是沒有流來委託背壓,則分配的內存空間要大一個數量級 — 同一進程之間的巨大差別!

這個實驗展現了Node.js的反壓機制是如何優化和節省成本的,如今,讓咱們分析一下它是如何工做的!

背壓如何解決這些問題?

將數據從一個進程傳輸到另外一個進程有不一樣的函數,在Node.js中,有一個名爲.pipe()的內部內置函數,還有其餘包也可使用!但最終,在這個過程的基本層面,咱們有兩個獨立的組件:數據來源和消費者。

當從源調用.pipe()時,它向消費者發出信號,告知有數據要傳輸,管道函數有助於爲事件觸發器設置適當的背壓閉合。

在Node.js中,源是Readable流,而消費者是Writable流(這些均可以與DuplexTransform流互換,但這超出了本指南的範圍)。

觸發背壓的時刻能夠精確地縮小到Writable.write()函數的返回值,固然,該返回值由幾個條件決定。

在數據緩衝區已超過highWaterMark或寫入隊列當前正忙的任何狀況下,.write()將返回false

當返回false值時,背壓系統啓動,它會暫停傳入的Readable流發送任何數據,並等待消費者再次準備就緒,清空數據緩衝區後,將發出.drain()事件並恢復傳入的數據流。

隊列完成後,背壓將容許再次發送數據,正在使用的內存空間將自行釋放併爲下一批數據作好準備。

這有效地容許在任何給定時間爲.pipe()函數使用固定數量的內存,沒有內存泄漏,沒有無限緩衝,垃圾收集器只須要處理內存中的一個區域!

那麼,若是背壓如此重要,爲何你(可能)沒有據說過它?答案很簡單:Node.js會自動爲你完成全部這些工做。

那太好了!可是當咱們試圖瞭解如何實現咱們本身的自定義流時,也不是那麼好。

注意:在大多數機器中,有一個字節大小能夠肯定緩衝區什麼時候已滿(在不一樣的機器上會有所不一樣),Node.js容許你設置本身的自定義highWaterMark,但一般,默認設置爲16kb16384,或objectMode流爲16),在你可能但願提升該值的狀況下,能夠嘗試,可是要當心!

.pipe()的生命週期

爲了更好地理解背壓,下面是一個關於Readable流的生命週期的流程圖,該流被管道傳輸到Writable流中:

+===================+
                         x-->  Piping functions   +-->   src.pipe(dest)  |
                         x     are set up during     |===================|
                         x     the .pipe method.     |  Event callbacks  |
  +===============+      x                           |-------------------|
  |   Your Data   |      x     They exist outside    | .on('close', cb)  |
  +=======+=======+      x     the data flow, but    | .on('data', cb)   |
          |              x     importantly attach    | .on('drain', cb)  |
          |              x     events, and their     | .on('unpipe', cb) |
+---------v---------+    x     respective callbacks. | .on('error', cb)  |
|  Readable Stream  +----+                           | .on('finish', cb) |
+-^-------^-------^-+    |                           | .on('end', cb)    |
  ^       |       ^      |                           +-------------------+
  |       |       |      |
  |       ^       |      |
  ^       ^       ^      |    +-------------------+         +=================+
  ^       |       ^      +---->  Writable Stream  +--------->  .write(chunk)  |
  |       |       |           +-------------------+         +=======+=========+
  |       |       |                                                 |
  |       ^       |                              +------------------v---------+
  ^       |       +-> if (!chunk)                |    Is this chunk too big?  |
  ^       |       |     emit .end();             |    Is the queue busy?      |
  |       |       +-> else                       +-------+----------------+---+
  |       ^       |     emit .write();                   |                |
  |       ^       ^                                   +--v---+        +---v---+
  |       |       ^-----------------------------------<  No  |        |  Yes  |
  ^       |                                           +------+        +---v---+
  ^       |                                                               |
  |       ^               emit .pause();          +=================+     |
  |       ^---------------^-----------------------+  return false;  <-----+---+
  |                                               +=================+         |
  |                                                                           |
  ^            when queue is empty     +============+                         |
  ^------------^-----------------------<  Buffering |                         |
               |                       |============|                         |
               +> emit .drain();       |  ^Buffer^  |                         |
               +> emit .resume();      +------------+                         |
                                       |  ^Buffer^  |                         |
                                       +------------+   add chunk to queue    |
                                       |            <---^---------------------<
                                       +============+

注意:若是要設置管道以將一些流連接在一塊兒來操做數據,則極可能會實現Transform流。

在這種狀況下,你的Readable流的輸出將輸入到Transform中,並將管道到Writable中。

Readable.pipe(Transformable).pipe(Writable);

背壓將自動應用,但請注意,Transform流的輸入和輸出highWaterMark均可能被操縱並將影響背壓系統。

背壓指南

從Node.js v0.10開始,Stream類提供了經過使用這些相應函數的下劃線版原本修改.read().write()的行爲的功能(._read()._write())。

對於實現Readable流和Writable流,有文檔化的指南,咱們假設你已閱讀過這些內容,下一節將更深刻一些。

實現自定義流時要遵照的規則

流的黃金法則始終是尊重背壓,最佳實踐的構成是非矛盾的實踐,只要你當心避免與內部背壓支持相沖突的行爲,你就能夠肯定你遵循良好作法。

通常來講:

  1. 若是你沒有被要求,永遠不要.push()
  2. 永遠不要在返回false後調用.write(),而是等待'drain'。
  3. 流在不一樣的Node.js版本和你使用的庫之間有變化,當心並測試一下。

注意:關於第3點,構建瀏覽器流的很是有用的包是readable-stream,Rodd Vagg撰寫了一篇很棒的博客文章,描述了這個庫的實用性,簡而言之,它爲Readable流提供了一種自動優雅降級,並支持舊版本的瀏覽器和Node.js。

Readable流的特定規則

到目前爲止,咱們已經瞭解了.write()如何影響背壓,並將重點放在Writable流上,因爲Node.js的功能,數據在技術上從Readable流向下游Writable。可是,正如咱們能夠在數據、物質或能量的任何傳輸中觀察到的那樣,源與目標同樣重要,Readable流對於如何處理背壓相當重要。

這兩個過程都相互依賴,有效地進行通訊,若是Readable忽略Writable流要求它中止發送數據的時候,那麼.write()的返回值不正確就會有問題。

所以,關於.write()返回,咱們還必須尊重._read()方法中使用的.push()的返回值,若是.push()返回false值,則流將中止從源讀取,不然,它將繼續而不會停頓。

如下是使用.push()的很差作法示例:

// This is problematic as it completely ignores return value from push
// which may be a signal for backpressure from the destination stream!
class MyReadable extends Readable {
  _read(size) {
    let chunk;
    while (null !== (chunk = getNextChunk())) {
      this.push(chunk);
    }
  }
}

此外,在自定義流以外,存在忽略背壓的陷阱,在這個良好的實踐的反例中,應用程序的代碼會在數據可用時強制經過(由.data事件發出信號):

// This ignores the backpressure mechanisms Node.js has set in place,
// and unconditionally pushes through data, regardless if the
// destination stream is ready for it or not.
readable.on('data', (data) =>
  writable.write(data)
);

Writable流的特定規則

回想一下.write()可能會根據某些條件返回truefalse,幸運的是,在構建咱們本身的Writable流時,流狀態機將處理咱們的回調並肯定什麼時候處理背壓併爲咱們優化數據流。

可是,當咱們想直接使用Writable時,咱們必須尊重.write()返回值並密切注意這些條件:

  • 若是寫隊列忙,.write()將返回false
  • 若是數據塊太大,.write()將返回false(該值由變量highWaterMark指示)。
// This writable is invalid because of the async nature of JavaScript callbacks.
// Without a return statement for each callback prior to the last,
// there is a great chance multiple callbacks will be called.
class MyWritable extends Writable {
  _write(chunk, encoding, callback) {
    if (chunk.toString().indexOf('a') >= 0)
      callback();
    else if (chunk.toString().indexOf('b') >= 0)
      callback();
    callback();
  }
}

// The proper way to write this would be:
    if (chunk.contains('a'))
      return callback();
    else if (chunk.contains('b'))
      return callback();
    callback();

在實現._writev()時還須要注意一些事項,該函數與.cork()結合使用,但寫入時有一個常見錯誤:

// Using .uncork() twice here makes two calls on the C++ layer, rendering the
// cork/uncork technique useless.
ws.cork();
ws.write('hello ');
ws.write('world ');
ws.uncork();

ws.cork();
ws.write('from ');
ws.write('Matteo');
ws.uncork();

// The correct way to write this is to utilize process.nextTick(), which fires
// on the next event loop.
ws.cork();
ws.write('hello ');
ws.write('world ');
process.nextTick(doUncork, ws);

ws.cork();
ws.write('from ');
ws.write('Matteo');
process.nextTick(doUncork, ws);

// as a global function
function doUncork(stream) {
  stream.uncork();
}

.cork()能夠被調用屢次,咱們只須要當心調用.uncork()相同的次數,使其再次流動。

結論

Streams是Node.js中常用的模塊,它們對於內部結構很是重要,對於開發人員來講,它們能夠跨Node.js模塊生態系統進行擴展和鏈接。

但願你如今可以進行故障排除,安全地編寫你本身的WritableReadable流,並考慮背壓,並與同事和朋友分享你的知識。

在使用Node.js構建應用程序時,請務必閱讀有關其餘API函數的Stream的更多信息,以幫助改進和釋放你的流功能。


上一篇:使用不一樣的文件系統

下一篇:域模塊剖析

相關文章
相關標籤/搜索