《Node.js設計模式》使用流進行編碼

本系列文章爲《Node.js Design Patterns Second Edition》的原文翻譯和讀書筆記,在GitHub連載更新,同步翻譯版連接javascript

歡迎關注個人專欄,以後的博文將在專欄同步:html

Coding with Streams

StreamsNode.js最重要的組件和模式之一。 社區中有一句格言「Stream all the things(Steam就是全部的)」,僅此一點就足以描述流在Node.js中的地位。 Dominic Tarr做爲Node.js社區的最大貢獻者,它將流定義爲Node.js最好,也是最難以理解的概念。前端

使Node.jsStreams如此吸引人還有其它緣由; 此外,Streams不只與性能或效率等技術特性有關,更重要的是它們的優雅性以及它們與Node.js的設計理念完美契合的方式。java

在本章中,將會學到如下內容:node

  • Streams對於Node.js的重要性。
  • 如何建立並使用Streams
  • Streams做爲編程範式,不僅是對於I/O而言,在多種應用場景下它的應用和強大的功能。
  • 管道模式和在不一樣的配置中鏈接Streams

發現Streams的重要性

在基於事件的平臺(如Node.js)中,處理I / O的最有效的方法是實時處理,一旦有輸入的信息,立馬進行處理,一旦有須要輸出的結果,也立馬輸出反饋。git

在本節中,咱們將首先介紹Node.jsStreams和它的優勢。 請記住,這只是一個概述,由於本章後面將會詳細介紹如何使用和組合Streamsgithub

Streams和Buffer的比較

咱們在本書中幾乎全部看到過的異步API都是使用的Buffer模式。 對於輸入操做,Buffer模式會未來自資源的全部數據收集到Buffer區中; 一旦讀取完整個資源,就會把結果傳遞給回調函數。 下圖顯示了這個範例的一個真實的例子:算法

從上圖咱們能夠看到,在t1時刻,一些數據從資源接收並保存到緩衝區。 在t2時刻,最後一段數據被接收到另外一個數據塊,完成讀取操做,這時,把整個緩衝區的內容發送給消費者。npm

另外一方面,Streams容許你在數據到達時當即處理數據。 以下圖所示:編程

這一張圖顯示了Streams如何從資源接收每一個新的數據塊,並當即提供給消費者,消費者如今沒必要等待緩衝區中收集全部數據再處理每一個數據塊。

可是這兩種方法有什麼區別呢? 咱們能夠將它們歸納爲兩點:

  • 空間效率
  • 時間效率

此外,Node.jsStreams具備另外一個重要的優勢:可組合性(composability)。 如今讓咱們看看這些屬性對咱們設計和編寫應用程序的方式會產生什麼影響。

空間效率

首先,Streams容許咱們作一些看起來不可能的事情,經過緩衝數據並一次性處理。 例如,考慮一下咱們必須讀取一個很是大的文件,好比說數百MB甚至千MB。 顯然,等待徹底讀取文件時返回大BufferAPI不是一個好主意。 想象一下,若是併發讀取一些大文件, 咱們的應用程序很容易耗盡內存。 除此以外,V8中的Buffer不能大於0x3FFFFFFF字節(小於1GB)。 因此,在耗盡物理內存以前,咱們可能會碰壁。

使用Buffered的API進行壓縮文件

舉一個具體的例子,讓咱們考慮一個簡單的命令行接口(CLI)的應用程序,它使用Gzip格式壓縮文件。 使用BufferedAPI,這樣的應用程序在Node.js中大概這麼編寫(爲簡潔起見,省略了異常處理):

const fs = require('fs');
const zlib = require('zlib');
const file = process.argv[2];
fs.readFile(file, (err, buffer) => {
  zlib.gzip(buffer, (err, buffer) => {
    fs.writeFile(file + '.gz', buffer, err => {
      console.log('File successfully compressed');
    });
  });
});

如今,咱們能夠嘗試將前面的代碼放在一個叫作gzip.js的文件中,而後執行下面的命令:

node gzip <path to file>

若是咱們選擇一個足夠大的文件,好比說大於1GB的文件,咱們會收到一個錯誤信息,說明咱們要讀取的文件大於最大容許的緩衝區大小,以下所示:

RangeError: File size is greater than possible Buffer:0x3FFFFFFF

上面的例子中,沒找到一個大文件,但確實對於大文件的讀取速率慢了許多。

正如咱們所預料到的那樣,使用Buffer來進行大文件的讀取顯然是錯誤的。

使用Streams進行壓縮文件

咱們必須修復咱們的Gzip應用程序,並使其處理大文件的最簡單方法是使用StreamsAPI。 讓咱們看看如何實現這一點。 讓咱們用下面的代碼替換剛建立的模塊的內容:

const fs = require('fs');
const zlib = require('zlib');
const file = process.argv[2];
fs.createReadStream(file)
  .pipe(zlib.createGzip())
  .pipe(fs.createWriteStream(file + '.gz'))
  .on('finish', () => console.log('File successfully compressed'));

「是嗎?」你可能會問。是的;正如咱們所說的,因爲Streams的接口和可組合性,所以咱們還能寫出這樣的更加簡潔,優雅和精煉的代碼。 咱們稍後會詳細地看到這一點,可是如今須要認識到的重要一點是,程序能夠順暢地運行在任何大小的文件上,理想狀況是內存利用率不變。 嘗試一下(但考慮壓縮一個大文件可能須要一段時間)。

時間效率

如今讓咱們考慮一個壓縮文件並將其上傳到遠程HTTP服務器的應用程序的例子,該遠程HTTP服務器進而將其解壓縮並保存到文件系統中。若是咱們的客戶端是使用BufferedAPI實現的,那麼只有當整個文件被讀取和壓縮時,上傳纔會開始。 另外一方面,只有在接收到全部數據的狀況下,解壓縮纔會在服務器上啓動。 實現相同結果的更好的解決方案涉及使用Streams。 在客戶端機器上,Streams只要從文件系統中讀取就能夠壓縮和發送數據塊,而在服務器上,只要從遠程對端接收到數據塊,就能夠解壓每一個數據塊。 咱們經過構建前面提到的應用程序來展現這一點,從服務器端開始。

咱們建立一個叫作gzipReceive.js的模塊,代碼以下:

const http = require('http');
const fs = require('fs');
const zlib = require('zlib');

const server = http.createServer((req, res) => {
  const filename = req.headers.filename;
  console.log('File request received: ' + filename);
  req
    .pipe(zlib.createGunzip())
    .pipe(fs.createWriteStream(filename))
    .on('finish', () => {
      res.writeHead(201, {
        'Content-Type': 'text/plain'
      });
      res.end('That\'s it\n');
      console.log(`File saved: ${filename}`);
    });
});

server.listen(3000, () => console.log('Listening'));

服務器從網絡接收數據塊,將其解壓縮,並在接收到數據塊後當即保存,這要歸功於Node.jsStreams

咱們的應用程序的客戶端將進入一個名爲gzipSend.js的模塊,以下所示:

在前面的代碼中,咱們再次使用Streams從文件中讀取數據,而後在從文件系統中讀取的同時壓縮併發送每一個數據塊。

如今,運行這個應用程序,咱們首先使用如下命令啓動服務器:

node gzipReceive

而後,咱們能夠經過指定要發送的文件和服務器的地址(例如localhost)來啓動客戶端:

node gzipSend <path to file> localhost

若是咱們選擇一個足夠大的文件,咱們將更容易地看到數據如何從客戶端流向服務器,但爲何這種模式下,咱們使用Streams,比使用BufferedAPI更有效率? 下圖應該給咱們一個提示:

一個文件被處理的過程,它通過如下階段:

  1. 客戶端從文件系統中讀取
  2. 客戶端壓縮數據
  3. 客戶端將數據發送到服務器
  4. 服務端接收數據
  5. 服務端解壓數據
  6. 服務端將數據寫入磁盤

爲了完成處理,咱們必須按照流水線順序那樣通過每一個階段,直到最後。在上圖中,咱們能夠看到,使用BufferedAPI,這個過程徹底是順序的。爲了壓縮數據,咱們首先必須等待整個文件被讀取完畢,而後,發送數據,咱們必須等待整個文件被讀取和壓縮,依此類推。當咱們使用Streams時,只要咱們收到第一個數據塊,流水線就會被啓動,而不須要等待整個文件的讀取。但更使人驚訝的是,當下一塊數據可用時,不須要等待上一組任務完成;相反,另外一條裝配線是並行啓動的。由於咱們執行的每一個任務都是異步的,這樣顯得很完美,因此能夠經過Node.js來並行執行Streams的相關操做;惟一的限制就是每一個階段都必須保證數據塊的到達順序。

從前面的圖能夠看出,使用Streams的結果是整個過程花費的時間更少,由於咱們不用等待全部數據被所有讀取完畢和處理。

組合性

到目前爲止,咱們已經看到的代碼已經告訴咱們如何使用pipe()方法來組裝Streams的數據塊,Streams容許咱們鏈接不一樣的處理單元,每一個處理單元負責單一的職責(這是符合Node.js風格的)。這是可能的,由於Streams具備統一的接口,而且就API而言,不一樣Streams也能夠很好的進行交互。惟一的先決條件是管道的下一個Streams必須支持上一個Streams生成的數據類型,能夠是二進制,文本甚至是對象,咱們將在後面的章節中看到。

