[譯] Node.js Streams: 你應該知道的事情

閱讀本文須要必定的 Node.js 基礎,若是文中出現了一些不太理解的地方,記得用搜索引擎或者 Node 官方文檔來解惑。html

生產者消費者問題:https://zh.wikipedia.org/zh-h...node

小鬍子哥的文章:http://www.barretlee.com/blog...git

發現這篇文章也不錯:https://github.com/zhengweike...github

閱讀原文web

你們都以爲 Node.js streams 很難用,更難理解。那麼我有一個好消息告訴你,如今已經再也不這樣了。api

這些年來,開發者爲了讓 streams 更好用造了不少輪子。在這篇文章中,我把重點放在原生的 Node.js stream API數組

Streams 是 Node 裏最好的也是被誤解最深的主意。緩存

streams 究竟是啥?

streams 是相似於數組或者字符串那樣的數據集合。不一樣之處在於,streams 有些時候不會一次性拿出全部數據,所以也沒必要擔憂其大小超過內存。 當處理大量數據時,streams 真的很強大。服務器

streams 不只被用在大數據量的處理上,使用 streams 還能讓咱們的代碼更具組合型。就像在 Linux 上能夠經過管道符將其餘較小的命令組成強大的命令同樣,咱們能夠在 Node 中使用 streams 實現同樣的效果。
image.png!thumbnail異步

const grep = ... // A stream for the grep output
const wc = ... // A stream for the wc input
grep.pipe(wc)

Node.js 中許多內置模塊都實現了 streams 接口。

image.png!thumbnail

上面的列表中羅列了一些原生 Node.js 對象,這些對象也是可讀或者可寫的 streams。在這些對象中,有些既是可讀流,也是可寫流,好比 TCP sockets,zlib 和 crypto 。

這些對象是緊密相關的。雖然 HTTP 響應流在客戶端上是可讀的,但它在服務器上是可寫的。這是由於在 HTTP 狀況下,咱們是從一個對象(http.IncomingMessage)讀取,從另外一個對象寫入(http.ServerResponse)。

還要注意,父進程中 stdin 是可讀流,stdout 和 stderr 是可寫流。而在子進程中,正好與父進程相反,stdin 是可寫流,而 stdout 和 stderr 是可讀流。這方便了咱們管理來自主進程的 stdio 流。(譯者注:個人理解是,子進程的 stdin 通常須要接收來自父進程的輸入,因此應當設計成可寫的,子進程的輸出,通常須要導回到父進程中,因此應當設計成可讀的)

streams 的實際使用示例

理論都是美好的,但其實每每不具備足夠的說服力。咱們來看一個可以彰顯 streams 在內存消耗上的不一樣之處的例子。

咱們先要製造一個大文件來充當測試用例:

const fs = require('fs');
const file = fs.createWriteStream('./big.file');

for(let i=0; i<= 1e6; i++) {
  file.write('Lorem ipsum dolor sit amet, consectetur adipisicing elit, sed do eiusmod tempor incididunt ut labore et dolore magna aliqua. Ut enim ad minim veniam, quis nostrud exercitation ullamco laboris nisi ut aliquip ex ea commodo consequat. Duis aute irure dolor in reprehenderit in voluptate velit esse cillum dolore eu fugiat nulla pariatur. Excepteur sint occaecat cupidatat non proident, sunt in culpa qui officia deserunt mollit anim id est laborum.\n');
}

file.end();

我用了可寫流來製造這個大文件。

fs 模塊實現了 streams 的接口,使其既能夠被用來讀取文件,又能夠用來執行寫入的操做。在上面的例子中,咱們經過循環的調用可寫流,在該文件中寫入了 100 萬行。

跑一下上面的腳本將會生成一個大小 400 MB 左右的文件。

這是一個簡單的 Node web 服務器,專門爲 big.file 提供服務:

const fs = require('fs');
const server = require('http').createServer();

