文章翻譯自:Node.js Streams: Everything you need to knownode
在開發者中廣泛認爲Node.js流不但難以應用,並且難以理解。如今有一個好消息,Node.js流將不在難以處理。過去幾年,爲了方便操做Node.js流,開發者開發了許多第三方Node.js包。可是在這篇文章中,我將集中在Node.js原生的流接口應用的介紹。linux
「Streams are Node’s best and most misunderstood idea.」數組
— Dominic Tarr緩存
流就是數據集合----諸如數組或是字符串。不一樣之處在於流沒必要一次所有使用,它們也沒必要適應內存。這兩個特色使流在處理大量數據或一次向外部返回一大塊數據時很是高效。bash
流代碼的組合性,爲流處理大量數據,提供了新的力量。就像把微小linux命令組合成功能豐富的組合命令同樣,Node.js流經過一樣的方式實現數據通道的功能。服務器
const grep = ... // A stream for the grep output
const wc = ... // A stream for the wc input
grep.pipe(wc)
複製代碼
許多Node.js內置模塊都實現流接口:異步
上面展現的API中,一部分原生Node.js對象既是可讀又是可寫流,諸如TCP Sockets,Zlib和Crypto流。socket
值得注意的是,對象的部分行爲是密切相關的。例如:在客戶端HTTP對象是可讀流,在服務端HTTP對象是可寫流。這是由於在HTTP上,程序從一個對象上讀取數據(http.IncomingMessage),而後將讀取的數據寫到另一個對象上(http.ServerResponse)。ide
理論聽起來美妙,但並不能徹底傳遞流的精妙。讓咱們看一個例子,經過這個例子,能夠看出是否使用流對於內存佔用的不一樣影響。函數
讓咱們先建立一個大的文件:
const fs = require('fs');
const file = fs.createWriteStream('./big.file');
for(let i=0; i<= 1e6; i++) {
file.write('Lorem ipsum dolor sit amet, consectetur adipisicing elit, sed do eiusmod tempor incididunt ut labore et dolore magna aliqua. Ut enim ad minim veniam, quis nostrud exercitation ullamco laboris nisi ut aliquip ex ea commodo consequat. Duis aute irure dolor in reprehenderit in voluptate velit esse cillum dolore eu fugiat nulla pariatur. Excepteur sint occaecat cupidatat non proident, sunt in culpa qui officia deserunt mollit anim id est laborum.\n');
}
file.end();
複製代碼
在上面的示例代碼中,fs模塊能夠經過流接口實現對文件的讀和寫。經過循環一百萬次可寫流,將數據寫入到big.file文件中。
執行相應的代碼,生成大約400兆的文件。
下面是一個專門用來操做這個大文件的Node服務器代碼:
const fs = require('fs');
const server = require('http').createServer();
server.on('request', (req, res) => {
fs.readFile('./big.file', (err, data) => {
if (err) throw err;
res.end(data);
});
});
server.listen(8000);
複製代碼
當服務接收請求,程序將會經過異步fs.readFile函數向請求者發送數據報文。表面上看這樣的代碼並不會阻塞程序的事件循環,真的是這樣嗎?
好,當咱們啓動這段服務,而後請求這個服務後,讓咱們看看內存佔用將會有怎樣的變化。
當啓動服務時,服務佔用的內存量是8.7兆。
而後請求這個服務,注意內存佔用的狀況:
哇 ---- 內存佔用忽然間跳到434.8兆。
本質上講,程序在將大數據文件寫入到http響應對象前,會將全部的數據寫入內存中。這種代碼的效率是很是低效的。
HTTP響應對象也是一個可寫流,若是咱們將表明big.file內容可讀流與HTTP相應對象的可寫流在管道中鏈接,程序就能夠經過兩個流管道,在不產生近400兆內存佔用的狀況下,達到相同的結果。
Node.js中的fs模塊經過createReadStream方法,生成讀取文件的可讀流。而後程序能夠經過管道將可讀流傳到http響應對象中:
const fs = require('fs');
const server = require('http').createServer();
server.on('request', (req, res) => {
const src = fs.createReadStream('./big.file');
src.pipe(res);
});
server.listen(8000);
複製代碼
當再次請求服務時,一個神奇的事情發生了(注意內存佔用):
當客戶端請求大文件時,程序一次將一塊數據生成流文件,這就意味着咱們不須要將數據緩存到內存中。內存的佔用也僅僅上升了25兆。
咱們能夠將這個測試用例推到極限。從新生成五百萬行而不是一百萬行的big.file文件,從新生成的文件將會達到2GB,這將大於Node.js默認的緩存量。
使用fs.readFile實現大內存文件的讀取,最好不要修改程序默認的緩存空間。可是若是使用fs.createReadStream,即使請求2GB的數據流也不會有問題。使用第二種方式做爲服務程序的內存佔用幾乎不發生變化。
流在Node.js中有四種:Readable、Writable、Duplex和Transform。
全部流都是EventEmitter模塊的實例,觸發可讀和可寫數據的事件。可是,程序可使用pipe函數消費流數據。
下面是一段你值得記憶的魔法代碼:
readableSrc.pipe(writableDest)
在這一簡單的代碼中,將可讀流的輸出 (數據源) 做爲可寫流的輸入 (目標) 進行管道化。源數據必須是可讀流,目標必須是可寫流。它們也能夠同時是雙工流或者轉換流。事實上, 若是開發者將雙工流傳入管道中, 咱們就能夠像Linux那樣連接到管道調用:
readableSrc
.pipe(transformStream1)
.pipe(transformStream2)
.pipe(finalWrtitableDest)
複製代碼
管道函數返回的是目標流,它能夠容許程序作上面的鏈式調用。下面的代碼: 流a是可讀流、流b與c是雙工流、流c是可寫流。
a.pipe(b).pipe(c).pipe(d)
# Which is equivalent to:
a.pipe(b)
b.pipe(c)
c.pipe(d)
# Which, in Linux, is equivalent to:
$ a | b | c | d
複製代碼
管道(pipe)方法是實現流消費的最簡單方式。一般建議使用管道函數(pipe)或者事件消費流,可是避免將它們混合使用。一般當你使用管道(pipe)函數時,你就不須要使用事件。可是若是程序須要定製流的消費,事件能夠是一個不錯的選擇。
除了讀取可讀流源,並把讀取的數據寫入到可寫的目的地上。管道(pipe)還能夠自動管理一些事情。例如:它能夠處理異常,當一個流比其它流更快或更慢時結束文件。
可是,流能夠經過事件被直接消費。下面是一段等效於管道(pipe)方法的程序,它經過簡化的、與事件等效的代碼實現數據的讀取或寫入。
# readable.pipe(writable)
readable.on('data', (chunk) => {
writable.write(chunk);
});
readable.on('end', () => {
writable.end();
});
複製代碼
這裏有一系列可用於可讀、可寫流的事件或函數。
這些事件或函數一般以某種方式相關聯,關於可讀流的事件有:
關於可寫流的重要事件有: -drain事件,可寫流接受數據時的信號 -finish事件,全部的數據已經刷新到系統底層時觸發
經過事件和函數能夠組合在一塊兒後自定義流或流的優化。爲了消費可讀流,程序可使用pipe/unpipe方法,或是read/unshift/resume方法。爲了消費可寫流,程序能夠將它做爲pipe/unpipe的目的地,或是經過write方法寫入數據,在寫入完成後調用end方法。
可讀流中存在兩種模式影響程序對可讀流的使用:
這些模式又是被認爲是拉和推模式。
全部的可讀流在默認狀況下都是從暫停模式開始,在程序須要時,轉換成流動模式或者暫停模式。有時這種轉換是自發的。
當可讀流在暫停(paused)模式時,咱們可使用read方法按需讀取流數據。可是,對於處在流動(flowing)模式下的可讀流,咱們必須經過監聽事件來消費數據。
在流動(flowing)模式下,若是數據沒有被消費,數據可能會丟失。這就是當程序中有流動的可讀流時,須要data事件處理數據的緣由。事實上,只須要添加data事件就能夠將流從暫停轉換爲流動模式和解除程序與事件監聽器的綁定、將流從流動模式轉換爲暫停模式。其中的一些是爲了向後兼容老版本Node流的接口。
開發者可使用resume方法和pause方法,手動實現兩種流模式的轉換。
當程序使用管道(pipe)方法消費可讀流時,開發這就沒必要關心流模式的轉換了,由於管道(pipe)會自動實現。
##實現流
當咱們Node.js中的流時,有兩種不一樣的任務:
到目前爲止,咱們僅僅討論着消費流。讓咱們實現一些例子吧!
實現流須要咱們在程序中引入流模塊
開發者可使用流模塊中的Writeable構造器,實現可寫流。
const { Writable } = require('stream');
開發者實現可寫流有不少種方式。例如:經過繼承writable構造器
class myWritableStream extends Writable {
}
複製代碼
可是,我更喜歡使用構造器的實現方式。僅僅經過writable接口建立對象並傳遞一些選項:一個必須函數選項是write函數,傳入要寫入的數據塊。
const { Writable } = require('stream');
const outStream = new Writable({
write(chunk, encoding, callback) {
console.log(chunk.toString());
callback();
}
});
process.stdin.pipe(outStream);
複製代碼
write函數有三個參數:
在outStream類中,程序僅僅將數據轉換爲字符串類型打印出來,並在沒有出現異常時調用回調函數,以此來標誌程序的成功執行。這是一個簡單但不是特別有效的回聲流,程序會輸出任何輸入的數據。
要使用這個流,咱們能夠將它與process.stdin一塊兒使用,這是一個可讀的流,將process.stdin傳輸到outStream。
當程序執行時,任何經過process.stdin輸入的數據都會被outStream中的console.log函數打印出來。
可是這個功能能夠經過Node.js內置模塊實現,所以這並非一個很是實用的流。它與process.stdout的功能很是相似,咱們使用下面的代碼能夠實現相同的功能:
process.stdin.pipe(process.stdout);
爲了實現一個可讀流,開發者須要引入Readable的接口後經過這個接口構建對象:
const { Readable } = require('stream');
const inStream = new Readable({});
複製代碼
這是實現可讀流的最簡單方式,開發者能夠直接推送數據以供消費使用。
const { Readable } = require('stream');
const inStream = new Readable();
inStream.push('ABCDEFGHIJKLM');
inStream.push('NOPQRSTUVWXYZ');
inStream.push(null); // No more data
inStream.pipe(process.stdout);
複製代碼
當程序中推送一個空對象時,這就意味着再也不有數據供給可讀流。
開發者能夠將可讀流經過管道傳給process.stdout的方式,供其消費可讀流。
執行這段代碼,程序能夠讀取來自可讀流的數據,並將數據打印出來。很是簡單,可是並不高效。
上段代碼的本質是:把數據推送給流,而後將流經過管道傳給process.stdout消費。其實程序能夠在消費者請求流時,按需推送數據,這種方式比上一種更高效。經過實現readable流中的read函數實現:
const inStream = new Readable({
read(size) {
// there is a demand on the data... Someone wants to read it.
}
});
複製代碼
在readable流中調用read函數,程序能夠將部分數據傳輸到隊列上。例如:每次向隊列中推送一個字母,字母的的序號從65(表明A)開始,每次推送的字母序號都自增1:
const inStream = new Readable({
read(size) {
this.push(String.fromCharCode(this.currentCharCode++));
if (this.currentCharCode > 90) {
this.push(null);
}
}
});
inStream.currentCharCode = 65;
inStream.pipe(process.stdout);
複製代碼
當消費者正在消費可讀流時,read函數就會被激活,程序就會推送更多的字母。經過向隊列些推送空對象,終止循環。如上代碼中,當字母的序號超過90時,終止循環。
這段代碼的功能與以前實現的代碼是等效的,可是當消費者要讀流時,程序能夠按需推送數據的效率更優。所以建議使用這種實現方式。
雙工流:在同一對象上分別實現可讀流和可寫流,就像對象繼承了兩個可讀流和可寫流接口。
下面是一個雙工流,它結合了上面已經實現的可讀流、可寫流的例子:
const { Duplex } = require('stream');
const inoutStream = new Duplex({
write(chunk, encoding, callback) {
console.log(chunk.toString());
callback();
},
read(size) {
this.push(String.fromCharCode(this.currentCharCode++));
if (this.currentCharCode > 90) {
this.push(null);
}
}
});
inoutStream.currentCharCode = 65;
process.stdin.pipe(inoutStream).pipe(process.stdout);
複製代碼
經過實現雙工流的對象,程序能夠讀取A-Z的字母,而後按順序打印出來。開發者將stdin可讀流傳輸到雙工流,而後將雙工流傳輸到stdout可寫流中,打印出A-Z字母。
在雙工流中的可讀流與可寫流是徹底獨立的,雙工流僅僅是一個對象同時具備可讀流和可寫流的功能。理解這一點相當重要。
轉換流比雙工流更有趣,由於它的結果是根據輸入流計算出來的。
對於雙工流,並不須要實現read和write函數,開發者僅僅須要實現transform函數,由於transform函數已經實現了read函數和write函數。
下面是將輸入的字母轉換爲大寫格式後,而後把轉換後的數據傳給可寫流:
const { Transform } = require('stream');
const upperCaseTr = new Transform({
transform(chunk, encoding, callback) {
this.push(chunk.toString().toUpperCase());
callback();
}
});
process.stdin.pipe(upperCaseTr).pipe(process.stdout);
複製代碼
在這個例子中,開發者僅僅經過transform函數,就實現了向上面雙工流的功能。在transform函數中,程序將數據轉換爲大寫後推送到可寫流中。
默認狀況下,流只接受Buffer和String的數據。可是開發者能夠經過設置objectMode標識的值,可使流接受任何Javascript數據。
下面的例子能夠證實這一點。經過一組流將以逗號分隔的字符串轉換爲Javscript對象,因而"a,b,c,d"轉換成{a: b, c : d}。
const { Transform } = require('stream');
const commaSplitter = new Transform({
readableObjectMode: true,
transform(chunk, encoding, callback) {
this.push(chunk.toString().trim().split(','));
callback();
}
});
const arrayToObject = new Transform({
readableObjectMode: true,
writableObjectMode: true,
transform(chunk, encoding, callback) {
const obj = {};
for(let i=0; i < chunk.length; i+=2) {
obj[chunk[i]] = chunk[i+1];
}
this.push(obj);
callback();
}
});
const objectToString = new Transform({
writableObjectMode: true,
transform(chunk, encoding, callback) {
this.push(JSON.stringify(chunk) + '\n');
callback();
}
});
process.stdin
.pipe(commaSplitter)
.pipe(arrayToObject)
.pipe(objectToString)
.pipe(process.stdout)
複製代碼
commaSplitter轉換流將輸入的字符串(例如:「a,b,c,d」)轉換爲數組([「a」, 「b」, 「c」, 「d」])。設置writeObjectMode標識,由於在transform函數中推的數據是對象而不是字符串。
而後將commaSplitter輸出的可讀流傳輸到轉換流arrayToObject中。因爲接收的是對象,一樣須要在arrayToObject中須要設置writableObjectMode標識。因爲須要在程序中推送對象(將傳入的數組轉換爲對象),這也是程序中設置readableObjectMode標識的緣由。最後,轉換流objectToString接收對象,可是輸出字符串。這就是程序中只設置writableObjectModel標識的緣由。輸出的可讀流時正常的字符串(序列化後的數組)。
Node的內置轉換流
Node有許多內置轉換流,如:lib和crypto流。
下面的代碼是使用zlib.createGzip()流與fs模塊的可讀和可寫流相結合,實現壓縮文件的代碼:
const fs = require('fs');
const zlib = require('zlib');
const file = process.argv[2];
fs.createReadStream(file)
.pipe(zlib.createGzip())
.pipe(fs.createWriteStream(file + '.gz'));
複製代碼
程序將讀取文件的可讀流傳輸進Node內置的轉換流zlib中,最後傳輸到建立壓縮文件的可寫流中。所以開發者只要將須要壓縮的文件路徑做爲參數傳進程序中,就能夠實現任何文件的壓縮。
開發者能夠將管道函數與事件結合使用,這是選擇管道函數另外一個緣由。例如:開發者讓程序經過打印出標記符顯示腳本正在執行,並在腳本執行完畢後打印出"Done"信息。pipe函數返回的是目標流,程序能夠在獲取目標流後註冊事件鏈:
const fs = require('fs');
const zlib = require('zlib');
const file = process.argv[2];
fs.createReadStream(file)
.pipe(zlib.createGzip())
.on('data', () => process.stdout.write('.'))
.pipe(fs.createWriteStream(file + '.zz'))
.on('finish', () => console.log('Done'));
複製代碼
開發者經過pipe函數能夠很容易操做流,甚至在須要時,經過事件對通過pipe函數處理後的目標流作一些定製交互。
管道函數的強大之處在於,使用易理解的方式將多個管道函數聯合在一塊兒。例如:不一樣於上個示例,開發者能夠經過傳入一個轉換流,標識腳本正在執行。
const fs = require('fs');
const zlib = require('zlib');
const file = process.argv[2];
const { Transform } = require('stream');
const reportProgress = new Transform({
transform(chunk, encoding, callback) {
process.stdout.write('.');
callback(null, chunk);
}
});
fs.createReadStream(file)
.pipe(zlib.createGzip())
.pipe(reportProgress)
.pipe(fs.createWriteStream(file + '.zz'))
.on('finish', () => console.log('Done'));
複製代碼
reportProgress只是一個轉換流,在這個流中標識腳本正在執行。值得注意的是,代碼中使用callback函數推送transform中的數據。這與先前示例中this.push()的功能是等效的。
組合流的應用場景還有不少。例如:開發者要先加密文件,而後壓縮文件或是先壓縮後加密。若是要完成這個功能,程序只要將文件按照順序傳入流中,使用crypto模塊實現:
const crypto = require('crypto');
// ...
fs.createReadStream(file)
.pipe(zlib.createGzip())
.pipe(crypto.createCipher('aes192', 'a_secret'))
.pipe(reportProgress)
.pipe(fs.createWriteStream(file + '.zz'))
.on('finish', () => console.log('Done'));
複製代碼
上面的代碼實現了壓縮、加密文件,只有知道密碼的用戶纔可使用加密後的文件。由於開發者不能按照普通解壓的工具對加密後壓縮文件,進行解壓。
對於任何經過上面代碼壓縮的文件,開發者只須要以相反的順序使用crypto和zlib流,代碼以下:
fs.createReadStream(file)
.pipe(crypto.createDecipher('aes192', 'a_secret'))
.pipe(zlib.createGunzip())
.pipe(reportProgress)
.pipe(fs.createWriteStream(file.slice(0, -3)))
.on('finish', () => console.log('Done'));
複製代碼
假設傳輸進去的文件是壓縮後的文件,上面的程序首先會生成可讀流,而後傳輸到crypto的createDecipher()流中,接着將輸出的流文件傳輸到zlib的createGunzip()流中,最後寫入到文件中。
上面就是我對這個主題的總結,感謝您的閱讀,期待下次與你相遇。