爲了證實Streams組合性的優點,咱們能夠嘗試在咱們先前構建的gzipReceive / gzipSend應用程序中添加加密功能。
爲此,咱們只須要經過向流水線添加另外一個Streams來更新客戶端。 確切地說,由crypto.createChipher()返回的流。 由此產生的代碼應以下所示:

const fs = require('fs');
const zlib = require('zlib');
const crypto = require('crypto');
const http = require('http');
const path = require('path');

const file = process.argv[2];
const server = process.argv[3];

const options = {
  hostname: server,
  port: 3000,
  path: '/',
  method: 'PUT',
  headers: {
    filename: path.basename(file),
    'Content-Type': 'application/octet-stream',
    'Content-Encoding': 'gzip'
  }
};

const req = http.request(options, res => {
  console.log('Server response: ' + res.statusCode);
});

fs.createReadStream(file)
  .pipe(zlib.createGzip())
  .pipe(crypto.createCipher('aes192', 'a_shared_secret'))
  .pipe(req)
  .on('finish', () => {
    console.log('File successfully sent');
  });

使用相同的方式,咱們更新服務端的代碼,使得它能夠在數據塊進行解壓以前先解密:

const http = require('http');
const fs = require('fs');
const zlib = require('zlib');
const crypto = require('crypto');

const server = http.createServer((req, res) => {
  const filename = req.headers.filename;
  console.log('File request received: ' + filename);
  req
    .pipe(crypto.createDecipher('aes192', 'a_shared_secret'))
    .pipe(zlib.createGunzip())
    .pipe(fs.createWriteStream(filename))
    .on('finish', () => {
      res.writeHead(201, {
        'Content-Type': 'text/plain'
      });
      res.end('That\'s it\n');
      console.log(`File saved: ${filename}`);
    });
});

server.listen(3000, () => console.log('Listening'));
crypto是Node.js的核心模塊之一,提供了一系列加密算法。

只需幾行代碼,咱們就在應用程序中添加了一個加密層。 咱們只須要簡單地經過把已經存在的Streams模塊和加密層組合到一塊兒,就能夠。相似的,咱們能夠添加和合並其餘Streams,如同在玩樂高積木同樣。

顯然,這種方法的主要優勢是可重用性,但正如咱們從目前爲止所介紹的代碼中能夠看到的那樣,Streams也能夠實現更清晰,更模塊化,更加簡潔的代碼。 出於這些緣由,流一般不只僅用於處理純粹的I / O,並且它仍是簡化和模塊化代碼的手段。

開始使用Streams

在前面的章節中,咱們瞭解了爲何Streams如此強大,並且它在Node.js中無處不在,甚至在Node.js的核心模塊中也有其身影。 例如,咱們已經看到,fs模塊具備用於從文件讀取的createReadStream()和用於寫入文件的createWriteStream()HTTP請求和響應對象本質上是Streams,而且zlib模塊容許咱們使用StreamsAPI壓縮和解壓縮數據塊。

如今咱們知道爲何Streams是如此重要,讓咱們退後一步,開始更詳細地探索它。

Streams的結構

Node.js中的每一個Streams都是Streams核心模塊中可用的四個基本抽象類之一的實現:

  • stream.Readable
  • stream.Writable
  • stream.Duplex
  • stream.Transform

每一個stream類也是EventEmitter的一個實例。實際上,Streams能夠產生幾種類型的事件,好比end事件會在一個可讀的Streams完成讀取,或者錯誤讀取,或其過程當中產生異常時觸發。

請注意,爲簡潔起見,在本章介紹的例子中,咱們常常會忽略適當的錯誤處理。可是,在生產環境下中,老是建議爲全部Stream註冊錯誤事件偵聽器。

Streams之因此如此靈活的緣由之一是它不只可以處理二進制數據,並且幾乎能夠處理任何JavaScript值。實際上,Streams能夠支持兩種操做模式:

  • 二進制模式:以數據塊形式(例如buffersstrings)流式傳輸數據
  • 對象模式:將流數據視爲一系列離散對象(這使得咱們幾乎可使用任何JavaScript值)

這兩種操做模式使咱們不只可使用I / O流,並且還能夠做爲一種工具,以函數式的風格優雅地組合處理單元,咱們將在本章後面看到。

在本章中,咱們將主要使用在Node.js 0.11中引入的Node.js流接口,也稱爲版本3。 有關與舊接口差別的更多詳細信息,請參閱StrongLoop在 https://strongloop.com/strong...

可讀的Streams

一個可讀的Streams表示一個數據源,在Node.js中,它使用stream模塊中的Readableabstract類實現。

從Streams中讀取信息

從可讀Streams接收數據有兩種方式:non-flowing模式和flowing模式。 咱們來更詳細地分析這些模式。

non-flowing模式(不流動模式)

從可讀的Streams中讀取數據的默認模式是爲其附加一個可讀事件偵聽器,用於指示要讀取的新數據的可用性。而後,在一個循環中,咱們讀取全部的數據,直到內部buffer被清空。這可使用read()方法完成,該方法同步從內部緩衝區中讀取數據,並返回表示數據塊的BufferString對象。read()方法以以下使用模式:

readable.read([size]);

使用這種方法,數據隨時能夠直接從Streams中按需提取。

爲了說明這是如何工做的,咱們建立一個名爲readStdin.js的新模塊,它實現了一個簡單的程序,它從標準輸入(一個可讀流)中讀取數據,並將全部數據回送到標準輸出:

process.stdin
  .on('readable', () => {
    let chunk;
    console.log('New data available');
    while ((chunk = process.stdin.read()) !== null) {
      console.log(
        `Chunk read: (${chunk.length}) "${chunk.toString()}"`
      );
    }
  })
  .on('end', () => process.stdout.write('End of stream'));

read()方法是一個同步操做,它從可讀Streams的內部Buffers區中提取數據塊。若是Streams在二進制模式下工做,返回的數據塊默認爲一個Buffer對象。

在以二進制模式工做的可讀的Stream中,咱們能夠經過在Stream上調用setEncoding(encoding)來讀取字符串而不是Buffer對象,並提供有效的編碼格式(例如utf8)。

數據是從可讀的偵聽器中讀取的,只要有新的數據,就會調用這個偵聽器。當內部緩衝區中沒有更多數據可用時,read()方法返回null;在這種狀況下,咱們不得不等待另外一個可讀的事件被觸發,告訴咱們能夠再次讀取或者等待表示Streams讀取過程結束的end事件觸發。當一個流以二進制模式工做時,咱們也能夠經過向read()方法傳遞一個size參數來指定咱們想要讀取的數據大小。這在實現網絡協議或解析特定數據格式時特別有用。

如今,咱們準備運行readStdin模塊並進行實驗。讓咱們在控制檯中鍵入一些字符,而後按Enter鍵查看回顯到標準輸出中的數據。要終止流並所以生成一個正常的結束事件,咱們須要插入一個EOF(文件結束)字符(在Windows上使用Ctrl + Z或在Linux上使用Ctrl + D)。

咱們也能夠嘗試將咱們的程序與其餘程序鏈接起來;這可使用管道運算符(|),它將程序的標準輸出重定向到另外一個程序的標準輸入。例如,咱們能夠運行以下命令:

cat <path to a file> | node readStdin

這是流式範例是一個通用接口的一個很好的例子,它使得咱們的程序可以進行通訊,而無論它們是用什麼語言寫的。

flowing模式(流動模式)

Streams中讀取的另外一種方法是將偵聽器附加到data事件;這會將Streams切換爲flowing模式,其中數據不是使用read()函數來提取的,而是一旦有數據到達data監聽器就被推送到監聽器內。例如,咱們以前建立的readStdin應用程序將使用流動模式:

process.stdin
  .on('data', chunk => {
    console.log('New data available');
    console.log(
      `Chunk read: (${chunk.length}) "${chunk.toString()}"`
    );
  })
  .on('end', () => process.stdout.write('End of stream'));

flowing模式是舊版Streams接口(也稱爲Streams1)的繼承,其靈活性較低,API較少。隨着Streams2接口的引入,flowing模式不是默認的工做模式,要啓用它,須要將偵聽器附加到data事件或顯式調用resume()方法。 要暫時中斷Streams觸發data事件,咱們能夠調用pause()方法,致使任何傳入數據緩存在內部buffer中。

調用pause()不會致使Streams切換回non-flowing模式。

實現可讀的Streams

如今咱們知道如何從Streams中讀取數據,下一步是學習如何實現一個新的Readable數據流。爲此,有必要經過繼承stream.Readable的原型來建立一個新的類。 具體流必須提供_read()方法的實現:

readable._read(size)

Readable類的內部將調用_read()方法,而該方法又將啓動
使用push()填充內部緩衝區:

請注意,read()是Stream消費者調用的方法,而_read()是一個由Stream子類實現的方法,不能直接調用。下劃線一般表示該方法爲私有方法,不該該直接調用。

爲了演示如何實現新的可讀Streams,咱們能夠嘗試實現一個生成隨機字符串的Streams。 咱們來建立一個名爲randomStream.js的新模塊,它將包含咱們的字符串的generator的代碼:

const stream = require('stream');
const Chance = require('chance');

const chance = new Chance();

class RandomStream extends stream.Readable {
  constructor(options) {
    super(options);
  }

  _read(size) {
    const chunk = chance.string(); //[1]
    console.log(`Pushing chunk of size: ${chunk.length}`);
    this.push(chunk, 'utf8'); //[2]
    if (chance.bool({
        likelihood: 5
      })) { //[3]
      this.push(null);
    }
  }
}

module.exports = RandomStream;

在文件頂部,咱們將加載咱們的依賴關係。除了咱們正在加載一個chance的npm模塊以外,沒有什麼特別之處,它是一個用於生成各類隨機值的庫,從數字到字符串到整個句子都能生成隨機值。

下一步是建立一個名爲RandomStream的新類,並指定stream.Readable做爲其父類。 在前面的代碼中,咱們調用父類的構造函數來初始化其內部狀態,並將收到的options參數做爲輸入。經過options對象傳遞的可能參數包括如下內容:

  • 用於將Buffers轉換爲Stringsencoding參數(默認值爲null
  • 是否啓用對象模式(objectMode默認爲false
  • 存儲在內部buffer區中的數據的上限,一旦超過這個上限,則暫停從data source讀取(highWaterMark默認爲16KB

好的,如今讓咱們來解釋一下咱們重寫的stream.Readable類的_read()方法:

  • 該方法使用chance生成隨機字符串。
  • 它將字符串push內部buffer。 請注意,因爲咱們push的是String,此外咱們還指定了編碼爲utf8(若是數據塊只是一個二進制Buffer,則不須要)。
  • 5%的機率隨機中斷stream的隨機字符串產生,經過push null到內部Buffer來表示EOF,即stream的結束。

咱們還能夠看到在_read()函數的輸入中給出的size參數被忽略了,由於它是一個建議的參數。 咱們能夠簡單地把全部可用的數據都push到內部的buffer中,可是若是在同一個調用中有多個推送,那麼咱們應該檢查push()是否返回false,由於這意味着內部buffer已經達到了highWaterMark限制,咱們應該中止添加更多的數據。

以上就是RandomStream模塊,咱們如今準備好使用它。咱們來建立一個名爲generateRandom.js的新模塊,在這個模塊中咱們實例化一個新的RandomStream對象並從中提取一些數據:

const RandomStream = require('./randomStream');
const randomStream = new RandomStream();

randomStream.on('readable', () => {
  let chunk;
  while ((chunk = randomStream.read()) !== null) {
    console.log(`Chunk received: ${chunk.toString()}`);
  }
});

如今,一切都準備好了,咱們嘗試新的自定義的stream。 像往常同樣簡單地執行generateRandom模塊,觀察隨機的字符串在屏幕上流動。

可寫的Streams

一個可寫的stream表示一個數據終點,在Node.js中,它使用stream模塊中的Writable抽象類來實現。

寫入一個stream

把一些數據放在可寫入的stream中是一件簡單的事情, 咱們所要作的就是使用write()方法,它具備如下格式:

writable.write(chunk, [encoding], [callback])

encoding參數是可選的,其在chunkString類型時指定(默認爲utf8,若是chunkBuffer,則忽略);當數據塊被刷新到底層資源中時,callback就會被調用,callback參數也是可選的。

爲了表示沒有更多的數據將被寫入stream中,咱們必須使用end()方法:

writable.end([chunk], [encoding], [callback])

咱們能夠經過end()方法提供最後一塊數據。在這種狀況下,callbak函數至關於爲finish事件註冊一個監聽器,當數據塊所有被寫入stream中時,會觸發該事件。

如今,讓咱們經過建立一個輸出隨機字符串序列的小型HTTP服務器來演示這是如何工做的:

const Chance = require('chance');
const chance = new Chance();

require('http').createServer((req, res) => {
  res.writeHead(200, {
    'Content-Type': 'text/plain'
  }); //[1]
  while (chance.bool({
      likelihood: 95
    })) { //[2]
    res.write(chance.string() + '\n'); //[3]
  }
  res.end('\nThe end...\n'); //[4]
  res.on('finish', () => console.log('All data was sent')); //[5]
}).listen(8080, () => console.log('Listening on http://localhost:8080'));

咱們建立了一個HTTP服務器,並把數據寫入res對象,res對象是http.ServerResponse的一個實例,也是一個可寫入的stream。下面來解釋上述代碼發生了什麼:

  1. 咱們首先寫HTTP response的頭部。請注意,writeHead()不是Writable接口的一部分,實際上,這個方法是http.ServerResponse類公開的輔助方法。
  2. 咱們開始一個5%的機率終止的循環(進入循環體的機率爲chance.bool()產生,其爲95%)。
  3. 在循環內部,咱們寫入一個隨機字符串到stream
  4. 一旦咱們不在循環中,咱們調用streamend(),表示沒有更多

數據塊將被寫入。另外,咱們在結束以前提供一個最終的字符串寫入流中。

  1. 最後,咱們註冊一個finish事件的監聽器,當全部的數據塊都被刷新到底層socket中時,這個事件將被觸發。

咱們能夠調用這個小模塊稱爲entropyServer.js,而後執行它。要測試這個服務器,咱們能夠在地址http:// localhost:8080打開一個瀏覽器,或者從終端使用curl命令,以下所示:

curl localhost:8080

此時,服務器應該開始向您選擇的HTTP客戶端發送隨機字符串(請注意,某些瀏覽器可能會緩衝數據,而且流式傳輸行爲可能不明顯)。

Back-pressure(反壓)

相似於在真實管道系統中流動的液體,Node.jsstream也可能遭受瓶頸,數據寫入速度可能快於stream的消耗。 解決這個問題的機制包括緩衝輸入數據;然而,若是數據stream沒有給生產者任何反饋,咱們可能會產生愈來愈多的數據被累積到內部緩衝區的狀況,致使內存泄露的發生。

爲了防止這種狀況的發生,當內部buffer超過highWaterMark限制時,writable.write()將返回false。 可寫入的stream具備highWaterMark屬性,這是write()方法開始返回false的內部Buffer區大小的限制,一旦Buffer區的大小超過這個限制,表示應用程序應該中止寫入。 當緩衝器被清空時,會觸發一個叫作drain的事件,通知再次開始寫入是安全的。 這種機制被稱爲back-pressure

本節介紹的機制一樣適用於可讀的stream。事實上,在可讀stream中也存在back-pressure,而且在_read()內調用的push()方法返回false時觸發。 可是,這對於stream實現者來講是一個特定的問題,因此咱們將不常常處理它。

咱們能夠經過修改以前建立的entropyServer模塊來演示可寫入的streamback-pressure

const Chance = require('chance');
const chance = new Chance();

require('http').createServer((req, res) => {
  res.writeHead(200, {
    'Content-Type': 'text/plain'
  });

  function generateMore() { //[1]
    while (chance.bool({
        likelihood: 95
      })) {
      const shouldContinue = res.write(
        chance.string({
          length: (16 * 1024) - 1
        }) //[2]
      );
      if (!shouldContinue) { //[3]
        console.log('Backpressure');
        return res.once('drain', generateMore);
      }
    }
    res.end('\nThe end...\n', () => console.log('All data was sent'));
  }
  generateMore();
}).listen(8080, () => console.log('Listening on http://localhost:8080'));

前面代碼中最重要的步驟能夠歸納以下:

  1. 咱們將主邏輯封裝在一個名爲generateMore()的函數中。
  2. 爲了增長得到一些back-pressure的機會,咱們將數據塊的大小增長到16KB-1Byte,這很是接近默認的highWaterMark限制。
  3. 在寫入一大塊數據以後,咱們檢查res.write()的返回值。 若是它返回false,這意味着內部buffer已滿,咱們應該中止發送更多的數據。在這種狀況下,咱們從函數中退出,而後新註冊一個寫入事件的發佈者,當drain事件觸發時調用generateMore

若是咱們如今嘗試再次運行服務器,而後使用curl生成客戶端請求,則極可能會有一些back-pressure,由於服務器以很是高的速度生成數據,速度甚至會比底層socket更快。

實現可寫入的Streams

咱們能夠經過繼承stream.Writable類來實現一個新的可寫入的流,併爲_write()方法提供一個實現。實現一個咱們自定義的可寫入的Streams類。

讓咱們構建一個可寫入的stream,它接收對象的格式以下:

{
  path: <path to a file>
  content: <string or buffer>
}

這個類的做用是這樣的:對於每個對象,咱們的stream必須將content部分保存到在給定路徑中建立的文件中。 咱們能夠當即看到,咱們stream的輸入是對象,而不是StringsBuffers,這意味着咱們的stream必須以對象模式工做。

調用模塊toFileStream.js

const stream = require('stream');
const fs = require('fs');
const path = require('path');
const mkdirp = require('mkdirp');

class ToFileStream extends stream.Writable {
  constructor() {
    super({
      objectMode: true
    });
  }

  _write(chunk, encoding, callback) {
    mkdirp(path.dirname(chunk.path), err => {
      if (err) {
        return callback(err);
      }
      fs.writeFile(chunk.path, chunk.content, callback);
    });
  }
}
module.exports = ToFileStream;

做爲第一步,咱們加載全部咱們所須要的依賴包。注意,咱們須要模塊mkdirp,正如你應該從前幾章中所知道的,它應該使用npm安裝。

咱們建立了一個新類,它從stream.Writable擴展而來。

咱們不得不調用父構造函數來初始化其內部狀態;咱們還提供了一個option對象做爲參數,用於指定流在對象模式下工做(objectMode:true)。stream.Writable接受的其餘選項以下:

  • highWaterMark(默認值是16KB):控制back-pressure的上限。
  • decodeStrings(默認爲true):在字符串傳遞給_write()方法以前,將字符串自動解碼爲二進制buffer區。 在對象模式下這個參數被忽略。

最後,咱們爲_write()方法提供了一個實現。正如你所看到的,這個方法接受一個數據塊,一個編碼方式(只有在二進制模式下,stream選項decodeStrings設置爲false時纔有意義)。

另外,該方法接受一個回調函數,該函數在操做完成時須要調用;而沒必要要傳遞操做的結果,可是若是須要的話,咱們仍然能夠傳遞一個error對象,這將致使stream觸發error事件。

如今,爲了嘗試咱們剛剛構建的stream,咱們能夠建立一個名爲writeToFile.js的新模塊,並對該流執行一些寫操做:

const ToFileStream = require('./toFileStream.js');
const tfs = new ToFileStream();

tfs.write({path: "file1.txt", content: "Hello"});
tfs.write({path: "file2.txt", content: "Node.js"});
tfs.write({path: "file3.txt", content: "Streams"});
tfs.end(() => console.log("All files created"));

有了這個,咱們建立並使用了咱們的第一個自定義的可寫入流。 像往常同樣運行新模塊來檢查其輸出;你會看到執行後會建立三個新文件。

雙重的Streams

雙重的stream既是可讀的,也可寫的。 當咱們想描述一個既是數據源又是數據終點的實體時(例如socket),這就顯得十分有用了。 雙工流繼承stream.Readablestream.Writable的方法,因此它對咱們來講並不新鮮。這意味着咱們能夠read()write()數據,或者能夠監聽readabledrain事件。

要建立一個自定義的雙重stream,咱們必須爲_read()_write()提供一個實現。傳遞給Duplex()構造函數的options對象在內部被轉發給ReadableWritable的構造函數。options參數的內容與前面討論的相同,options增長了一個名爲allowHalfOpen值(默認爲true),若是設置爲false,則會致使只要stream的一方(ReadableWritable)結束,stream就結束了。

爲了使雙重的stream在一方以對象模式工做,而在另外一方以二進制模式工做,咱們須要在流構造器中手動設置如下屬性:
this._writableState.objectMode
this._readableState.objectMode

轉換的Streams

轉換的Streams是專門設計用於處理數據轉換的一種特殊類型的雙重Streams

在一個簡單的雙重Streams中,從stream中讀取的數據和寫入到其中的數據之間沒有直接的關係(至少stream是不可知的)。 想一想一個TCP socket,它只是向遠程節點發送數據和從遠程節點接收數據。TCP socket自身沒有意識到輸入和輸出之間有任何關係。

下圖說明了雙重Streams中的數據流:

另外一方面,轉換的Streams對從可寫入端接收到的每一個數據塊應用某種轉換,而後在其可讀端使轉換的數據可用。

下圖顯示了數據如何在轉換的Streams中流動:

從外面看,轉換的Streams的接口與雙重Streams的接口徹底相同。可是,當咱們想要構建一個新的雙重Streams時,咱們必須提供_read()_write()方法,而爲了實現一個新的變換流,咱們必須填寫另外一對方法:_transform()_flush())。

咱們來演示如何用一個例子來建立一個新的轉換的Streams

實現轉換的Streams

咱們來實現一個轉換的Streams,它將替換給定全部出現的字符串。 要作到這一點,咱們必須建立一個名爲replaceStream.js的新模塊。 讓咱們直接看怎麼實現它:

const stream = require('stream');
const util = require('util');

class ReplaceStream extends stream.Transform {
  constructor(searchString, replaceString) {
    super();
    this.searchString = searchString;
    this.replaceString = replaceString;
    this.tailPiece = '';
  }

  _transform(chunk, encoding, callback) {
    const pieces = (this.tailPiece + chunk)         //[1]
      .split(this.searchString);
    const lastPiece = pieces[pieces.length - 1];
    const tailPieceLen = this.searchString.length - 1;

    this.tailPiece = lastPiece.slice(-tailPieceLen);     //[2]
    pieces[pieces.length - 1] = lastPiece.slice(0,-tailPieceLen);

    this.push(pieces.join(this.replaceString));       //[3]
    callback();
  }

  _flush(callback) {
    this.push(this.tailPiece);
    callback();
  }
}

module.exports = ReplaceStream;

與往常同樣,咱們將從其依賴項開始構建模塊。此次咱們沒有使用第三方模塊。

而後咱們建立了一個從stream.Transform基類繼承的新類。該類的構造函數接受兩個參數:searchStringreplaceString。 正如你所想象的那樣,它們容許咱們定義要匹配的文本以及用做替換的字符串。 咱們還初始化一個將由_transform()方法使用的tailPiece內部變量。

如今,咱們來分析一下_transform()方法,它是咱們新類的核心。_transform()方法與可寫入的stream_write()方法具備幾乎相同的格式,但不是將數據寫入底層資源,而是使用this.push()將其推入內部buffer,這與咱們會在可讀流的_read()方法中執行。這顯示了轉換的Streams的雙方如何實際鏈接。

ReplaceStream_transform()方法實現了咱們這個新類的核心。正常狀況下,搜索和替換buffer區中的字符串是一件容易的事情;可是,當數據流式傳輸時,狀況則徹底不一樣,可能的匹配可能分佈在多個數據塊中。代碼後面的程序能夠解釋以下:

  1. 咱們的算法使用searchString函數做爲分隔符來分割塊。
  2. 而後,它取出分隔後生成的數組的最後一項lastPiece,並提取其最後一個字符searchString.length - 1。結果被保存到tailPiece變量中,它將會被做爲下一個數據塊的前綴。
  3. 最後,全部從split()獲得的片斷用replaceString做爲分隔符鏈接在一塊兒,並推入內部buffer區。

stream結束時,咱們可能仍然有最後一個tailPiece變量沒有被壓入內部緩衝區。這正是_flush()方法的用途;它在stream結束以前被調用,而且這是咱們最終有機會完成流或者在徹底結束流以前推送任何剩餘數據的地方。

_flush()方法只須要一個回調函數做爲參數,當全部的操做完成後,咱們必須確保調用這個回調函數。完成了這個,咱們已經完成了咱們的ReplaceStream類。

如今,是時候嘗試新的stream。咱們能夠建立另外一個名爲replaceStreamTest.js的模塊來寫入一些數據,而後讀取轉換的結果:

const ReplaceStream = require('./replaceStream');

const rs = new ReplaceStream('World', 'Node.js');
rs.on('data', chunk => console.log(chunk.toString()));

rs.write('Hello W');
rs.write('orld!');
rs.end();

爲了使得這個例子更復雜一些,咱們把搜索詞分佈在兩個不一樣的數據塊上;而後,使用flowing模式,咱們從同一個stream中讀取數據,記錄每一個已轉換的塊。運行前面的程序應該產生如下輸出:

Hel
lo Node.js
!
有一個值得說起是,第五種類型的stream:stream.PassThrough。 與咱們介紹的其餘流類不一樣,PassThrough不是抽象的,能夠直接實例化,而不須要實現任何方法。實際上,這是一個可轉換的stream,它能夠輸出每一個數據塊,而不須要進行任何轉換。

使用管道鏈接Streams

Unix管道的概念是由Douglas Mcllroy發明的;這使程序的輸出可以鏈接到下一個的輸入。看看下面的命令:

echo Hello World! | sed s/World/Node.js/g

在前面的命令中,echo會將Hello World!寫入標準輸出,而後被重定向到sed命令的標準輸入(由於有管道操做符 |)。 而後sedNode.js替換任何World,並將結果打印到它的標準輸出(此次是控制檯)。

以相似的方式,可使用可讀的Streamspipe()方法將Node.jsStreams鏈接在一塊兒,它具備如下接口:

readable.pipe(writable, [options])

很是直觀地,pipe()方法將從可讀的Streams中發出的數據抽取到所提供的可寫入的Streams中。 另外,當可讀的Streams發出end事件(除非咱們指定{end:false}做爲options)時,可寫入的Streams將自動結束。 pipe()方法返回做爲參數傳遞的可寫入的Streams,若是這樣的stream也是可讀的(例如雙重或可轉換的Streams),則容許咱們建立鏈式調用。

將兩個Streams鏈接到一塊兒時,則容許數據自動流向可寫入的Streams,因此不須要調用read()write()方法;但最重要的是不須要控制back-pressure,由於它會自動處理。

舉個簡單的例子(將會有大量的例子),咱們能夠建立一個名爲replace.js的新模塊,它接受來自標準輸入的文本流,應用替換轉換,而後將數據返回到標準輸出:

const ReplaceStream = require('./replaceStream');
process.stdin
  .pipe(new ReplaceStream(process.argv[2], process.argv[3]))
  .pipe(process.stdout);

上述程序未來自標準輸入的數據傳送到ReplaceStream,而後返回到標準輸出。 如今,爲了實踐這個小應用程序,咱們能夠利用Unix管道將一些數據重定向到它的標準輸入,以下所示:

echo Hello World! | node replace World Node.js

運行上述程序,會輸出以下結果:

Hello Node.js

這個簡單的例子演示了Streams(特別是文本Streams)是一個通用接口,管道幾乎是構成和鏈接全部這些接口的通用方式。

error事件不會經過管道自動傳播。舉個例子,看以下代碼片斷:
stream1
  .pipe(stream2)
  .on('error', function() {});
在前面的鏈式調用中,咱們將只捕獲來自 stream2的錯誤,這是因爲咱們給其添加了 erorr事件偵聽器。這意味着,若是咱們想捕獲從 stream1生成的任何錯誤,咱們必須直接附加另外一個錯誤偵聽器。 稍後咱們將看到一種能夠實現共同錯誤捕獲的另外一種模式(合併 Streams)。 此外,咱們應該注意到,若是目標 Streams(讀取的 Streams)發出錯誤,它將會對源 Streams通知一個 error,以後致使管道的中斷。

Streams如何經過管道

到目前爲止,咱們建立自定義Streams的方式並不徹底遵循Node定義的模式;實際上,從stream基類繼承是違反small surface area的,並須要一些示例代碼。 這並不意味着Streams設計得很差,實際上,咱們不該該忘記,由於StreamsNode.js核心的一部分,因此它們必須儘量地靈活,普遍拓展Streams以至於用戶級模塊可以將它們充分運用。

然而,大多數狀況下,咱們並不須要原型繼承能夠給予的全部權力和可擴展性,但一般咱們想要的僅僅是定義新Streams的一種快速開發的模式。Node.js社區固然也爲此建立了一個解決方案。 一個完美的例子是through2,一個使得咱們能夠簡單地建立轉換的Streams的小型庫。 經過through2,咱們能夠經過調用一個簡單的函數來建立一個新的可轉換的Streams

const transform = through2([options], [_transform], [_flush]);

相似的,from2也容許咱們像下面這樣建立一個可讀的Streams

const readable = from2([options], _read);

接下來,咱們將在本章其他部分展現它們的用法,那時,咱們會清楚使用這些小型庫的好處。

throughfrom是基於 Stream1規範的頂層庫。

基於Streams的異步控制流

經過咱們已經介紹的例子,應該清楚的是,Streams不只能夠用來處理I / O,並且能夠用做處理任何類型數據的優雅編程模式。 但優勢並不止這些;還能夠利用Streams來實現異步控制流,在本節將會看到。

順序執行

默認狀況下,Streams將按順序處理數據;例如,轉換的Streams_transform()函數在前一個數據塊執行callback()以後纔會進行下一塊數據塊的調用。這是Streams的一個重要屬性,按正確順序處理每一個數據塊相當重要,可是也能夠利用這一屬性將Streams實現優雅的傳統控制流模式。

代碼老是比太多的解釋要好得多,因此讓咱們來演示一下如何使用流來按順序執行異步任務的例子。讓咱們建立一個函數來鏈接一組接收到的文件做爲輸入,確保遵照提供的順序。咱們建立一個名爲concatFiles.js的新模塊,並從其依賴開始:

const fromArray = require('from2-array');
const through = require('through2');
const fs = require('fs');

咱們將使用through2來簡化轉換的Streams的建立,並使用from2-array從一個對象數組中建立可讀的Streams
接下來,咱們能夠定義concatFiles()函數:

function concatFiles(destination, files, callback) {
  const destStream = fs.createWriteStream(destination);
  fromArray.obj(files)             //[1]
    .pipe(through.obj((file, enc, done) => {   //[2]
      const src = fs.createReadStream(file);
      src.pipe(destStream, {end: false});
      src.on('end', done); //[3]
    }))
    .on('finish', () => {         //[4]
      destStream.end();
      callback();
    });
}

module.exports = concatFiles;

前面的函數經過將files數組轉換爲Streams來實現對files數組的順序迭代。 該函數所遵循的程序解釋以下:

  1. 首先,咱們使用from2-arrayfiles數組建立一個可讀的Streams
  2. 接下來,咱們使用through來建立一個轉換的Streams來處理序列中的每一個文件。對於每一個文件,咱們建立一個可讀的Streams,並經過管道將其輸入到表示輸出文件的destStream中。 在源文件完成讀取後,經過在pipe()方法的第二個參數中指定{end:false},咱們確保不關閉destStream
  3. 當源文件的全部內容都被傳送到destStream時,咱們調用through.obj公開的done函數來傳遞當前處理已經完成,在咱們的狀況下這是須要觸發處理下一個文件。
  4. 全部文件處理完後,finish事件被觸發。咱們最後能夠結束destStream並調用concatFiles()callback()函數,這個函數表示整個操做的完成。

咱們如今能夠嘗試使用咱們剛剛建立的小模塊。讓咱們建立一個名爲concat.js的新文件來完成一個示例:

const concatFiles = require('./concatFiles');

concatFiles(process.argv[2], process.argv.slice(3), () => {
  console.log('Files concatenated successfully');
});

咱們如今能夠運行上述程序,將目標文件做爲第一個命令行參數,接着是要鏈接的文件列表,例如:

node concat allTogether.txt file1.txt file2.txt

執行這一條命令,會建立一個名爲allTogether.txt的新文件,其中按順序保存file1.txtfile2.txt的內容。

使用concatFiles()函數,咱們可以僅使用Streams實現異步操做的順序執行。正如咱們在Chapter3 Asynchronous Control Flow Patters with Callbacks中看到的那樣,若是使用純JavaScript實現,或者使用async等外部庫,則須要使用或實現迭代器。咱們如今提供了另一個能夠達到一樣效果的方法,正如咱們所看到的,它的實現方式很是優雅且可讀性高。

模式:使用Streams或Streams的組合,能夠輕鬆地按順序遍歷一組異步任務。

無序並行執行

咱們剛剛看到Streams按順序處理每一個數據塊,但有時這可能並不能這麼作,由於這樣並無充分利用Node.js的併發性。若是咱們必須對每一個數據塊執行一個緩慢的異步操做,那麼並行化執行這一組異步任務徹底是有必要的。固然,只有在每一個數據塊之間沒有關係的狀況下才能應用這種模式,這些數據塊可能常常發生在對象模式的Streams中,可是對於二進制模式的Streams不多使用無序的並行執行。

注意:當處理數據的順序很重要時,不能使用無序並行執行的Streams。

爲了並行化一個可轉換的Streams的執行,咱們能夠運用Chapter3 Asynchronous Control Flow Patters with Callbacks所講到的無序並行執行的相同模式,而後作出一些改變使它們適用於Streams。讓咱們看看這是如何更改的。

實現一個無序並行的Streams

讓咱們用一個例子直接說明:咱們建立一個叫作parallelStream.js的模塊,而後自定義一個普通的可轉換的Streams,而後給出一系列可轉換流的方法:

const stream = require('stream');

class ParallelStream extends stream.Transform {
  constructor(userTransform) {
    super({objectMode: true});
    this.userTransform = userTransform;
    this.running = 0;
    this.terminateCallback = null;
  }

  _transform(chunk, enc, done) {
    this.running++;
    this.userTransform(chunk, enc, this._onComplete.bind(this), this.push.bind(this));
    done();
  }

  _flush(done) {
    if(this.running > 0) {
      this.terminateCallback = done;
    } else {
      done();
    }
  }

  _onComplete(err) {
    this.running--;
    if(err) {
      return this.emit('error', err);
    }
    if(this.running === 0) {
      this.terminateCallback && this.terminateCallback();
    }
  }
}

module.exports = ParallelStream;

咱們來分析一下這個新的自定義的類。正如你所看到的同樣,構造函數接受一個userTransform()函數做爲參數,而後將其另存爲一個實例變量;咱們也調用父構造函數,而且咱們默認啓用對象模式。

接下來,來看_transform()方法,在這個方法中,咱們執行userTransform()函數,而後增長當前正在運行的任務個數; 最後,咱們經過調用done()來通知當前轉換步驟已經完成。_transform()方法展現瞭如何並行處理另外一項任務。咱們不用等待userTransform()方法執行完畢再調用done()。 相反,咱們當即執行done()方法。另外一方面,咱們提供了一個特殊的回調函數給userTransform()方法,這就是this._onComplete()方法;以便咱們在userTransform()完成的時候收到通知。

Streams終止以前,會調用_flush()方法,因此若是仍有任務正在運行,咱們能夠經過不當即調用done()回調函數來延遲finish事件的觸發。相反,咱們將其分配給this.terminateCallback變量。爲了理解Streams如何正確終止,來看_onComplete()方法。

在每組異步任務最終完成時,_onComplete()方法會被調用。首先,它會檢查是否有任務正在運行,若是沒有,則調用this.terminateCallback()函數,這將致使Streams結束,觸發_flush()方法的finish事件。

利用剛剛構建的ParallelStream類能夠輕鬆地建立一個無序並行執行的可轉換的Streams實例,可是有個注意:它不會保留項目接收的順序。實際上,異步操做能夠在任什麼時候候都有可能完成並推送數據,而跟它們開始的時刻並無必然的聯繫。所以咱們知道,對於二進制模式的Streams並不適用,由於二進制的Streams對順序要求較高。

實現一個URL監控應用程序

如今,讓咱們使用ParallelStream模塊實現一個具體的例子。讓咱們想象如下咱們想要構建一個簡單的服務來監控一個大URL列表的狀態,讓咱們想象如下,全部的這些URL包含在一個單獨的文件中,而且每個URL佔據一個空行。

Streams可以爲這個場景提供一個高效且優雅的解決方案。特別是當咱們使用咱們剛剛寫的ParallelStream類來無序地審覈這些URL

接下來,讓咱們建立一個簡單的放在checkUrls.js模塊的應用程序。

const fs = require('fs');
const split = require('split');
const request = require('request');
const ParallelStream = require('./parallelStream');

fs.createReadStream(process.argv[2])         //[1]
  .pipe(split())                             //[2]
  .pipe(new ParallelStream((url, enc, done, push) => {     //[3]
    if(!url) return done();
    request.head(url, (err, response) => {
      push(url + ' is ' + (err ? 'down' : 'up') + '\n');
      done();
    });
  }))
  .pipe(fs.createWriteStream('results.txt'))   //[4]
  .on('finish', () => console.log('All urls were checked'))
;

正如咱們所看到的,經過流,咱們的代碼看起來很是優雅,直觀。 讓咱們看看它是如何工做的:

  1. 首先,咱們經過給定的文件參數建立一個可讀的Streams,便於接下來讀取文件。
  2. 咱們經過split將輸入的文件的Streams的內容輸出一個可轉換的Streams到管道中,而且分隔了數據塊的每一行。
  3. 而後,是時候使用咱們的ParallelStream來檢查URL了,咱們發送一個HEAD請求而後等待請求的response。當請求返回時,咱們把請求的結果pushstream中。
  4. 最後,經過管道把結果保存到results.txt文件中。
node checkUrls urlList.txt

這裏的文件urlList.txt包含一組URL,例如:

  • http://www.mariocasciaro.me/
  • http://loige.co/
  • http://thiswillbedownforsure.com/

當應用執行完成後,咱們能夠看到一個文件results.txt被建立,裏面包含有操做的結果,例如:

  • http://thiswillbedownforsure.com is down
  • http://loige.co is up
  • http://www.mariocasciaro.me is up

輸出的結果的順序頗有可能與輸入文件中指定URL的順序不一樣。這是Streams無序並行執行任務的明顯特徵。

出於好奇,咱們可能想嘗試用一個正常的through2流替換ParallelStream,並比較二者的行爲和性能(你可能想這樣作的一個練習)。咱們將會看到,使用through2的方式會比較慢,由於每一個URL都將按順序進行檢查,並且文件results.txt中結果的順序也會被保留。

無序限制並行執行

若是運行包含數千或數百萬個URL的文件的checkUrls應用程序,咱們確定會遇到麻煩。咱們的應用程序將同時建立不受控制的鏈接數量,並行發送大量數據,並可能破壞應用程序的穩定性和整個系統的可用性。咱們已經知道,控制負載的無序限制並行執行是一個極好的解決方案。

讓咱們經過建立一個limitedParallelStream.js模塊來看看它是如何工做的,這個模塊是改編自上一節中建立的parallelStream.js模塊。

讓咱們看看它的構造函數:

class LimitedParallelStream extends stream.Transform {
  constructor(concurrency, userTransform) {
    super({objectMode: true});
    this.concurrency = concurrency;
    this.userTransform = userTransform;
    this.running = 0;
    this.terminateCallback = null;
    this.continueCallback = null;
  }
// ...
}

咱們須要一個concurrency變量做爲輸入來限制併發量,此次咱們要保存兩個回調函數,continueCallback用於任何掛起的_transform方法,terminateCallback用於_flush方法的回調。
接下來看_transform()方法:

_transform(chunk, enc, done) {
  this.running++;
  this.userTransform(chunk, enc,  this.push.bind(this), this._onComplete.bind(this));
  if(this.running < this.concurrency) {
    done();
  } else {
    this.continueCallback = done;
  }
}

此次在_transform()方法中,咱們必須在調用done()以前檢查是否達到了最大並行數量的限制,若是沒有達到了限制,才能觸發下一個項目的處理。若是咱們已經達到最大並行數量的限制,咱們能夠簡單地將done()回調保存到continueCallback變量中,以便在任務完成後當即調用它。

_flush()方法與ParallelStream類保持徹底同樣,因此咱們直接轉到實現_onComplete()方法:

_onComplete(err) {
  this.running--;
  if(err) {
    return this.emit('error', err);
  }
  const tmpCallback = this.continueCallback;
  this.continueCallback = null;
  tmpCallback && tmpCallback();
  if(this.running === 0) {
    this.terminateCallback && this.terminateCallback();
  }
}

每當任務完成,咱們調用任何已保存的continueCallback()將致使
stream解鎖,觸發下一個項目的處理。

這就是limitedParallelStream模塊。 咱們如今能夠在checkUrls模塊中使用它來代替parallelStream,而且將咱們的任務的併發限制在咱們設置的值上。

順序並行執行

咱們之前建立的並行Streams可能會使得數據的順序混亂,可是在某些狀況下這是不可接受的。有時,實際上,有那種須要每一個數據塊都以接收到的相同順序發出的業務場景。咱們仍然能夠並行運行transform函數。咱們所要作的就是對每一個任務發出的數據進行排序,使其遵循與接收數據相同的順序。

這種技術涉及使用buffer,在每一個正在運行的任務發出時從新排序塊。爲簡潔起見,咱們不打算提供這樣一個stream的實現,由於這本書的範圍是至關冗長的;咱們要作的就是重用爲了這個特定目的而構建的npm上的一個可用包,例如through2-parallel

咱們能夠經過修改現有的checkUrls模塊來快速檢查一個有序的並行執行的行爲。 假設咱們但願咱們的結果按照與輸入文件中的URL相同的順序編寫。 咱們可使用經過through2-parallel來實現:

const fs = require('fs');
const split = require('split');
const request = require('request');
const throughParallel = require('through2-parallel');

fs.createReadStream(process.argv[2])
  .pipe(split())
  .pipe(throughParallel.obj({concurrency: 2}, function (url, enc, done) {
    if(!url) return done();
    request.head(url, (err, response) => {
      this.push(url + ' is ' + (err ? 'down' : 'up') + '\n');
      done();
    });
  }))
  .pipe(fs.createWriteStream('results.txt'))
  .on('finish', () => console.log('All urls were checked'))
;

正如咱們所看到的,through2-parallel的接口與through2的接口很是類似;惟一的不一樣是在through2-parallel還能夠爲咱們提供的transform函數指定一個併發限制。若是咱們嘗試運行這個新版本的checkUrls,咱們會看到results.txt文件列出結果的順序與輸入文件中
URLs的出現順序是同樣的。

經過這個,咱們總結了使用Streams實現異步控制流的分析;接下來,咱們研究管道模式。

管道模式

就像在現實生活中同樣,Node.jsStreams也能夠按照不一樣的模式進行管道鏈接。事實上,咱們能夠將兩個不一樣的Streams合併成一個Streams,將一個Streams分紅兩個或更多的管道,或者根據條件重定向流。 在本節中,咱們將探討可應用於Node.jsStreams最重要的管道技術。

組合的Streams

在本章中,咱們強調Streams提供了一個簡單的基礎結構來模塊化和重用咱們的代碼,可是卻漏掉了一個重要的部分:若是咱們想要模塊化和重用整個流水線?若是咱們想要合併多個Streams,使它們看起來像外部的Streams,那該怎麼辦?下圖顯示了這是什麼意思:

從上圖中,咱們看到了如何組合幾個流的了:

  • 當咱們寫入組合的Streams的時候,實際上咱們是寫入組合的Streams的第一個單元,即StreamA
  • 當咱們從組合的Streams中讀取信息時,實際上咱們從組合的Streams的最後一個單元中讀取。

一個組合的Streams一般是一個多重的Streams,經過鏈接第一個單元的寫入端和鏈接最後一個單元的讀取端。

要從兩個不一樣的Streams(一個可讀的Streams和一個可寫入的Streams)中建立一個多重的Streams,咱們可使用一個npm模塊,例如 duplexer2

但上述這麼作並不完整。實際上,組合的Streams還應該作到捕獲到管道中任意一段Streams單元產生的錯誤。咱們已經說過,任何錯誤都不會自動傳播到管道中。 因此,咱們必須有適當的錯誤管理,咱們將不得不顯式附加一個錯誤監聽器到每一個Streams。可是,組合的Streams其實是一個黑盒,這意味着咱們沒法訪問管道中間的任何單元,因此對於管道中任意單元的異常捕獲,組合的Streams也充當聚合器的角色。

總而言之,組合的Streams具備兩個主要優勢:

  • 管道內部是一個黑盒,對使用者不可見。
  • 簡化了錯誤管理,由於咱們沒必要爲管道中的每一個單元附加一個錯誤偵聽器,而只須要給組合的Streams自身附加上就能夠了。

組合的Streams是一個很是通用和廣泛的作法,因此若是咱們沒有任何特殊的須要,咱們可能只想重用現有的解決方案,如multipipecombine-stream

實現一個組合的Streams

爲了說明一個簡單的例子,咱們來考慮下面兩個組合的Streams的狀況:

  • 壓縮和加密數據
  • 解壓和解密數據

使用諸如multipipe之類的庫,咱們能夠經過組合一些核心庫中已有的Streams(文件combinedStreams.js)來輕鬆地構建組合的Streams

const zlib = require('zlib');
const crypto = require('crypto');
const combine = require('multipipe');
module.exports.compressAndEncrypt = password => {
  return combine(
    zlib.createGzip(),
    crypto.createCipher('aes192', password)
  );
};
module.exports.decryptAndDecompress = password => {
  return combine(
    crypto.createDecipher('aes192', password),
    zlib.createGunzip()
  );
};

例如,咱們如今可使用這些組合的數據流,如同黑盒,這些對咱們均是不可見的,能夠建立一個小型應用程序,經過壓縮和加密來歸檔文件。 讓咱們在一個名爲archive.js的新模塊中作這件事:

const fs = require('fs');
const compressAndEncryptStream = require('./combinedStreams').compressAndEncrypt;
fs.createReadStream(process.argv[3])
  .pipe(compressAndEncryptStream(process.argv[2]))
  .pipe(fs.createWriteStream(process.argv[3] + ".gz.enc"));

咱們能夠經過從咱們建立的流水線中構建一個組合的Stream來進一步改進前面的代碼,但此次並不僅是爲了得到對外不可見的黑盒,而是爲了進行異常捕獲。 實際上,正如咱們已經提到過的那樣,寫下以下的代碼只會捕獲最後一個Stream單元發出的錯誤:

fs.createReadStream(process.argv[3])
  .pipe(compressAndEncryptStream(process.argv[2]))
  .pipe(fs.createWriteStream(process.argv[3] + ".gz.enc"))
  .on('error', function(err) {
    // 只會捕獲最後一個單元的錯誤
    console.log(err);
  });

可是,經過把全部的Streams結合在一塊兒,咱們能夠優雅地解決這個問題。重構後的archive.js以下:

const combine = require('multipipe');
   const fs = require('fs');
   const compressAndEncryptStream =
     require('./combinedStreams').compressAndEncrypt;
   combine(
     fs.createReadStream(process.argv[3])
     .pipe(compressAndEncryptStream(process.argv[2]))
     .pipe(fs.createWriteStream(process.argv[3] + ".gz.enc"))
   ).on('error', err => {
     // 使用組合的Stream能夠捕獲任意位置的錯誤
     console.log(err);
   });

正如咱們所看到的,咱們如今能夠將一個錯誤偵聽器直接附加到組合的Streams,它將接收任何內部流發出的任何error事件。
如今,要運行archive模塊,只需在命令行參數中指定passwordfile參數,即壓縮模塊的參數:

node archive mypassword /path/to/a/file.text

經過這個例子,咱們已經清楚地證實了組合的Stream是多麼重要; 從一個方面來講,它容許咱們建立流的可重用組合,從另外一方面來講,它簡化了管道的錯誤管理。

分開的Streams

咱們能夠經過將單個可讀的Stream管道化爲多個可寫入的Stream來執行Stream的分支。當咱們想要將相同的數據發送到不一樣的目的地時,這便體現其做用了,例如,兩個不一樣的套接字或兩個不一樣的文件。當咱們想要對相同的數據執行不一樣的轉換時,或者當咱們想要根據一些標準拆分數據時,也可使用它。如圖所示:

Node.js中分開的Stream是一件小事。舉例說明。

實現一個多重校驗和的生成器

讓咱們建立一個輸出給定文件的sha1md5散列的小工具。咱們來調用這個新模塊generateHashes.js,看以下的代碼:

const fs = require('fs');
const crypto = require('crypto');
const sha1Stream = crypto.createHash('sha1');
sha1Stream.setEncoding('base64');
const md5Stream = crypto.createHash('md5');
md5Stream.setEncoding('base64');

目前爲止沒什麼特別的 該模塊的下一個部分其實是咱們將從文件建立一個可讀的Stream,並將其分叉到兩個不一樣的流,以得到另外兩個文件,其中一個包含sha1散列,另外一個包含md5校驗和:

const inputFile = process.argv[2];
const inputStream = fs.createReadStream(inputFile);
inputStream
  .pipe(sha1Stream)
  .pipe(fs.createWriteStream(inputFile + '.sha1'));
inputStream
  .pipe(md5Stream)
  .pipe(fs.createWriteStream(inputFile + '.md5'));

這很簡單:inputStream變量經過管道一邊輸入到sha1Stream,另外一邊輸入到md5Stream。可是要注意:

  • inputStream結束時,md5Streamsha1Stream會自動結束,除非當調用pipe()時指定了end選項爲false
  • Stream的兩個分支會接受相同的數據塊,所以當對數據執行一些反作用的操做時咱們必須很是謹慎,由於那樣會影響另一個分支。
  • 黑盒外會產生背壓,來自inputStream的數據流的流速會根據接收最慢的分支的流速做出調整。

合併的Streams

合併與分開相對,經過把一組可讀的Streams合併到一個單獨的可寫的Stream裏,如圖所示:

將多個Streams合併爲一個一般是一個簡單的操做; 然而,咱們必須注意咱們處理end事件的方式,由於使用自動結束選項的管道系統會在一個源結束時當即結束目標流。 這一般會致使錯誤,由於其餘還未結束的源將繼續寫入已終止的Stream。 解決此問題的方法是在將多個源傳輸到單個目標時使用選項{end:false},而且只有在全部源完成讀取後纔在目標Stream上調用end()

用多個源文件壓縮爲一個壓縮包

舉一個簡單的例子,咱們來實現一個小程序,它根據兩個不一樣目錄的內容建立一個壓縮包。 爲此,咱們將介紹兩個新的npm模塊:

  • tar用來建立壓縮包
  • fstream從文件系統文件建立對象streams的庫

咱們建立一個新模塊mergeTar.js,以下開始初始化:

var tar = require('tar');
var fstream = require('fstream');
var path = require('path');
var destination = path.resolve(process.argv[2]);
var sourceA = path.resolve(process.argv[3]);
var sourceB = path.resolve(process.argv[4]);

在前面的代碼中,咱們只加載所有依賴包和初始化包含目標文件和兩個源目錄(sourceAsourceB)的變量。

接下來,咱們建立tarStream並經過管道輸出到一個可寫入的Stream

const pack = tar.Pack();
pack.pipe(fstream.Writer(destination));

如今,咱們開始初始化源Stream

let endCount = 0;

function onEnd() {
  if (++endCount === 2) {
    pack.end();
  }
}

const sourceStreamA = fstream.Reader({
    type: "Directory",
    path: sourceA
  })
  .on('end', onEnd);

const sourceStreamB = fstream.Reader({
    type: "Directory",
    path: sourceB
  })
  .on('end', onEnd);

在前面的代碼中,咱們建立了從兩個源目錄(sourceStreamAsourceStreamB)中讀取的Stream那麼對於每一個源Stream,咱們附加一個end事件訂閱者,只有當這兩個目錄被徹底讀取時,纔會觸發packend事件。

最後,合併兩個Stream

sourceStreamA.pipe(pack, {end: false});
sourceStreamB.pipe(pack, {end: false});

咱們將兩個源文件都壓縮到pack這個Stream中,並經過設定pipe()option參數爲{end:false}配置終點Stream的自動觸發end事件。

這樣,咱們已經完成了咱們簡單的TAR程序。咱們能夠經過提供目標文件做爲第一個命令行參數,而後是兩個源目錄來嘗試運行這個實用程序:

node mergeTar dest.tar /path/to/sourceA /path/to/sourceB

npm中咱們能夠找到一些能夠簡化Stream的合併的模塊:

要注意,流入目標Stream的數據是隨機混合的,這是一個在某些類型的對象流中能夠接受的屬性(正如咱們在上一個例子中看到的那樣),可是在處理二進制Stream時一般是一個不但願這樣。

然而,咱們能夠經過一種模式按順序合併Stream; 它包含一個接一個地合併源Stream,當前一個結束時,開始發送第二段數據塊(就像鏈接全部源Stream的輸出同樣)。在npm上,咱們能夠找到一些也處理這種狀況的軟件包。其中之一是multistream

多路複用和多路分解

合併Stream模式有一個特殊的模式,咱們並非真的只想將多個Stream合併在一塊兒,而是使用一個共享通道來傳送一組數據Stream。與以前的不同,由於源數據Stream在共享通道內保持邏輯分離,這使得一旦數據到達共享通道的另外一端,咱們就能夠再次分離數據Stream。如圖所示:

將多個Stream組合在單個Stream上傳輸的操做被稱爲多路複用,而相反的操做(即,從共享Stream接收數據重構原始的Stream)則被稱爲多路分用。執行這些操做的設備分別稱爲多路複用器和多路分解器(。 這是一個在計算機科學和電信領域普遍研究的話題,由於它是幾乎任何類型的通訊媒體,如電話,廣播,電視,固然還有互聯網自己的基礎之一。 對於本書的範圍,咱們不會過多解釋,由於這是一個很大的話題。

咱們想在本節中演示的是,如何使用共享的Node.js Streams來傳送多個邏輯上分離的Stream,而後在共享Stream的另外一端再次分離,即實現一次多路複用和多路分解。

建立一個遠程logger日誌記錄

舉例說明,咱們但願有一個小程序來啓動子進程,並將其標準輸出和標準錯誤都重定向到遠程服務器,服務器接受它們而後保存爲兩個單獨的文件。所以,在這種狀況下,共享介質是TCP鏈接,而要複用的兩個通道是子進程的stdoutstderr。 咱們將利用分組交換的技術,這種技術與IPTCPUDP等協議所使用的技術相同,包括將數據封裝在數據包中,容許咱們指定各類源信息,這對多路複用,路由,控制 流程,檢查損壞的數據都十分有幫助。

如圖所示,這個例子的協議大概是這樣,數據被封裝成具備如下結構的數據包:

在客戶端實現多路複用

先說客戶端,建立一個名爲client.js的模塊,這是咱們這個應用程序的一部分,它負責啓動一個子進程並實現Stream多路複用。

開始定義模塊,首先加載依賴:

const child_process = require('child_process');
const net = require('net');

而後開始實現多路複用的函數:

function multiplexChannels(sources, destination) {
  let totalChannels = sources.length;

  for(let i = 0; i < sources.length; i++) {
    sources[i]
      .on('readable', function() { // [1]
        let chunk;
        while ((chunk = this.read()) !== null) {
          const outBuff = new Buffer(1 + 4 + chunk.length); // [2]
          outBuff.writeUInt8(i, 0);
          outBuff.writeUInt32BE(chunk.length, 1);
          chunk.copy(outBuff, 5);
          console.log('Sending packet to channel: ' + i);
          destination.write(outBuff); // [3]
        }
      })
      .on('end', () => { //[4]
        if (--totalChannels === 0) {
          destination.end();
        }
      });
  }
}

multiplexChannels()函數接受要複用的源Stream做爲輸入
和複用接口做爲參數,而後執行如下步驟:

  1. 對於每一個源Stream,它會註冊一個readable事件偵聽器,咱們使用non-flowing模式從流中讀取數據。
  2. 每讀取一個數據塊,咱們將其封裝到一個首部中,首部的順序爲:channel ID爲1字節(UInt8),數據包大小爲4字節(UInt32BE),而後爲實際數據。
  3. 數據包準備好後,咱們將其寫入目標Stream
  4. 咱們爲end事件註冊一個監聽器,以便當全部源Stream結束時,end事件觸發,通知目標Stream觸發end事件。
注意,咱們的協議最多可以複用多達256個不一樣的源流,由於咱們只有1個字節來標識 channel
const socket = net.connect(3000, () => { // [1]
  const child = child_process.fork( // [2]
    process.argv[2],
    process.argv.slice(3), {
      silent: true
    }
  );
  multiplexChannels([child.stdout, child.stderr], socket); // [3]
});

在最後,咱們執行如下操做:

  1. 咱們建立一個新的TCP客戶端鏈接到地址localhost:3000
  2. 咱們經過使用第一個命令行參數做爲路徑來啓動子進程,同時咱們提供剩餘的process.argv數組做爲子進程的參數。咱們指定選項{silent:true},以便子進程不會繼承父級的stdoutstderr
  3. 咱們使用mutiplexChannels()函數將stdoutstderr多路複用到socket裏。
在服務端實現多路分解

如今來看服務端,建立server.js模塊,在這裏咱們未來自遠程鏈接的Stream多路分解,並將它們傳送到兩個不一樣的文件中。

首先建立一個名爲demultiplexChannel()的函數:

function demultiplexChannel(source, destinations) {
  let currentChannel = null;
  let currentLength = null;
  source
    .on('readable', () => { //[1]
      let chunk;
      if(currentChannel === null) {          //[2]
        chunk = source.read(1);
        currentChannel = chunk && chunk.readUInt8(0);
      }
    
      if(currentLength === null) {          //[3]
        chunk = source.read(4);
        currentLength = chunk && chunk.readUInt32BE(0);
        if(currentLength === null) {
          return;
        }
      }
    
      chunk = source.read(currentLength);        //[4]
      if(chunk === null) {
        return;
      }
    
      console.log('Received packet from: ' + currentChannel);
    
      destinations[currentChannel].write(chunk);      //[5]
      currentChannel = null;
      currentLength = null;
    })
    .on('end', () => {            //[6]
      destinations.forEach(destination => destination.end());
      console.log('Source channel closed');
    })
  ;
}

上面的代碼可能看起來很複雜,仔細閱讀並不是如此;因爲Node.js可讀的Stream的拉動特性,咱們能夠很容易地實現咱們的小協議的多路分解,以下所示:

  1. 咱們開始使用non-flowing模式從流中讀取數據。
  2. 首先,若是咱們尚未讀取channel ID,咱們嘗試從流中讀取1個字節,而後將其轉換爲數字。
  3. 下一步是讀取首部的長度。咱們須要讀取4個字節,因此有可能在內部Buffer尚未足夠的數據,這將致使this.read()調用返回null。在這種狀況下,咱們只是中斷解析,而後重試下一個readable事件。
  4. 當咱們最終還能夠讀取數據大小時,咱們知道從內部Buffer中拉出多少數據,因此咱們嘗試讀取全部數據。
  5. 當咱們讀取全部的數據時,咱們能夠把它寫到正確的目標通道,必定要記得重置currentChannelcurrentLength變量(這些變量將被用來解析下一個數據包)。
  6. 最後,當源channel結束時,必定不要忘記調用目標Streamend()方法。

既然咱們能夠多路分解源Stream,進行以下調用:

net.createServer(socket => {
  const stdoutStream = fs.createWriteStream('stdout.log');
  const stderrStream = fs.createWriteStream('stderr.log');
  demultiplexChannel(socket, [stdoutStream, stderrStream]);
})
  .listen(3000, () => console.log('Server started'))
;

在上面的代碼中,咱們首先在3000端口上啓動一個TCP服務器,而後對於咱們接收到的每一個鏈接,咱們將建立兩個可寫入的Stream,指向兩個不一樣的文件,一個用於標準輸出,另外一個用於標準錯誤; 這些是咱們的目標channel。 最後,咱們使用demultiplexChannel()將套接字流解複用爲stdoutStreamstderrStream

運行多路複用和多路分解應用程序

如今,咱們準備嘗試運行咱們的新的多路複用/多路分解應用程序,但首先讓咱們建立一個小的Node.js程序來產生一些示例輸出; 咱們把它叫作generateData.js

console.log("out1");
console.log("out2");
console.error("err1");
console.log("out3");
console.error("err2");

首先,讓咱們開始運行服務端:

node server

而後運行客戶端,須要提供做爲子進程的文件參數:

node client generateData.js

客戶端幾乎立馬運行,可是進程結束時,generateData應用程序的標準輸入和標準輸出通過一個TCP鏈接,而後在服務器端,被多路分解成兩個文件。

注意,當咱們使用 child_process.fork()時,咱們的客戶端可以啓動別的 Node.js模塊。

對象Streams的多路複用和多路分解

咱們剛剛展現的例子演示瞭如何複用和解複用二進制/文本Stream,但值得一提的是,相同的規則也適用於對象Stream。 最大的區別是,使用對象,咱們已經有了使用原子消息(對象)傳輸數據的方法,因此多路複用就像設置一個屬性channel ID到每一個對象同樣簡單,而多路分解只須要讀·channel ID屬性,並將每一個對象路由到正確的目標Stream

還有一種模式是取一個對象上的幾個屬性並分發到多個目的Stream的模式 經過這種模式,咱們能夠實現複雜的流程,以下圖所示:

如上圖所示,取一個對象Stream表示animals,而後根據動物類型:reptilesamphibiansmammals,而後分發到正確的目標Stream中。

總結

在本章中,咱們已經對Node.js Streams及其使用案例進行了闡述,但同時也應該爲編程範式打開一扇大門,幾乎具備無限的可能性。咱們瞭解了爲何StreamNode.js社區讚譽,而且咱們掌握了它們的基本功能,使咱們可以利用它作更多有趣的事情。咱們分析了一些先進的模式,並開始瞭解如何將不一樣配置的Streams鏈接在一塊兒,掌握這些特性,從而使流如此多才多藝,功能強大。

若是咱們遇到不能用一個Stream來實現的功能,咱們能夠經過將其餘Streams鏈接在一塊兒來實現,這是Node.js的一個很好的特性;Streams在處理二進制數據,字符串和對象都十分有用,並具備鮮明的特色。

在下一章中,咱們將重點介紹傳統的面向對象的設計模式。儘管JavaScript在某種程度上是面向對象的語言,但在Node.js中,函數式或混合方法一般是首選。在閱讀下一章便揭曉答案。

相關文章
相關標籤/搜索