[譯] Node.js 流: 你須要知道的一切

Node.js 流: 你須要知道的一切

圖片來源javascript

Node.js 中的流有着難以使用,更難以理解的名聲。如今我有一個好消息告訴你:事情已經再也不是這樣了。html

很長時間以來,開發人員創造了許許多多的軟件包爲的就是能夠更簡單的使用流。可是在本文中,我會把重點放在原生的 Node.js 流 API上。前端

「流是 Node 中最棒的,同時也是最被人誤解的想法。」java

— Dominic Tarrnode

流究竟是什麼呢?

流是數據的集合 —— 就像數組或字符串同樣。區別在於流中的數據可能不會馬上就所有可用,而且你無需一次性地把這些數據所有放入內存。這使得流在操做大量數據或是數據從外部來源逐發送過來的時候變得很是有用。react

然而,流的做用並不只限於操做大量數據。它還帶給咱們組合代碼的能力。就像咱們能夠經過管道鏈接幾個簡單的 Linux 命令以組合出強大的功能同樣,咱們能夠利用流在 Node 中作一樣的事。android

Linux 命令的組合性ios

const grep = ... // 一個 grep 命令輸出的 stream
const wc = ... // 一個 wc 命令輸入的 stream

grep.pipe(wc)複製代碼

Node 中許多內建的模塊都實現了流接口:git

截屏來自於個人 Pluralsight 課程 —— 高級 Node.jsgithub

上邊的列表中有一些 Node.js 原生的對象,這些對象也是能夠讀寫的流。這些對象中的一部分是既可讀、又可寫的流,例如 TCP sockets,zlib 以及 crypto。

須要注意的是這些對象是緊密關聯的。雖然一個 HTTP 響應在客戶端是一個可讀流,但在服務器端它倒是一個可寫流。這是由於在 HTTP 的狀況中,咱們基本上是從一個對象(http.IncomingMessage)讀取數據,向另外一個對象(http.ServerResponse)寫入數據。

還須要注意的是 stdio 流(stdinstdoutstderr)在子進程中有着與父進程中相反的類型。這使得在子進程中從父進程的 stdio 流中讀取或寫入數據變得很是簡單。

一個流的真實例子

理論是偉大的,當每每沒有 100% 的說服力。下面讓咱們經過一個例子來看看流在節省內存消耗方面能夠起到的做用。

首先讓咱們建立一個大文件:

const fs = require('fs');
const file = fs.createWriteStream('./big.file');

for(let i=0; i<= 1e6; i++) {
  file.write('Lorem ipsum dolor sit amet, consectetur adipisicing elit, sed do eiusmod tempor incididunt ut labore et dolore magna aliqua. Ut enim ad minim veniam, quis nostrud exercitation ullamco laboris nisi ut aliquip ex ea commodo consequat. Duis aute irure dolor in reprehenderit in voluptate velit esse cillum dolore eu fugiat nulla pariatur. Excepteur sint occaecat cupidatat non proident, sunt in culpa qui officia deserunt mollit anim id est laborum.\n');
}

file.end();複製代碼

看看在建立這個大文件時我用到了什麼。一個可寫流!

經過 fs 模塊你可使用一個流接口讀取或寫入文件。在上面的例子中,咱們經過一個可寫流向 big.file 寫入了 100 萬行數據。

執行這段腳本會生成一個約 400MB 大小的文件。

如下是一個用來發送 big.file 文件的 Node web 服務器:

const fs = require('fs');
const server = require('http').createServer();

server.on('request', (req, res) => {
  fs.readFile('./big.file', (err, data) => {
    if (err) throw err;

    res.end(data);
  });
});

server.listen(8000);複製代碼

當服務器收到請求時,它會經過異步方法 fs.readFile 讀取文件內容發送給客戶端。看起來咱們並無阻塞事件循環。一切看起來還不錯,是吧?是嗎?

讓咱們來看看真實的狀況吧。咱們啓動服務器,發起鏈接,並監控內存的使用狀況。

當我啓動服務器的時候,它佔用了一個正常大小的內存空間,8.7MB:

當我鏈接到服務器的時候。請注意內存消耗的變化:

哇 —— 內存消耗暴增到 434.8MB。

在咱們將其寫入響應對象以前,咱們基本上把 big.file 的所有內容都載入到內存中了。這是很是低效的。

HTTP 響應對象也是一個可寫流。這意味着若是咱們有一個表明了 big.file 內容的可讀流,咱們就能夠經過將兩個流鏈接起來以實現相同的功能而沒必要消耗約 400MB 的內存。

Node fs 模塊中的 createReadStream 方法能夠針對任何文件給咱們返回一個可讀流。咱們能夠把它和響應對象鏈接起來:

const fs = require('fs');
const server = require('http').createServer();

server.on('request', (req, res) => {
  const src = fs.createReadStream('./big.file');
  src.pipe(res);
});

server.listen(8000);複製代碼

如今,當你再次鏈接到服務器時,神奇的事情發生了(請注意內存消耗):

發生了什麼?

當客戶端請求這個大文件時,咱們經過流逐塊的發送數據。這意味着咱們不須要把文件的所有內容緩存到內存中。內存消耗只增加了大約 25MB。

你能夠把這個例子推向極端。從新生成一個 500 萬行而不是 100 萬行的 big.file 文件。它大概有 2GB 那麼大。這已經超過了 Node 中默認的緩衝區大小的上限。

若是你嘗試經過 fs.readFile 讀取那個文件,默認狀況下會失敗(固然你能夠修改緩衝區大小上限)。可是經過使用 fs.createReadStream,向客戶端發送一個 2GB 的文件就沒有任何問題。更棒的是,進程的內存消耗並不會因文件增大而增加。

準備好學習流了嗎?

這篇文章是個人 Pluralsight 課堂上 Node.js 課程中的一部分。你能夠經過這個連接找到這部份內容的視頻版。

流快速入門

在 Node.js 中有四種基本類型的流:可讀流,可寫流,雙向流以及變換流。

  • 可讀流是對一個能夠讀取數據的源的抽象。fs.createReadStream 方法是一個可讀流的例子。
  • 可寫流是對一個能夠寫入數據的目標的抽象。fs.createWriteStream 方法是一個可寫流的例子。
  • 雙向流既是可讀的,又是可寫的。TCP socket 就屬於這種。
  • 變換流是一種特殊的雙向流,它會基於寫入的數據生成可供讀取的數據。例如使用 zlib.createGzip 來壓縮數據。你能夠把一個變換流想象成一個函數,這個函數的輸入部分對應可寫流,輸出部分對應可讀流。你也可能據說過變換流有時被稱爲 「thought streams」。

全部的流都是 EventEmitter 的實例。它們發出可用於讀取或寫入數據的事件。然而,咱們能夠利用 pipe 方法以一種更簡單的方式使用流中的數據。

pipe 方法

如下這行代碼就是你要記住的魔法:

readableSrc.pipe(writableDest)複製代碼

在這行簡單的代碼中,咱們以管道的方式把一個可讀流的輸出鏈接到了一個可寫流的輸入。管道的上游(source)必須是一個可讀流,下游(destination)必須是一個可寫流。固然,它們也能夠是雙向流/變換流。事實上,若是咱們使用管道鏈接的是雙向流,咱們就能夠像 Linux 系統裏那樣鏈接多個流:

readableSrc
  .pipe(transformStream1)
  .pipe(transformStream2)
  .pipe(finalWrtitableDest)複製代碼

pipe 方法會返回最後一個流,這使得咱們能夠串聯多個流。對於流 a (可讀),bc (雙向),以及 d(可寫)。咱們能夠這樣:

a.pipe(b).pipe(c).pipe(d)

# 等價於:
a.pipe(b)
b.pipe(c)
c.pipe(d)

# 在 Linux 中,等價於:
$ a | b | c | d複製代碼

pipe 方法是使用流最簡單的方式。一般的建議是要麼使用 pipe 方法、要麼使用事件來讀取流,要避免混合使用二者。通常狀況下使用 pipe 方法時你就沒必要再使用事件了。但若是你想以一種更加自定義的方式使用流,就要用到事件了。