server.on('request', (req, res) => {
  fs.readFile('./big.file', (err, data) => {
    if (err) throw err;
  
    res.end(data);
  });
});

server.listen(8000);

當服務器得到請求時,它將使用異步方法 fs.readFile 讀取大文件並返回給客戶端。很簡單的幾行代碼,看上去其表現不會跟一大堆事件循環或者其餘複雜的代碼那樣。

那麼,讓咱們運行服務器,並在請求這個文件時監視內存,看看會發生什麼。

當我啓動服務時,它開始處於一個正常的內存量,8.7 MB:

image.png!thumbnail

而後我發起請求,注意看內存用量發生了什麼變化:

圖片描述

內存用量竟然漲到了 434.8 MB。

咱們基本上把整個 big.file 的內容都放在內存中了,而後再把它寫到響應對象中。這是很是低效的。

HTTP 響應對象(上面的代碼中的 res )也是可寫的流。這意味着若是咱們有一個表明 big.file 的內容的可讀流,咱們能夠直接讓這兩個對象經過 pipe 鏈接,不用耗費 400 MB 的內存就能實現相同的功能。

fs 模塊可使用 createReadStream 方法爲任何文件提供可讀的流。 咱們能夠將其傳遞給響應對象:

const fs = require('fs');
const server = require('http').createServer();

server.on('request', (req, res) => {
  const src = fs.createReadStream('./big.file');
  src.pipe(res);
});

server.listen(8000);

如今,當你再次發出請求,會發生一個神奇的事情(看內存消耗):

圖片描述

發生了什麼?

當客戶端請求這個大文件時,咱們每次流式傳輸一個塊,這意味着咱們不會在內存中緩存該文件。內存的使用量大約增加了25 MB,就是這樣。

你能夠將此示例推到極限。 從新生成 500 萬行而不是 100 萬行的 big.file 文件,這將使文件佔用超過 2 GB,實際上超過了 Node 中的默認緩衝區限制。

默認狀況下(你能夠更改限制),你是沒辦法用 fs.readFile 來向客戶端提供該文件的。可是使用 fs.createReadStream,將2 GB的數據傳輸給請求者徹底沒問題,最重要的是,進程內存使用量也跟以前大體相同。

如今,你準備好學習 streams 了麼?

Streams 101

Node.js 中有四種基本流類型:Readable(可讀流), Writeable(可寫流), Duplex(雙工流), Transform(轉換流)

  • 可讀流是能夠從中消費數據的數據源的抽象。fs.createReadStream 方法就是個例子。
  • 可寫流是能夠被寫入數據的目標的抽象。fs.createWriteStream 方法就是個例子。
  • 雙工流既可讀又可寫。TCP socket 就是個例子。
  • 轉換流基本上是一個雙工流,能夠用於在寫入和讀取數據時修改或轉換數據。 一個例子是使用 gzip 壓縮數據的 zlib.createGzip 流。你能夠將轉換流視爲函數,其中輸入是可寫入流部分,輸出是可讀流部分。你也可能會聽轉換流被稱爲 」through streams」 。

全部流都是 EventEmitter 的實例。它們發出可用於讀取和寫入數據的事件。但其實咱們有更簡單的處理流數據的辦法 —— pipe。

pipe 方法

下面這個魔法公式你應該記住:

readableSrc.pipe(writableDest)

這行簡單的代碼中,咱們將可讀流的輸出(數據源)做爲可寫流的輸入。源必須是可讀流,目標必須是可寫流。固然,它們也能夠是雙工/轉換流。實際上,若是咱們正在處理一個雙工流,那咱們就能夠像在Linux中同樣將 pipe 鏈式調用:

readableSrc
  .pipe(transformStream1)
  .pipe(transformStream2)
  .pipe(finalWrtitableDest)

管道方法返回目標流,這使咱們可以在上面進行連續調用。 對於流 a(可讀),b和c(雙工)和d(可寫),咱們能夠這樣作:

a.pipe(b).pipe(c).pipe(d)