流事件

除了從可讀流中讀取數據寫入可寫流之外,pipe 方法還自動幫你處理了一些其餘狀況。例如,錯誤處理,文件結尾,以及兩個流讀取/寫入速度不一致的狀況。

然而,流也能夠直接經過事件讀取。如下是一段簡化的使用事件來模擬 pipe 讀取、寫入數據的代碼:

# readable.pipe(writable)

readable.on('data', (chunk) => {
  writable.write(chunk);
});

readable.on('end', () => {
  writable.end();
});複製代碼

如下是一些使用可讀流或可寫流時用到的事件和方法:

截屏來自於個人 Pluralsight 課程 - 高級 Node.js

這些事件和函數是相關的,由於咱們老是把它們組合在一塊兒使用。

一個可讀流上最重要的兩個事件是:

  • data 事件,任什麼時候候當可讀流發送數據給它的消費者時,會觸發此事件
  • end 事件,當可讀流沒有更多的數據要發送給消費者時,會觸發此事件

一個可寫流上最重要的兩個事件是:

  • drain 事件,這是一個表示可寫流能夠接受更多數據的信號.
  • finish 事件,當全部數據都被寫入底層系統後會觸發此事件。

事件和函數能夠組合起來使用,以更加定製,優化的方式使用流。對於可讀流,咱們可使用 pipe/unpipe 方法,或是 readunshiftresume方法。對於可寫流,咱們能夠把它設置爲 pipe/unpipe 方法的下游,亦或是使用 write 方法寫入數據並在寫入完成後調用 end 方法。

可讀流的暫停和流動模式

可讀流有兩種主要的模式,影響咱們使用它的方式:

  • 它要麼處於暫停模式
  • 要麼就是處於流動模式

這些模式有時也被成爲拉取和推送模式。

全部的可讀流默認都處於暫停模式。但它們能夠按需在流動模式和暫停模式間切換。這種切換有時會自動發生。

當一個可讀流處於暫停模式時,咱們可使用 read() 方法按需的讀取數據。而對於一個處於流動模式的可讀流,數據會源源不斷的流動,咱們須要經過事件監聽來處理數據。

在流動模式中,若是沒有消費者監聽事件那麼數據就會丟失。這就是爲什麼在處理流動模式的可讀流時咱們須要一個 data 事件回調函數。事實上,經過增長一個 data 事件回調就能夠把處於暫停模式的流切換到流動模式;一樣的,移除 data 事件回調會把流切回到暫停模式。這麼作的一部分緣由是爲了和舊的 Node 流接口兼容。

要手動在這兩個模式間切換,你可使用 resume()pause() 方法。

截屏來自於個人 Pluralsight 課程 - 高級 Node.js

當使用 pipe 方法時,它會自動幫你處理好這些模式之間的切換,所以你無須關心這些細節。

實現流接口

當咱們討論 Node.js 中的流時,主要是討論兩項任務:

  • 一個是實現流。
  • 一個是使用流。

到目前爲止,咱們只討論瞭如何使用流。接下來讓咱們看看如何實現它!

流的實現者一般都會 require stream 模塊。

實現一個可寫流

要實現一個可寫流,咱們須要使用來自 stream 模塊的 Writable 類。

const { Writable } = require('streams');複製代碼

實現一個可寫流有不少種方法。例如,咱們能夠繼承 Writable 類:

class myWritableStream extends Writable {
}複製代碼

然而,我傾向於更簡單的構造方法。咱們能夠直接給 Writable 構造函數傳入配置項來建立一個對象。惟一必須的配置項是一個 write 函數,它用於暴露一個寫入數據的接口。

const { Writable } = require('stream');
const outStream = new Writable({
  write(chunk, encoding, callback) {
    console.log(chunk.toString());
    callback();
  }
});

process.stdin.pipe(outStream);複製代碼

write 方法接受三個參數。

  • chunk 一般是一個 buffer,除非咱們對流進行了特殊配置。
  • encoding 一般能夠忽略。除非 chunk 被配置爲不是 buffer。
  • callback 方法是一個在咱們完成數據處理後要執行的回調函數。它用來表示數據是否成功寫入。如果寫入失敗,在執行該回調函數時須要傳入一個錯誤對象。

outStream 中,咱們只是單純的把收到的數據當作字符串 console.log 出來,並經過執行 callback 時不傳入錯誤對象以表示寫入成功。這是一個很是簡單且沒什麼用處的回傳流。它會回傳任何收到的數據。

要使用這個流,咱們能夠把它和可讀流 process.stdin 配合使用。只需把 process.stdin 經過管道鏈接到 outStream

當咱們運行上面的代碼時,任何輸入到 process.stdin 中的字符都會被 outStream 中的 console.log 輸出回來。

這不是一個很是實用的流實現,由於 Node 已經內置了它的實現。它幾乎等同於 process.stdout。經過把 stdinstdout 鏈接起來,咱們就能夠經過一行代碼獲得徹底相同的回傳效果:

process.stdin.pipe(process.stdout);複製代碼

實現一個可讀流

要實現可讀流,咱們須要引入 Readable 接口並經過它建立對象:

const { Readable } = require('stream');

const inStream = new Readable({});複製代碼

這是一個很是簡單的可讀流實現。咱們能夠經過 push 方法向下遊推送數據。

const { Readable } = require('stream');  

const inStream = new Readable();

inStream.push('ABCDEFGHIJKLM');
inStream.push('NOPQRSTUVWXYZ');

inStream.push(null); // 沒有更多數據了

inStream.pipe(process.stdout);複製代碼

當咱們 push 一個 null 值,這表示該流後續不會再有任何數據了。

要使用這個可讀流,咱們能夠把它鏈接到可寫流 process.stdout

當咱們執行以上代碼時,全部讀取自 inStream 的數據都會被顯示到標準輸出上。很是簡單,但並不高效。

在把該流鏈接到 process.stdout 以前,咱們就已經推送了全部數據。更好的方式是隻在使用者要求時按需推送數據。咱們能夠經過在可讀流配置中實現 read() 方法來達成這一目的:

const inStream = new Readable({
  read(size) {
    // 某人想要讀取數據
  }
});複製代碼

當可讀流上的 read 方法被調用時,流實現能夠向隊列中推送部分數據。例如,咱們能夠從字符編碼 65(表示字母 A) 開始,一次推送一個字母,每次都把字符編碼加 1:

const inStream = new Readable({
  read(size) {
    this.push(String.fromCharCode(this.currentCharCode++));
    if (this.currentCharCode > 90) {
      this.push(null);
    }
  }
});

inStream.currentCharCode = 65

inStream.pipe(process.stdout);複製代碼

當使用者讀取該可讀流時,read 方法會持續被觸發,咱們不斷推送字母。咱們須要在某處中止該循環,這就是爲什麼咱們放置了一個 if 語句以便在 currentCharCode 大於 90(表明 Z) 時推送一個 null 值。

這段代碼等價於以前的咱們開始時編寫的那段簡單代碼,但咱們已改成在使用者須要時推送數據。你始終應該這樣作。

實現雙向/變換流

對於雙向流,咱們要在同一個對象上同時現實可讀流和可寫流。就好像是咱們繼承了兩個接口。

如下的例子實現了一個綜合了前面提到的可讀流與可寫流功能的雙向流:

const { Duplex } = require('stream');

const inoutStream = new Duplex({
  write(chunk, encoding, callback) {
    console.log(chunk.toString());
    callback();
  },

  read(size) {
    this.push(String.fromCharCode(this.currentCharCode++));
    if (this.currentCharCode > 90) {
      this.push(null);
    }
  }
});

inoutStream.currentCharCode = 65;

process.stdin.pipe(inoutStream).pipe(process.stdout);複製代碼

經過組合這些方法,咱們能夠經過該雙向流讀取從 A 到 Z 的字母還能夠利用它的回傳特性。咱們把可讀的 stdin 流接入這個雙向流以利用它的回傳特性同時又把它接入可寫的 stdout 流以查看字母 A 到 Z。