# Which is equivalent to:
a.pipe(b)
b.pipe(c)
c.pipe(d)

# Which, in Linux, is equivalent to:
$ a | b | c | d

pipe 方法是最簡單的消費流的方法。通常比較建議使用 pipe 方法或使用事件來消費流,但應該避免混合使用這兩種方式。一般當你使用 pipe 方法時,不須要使用事件,可是若是你須要以自定義的方式使用 streams,那麼使用事件可能有些必要。

Stream events

除了從可讀流源讀取並寫入可寫流目的地以外,pipe 方法還會自動管理一些事情。例如,錯誤處理,文件結尾以及一個流比另外一個流慢或快的狀況。

然而,流也能夠直接與事件一塊兒使用。如下是 pipe 方法主要用於讀取和寫入數據的簡化事件等效代碼:

# readable.pipe(writable)

readable.on('data', (chunk) => {
  writable.write(chunk);
});

readable.on('end', () => {
  writable.end();
});

如下是能夠被用來處理可讀流和可寫流重要事件和函數列表:

image.png!thumbnail

事件和函數在某種程度上是相關的,由於它們一般被一塊兒使用。

可讀流中最重要的事件是:

  • data 事件:當流將一塊數據傳遞給消費者時,該事件被觸發
  • end 事件:當 streams 中沒有數據時,該事件被觸發
    可寫流中最重要的事件是:
  • drain 事件:可寫入流能夠接收更多數據的信號。
  • finish 事件:當全部數據寫入完畢時會觸發該事件。
    事件和函數能夠結合起來使用,以便定製和優化流。要使用可讀流,咱們可使用 pipe/unpipe ,或者 read/unshift/resume 方法。要使用可寫流,咱們能夠將其做爲 pipe/unpipe 的目標,或者用 write 方法寫入它,並在完成後調用 end 方法。

可讀流的暫停和流動模式

可讀流具備兩種主要模式會影響咱們消費數據的方式:

  • 可讀流可能處於暫停模式
  • 也可能處於流動模式
    這兩種模式也常被稱爲 pull 和 push 模式。

默認狀況下,全部可讀流可在暫停模式下啓動,在須要時也能夠輕鬆切換到流動狀態或者返回到暫停狀態。 有時這兩種模式會自動切換。

當可讀流處於暫停模式時,咱們可使用 read() 方法從流按需讀取,然而,對於流模式中的可讀流,數據持續流動,咱們必須監聽事件不斷消耗它。

在流動模式下,若是沒有消費者在處理它,數據實際上可能會丟失(譯者注:生產者一直向緩衝區導入數據,等到緩衝區滿了,卻一直沒有人來消費他,這塊空間可能會被釋放掉)。這就是爲何在流動模式下有可讀流時,須要一個 data 事件處理程序。事實上,只需添加一個 data 事件處理程序便可將暫停的流轉換爲流模式,而刪除 data 事件的處理程序將把流切換回暫停模式。這麼作的一部分緣由是爲了兼容舊的 Node Streams 接口。

要手動切換這兩種流模式,可使用 resume() 和 pause() 方法。

image.png!thumbnail

當使用管道方法消耗可讀流時,咱們沒必要擔憂管道自動管理這些模式。

實現 Streams

當咱們在 Node 中使用 streams 時,主要有兩種不一樣的任務:

  • 一個任務是實現 streams
  • 另外一個是消費 streams
    到目前爲止,咱們一直都在討論怎麼消耗流。接下來,讓咱們實現一些 streams!

stream 的實現者一般是須要依賴 stream 模塊。

實現一個可寫流

要實現可寫流,咱們須要使用 stream 模塊中的 Writable 構造函數。

const { Writable } = require('stream');

咱們能夠經過不少方式實現一個可寫流。 例如,咱們能夠繼承 Writable

class myWritableStream extends Writable {
}

然而,我喜歡更簡單的構造方法。咱們只是從 Writable 構造函數建立一個對象,並傳遞一些選項。 惟一必需的選項是 write 函數,它要處理寫入的數據塊。

const { Writable } = require('stream');
const outStream = new Writable({
  write(chunk, encoding, callback) {
    console.log(chunk.toString());
    callback();
  }
});

process.stdin.pipe(outStream);

write 函數接受三個參數:

  • chunk 參數一般是 buffer ,固然也能夠經過配置修改參數的類型。
  • encoding 在某種狀況下是必須的,但一般咱們能夠忽略它。
  • callback 是處理完數據塊後咱們須要調用的函數。這是寫操做是否成功的信號。要發出失敗信號,只要把 error 對象傳入回調便可。

在 outStream 中,咱們只是將 chunk 做爲一個字符串進行 console.log,而後在沒有錯誤的狀況下調用回調來表示成功。這是一個很簡單的 echo 流,它會輸出任何收到的東西。

要消費這個流,咱們能夠直接使用 process.stdin,這是一個可讀流,因此咱們能夠將 process.stdin 經過 pipe 傳入 outstream。

當咱們運行上面的代碼時,咱們輸入到 process.stdin 的任何內容都將會使用 outStream 中的 console.log 來輸出。

這個例子其實沒啥用,實際上 node 已經內置了這個功能 —— process.stdout。咱們能夠將stdin 經過 pipe 輸入到 stdout 中,咱們能夠用一行代碼實現剛剛的 echo 功能:

process.stdin.pipe(process.stdout);

實現一個可讀流

爲了實現可讀流,咱們須要 Readable 接口並從中構造一個對象:

const { Readable } = require('stream');

const inStream = new Readable({});

有一種實現可讀流的簡單方法,咱們能夠直接 push 咱們想要消費者消費的數據。

const { Readable } = require('stream');

const inStream = new Readable();

inStream.push('ABCDEFGHIJKLM');
inStream.push('NOPQRSTUVWXYZ');

inStream.push(null); // No more data

inStream.pipe(process.stdout);

當咱們向 inStream 中 push null 時,意味着沒有數據要繼續 push 了。

爲了消費這些數據,咱們用 pipe 把他們導入到了 process.stdout 中。

當咱們運行上面的代碼時,咱們將從 inStream 讀取全部數據,並將其輸出到標準輸出流中。 很簡單,但不高效。

咱們基本上將流中的全部數據推送到 process.stdout 。更好的方法是在消費者要求時按需推送數據。咱們能夠經過在可讀流配置中實現 read() 方法:

const inStream = new Readable({
  read(size) {
    // there is a demand on the data... Someone wants to read it.
  }
});

當讀取方法在可讀流上被調用時,該實現能夠將部分數據推送到隊列。例如,咱們能夠一次推一個字母,從字符代碼 65(表示A)開始,並在每次推送時遞增:

const inStream = new Readable({
  read(size) {
    this.push(String.fromCharCode(this.currentCharCode++));
    if (this.currentCharCode > 90) {
      this.push(null);
    }
  }
});

inStream.currentCharCode = 65;

inStream.pipe(process.stdout);

當消費者正在消費可讀流時,read 方法將被持續觸發,每次觸發,又會有新的流進入緩衝區。 咱們要在某個地方中止這個循環,這就是爲何當 currentCharCode 大於90(表示Z)時,if 語句推送 null 。

此代碼的效果等同於咱們剛開始使用的那段代碼,不一樣之處在於,如今能夠按需推送數據。這麼作永遠不會錯~

實現雙工/轉換流

雙工流可讓一個對象既可讀又可寫。就好像這個對象繼承了兩個接口。

下面這個例子經過雙工流實現了上面那兩個可讀流和可寫流的功能:

const { Duplex } = require('stream');

const inoutStream = new Duplex({
  write(chunk, encoding, callback) {
    console.log(chunk.toString());
    callback();
  },

  read(size) {
    this.push(String.fromCharCode(this.currentCharCode++));
    if (this.currentCharCode > 90) {
      this.push(null);
    }
  }
});