理解雙向流的讀取和寫入部分是徹底獨立的這一點很是重要。它只不過是把兩種特性在同一個對象上實現罷了。

變換流是一種更有趣的雙向流,由於它的輸出是基於輸入運算獲得的。

對於一個變換流,咱們不須要實現 readwrite 方法,而是隻須要實現一個 transform 方法便可,它結合了兩者的功能。它的函數簽名和 write 方法一致,咱們也能夠經過它 push 數據。

如下是一個把你輸入的任何內容轉換爲大寫字母的變換流:

const { Transform } = require('stream');

const upperCaseTr = new Transform({
  transform(chunk, encoding, callback) {
    this.push(chunk.toString().toUpperCase());
    callback();
  }
});

process.stdin.pipe(upperCaseTr).pipe(process.stdout);複製代碼

在這個變換流中,咱們只實現了 transform() 方法,卻達到了前面雙向流例子的效果。在該方法中,咱們把 chunk 轉換爲大寫而後經過 push 方法傳遞給下游。

流對象模式

默認狀況下,流接收的參數類型爲 Buffer/String。咱們能夠經過設置 objectMode 參數使得流能夠接受任何 JavaScript 對象。

如下是一個簡單的演示。如下變換流的組合用於把一個逗號分割的字符串轉變成爲一個 JavaScript 對象。傳入 "a,b,c,d" 就變成了 {a: b, c: d}

const { Transform } = require('stream');
const commaSplitter = new Transform({
  readableObjectMode: true,
  transform(chunk, encoding, callback) {
    this.push(chunk.toString().trim().split(','));
    callback();
  }
});
const arrayToObject = new Transform({
  readableObjectMode: true,
  writableObjectMode: true,
  transform(chunk, encoding, callback) {
    const obj = {};
    for(let i=0; i < chunk.length; i+=2) {
      obj[chunk[i]] = chunk[i+1];
    }
    this.push(obj);
    callback();
  }
});
const objectToString = new Transform({
  writableObjectMode: true,
  transform(chunk, encoding, callback) {
    this.push(JSON.stringify(chunk) + '\n');
    callback();
  }
});
process.stdin
  .pipe(commaSplitter)
  .pipe(arrayToObject)
  .pipe(objectToString)
  .pipe(process.stdout)複製代碼

咱們給 commaSplitter 傳入一個字符串(假設是 "a,b,c,d"),它會輸出一個數組做爲可讀數據([「a」, 「b」, 「c」, 「d」])。在該流上增長 readableObjectMode 標記是必須的,由於咱們在給下游推送一個對象,而不是字符串。

咱們接着把 commaSplitter 輸出的數組傳遞給了 arrayToObject 流。咱們須要設置 writableObjectModel 以便讓該流能夠接收一個對象。它還會往下游推送一個對象(輸入的數據被轉換成對象),這就是爲何咱們還須要配置 readableObjectMode 標誌位。最後的 objectToString 流接收一個對象但卻輸出一個字符串,所以咱們只需配置 writableObjectMode 便可。傳遞給下游的只是一個普通字符串。

以上實例代碼的使用方法

Node 內置的變換流

Node 內置了一些很是有用的變換流。這就是 zlib 和 crypto 流。

下面是一個組合了 zlib.createGzip()fs 可讀/可寫流來壓縮文件的腳本:

const fs = require('fs');
const zlib = require('zlib');
const file = process.argv[2];

fs.createReadStream(file)
  .pipe(zlib.createGzip())
  .pipe(fs.createWriteStream(file + '.gz'));複製代碼

你能夠經過該腳本給任何參數中傳入的文件進行 gzip 壓縮。咱們經過可讀流讀取文件內容傳遞給 zlib 內置的變換流,而後經過一個可寫流來寫入新文件。很簡單吧。

使用管道很棒的一點在於,若是有必要,咱們能夠把它和事件組合使用。例如,我但願在腳本執行過程當中給用戶一些進度提示,在腳本執行完成後顯示一條完成消息。既然 pipe 方法會返回下游流,咱們就能夠把註冊事件回調的操做級聯在一塊兒:

const fs = require('fs');
const zlib = require('zlib');
const file = process.argv[2];