inoutStream.currentCharCode = 65;
process.stdin.pipe(inoutStream).pipe(process.stdout);

經過組合這些方法,咱們可使用這個雙工流來讀取從 A 到 Z 的字母,咱們還能利用它可以輸出的特徵。咱們將可讀的 stdin 流經過 pipe 導入到這個雙工流中以使用 echo 功能,並將雙工流自己經過 pipe 導入到可寫入的 stdout 流中,以查看字母 A 到 Z 。

重要的是要了解雙工流的可讀和可寫是徹底獨立運行的。雙工流只是將兩個特徵組合到了一個對象上。

轉換流是更有趣的雙工流,由於它的輸出是從其輸入計算的。

對於一個轉換流,咱們不須要實現 read 或者 write 方法,只須要實現一個結合兩者的 transform 方法。咱們既能夠經過 write 方法寫入,也可使用它來 push 數據。

下面這個例子是一個簡單的轉換流,它將你的輸入轉換爲大寫格式後再輸出:

const { Transform } = require('stream');

const upperCaseTr = new Transform({
  transform(chunk, encoding, callback) {
    this.push(chunk.toString().toUpperCase());
    callback();
  }
});

process.stdin.pipe(upperCaseTr).pipe(process.stdout);

在這個轉換流中,咱們正在消費的數據與以前的雙工流示例的數據同樣,咱們只實現了一個transform() 方法。 在該方法中,咱們將 chunk 轉換爲大寫版本,而後將該版本推送爲可讀部分。

流對象模式

默認狀況下,流指望處理的值是 Buffer/String 類型。 有一個 objectMode 標誌,咱們能夠設置爲讓流接受任何 JavaScript 對象。

能夠用一個簡單的例子來演示一下。如下轉換流的組合使得將逗號分隔值的字符串映射爲 JavaScript 對象的功能。 因此 「a,b,c,d」 成爲 {a:b,c:d} 。

const { Transform } = require('stream');

const commaSplitter = new Transform({
  readableObjectMode: true,  
  
  transform(chunk, encoding, callback) {
    this.push(chunk.toString().trim().split(','));
    callback();
  }
});

const arrayToObject = new Transform({
  readableObjectMode: true,
  writableObjectMode: true,  
  
  transform(chunk, encoding, callback) {
    const obj = {};
    for (let i=0; i < chunk.length; i+=2) {
      obj[chunk[i]] = chunk[i+1];
    }
    this.push(obj);
    callback();
  }
});

const objectToString = new Transform({
  writableObjectMode: true,
  
  transform(chunk, encoding, callback) {
    this.push(JSON.stringify(chunk) + '\n');
    callback();
  }
});

process.stdin
  .pipe(commaSplitter)
  .pipe(arrayToObject)
  .pipe(objectToString)
  .pipe(process.stdout)

咱們經過 commaSplitter 傳遞輸入字符串(例如,「a,b,c,d」),它將輸入轉換爲數組並將其做爲可讀數據([「a」,「b」,「c」,「d」])。readableObjectMode 這個屬性的配置必不可少,由於咱們是要把一個 JavaScript 對象 push 進來。

而後咱們把數組用 pipe 導入到 arrayToObject 數據流中。咱們須要配置一個 writableObjectMode 屬性來使該流接受一個對象。咱們還須要把最後的結果 push 到可讀流中,所以 readableObjectMode 這個屬性也是必要的。最後一個 objectToString 流接受一個對象可是導出的結果是一個字符串,因此咱們須要 writableObjectMode 屬性來處理輸入的對象,可是由於輸出到可讀流的是字符串,因此不須要配置 readableObjectMode 屬性。(譯者注:因此,若是是要讓可讀流支持 js 對象,要設置 readableObjectMode,要讓可寫流支持 js 對象,要設置 writeableObjectMode)

image.png!thumbnail

Node 內置 transform streams