fs.createReadStream(file)
  .pipe(zlib.createGzip())
  .on('data', () => process.stdout.write('.'))
  .pipe(fs.createWriteStream(file + '.zz'))
  .on('finish', () => console.log('Done'));複製代碼

因此使用 pipe 方法,咱們能夠很簡單的使用流。當須要時,咱們還能夠經過事件來進一步定製和流的交互。

pipe 方法的好處在於,咱們能夠用一種更加可讀的方式經過若干片斷組合咱們的程序。例如,咱們能夠經過建立一個變換流來顯示進度,而不是直接監聽 data 事件。把 .on() 調用換成另外一個 .pipe() 調用:

const fs = require('fs');
const zlib = require('zlib');
const file = process.argv[2];

const { Transform } = require('stream');

const reportProgress = new Transform({
  transform(chunk, encoding, callback) {
    process.stdout.write('.');
    callback(null, chunk);
  }
});

fs.createReadStream(file)
  .pipe(zlib.createGzip())
  .pipe(reportProgress)
  .pipe(fs.createWriteStream(file + '.zz'))
  .on('finish', () => console.log('Done'));複製代碼

這個 reportProgress 流是一個簡單的直通流,但同時報告了進度信息。請注意我是如何在 transform() 方法中利用 callback() 的第二個參數傳遞數據的。它等價於使用 push 方法推送數據。

組合流的應用是無止境的。例如,假設咱們須要在壓縮文件以前或以後加密它,咱們要作的只不過是在正確的位置引入一個新的變換流。咱們可使用 Node 內置的 crypto 模塊:

**const crypto = require('crypto');
**// ...複製代碼
const crypto = require('crypto');
// ...
fs.createReadStream(file)
  .pipe(zlib.createGzip())
  .pipe(crypto.createCipher('aes192', 'a_secret'))
  .pipe(reportProgress)
  .pipe(fs.createWriteStream(file + '.zz'))
  .on('finish', () => console.log('Done'));複製代碼

以上的腳本對給定的文件先壓縮再加密,只有知道祕鑰的人才能利用生成的文件。咱們不能利用普通的解壓工具解壓該文件,由於它被加密了。

要能真正的解壓任何使用以上腳本壓縮過的文件,咱們須要以相反的順序利用 crypto 和 zlib:

fs.createReadStream(file)
  .pipe(crypto.createDecipher('aes192', 'a_secret'))
  .pipe(zlib.createGunzip())
  .pipe(reportProgress)
  .pipe(fs.createWriteStream(file.slice(0, -3)))
  .on('finish', () => console.log('Done'));複製代碼

假設傳入的文件是壓縮後的版本,以上的腳本會建立一個針對該文件的讀取流,鏈接到一個 crypto 模塊的 createDecipher() 流(使用相同的密鑰),以後將輸出傳遞給一個 zlib 模塊的 createGunzip() 流,最後將獲得的數據寫入一個沒有壓縮文件擴展名的文件。

以上就是我關於本主題要討論的所有內容了。感謝閱讀!下次再見!

若是你認爲這篇文件對你有幫助,請點擊下方的💚。關注我以獲取更多關於 Node.js 和 JavaScript 的文章。

我爲 PluralsightLynda 製做在線課程。我最近的課程是 React.js 入門, 高級 Node.js, 和學習全棧 JavaScript

我還進行線上與現場培訓,內容涵蓋 JavaScript,Node.js,React.js 和 GraphQL 從初級到高級的所有範圍。若是你在尋找一名講師,請聯繫我。我將在今年七月份的 Foward.js 上進行 6 場現場講習班,其中一場是 Node.js 進階

若是關於本文或任何個人其餘文章有疑問,你能夠經過這個 slack 帳號找到我並在 #questions 房間裏提問。


掘金翻譯計劃 是一個翻譯優質互聯網技術文章的社區,文章來源爲 掘金 上的英文分享文章。內容覆蓋 AndroidiOSReact前端後端產品設計 等領域,想要查看更多優質譯文請持續關注 掘金翻譯計劃

相關文章
相關標籤/搜索