Node 內置了一些很是有用的轉換流。即 zlib 和 crypto。

下面是一個使用 zlib.createGzip() 流結合 fs 可讀/可寫流建立文件壓縮腳本的示例:

const fs = require('fs');
const zlib = require('zlib');
const file = process.argv[2];

fs.createReadStream(file)
  .pipe(zlib.createGzip())
  .pipe(fs.createWriteStream(file + '.gz'));

你能夠用這個腳本指定的文件進行 gzip 壓縮。咱們將該文件處理成一個可讀流並使用 pipe 輸入到 zlib 內置的轉換流中,而後將新的 gzip 壓縮文件轉換可寫流寫入文件。

使用 pipe 很酷的地方在於,只要咱們須要,就能夠把它們跟事件結合起來。例如,我但願用戶在腳本運行時可以看到一個進度條,而且在腳本工做完成時可以顯示 「Done」 消息。因爲 pipe 方法剛好可以返回目標流,咱們能夠鏈式調用事件處理程序的註冊程序:

const fs = require('fs');
const zlib = require('zlib');
const file = process.argv[2];

fs.createReadStream(file)
  .pipe(zlib.createGzip())
  .on('data', () => process.stdout.write('.'))
  .pipe(fs.createWriteStream(file + '.gz'))
  .on('finish', () => console.log('Done'));

使用 pipe 方法,咱們能夠很輕鬆地消費流,咱們還可以使用須要的事件進一步定製與這些流的交互。

pipe 方法可以把咱們的程序解構成一步一步的方式,讓咱們的程序更具可讀性。例如,咱們能夠建立一個簡單的轉換流來報告進度,而不用像上面那樣監聽 data 事件,能夠直接拿 pipe() 替換 on() :

const fs = require('fs');
const zlib = require('zlib');
const file = process.argv[2];

const { Transform } = require('stream');

const reportProgress = new Transform({
  transform(chunk, encoding, callback) {
    process.stdout.write('.');
    callback(null, chunk);
  }
});

fs.createReadStream(file)
  .pipe(zlib.createGzip())
  .pipe(reportProgress)
  .pipe(fs.createWriteStream(file + '.gz'))
  .on('finish', () => console.log('Done'));

reportProgress 流是一個簡單的直通流,同時它也將進度標準化。注意我如何在回調函數中使用第二個參數來推送 transform() 方法中的數據。 這至關於 push 數據。

把各類流組合起來能夠完成不少事,例如,若是咱們須要在 gzip 以前或以後加密文件,須要作的就是按照需求的順序來管理另外一個轉換流。咱們可使用 Node 的 crypto 模塊:

const crypto = require('crypto');
// ...

fs.createReadStream(file)
  .pipe(zlib.createGzip())
  .pipe(crypto.createCipher('aes192', 'a_secret'))
  .pipe(reportProgress)
  .pipe(fs.createWriteStream(file + '.gz'))
  .on('finish', () => console.log('Done'));

上面的腳本壓縮而後加密傳遞的文件,只有知道祕鑰的人才可使用輸出的文件。咱們沒法使用正常的解壓工具來解壓這個文件,由於它已被加密。

爲了可以解壓以上腳本壓縮的文件,咱們須要以相反的順序使用 crypto 的解密功能和 zlib 的解壓縮功能:

fs.createReadStream(file)
  .pipe(crypto.createDecipher('aes192', 'a_secret'))
  .pipe(zlib.createGunzip())
  .pipe(reportProgress)
  .pipe(fs.createWriteStream(file.slice(0, -3)))
  .on('finish', () => console.log('Done'));

假設傳遞的文件是壓縮版本,上面的代碼先建立了一個可讀流,將該文件傳輸到 crypto.createDecipher() 流中(使用相同的祕鑰),將結果經過 pipe 輸入到 zlib.createGunzip() 流中, 而後將最終結果寫到文件中。

感謝閱讀~

相關文章
相關標籤/搜索