本系列文章爲《Node.js Design Patterns Second Edition》的原文翻譯和讀書筆記,在GitHub連載更新,同步翻譯版連接。javascript
歡迎關注個人專欄,以後的博文將在專欄同步:html
Streams
是Node.js
最重要的組件和模式之一。 社區中有一句格言「Stream all the things(Steam就是全部的)」,僅此一點就足以描述流在Node.js
中的地位。 Dominic Tarr
做爲Node.js
社區的最大貢獻者,它將流定義爲Node.js
最好,也是最難以理解的概念。前端
使Node.js
的Streams
如此吸引人還有其它緣由; 此外,Streams
不只與性能或效率等技術特性有關,更重要的是它們的優雅性以及它們與Node.js
的設計理念完美契合的方式。java
在本章中,將會學到如下內容:node
Streams
對於Node.js
的重要性。Streams
。Streams
做爲編程範式,不僅是對於I/O
而言,在多種應用場景下它的應用和強大的功能。Streams
。在基於事件的平臺(如Node.js
)中,處理I / O
的最有效的方法是實時處理,一旦有輸入的信息,立馬進行處理,一旦有須要輸出的結果,也立馬輸出反饋。git
在本節中,咱們將首先介紹Node.js
的Streams
和它的優勢。 請記住,這只是一個概述,由於本章後面將會詳細介紹如何使用和組合Streams
。github
咱們在本書中幾乎全部看到過的異步API都是使用的Buffer
模式。 對於輸入操做,Buffer
模式會未來自資源的全部數據收集到Buffer
區中; 一旦讀取完整個資源,就會把結果傳遞給回調函數。 下圖顯示了這個範例的一個真實的例子:算法
從上圖咱們能夠看到,在t1時刻,一些數據從資源接收並保存到緩衝區。 在t2時刻,最後一段數據被接收到另外一個數據塊,完成讀取操做,這時,把整個緩衝區的內容發送給消費者。npm
另外一方面,Streams
容許你在數據到達時當即處理數據。 以下圖所示:編程
這一張圖顯示了Streams
如何從資源接收每一個新的數據塊,並當即提供給消費者,消費者如今沒必要等待緩衝區中收集全部數據再處理每一個數據塊。
可是這兩種方法有什麼區別呢? 咱們能夠將它們歸納爲兩點:
此外,Node.js
的Streams
具備另外一個重要的優勢:可組合性(composability)。 如今讓咱們看看這些屬性對咱們設計和編寫應用程序的方式會產生什麼影響。
首先,Streams
容許咱們作一些看起來不可能的事情,經過緩衝數據並一次性處理。 例如,考慮一下咱們必須讀取一個很是大的文件,好比說數百MB
甚至千MB
。 顯然,等待徹底讀取文件時返回大Buffer
的API
不是一個好主意。 想象一下,若是併發讀取一些大文件, 咱們的應用程序很容易耗盡內存。 除此以外,V8
中的Buffer
不能大於0x3FFFFFFF
字節(小於1GB
)。 因此,在耗盡物理內存以前,咱們可能會碰壁。
舉一個具體的例子,讓咱們考慮一個簡單的命令行接口(CLI
)的應用程序,它使用Gzip
格式壓縮文件。 使用Buffered
的API
,這樣的應用程序在Node.js
中大概這麼編寫(爲簡潔起見,省略了異常處理):
const fs = require('fs'); const zlib = require('zlib'); const file = process.argv[2]; fs.readFile(file, (err, buffer) => { zlib.gzip(buffer, (err, buffer) => { fs.writeFile(file + '.gz', buffer, err => { console.log('File successfully compressed'); }); }); });
如今,咱們能夠嘗試將前面的代碼放在一個叫作gzip.js
的文件中,而後執行下面的命令:
node gzip <path to file>
若是咱們選擇一個足夠大的文件,好比說大於1GB
的文件,咱們會收到一個錯誤信息,說明咱們要讀取的文件大於最大容許的緩衝區大小,以下所示:
RangeError: File size is greater than possible Buffer:0x3FFFFFFF
上面的例子中,沒找到一個大文件,但確實對於大文件的讀取速率慢了許多。
正如咱們所預料到的那樣,使用Buffer
來進行大文件的讀取顯然是錯誤的。
咱們必須修復咱們的Gzip
應用程序,並使其處理大文件的最簡單方法是使用Streams
的API
。 讓咱們看看如何實現這一點。 讓咱們用下面的代碼替換剛建立的模塊的內容:
const fs = require('fs'); const zlib = require('zlib'); const file = process.argv[2]; fs.createReadStream(file) .pipe(zlib.createGzip()) .pipe(fs.createWriteStream(file + '.gz')) .on('finish', () => console.log('File successfully compressed'));
「是嗎?」你可能會問。是的;正如咱們所說的,因爲Streams
的接口和可組合性,所以咱們還能寫出這樣的更加簡潔,優雅和精煉的代碼。 咱們稍後會詳細地看到這一點,可是如今須要認識到的重要一點是,程序能夠順暢地運行在任何大小的文件上,理想狀況是內存利用率不變。 嘗試一下(但考慮壓縮一個大文件可能須要一段時間)。
如今讓咱們考慮一個壓縮文件並將其上傳到遠程HTTP
服務器的應用程序的例子,該遠程HTTP
服務器進而將其解壓縮並保存到文件系統中。若是咱們的客戶端是使用Buffered
的API
實現的,那麼只有當整個文件被讀取和壓縮時,上傳纔會開始。 另外一方面,只有在接收到全部數據的狀況下,解壓縮纔會在服務器上啓動。 實現相同結果的更好的解決方案涉及使用Streams
。 在客戶端機器上,Streams
只要從文件系統中讀取就能夠壓縮和發送數據塊,而在服務器上,只要從遠程對端接收到數據塊,就能夠解壓每一個數據塊。 咱們經過構建前面提到的應用程序來展現這一點,從服務器端開始。
咱們建立一個叫作gzipReceive.js
的模塊,代碼以下:
const http = require('http'); const fs = require('fs'); const zlib = require('zlib'); const server = http.createServer((req, res) => { const filename = req.headers.filename; console.log('File request received: ' + filename); req .pipe(zlib.createGunzip()) .pipe(fs.createWriteStream(filename)) .on('finish', () => { res.writeHead(201, { 'Content-Type': 'text/plain' }); res.end('That\'s it\n'); console.log(`File saved: ${filename}`); }); }); server.listen(3000, () => console.log('Listening'));
服務器從網絡接收數據塊,將其解壓縮,並在接收到數據塊後當即保存,這要歸功於Node.js
的Streams
。
咱們的應用程序的客戶端將進入一個名爲gzipSend.js
的模塊,以下所示:
在前面的代碼中,咱們再次使用Streams
從文件中讀取數據,而後在從文件系統中讀取的同時壓縮併發送每一個數據塊。
如今,運行這個應用程序,咱們首先使用如下命令啓動服務器:
node gzipReceive
而後,咱們能夠經過指定要發送的文件和服務器的地址(例如localhost
)來啓動客戶端:
node gzipSend <path to file> localhost
若是咱們選擇一個足夠大的文件,咱們將更容易地看到數據如何從客戶端流向服務器,但爲何這種模式下,咱們使用Streams
,比使用Buffered
的API
更有效率? 下圖應該給咱們一個提示:
一個文件被處理的過程,它通過如下階段:
爲了完成處理,咱們必須按照流水線順序那樣通過每一個階段,直到最後。在上圖中,咱們能夠看到,使用Buffered
的API
,這個過程徹底是順序的。爲了壓縮數據,咱們首先必須等待整個文件被讀取完畢,而後,發送數據,咱們必須等待整個文件被讀取和壓縮,依此類推。當咱們使用Streams
時,只要咱們收到第一個數據塊,流水線就會被啓動,而不須要等待整個文件的讀取。但更使人驚訝的是,當下一塊數據可用時,不須要等待上一組任務完成;相反,另外一條裝配線是並行啓動的。由於咱們執行的每一個任務都是異步的,這樣顯得很完美,因此能夠經過Node.js
來並行執行Streams
的相關操做;惟一的限制就是每一個階段都必須保證數據塊的到達順序。
從前面的圖能夠看出,使用Streams
的結果是整個過程花費的時間更少,由於咱們不用等待全部數據被所有讀取完畢和處理。
到目前爲止,咱們已經看到的代碼已經告訴咱們如何使用pipe()
方法來組裝Streams
的數據塊,Streams
容許咱們鏈接不一樣的處理單元,每一個處理單元負責單一的職責(這是符合Node.js
風格的)。這是可能的,由於Streams
具備統一的接口,而且就API
而言,不一樣Streams
也能夠很好的進行交互。惟一的先決條件是管道的下一個Streams
必須支持上一個Streams
生成的數據類型,能夠是二進制,文本甚至是對象,咱們將在後面的章節中看到。
爲了證實Streams
組合性的優點,咱們能夠嘗試在咱們先前構建的gzipReceive / gzipSend
應用程序中添加加密功能。
爲此,咱們只須要經過向流水線添加另外一個Streams
來更新客戶端。 確切地說,由crypto.createChipher()
返回的流。 由此產生的代碼應以下所示:
const fs = require('fs'); const zlib = require('zlib'); const crypto = require('crypto'); const http = require('http'); const path = require('path'); const file = process.argv[2]; const server = process.argv[3]; const options = { hostname: server, port: 3000, path: '/', method: 'PUT', headers: { filename: path.basename(file), 'Content-Type': 'application/octet-stream', 'Content-Encoding': 'gzip' } }; const req = http.request(options, res => { console.log('Server response: ' + res.statusCode); }); fs.createReadStream(file) .pipe(zlib.createGzip()) .pipe(crypto.createCipher('aes192', 'a_shared_secret')) .pipe(req) .on('finish', () => { console.log('File successfully sent'); });
使用相同的方式,咱們更新服務端的代碼,使得它能夠在數據塊進行解壓以前先解密:
const http = require('http'); const fs = require('fs'); const zlib = require('zlib'); const crypto = require('crypto'); const server = http.createServer((req, res) => { const filename = req.headers.filename; console.log('File request received: ' + filename); req .pipe(crypto.createDecipher('aes192', 'a_shared_secret')) .pipe(zlib.createGunzip()) .pipe(fs.createWriteStream(filename)) .on('finish', () => { res.writeHead(201, { 'Content-Type': 'text/plain' }); res.end('That\'s it\n'); console.log(`File saved: ${filename}`); }); }); server.listen(3000, () => console.log('Listening'));
crypto是Node.js的核心模塊之一,提供了一系列加密算法。
只需幾行代碼,咱們就在應用程序中添加了一個加密層。 咱們只須要簡單地經過把已經存在的Streams
模塊和加密層組合到一塊兒,就能夠。相似的,咱們能夠添加和合並其餘Streams
,如同在玩樂高積木同樣。
顯然,這種方法的主要優勢是可重用性,但正如咱們從目前爲止所介紹的代碼中能夠看到的那樣,Streams
也能夠實現更清晰,更模塊化,更加簡潔的代碼。 出於這些緣由,流一般不只僅用於處理純粹的I / O
,並且它仍是簡化和模塊化代碼的手段。
在前面的章節中,咱們瞭解了爲何Streams
如此強大,並且它在Node.js
中無處不在,甚至在Node.js
的核心模塊中也有其身影。 例如,咱們已經看到,fs
模塊具備用於從文件讀取的createReadStream()
和用於寫入文件的createWriteStream()
,HTTP
請求和響應對象本質上是Streams
,而且zlib
模塊容許咱們使用Streams
式API
壓縮和解壓縮數據塊。
如今咱們知道爲何Streams
是如此重要,讓咱們退後一步,開始更詳細地探索它。
Node.js
中的每一個Streams
都是Streams
核心模塊中可用的四個基本抽象類之一的實現:
stream.Readable
stream.Writable
stream.Duplex
stream.Transform
每一個stream
類也是EventEmitter
的一個實例。實際上,Streams
能夠產生幾種類型的事件,好比end
事件會在一個可讀的Streams
完成讀取,或者錯誤讀取,或其過程當中產生異常時觸發。
請注意,爲簡潔起見,在本章介紹的例子中,咱們常常會忽略適當的錯誤處理。可是,在生產環境下中,老是建議爲全部Stream註冊錯誤事件偵聽器。
Streams
之因此如此靈活的緣由之一是它不只可以處理二進制數據,並且幾乎能夠處理任何JavaScript
值。實際上,Streams
能夠支持兩種操做模式:
buffers
或strings
)流式傳輸數據JavaScript
值)這兩種操做模式使咱們不只可使用I / O
流,並且還能夠做爲一種工具,以函數式的風格優雅地組合處理單元,咱們將在本章後面看到。
在本章中,咱們將主要使用在Node.js 0.11中引入的Node.js流接口,也稱爲版本3。 有關與舊接口差別的更多詳細信息,請參閱StrongLoop在 https://strongloop.com/strong...。
一個可讀的Streams
表示一個數據源,在Node.js
中,它使用stream
模塊中的Readableabstract
類實現。
從可讀Streams
接收數據有兩種方式:non-flowing
模式和flowing
模式。 咱們來更詳細地分析這些模式。
從可讀的Streams
中讀取數據的默認模式是爲其附加一個可讀事件偵聽器,用於指示要讀取的新數據的可用性。而後,在一個循環中,咱們讀取全部的數據,直到內部buffer
被清空。這可使用read()
方法完成,該方法同步從內部緩衝區中讀取數據,並返回表示數據塊的Buffer
或String
對象。read()
方法以以下使用模式:
readable.read([size]);
使用這種方法,數據隨時能夠直接從Streams
中按需提取。
爲了說明這是如何工做的,咱們建立一個名爲readStdin.js
的新模塊,它實現了一個簡單的程序,它從標準輸入(一個可讀流)中讀取數據,並將全部數據回送到標準輸出:
process.stdin .on('readable', () => { let chunk; console.log('New data available'); while ((chunk = process.stdin.read()) !== null) { console.log( `Chunk read: (${chunk.length}) "${chunk.toString()}"` ); } }) .on('end', () => process.stdout.write('End of stream'));
read()
方法是一個同步操做,它從可讀Streams
的內部Buffers
區中提取數據塊。若是Streams
在二進制模式下工做,返回的數據塊默認爲一個Buffer
對象。
在以二進制模式工做的可讀的Stream中,咱們能夠經過在Stream上調用setEncoding(encoding)來讀取字符串而不是Buffer對象,並提供有效的編碼格式(例如utf8)。
數據是從可讀的偵聽器中讀取的,只要有新的數據,就會調用這個偵聽器。當內部緩衝區中沒有更多數據可用時,read()
方法返回null
;在這種狀況下,咱們不得不等待另外一個可讀的事件被觸發,告訴咱們能夠再次讀取或者等待表示Streams
讀取過程結束的end
事件觸發。當一個流以二進制模式工做時,咱們也能夠經過向read()
方法傳遞一個size
參數來指定咱們想要讀取的數據大小。這在實現網絡協議或解析特定數據格式時特別有用。
如今,咱們準備運行readStdin
模塊並進行實驗。讓咱們在控制檯中鍵入一些字符,而後按Enter
鍵查看回顯到標準輸出中的數據。要終止流並所以生成一個正常的結束事件,咱們須要插入一個EOF
(文件結束)字符(在Windows
上使用Ctrl + Z
或在Linux
上使用Ctrl + D
)。
咱們也能夠嘗試將咱們的程序與其餘程序鏈接起來;這可使用管道運算符(|
),它將程序的標準輸出重定向到另外一個程序的標準輸入。例如,咱們能夠運行以下命令:
cat <path to a file> | node readStdin
這是流式範例是一個通用接口的一個很好的例子,它使得咱們的程序可以進行通訊,而無論它們是用什麼語言寫的。
從Streams
中讀取的另外一種方法是將偵聽器附加到data
事件;這會將Streams
切換爲flowing
模式,其中數據不是使用read()
函數來提取的,而是一旦有數據到達data
監聽器就被推送到監聽器內。例如,咱們以前建立的readStdin
應用程序將使用流動模式:
process.stdin .on('data', chunk => { console.log('New data available'); console.log( `Chunk read: (${chunk.length}) "${chunk.toString()}"` ); }) .on('end', () => process.stdout.write('End of stream'));
flowing
模式是舊版Streams
接口(也稱爲Streams1
)的繼承,其靈活性較低,API
較少。隨着Streams2
接口的引入,flowing
模式不是默認的工做模式,要啓用它,須要將偵聽器附加到data
事件或顯式調用resume()
方法。 要暫時中斷Streams
觸發data
事件,咱們能夠調用pause()
方法,致使任何傳入數據緩存在內部buffer
中。
調用pause()不會致使Streams切換回non-flowing模式。
如今咱們知道如何從Streams
中讀取數據,下一步是學習如何實現一個新的Readable
數據流。爲此,有必要經過繼承stream.Readable
的原型來建立一個新的類。 具體流必須提供_read()
方法的實現:
readable._read(size)
Readable
類的內部將調用_read()
方法,而該方法又將啓動
使用push()
填充內部緩衝區:
請注意,read()是Stream消費者調用的方法,而_read()是一個由Stream子類實現的方法,不能直接調用。下劃線一般表示該方法爲私有方法,不該該直接調用。
爲了演示如何實現新的可讀Streams
,咱們能夠嘗試實現一個生成隨機字符串的Streams
。 咱們來建立一個名爲randomStream.js
的新模塊,它將包含咱們的字符串的generator
的代碼:
const stream = require('stream'); const Chance = require('chance'); const chance = new Chance(); class RandomStream extends stream.Readable { constructor(options) { super(options); } _read(size) { const chunk = chance.string(); //[1] console.log(`Pushing chunk of size: ${chunk.length}`); this.push(chunk, 'utf8'); //[2] if (chance.bool({ likelihood: 5 })) { //[3] this.push(null); } } } module.exports = RandomStream;
在文件頂部,咱們將加載咱們的依賴關係。除了咱們正在加載一個chance的npm模塊以外,沒有什麼特別之處,它是一個用於生成各類隨機值的庫,從數字到字符串到整個句子都能生成隨機值。
下一步是建立一個名爲RandomStream
的新類,並指定stream.Readable
做爲其父類。 在前面的代碼中,咱們調用父類的構造函數來初始化其內部狀態,並將收到的options
參數做爲輸入。經過options
對象傳遞的可能參數包括如下內容:
Buffers
轉換爲Strings
的encoding
參數(默認值爲null
)objectMode
默認爲false
)buffer
區中的數據的上限,一旦超過這個上限,則暫停從data source
讀取(highWaterMark
默認爲16KB
)好的,如今讓咱們來解釋一下咱們重寫的stream.Readable
類的_read()
方法:
chance
生成隨機字符串。push
內部buffer
。 請注意,因爲咱們push
的是String
,此外咱們還指定了編碼爲utf8
(若是數據塊只是一個二進制Buffer
,則不須要)。5%
的機率隨機中斷stream
的隨機字符串產生,經過push
null
到內部Buffer
來表示EOF
,即stream
的結束。咱們還能夠看到在_read()
函數的輸入中給出的size
參數被忽略了,由於它是一個建議的參數。 咱們能夠簡單地把全部可用的數據都push
到內部的buffer
中,可是若是在同一個調用中有多個推送,那麼咱們應該檢查push()
是否返回false
,由於這意味着內部buffer
已經達到了highWaterMark
限制,咱們應該中止添加更多的數據。
以上就是RandomStream
模塊,咱們如今準備好使用它。咱們來建立一個名爲generateRandom.js
的新模塊,在這個模塊中咱們實例化一個新的RandomStream
對象並從中提取一些數據:
const RandomStream = require('./randomStream'); const randomStream = new RandomStream(); randomStream.on('readable', () => { let chunk; while ((chunk = randomStream.read()) !== null) { console.log(`Chunk received: ${chunk.toString()}`); } });
如今,一切都準備好了,咱們嘗試新的自定義的stream
。 像往常同樣簡單地執行generateRandom
模塊,觀察隨機的字符串在屏幕上流動。
一個可寫的stream
表示一個數據終點,在Node.js
中,它使用stream
模塊中的Writable
抽象類來實現。
把一些數據放在可寫入的stream
中是一件簡單的事情, 咱們所要作的就是使用write()
方法,它具備如下格式:
writable.write(chunk, [encoding], [callback])
encoding
參數是可選的,其在chunk
是String
類型時指定(默認爲utf8
,若是chunk
是Buffer
,則忽略);當數據塊被刷新到底層資源中時,callback
就會被調用,callback
參數也是可選的。
爲了表示沒有更多的數據將被寫入stream
中,咱們必須使用end()
方法:
writable.end([chunk], [encoding], [callback])
咱們能夠經過end()
方法提供最後一塊數據。在這種狀況下,callbak
函數至關於爲finish
事件註冊一個監聽器,當數據塊所有被寫入stream
中時,會觸發該事件。
如今,讓咱們經過建立一個輸出隨機字符串序列的小型HTTP
服務器來演示這是如何工做的:
const Chance = require('chance'); const chance = new Chance(); require('http').createServer((req, res) => { res.writeHead(200, { 'Content-Type': 'text/plain' }); //[1] while (chance.bool({ likelihood: 95 })) { //[2] res.write(chance.string() + '\n'); //[3] } res.end('\nThe end...\n'); //[4] res.on('finish', () => console.log('All data was sent')); //[5] }).listen(8080, () => console.log('Listening on http://localhost:8080'));
咱們建立了一個HTTP服務器
,並把數據寫入res
對象,res
對象是http.ServerResponse
的一個實例,也是一個可寫入的stream
。下面來解釋上述代碼發生了什麼:
HTTP response
的頭部。請注意,writeHead()
不是Writable
接口的一部分,實際上,這個方法是http.ServerResponse
類公開的輔助方法。5%
的機率終止的循環(進入循環體的機率爲chance.bool()
產生,其爲95%
)。stream
。stream
的end()
,表示沒有更多數據塊將被寫入。另外,咱們在結束以前提供一個最終的字符串寫入流中。
finish
事件的監聽器,當全部的數據塊都被刷新到底層socket
中時,這個事件將被觸發。咱們能夠調用這個小模塊稱爲entropyServer.js
,而後執行它。要測試這個服務器,咱們能夠在地址http:// localhost:8080
打開一個瀏覽器,或者從終端使用curl
命令,以下所示:
curl localhost:8080
此時,服務器應該開始向您選擇的HTTP客戶端
發送隨機字符串(請注意,某些瀏覽器可能會緩衝數據,而且流式傳輸行爲可能不明顯)。
相似於在真實管道系統中流動的液體,Node.js
的stream
也可能遭受瓶頸,數據寫入速度可能快於stream
的消耗。 解決這個問題的機制包括緩衝輸入數據;然而,若是數據stream
沒有給生產者任何反饋,咱們可能會產生愈來愈多的數據被累積到內部緩衝區的狀況,致使內存泄露的發生。
爲了防止這種狀況的發生,當內部buffer
超過highWaterMark
限制時,writable.write()
將返回false
。 可寫入的stream
具備highWaterMark
屬性,這是write()
方法開始返回false
的內部Buffer
區大小的限制,一旦Buffer
區的大小超過這個限制,表示應用程序應該中止寫入。 當緩衝器被清空時,會觸發一個叫作drain
的事件,通知再次開始寫入是安全的。 這種機制被稱爲back-pressure
。
本節介紹的機制一樣適用於可讀的stream。事實上,在可讀stream中也存在back-pressure,而且在_read()內調用的push()方法返回false時觸發。 可是,這對於stream實現者來講是一個特定的問題,因此咱們將不常常處理它。
咱們能夠經過修改以前建立的entropyServer
模塊來演示可寫入的stream
的back-pressure
:
const Chance = require('chance'); const chance = new Chance(); require('http').createServer((req, res) => { res.writeHead(200, { 'Content-Type': 'text/plain' }); function generateMore() { //[1] while (chance.bool({ likelihood: 95 })) { const shouldContinue = res.write( chance.string({ length: (16 * 1024) - 1 }) //[2] ); if (!shouldContinue) { //[3] console.log('Backpressure'); return res.once('drain', generateMore); } } res.end('\nThe end...\n', () => console.log('All data was sent')); } generateMore(); }).listen(8080, () => console.log('Listening on http://localhost:8080'));
前面代碼中最重要的步驟能夠歸納以下:
generateMore()
的函數中。back-pressure
的機會,咱們將數據塊的大小增長到16KB-1Byte
,這很是接近默認的highWaterMark
限制。res.write()
的返回值。 若是它返回false
,這意味着內部buffer
已滿,咱們應該中止發送更多的數據。在這種狀況下,咱們從函數中退出,而後新註冊一個寫入事件的發佈者,當drain
事件觸發時調用generateMore
。若是咱們如今嘗試再次運行服務器,而後使用curl
生成客戶端請求,則極可能會有一些back-pressure
,由於服務器以很是高的速度生成數據,速度甚至會比底層socket
更快。
咱們能夠經過繼承stream.Writable
類來實現一個新的可寫入的流,併爲_write()
方法提供一個實現。實現一個咱們自定義的可寫入的Streams
類。
讓咱們構建一個可寫入的stream
,它接收對象的格式以下:
{ path: <path to a file> content: <string or buffer> }
這個類的做用是這樣的:對於每個對象,咱們的stream
必須將content
部分保存到在給定路徑中建立的文件中。 咱們能夠當即看到,咱們stream
的輸入是對象,而不是Strings
或Buffers
,這意味着咱們的stream
必須以對象模式工做。
調用模塊toFileStream.js
:
const stream = require('stream'); const fs = require('fs'); const path = require('path'); const mkdirp = require('mkdirp'); class ToFileStream extends stream.Writable { constructor() { super({ objectMode: true }); } _write(chunk, encoding, callback) { mkdirp(path.dirname(chunk.path), err => { if (err) { return callback(err); } fs.writeFile(chunk.path, chunk.content, callback); }); } } module.exports = ToFileStream;
做爲第一步,咱們加載全部咱們所須要的依賴包。注意,咱們須要模塊mkdirp
,正如你應該從前幾章中所知道的,它應該使用npm
安裝。
咱們建立了一個新類,它從stream.Writable
擴展而來。
咱們不得不調用父構造函數來初始化其內部狀態;咱們還提供了一個option
對象做爲參數,用於指定流在對象模式下工做(objectMode:true
)。stream.Writable
接受的其餘選項以下:
highWaterMark
(默認值是16KB
):控制back-pressure
的上限。decodeStrings
(默認爲true
):在字符串傳遞給_write()
方法以前,將字符串自動解碼爲二進制buffer
區。 在對象模式下這個參數被忽略。最後,咱們爲_write()
方法提供了一個實現。正如你所看到的,這個方法接受一個數據塊,一個編碼方式(只有在二進制模式下,stream
選項decodeStrings
設置爲false
時纔有意義)。
另外,該方法接受一個回調函數,該函數在操做完成時須要調用;而沒必要要傳遞操做的結果,可是若是須要的話,咱們仍然能夠傳遞一個error
對象,這將致使stream
觸發error
事件。
如今,爲了嘗試咱們剛剛構建的stream
,咱們能夠建立一個名爲writeToFile.js
的新模塊,並對該流執行一些寫操做:
const ToFileStream = require('./toFileStream.js'); const tfs = new ToFileStream(); tfs.write({path: "file1.txt", content: "Hello"}); tfs.write({path: "file2.txt", content: "Node.js"}); tfs.write({path: "file3.txt", content: "Streams"}); tfs.end(() => console.log("All files created"));
有了這個,咱們建立並使用了咱們的第一個自定義的可寫入流。 像往常同樣運行新模塊來檢查其輸出;你會看到執行後會建立三個新文件。
雙重的stream
既是可讀的,也可寫的。 當咱們想描述一個既是數據源又是數據終點的實體時(例如socket
),這就顯得十分有用了。 雙工流繼承stream.Readable
和stream.Writable
的方法,因此它對咱們來講並不新鮮。這意味着咱們能夠read()
或write()
數據,或者能夠監聽readable
和drain
事件。
要建立一個自定義的雙重stream
,咱們必須爲_read()
和_write()
提供一個實現。傳遞給Duplex()
構造函數的options
對象在內部被轉發給Readable
和Writable
的構造函數。options
參數的內容與前面討論的相同,options
增長了一個名爲allowHalfOpen
值(默認爲true
),若是設置爲false
,則會致使只要stream
的一方(Readable
和Writable
)結束,stream
就結束了。
爲了使雙重的stream在一方以對象模式工做,而在另外一方以二進制模式工做,咱們須要在流構造器中手動設置如下屬性:
this._writableState.objectMode this._readableState.objectMode
轉換的Streams
是專門設計用於處理數據轉換的一種特殊類型的雙重Streams
。
在一個簡單的雙重Streams
中,從stream
中讀取的數據和寫入到其中的數據之間沒有直接的關係(至少stream
是不可知的)。 想一想一個TCP socket
,它只是向遠程節點發送數據和從遠程節點接收數據。TCP socket
自身沒有意識到輸入和輸出之間有任何關係。
下圖說明了雙重Streams
中的數據流:
另外一方面,轉換的Streams
對從可寫入端接收到的每一個數據塊應用某種轉換,而後在其可讀端使轉換的數據可用。
下圖顯示了數據如何在轉換的Streams
中流動:
從外面看,轉換的Streams
的接口與雙重Streams
的接口徹底相同。可是,當咱們想要構建一個新的雙重Streams
時,咱們必須提供_read()
和_write()
方法,而爲了實現一個新的變換流,咱們必須填寫另外一對方法:_transform()
和_flush()
)。
咱們來演示如何用一個例子來建立一個新的轉換的Streams
。
咱們來實現一個轉換的Streams
,它將替換給定全部出現的字符串。 要作到這一點,咱們必須建立一個名爲replaceStream.js
的新模塊。 讓咱們直接看怎麼實現它:
const stream = require('stream'); const util = require('util'); class ReplaceStream extends stream.Transform { constructor(searchString, replaceString) { super(); this.searchString = searchString; this.replaceString = replaceString; this.tailPiece = ''; } _transform(chunk, encoding, callback) { const pieces = (this.tailPiece + chunk) //[1] .split(this.searchString); const lastPiece = pieces[pieces.length - 1]; const tailPieceLen = this.searchString.length - 1; this.tailPiece = lastPiece.slice(-tailPieceLen); //[2] pieces[pieces.length - 1] = lastPiece.slice(0,-tailPieceLen); this.push(pieces.join(this.replaceString)); //[3] callback(); } _flush(callback) { this.push(this.tailPiece); callback(); } } module.exports = ReplaceStream;
與往常同樣,咱們將從其依賴項開始構建模塊。此次咱們沒有使用第三方模塊。
而後咱們建立了一個從stream.Transform
基類繼承的新類。該類的構造函數接受兩個參數:searchString
和replaceString
。 正如你所想象的那樣,它們容許咱們定義要匹配的文本以及用做替換的字符串。 咱們還初始化一個將由_transform()
方法使用的tailPiece
內部變量。
如今,咱們來分析一下_transform()
方法,它是咱們新類的核心。_transform()
方法與可寫入的stream
的_write()
方法具備幾乎相同的格式,但不是將數據寫入底層資源,而是使用this.push()
將其推入內部buffer
,這與咱們會在可讀流的_read()
方法中執行。這顯示了轉換的Streams
的雙方如何實際鏈接。
ReplaceStream
的_transform()
方法實現了咱們這個新類的核心。正常狀況下,搜索和替換buffer
區中的字符串是一件容易的事情;可是,當數據流式傳輸時,狀況則徹底不一樣,可能的匹配可能分佈在多個數據塊中。代碼後面的程序能夠解釋以下:
searchString
函數做爲分隔符來分割塊。lastPiece
,並提取其最後一個字符searchString.length - 1
。結果被保存到tailPiece
變量中,它將會被做爲下一個數據塊的前綴。split()
獲得的片斷用replaceString
做爲分隔符鏈接在一塊兒,並推入內部buffer
區。當stream
結束時,咱們可能仍然有最後一個tailPiece
變量沒有被壓入內部緩衝區。這正是_flush()
方法的用途;它在stream
結束以前被調用,而且這是咱們最終有機會完成流或者在徹底結束流以前推送任何剩餘數據的地方。
_flush()
方法只須要一個回調函數做爲參數,當全部的操做完成後,咱們必須確保調用這個回調函數。完成了這個,咱們已經完成了咱們的ReplaceStream
類。
如今,是時候嘗試新的stream
。咱們能夠建立另外一個名爲replaceStreamTest.js
的模塊來寫入一些數據,而後讀取轉換的結果:
const ReplaceStream = require('./replaceStream'); const rs = new ReplaceStream('World', 'Node.js'); rs.on('data', chunk => console.log(chunk.toString())); rs.write('Hello W'); rs.write('orld!'); rs.end();
爲了使得這個例子更復雜一些,咱們把搜索詞分佈在兩個不一樣的數據塊上;而後,使用flowing
模式,咱們從同一個stream
中讀取數據,記錄每一個已轉換的塊。運行前面的程序應該產生如下輸出:
Hel lo Node.js !
有一個值得說起是,第五種類型的stream:stream.PassThrough。 與咱們介紹的其餘流類不一樣,PassThrough不是抽象的,能夠直接實例化,而不須要實現任何方法。實際上,這是一個可轉換的stream,它能夠輸出每一個數據塊,而不須要進行任何轉換。
Unix
管道的概念是由Douglas Mcllroy
發明的;這使程序的輸出可以鏈接到下一個的輸入。看看下面的命令:
echo Hello World! | sed s/World/Node.js/g
在前面的命令中,echo
會將Hello World!
寫入標準輸出,而後被重定向到sed
命令的標準輸入(由於有管道操做符 |
)。 而後sed
用Node.js
替換任何World
,並將結果打印到它的標準輸出(此次是控制檯)。
以相似的方式,可使用可讀的Streams
的pipe()
方法將Node.js
的Streams
鏈接在一塊兒,它具備如下接口:
readable.pipe(writable, [options])
很是直觀地,pipe()
方法將從可讀的Streams
中發出的數據抽取到所提供的可寫入的Streams
中。 另外,當可讀的Streams
發出end
事件(除非咱們指定{end:false}
做爲options
)時,可寫入的Streams
將自動結束。 pipe()
方法返回做爲參數傳遞的可寫入的Streams
,若是這樣的stream
也是可讀的(例如雙重或可轉換的Streams
),則容許咱們建立鏈式調用。
將兩個Streams
鏈接到一塊兒時,則容許數據自動流向可寫入的Streams
,因此不須要調用read()
或write()
方法;但最重要的是不須要控制back-pressure
,由於它會自動處理。
舉個簡單的例子(將會有大量的例子),咱們能夠建立一個名爲replace.js
的新模塊,它接受來自標準輸入的文本流,應用替換轉換,而後將數據返回到標準輸出:
const ReplaceStream = require('./replaceStream'); process.stdin .pipe(new ReplaceStream(process.argv[2], process.argv[3])) .pipe(process.stdout);
上述程序未來自標準輸入的數據傳送到ReplaceStream
,而後返回到標準輸出。 如今,爲了實踐這個小應用程序,咱們能夠利用Unix
管道將一些數據重定向到它的標準輸入,以下所示:
echo Hello World! | node replace World Node.js
運行上述程序,會輸出以下結果:
Hello Node.js
這個簡單的例子演示了Streams
(特別是文本Streams
)是一個通用接口,管道幾乎是構成和鏈接全部這些接口的通用方式。
error
事件不會經過管道自動傳播。舉個例子,看以下代碼片斷:
stream1 .pipe(stream2) .on('error', function() {});
在前面的鏈式調用中,咱們將只捕獲來自stream2
的錯誤,這是因爲咱們給其添加了erorr
事件偵聽器。這意味着,若是咱們想捕獲從stream1
生成的任何錯誤,咱們必須直接附加另外一個錯誤偵聽器。 稍後咱們將看到一種能夠實現共同錯誤捕獲的另外一種模式(合併Streams
)。 此外,咱們應該注意到,若是目標Streams
(讀取的Streams
)發出錯誤,它將會對源Streams
通知一個error
,以後致使管道的中斷。
到目前爲止,咱們建立自定義Streams
的方式並不徹底遵循Node
定義的模式;實際上,從stream
基類繼承是違反small surface area
的,並須要一些示例代碼。 這並不意味着Streams
設計得很差,實際上,咱們不該該忘記,由於Streams
是Node.js
核心的一部分,因此它們必須儘量地靈活,普遍拓展Streams
以至於用戶級模塊可以將它們充分運用。
然而,大多數狀況下,咱們並不須要原型繼承能夠給予的全部權力和可擴展性,但一般咱們想要的僅僅是定義新Streams
的一種快速開發的模式。Node.js
社區固然也爲此建立了一個解決方案。 一個完美的例子是through2,一個使得咱們能夠簡單地建立轉換的Streams
的小型庫。 經過through2
,咱們能夠經過調用一個簡單的函數來建立一個新的可轉換的Streams
:
const transform = through2([options], [_transform], [_flush]);
相似的,from2也容許咱們像下面這樣建立一個可讀的Streams
:
const readable = from2([options], _read);
接下來,咱們將在本章其他部分展現它們的用法,那時,咱們會清楚使用這些小型庫的好處。
through和
from是基於
Stream1
規範的頂層庫。
經過咱們已經介紹的例子,應該清楚的是,Streams
不只能夠用來處理I / O
,並且能夠用做處理任何類型數據的優雅編程模式。 但優勢並不止這些;還能夠利用Streams
來實現異步控制流,在本節將會看到。
默認狀況下,Streams
將按順序處理數據;例如,轉換的Streams
的_transform()
函數在前一個數據塊執行callback()
以後纔會進行下一塊數據塊的調用。這是Streams
的一個重要屬性,按正確順序處理每一個數據塊相當重要,可是也能夠利用這一屬性將Streams
實現優雅的傳統控制流模式。
代碼老是比太多的解釋要好得多,因此讓咱們來演示一下如何使用流來按順序執行異步任務的例子。讓咱們建立一個函數來鏈接一組接收到的文件做爲輸入,確保遵照提供的順序。咱們建立一個名爲concatFiles.js
的新模塊,並從其依賴開始:
const fromArray = require('from2-array'); const through = require('through2'); const fs = require('fs');
咱們將使用through2
來簡化轉換的Streams
的建立,並使用from2-array
從一個對象數組中建立可讀的Streams
。
接下來,咱們能夠定義concatFiles()
函數:
function concatFiles(destination, files, callback) { const destStream = fs.createWriteStream(destination); fromArray.obj(files) //[1] .pipe(through.obj((file, enc, done) => { //[2] const src = fs.createReadStream(file); src.pipe(destStream, {end: false}); src.on('end', done); //[3] })) .on('finish', () => { //[4] destStream.end(); callback(); }); } module.exports = concatFiles;
前面的函數經過將files
數組轉換爲Streams
來實現對files
數組的順序迭代。 該函數所遵循的程序解釋以下:
from2-array
從files
數組建立一個可讀的Streams
。through
來建立一個轉換的Streams
來處理序列中的每一個文件。對於每一個文件,咱們建立一個可讀的Streams
,並經過管道將其輸入到表示輸出文件的destStream
中。 在源文件完成讀取後,經過在pipe()
方法的第二個參數中指定{end:false}
,咱們確保不關閉destStream
。destStream
時,咱們調用through.obj
公開的done
函數來傳遞當前處理已經完成,在咱們的狀況下這是須要觸發處理下一個文件。finish
事件被觸發。咱們最後能夠結束destStream
並調用concatFiles()
的callback()
函數,這個函數表示整個操做的完成。咱們如今能夠嘗試使用咱們剛剛建立的小模塊。讓咱們建立一個名爲concat.js
的新文件來完成一個示例:
const concatFiles = require('./concatFiles'); concatFiles(process.argv[2], process.argv.slice(3), () => { console.log('Files concatenated successfully'); });
咱們如今能夠運行上述程序,將目標文件做爲第一個命令行參數,接着是要鏈接的文件列表,例如:
node concat allTogether.txt file1.txt file2.txt
執行這一條命令,會建立一個名爲allTogether.txt
的新文件,其中按順序保存file1.txt
和file2.txt
的內容。
使用concatFiles()
函數,咱們可以僅使用Streams
實現異步操做的順序執行。正如咱們在Chapter3 Asynchronous Control Flow Patters with Callbacks
中看到的那樣,若是使用純JavaScript
實現,或者使用async
等外部庫,則須要使用或實現迭代器。咱們如今提供了另一個能夠達到一樣效果的方法,正如咱們所看到的,它的實現方式很是優雅且可讀性高。
模式:使用Streams或Streams的組合,能夠輕鬆地按順序遍歷一組異步任務。
咱們剛剛看到Streams
按順序處理每一個數據塊,但有時這可能並不能這麼作,由於這樣並無充分利用Node.js
的併發性。若是咱們必須對每一個數據塊執行一個緩慢的異步操做,那麼並行化執行這一組異步任務徹底是有必要的。固然,只有在每一個數據塊之間沒有關係的狀況下才能應用這種模式,這些數據塊可能常常發生在對象模式的Streams
中,可是對於二進制模式的Streams
不多使用無序的並行執行。
注意:當處理數據的順序很重要時,不能使用無序並行執行的Streams。
爲了並行化一個可轉換的Streams
的執行,咱們能夠運用Chapter3 Asynchronous Control Flow Patters with Callbacks
所講到的無序並行執行的相同模式,而後作出一些改變使它們適用於Streams
。讓咱們看看這是如何更改的。
讓咱們用一個例子直接說明:咱們建立一個叫作parallelStream.js
的模塊,而後自定義一個普通的可轉換的Streams
,而後給出一系列可轉換流的方法:
const stream = require('stream'); class ParallelStream extends stream.Transform { constructor(userTransform) { super({objectMode: true}); this.userTransform = userTransform; this.running = 0; this.terminateCallback = null; } _transform(chunk, enc, done) { this.running++; this.userTransform(chunk, enc, this._onComplete.bind(this), this.push.bind(this)); done(); } _flush(done) { if(this.running > 0) { this.terminateCallback = done; } else { done(); } } _onComplete(err) { this.running--; if(err) { return this.emit('error', err); } if(this.running === 0) { this.terminateCallback && this.terminateCallback(); } } } module.exports = ParallelStream;
咱們來分析一下這個新的自定義的類。正如你所看到的同樣,構造函數接受一個userTransform()
函數做爲參數,而後將其另存爲一個實例變量;咱們也調用父構造函數,而且咱們默認啓用對象模式。
接下來,來看_transform()
方法,在這個方法中,咱們執行userTransform()
函數,而後增長當前正在運行的任務個數; 最後,咱們經過調用done()
來通知當前轉換步驟已經完成。_transform()
方法展現瞭如何並行處理另外一項任務。咱們不用等待userTransform()
方法執行完畢再調用done()
。 相反,咱們當即執行done()
方法。另外一方面,咱們提供了一個特殊的回調函數給userTransform()
方法,這就是this._onComplete()
方法;以便咱們在userTransform()
完成的時候收到通知。
在Streams
終止以前,會調用_flush()
方法,因此若是仍有任務正在運行,咱們能夠經過不當即調用done()
回調函數來延遲finish
事件的觸發。相反,咱們將其分配給this.terminateCallback
變量。爲了理解Streams
如何正確終止,來看_onComplete()
方法。
在每組異步任務最終完成時,_onComplete()
方法會被調用。首先,它會檢查是否有任務正在運行,若是沒有,則調用this.terminateCallback()
函數,這將致使Streams
結束,觸發_flush()
方法的finish
事件。
利用剛剛構建的ParallelStream
類能夠輕鬆地建立一個無序並行執行的可轉換的Streams
實例,可是有個注意:它不會保留項目接收的順序。實際上,異步操做能夠在任什麼時候候都有可能完成並推送數據,而跟它們開始的時刻並無必然的聯繫。所以咱們知道,對於二進制模式的Streams
並不適用,由於二進制的Streams
對順序要求較高。
如今,讓咱們使用ParallelStream
模塊實現一個具體的例子。讓咱們想象如下咱們想要構建一個簡單的服務來監控一個大URL
列表的狀態,讓咱們想象如下,全部的這些URL
包含在一個單獨的文件中,而且每個URL
佔據一個空行。
Streams
可以爲這個場景提供一個高效且優雅的解決方案。特別是當咱們使用咱們剛剛寫的ParallelStream
類來無序地審覈這些URL
。
接下來,讓咱們建立一個簡單的放在checkUrls.js
模塊的應用程序。
const fs = require('fs'); const split = require('split'); const request = require('request'); const ParallelStream = require('./parallelStream'); fs.createReadStream(process.argv[2]) //[1] .pipe(split()) //[2] .pipe(new ParallelStream((url, enc, done, push) => { //[3] if(!url) return done(); request.head(url, (err, response) => { push(url + ' is ' + (err ? 'down' : 'up') + '\n'); done(); }); })) .pipe(fs.createWriteStream('results.txt')) //[4] .on('finish', () => console.log('All urls were checked')) ;
正如咱們所看到的,經過流,咱們的代碼看起來很是優雅,直觀。 讓咱們看看它是如何工做的:
Streams
,便於接下來讀取文件。Streams
的內容輸出一個可轉換的Streams
到管道中,而且分隔了數據塊的每一行。ParallelStream
來檢查URL
了,咱們發送一個HEAD
請求而後等待請求的response
。當請求返回時,咱們把請求的結果push
到stream
中。results.txt
文件中。node checkUrls urlList.txt
這裏的文件urlList.txt
包含一組URL
,例如:
http://www.mariocasciaro.me/
http://loige.co/
http://thiswillbedownforsure.com/
當應用執行完成後,咱們能夠看到一個文件results.txt
被建立,裏面包含有操做的結果,例如:
http://thiswillbedownforsure.com is down
http://loige.co is up
http://www.mariocasciaro.me is up
輸出的結果的順序頗有可能與輸入文件中指定URL
的順序不一樣。這是Streams
無序並行執行任務的明顯特徵。
出於好奇,咱們可能想嘗試用一個正常的through2流替換ParallelStream,並比較二者的行爲和性能(你可能想這樣作的一個練習)。咱們將會看到,使用through2的方式會比較慢,由於每一個URL都將按順序進行檢查,並且文件results.txt中結果的順序也會被保留。
若是運行包含數千或數百萬個URL的文件的checkUrls
應用程序,咱們確定會遇到麻煩。咱們的應用程序將同時建立不受控制的鏈接數量,並行發送大量數據,並可能破壞應用程序的穩定性和整個系統的可用性。咱們已經知道,控制負載的無序限制並行執行是一個極好的解決方案。
讓咱們經過建立一個limitedParallelStream.js
模塊來看看它是如何工做的,這個模塊是改編自上一節中建立的parallelStream.js
模塊。
讓咱們看看它的構造函數:
class LimitedParallelStream extends stream.Transform { constructor(concurrency, userTransform) { super({objectMode: true}); this.concurrency = concurrency; this.userTransform = userTransform; this.running = 0; this.terminateCallback = null; this.continueCallback = null; } // ... }
咱們須要一個concurrency
變量做爲輸入來限制併發量,此次咱們要保存兩個回調函數,continueCallback
用於任何掛起的_transform
方法,terminateCallback
用於_flush方法的回調。
接下來看_transform()
方法:
_transform(chunk, enc, done) { this.running++; this.userTransform(chunk, enc, this.push.bind(this), this._onComplete.bind(this)); if(this.running < this.concurrency) { done(); } else { this.continueCallback = done; } }
此次在_transform()
方法中,咱們必須在調用done()
以前檢查是否達到了最大並行數量的限制,若是沒有達到了限制,才能觸發下一個項目的處理。若是咱們已經達到最大並行數量的限制,咱們能夠簡單地將done()
回調保存到continueCallback
變量中,以便在任務完成後當即調用它。
_flush()
方法與ParallelStream
類保持徹底同樣,因此咱們直接轉到實現_onComplete()
方法:
_onComplete(err) { this.running--; if(err) { return this.emit('error', err); } const tmpCallback = this.continueCallback; this.continueCallback = null; tmpCallback && tmpCallback(); if(this.running === 0) { this.terminateCallback && this.terminateCallback(); } }
每當任務完成,咱們調用任何已保存的continueCallback()
將致使stream
解鎖,觸發下一個項目的處理。
這就是limitedParallelStream
模塊。 咱們如今能夠在checkUrls
模塊中使用它來代替parallelStream
,而且將咱們的任務的併發限制在咱們設置的值上。
咱們之前建立的並行Streams
可能會使得數據的順序混亂,可是在某些狀況下這是不可接受的。有時,實際上,有那種須要每一個數據塊都以接收到的相同順序發出的業務場景。咱們仍然能夠並行運行transform
函數。咱們所要作的就是對每一個任務發出的數據進行排序,使其遵循與接收數據相同的順序。
這種技術涉及使用buffer
,在每一個正在運行的任務發出時從新排序塊。爲簡潔起見,咱們不打算提供這樣一個stream
的實現,由於這本書的範圍是至關冗長的;咱們要作的就是重用爲了這個特定目的而構建的npm
上的一個可用包,例如through2-parallel。
咱們能夠經過修改現有的checkUrls
模塊來快速檢查一個有序的並行執行的行爲。 假設咱們但願咱們的結果按照與輸入文件中的URL
相同的順序編寫。 咱們可使用經過through2-parallel
來實現:
const fs = require('fs'); const split = require('split'); const request = require('request'); const throughParallel = require('through2-parallel'); fs.createReadStream(process.argv[2]) .pipe(split()) .pipe(throughParallel.obj({concurrency: 2}, function (url, enc, done) { if(!url) return done(); request.head(url, (err, response) => { this.push(url + ' is ' + (err ? 'down' : 'up') + '\n'); done(); }); })) .pipe(fs.createWriteStream('results.txt')) .on('finish', () => console.log('All urls were checked')) ;
正如咱們所看到的,through2-parallel
的接口與through2
的接口很是類似;惟一的不一樣是在through2-parallel
還能夠爲咱們提供的transform
函數指定一個併發限制。若是咱們嘗試運行這個新版本的checkUrls
,咱們會看到results.txt
文件列出結果的順序與輸入文件中
URLs的出現順序是同樣的。
經過這個,咱們總結了使用Streams
實現異步控制流的分析;接下來,咱們研究管道模式。
就像在現實生活中同樣,Node.js
的Streams
也能夠按照不一樣的模式進行管道鏈接。事實上,咱們能夠將兩個不一樣的Streams
合併成一個Streams
,將一個Streams
分紅兩個或更多的管道,或者根據條件重定向流。 在本節中,咱們將探討可應用於Node.js
的Streams
最重要的管道技術。
在本章中,咱們強調Streams
提供了一個簡單的基礎結構來模塊化和重用咱們的代碼,可是卻漏掉了一個重要的部分:若是咱們想要模塊化和重用整個流水線?若是咱們想要合併多個Streams
,使它們看起來像外部的Streams
,那該怎麼辦?下圖顯示了這是什麼意思:
從上圖中,咱們看到了如何組合幾個流的了:
Streams
的時候,實際上咱們是寫入組合的Streams
的第一個單元,即StreamA
。Streams
中讀取信息時,實際上咱們從組合的Streams
的最後一個單元中讀取。一個組合的Streams
一般是一個多重的Streams
,經過鏈接第一個單元的寫入端和鏈接最後一個單元的讀取端。
要從兩個不一樣的Streams(一個可讀的Streams和一個可寫入的Streams)中建立一個多重的Streams,咱們可使用一個npm模塊,例如 duplexer2。
但上述這麼作並不完整。實際上,組合的Streams
還應該作到捕獲到管道中任意一段Streams
單元產生的錯誤。咱們已經說過,任何錯誤都不會自動傳播到管道中。 因此,咱們必須有適當的錯誤管理,咱們將不得不顯式附加一個錯誤監聽器到每一個Streams
。可是,組合的Streams
其實是一個黑盒,這意味着咱們沒法訪問管道中間的任何單元,因此對於管道中任意單元的異常捕獲,組合的Streams
也充當聚合器的角色。
總而言之,組合的Streams
具備兩個主要優勢:
Streams
自身附加上就能夠了。組合的Streams
是一個很是通用和廣泛的作法,因此若是咱們沒有任何特殊的須要,咱們可能只想重用現有的解決方案,如multipipe或combine-stream。
爲了說明一個簡單的例子,咱們來考慮下面兩個組合的Streams
的狀況:
使用諸如multipipe
之類的庫,咱們能夠經過組合一些核心庫中已有的Streams
(文件combinedStreams.js
)來輕鬆地構建組合的Streams
:
const zlib = require('zlib'); const crypto = require('crypto'); const combine = require('multipipe'); module.exports.compressAndEncrypt = password => { return combine( zlib.createGzip(), crypto.createCipher('aes192', password) ); }; module.exports.decryptAndDecompress = password => { return combine( crypto.createDecipher('aes192', password), zlib.createGunzip() ); };
例如,咱們如今可使用這些組合的數據流,如同黑盒,這些對咱們均是不可見的,能夠建立一個小型應用程序,經過壓縮和加密來歸檔文件。 讓咱們在一個名爲archive.js
的新模塊中作這件事:
const fs = require('fs'); const compressAndEncryptStream = require('./combinedStreams').compressAndEncrypt; fs.createReadStream(process.argv[3]) .pipe(compressAndEncryptStream(process.argv[2])) .pipe(fs.createWriteStream(process.argv[3] + ".gz.enc"));
咱們能夠經過從咱們建立的流水線中構建一個組合的Stream
來進一步改進前面的代碼,但此次並不僅是爲了得到對外不可見的黑盒,而是爲了進行異常捕獲。 實際上,正如咱們已經提到過的那樣,寫下以下的代碼只會捕獲最後一個Stream
單元發出的錯誤:
fs.createReadStream(process.argv[3]) .pipe(compressAndEncryptStream(process.argv[2])) .pipe(fs.createWriteStream(process.argv[3] + ".gz.enc")) .on('error', function(err) { // 只會捕獲最後一個單元的錯誤 console.log(err); });
可是,經過把全部的Streams
結合在一塊兒,咱們能夠優雅地解決這個問題。重構後的archive.js
以下:
const combine = require('multipipe'); const fs = require('fs'); const compressAndEncryptStream = require('./combinedStreams').compressAndEncrypt; combine( fs.createReadStream(process.argv[3]) .pipe(compressAndEncryptStream(process.argv[2])) .pipe(fs.createWriteStream(process.argv[3] + ".gz.enc")) ).on('error', err => { // 使用組合的Stream能夠捕獲任意位置的錯誤 console.log(err); });
正如咱們所看到的,咱們如今能夠將一個錯誤偵聽器直接附加到組合的Streams
,它將接收任何內部流發出的任何error
事件。
如今,要運行archive
模塊,只需在命令行參數中指定password
和file
參數,即壓縮模塊的參數:
node archive mypassword /path/to/a/file.text
經過這個例子,咱們已經清楚地證實了組合的Stream
是多麼重要; 從一個方面來講,它容許咱們建立流的可重用組合,從另外一方面來講,它簡化了管道的錯誤管理。
Streams
咱們能夠經過將單個可讀的Stream
管道化爲多個可寫入的Stream
來執行Stream
的分支。當咱們想要將相同的數據發送到不一樣的目的地時,這便體現其做用了,例如,兩個不一樣的套接字或兩個不一樣的文件。當咱們想要對相同的數據執行不一樣的轉換時,或者當咱們想要根據一些標準拆分數據時,也可使用它。如圖所示:
在Node.js
中分開的Stream
是一件小事。舉例說明。
讓咱們建立一個輸出給定文件的sha1
和md5
散列的小工具。咱們來調用這個新模塊generateHashes.js
,看以下的代碼:
const fs = require('fs'); const crypto = require('crypto'); const sha1Stream = crypto.createHash('sha1'); sha1Stream.setEncoding('base64'); const md5Stream = crypto.createHash('md5'); md5Stream.setEncoding('base64');
目前爲止沒什麼特別的 該模塊的下一個部分其實是咱們將從文件建立一個可讀的Stream
,並將其分叉到兩個不一樣的流,以得到另外兩個文件,其中一個包含sha1
散列,另外一個包含md5
校驗和:
const inputFile = process.argv[2]; const inputStream = fs.createReadStream(inputFile); inputStream .pipe(sha1Stream) .pipe(fs.createWriteStream(inputFile + '.sha1')); inputStream .pipe(md5Stream) .pipe(fs.createWriteStream(inputFile + '.md5'));
這很簡單:inputStream
變量經過管道一邊輸入到sha1Stream
,另外一邊輸入到md5Stream
。可是要注意:
inputStream
結束時,md5Stream
和sha1Stream
會自動結束,除非當調用pipe()
時指定了end
選項爲false
。Stream
的兩個分支會接受相同的數據塊,所以當對數據執行一些反作用的操做時咱們必須很是謹慎,由於那樣會影響另一個分支。inputStream
的數據流的流速會根據接收最慢的分支的流速做出調整。Streams
合併與分開相對,經過把一組可讀的Streams
合併到一個單獨的可寫的Stream
裏,如圖所示:
將多個Streams
合併爲一個一般是一個簡單的操做; 然而,咱們必須注意咱們處理end
事件的方式,由於使用自動結束選項的管道系統會在一個源結束時當即結束目標流。 這一般會致使錯誤,由於其餘還未結束的源將繼續寫入已終止的Stream
。 解決此問題的方法是在將多個源傳輸到單個目標時使用選項{end:false}
,而且只有在全部源完成讀取後纔在目標Stream
上調用end()
。
舉一個簡單的例子,咱們來實現一個小程序,它根據兩個不一樣目錄的內容建立一個壓縮包。 爲此,咱們將介紹兩個新的npm
模塊:
咱們建立一個新模塊mergeTar.js
,以下開始初始化:
var tar = require('tar'); var fstream = require('fstream'); var path = require('path'); var destination = path.resolve(process.argv[2]); var sourceA = path.resolve(process.argv[3]); var sourceB = path.resolve(process.argv[4]);
在前面的代碼中,咱們只加載所有依賴包和初始化包含目標文件和兩個源目錄(sourceA
和sourceB
)的變量。
接下來,咱們建立tar
的Stream
並經過管道輸出到一個可寫入的Stream
:
const pack = tar.Pack(); pack.pipe(fstream.Writer(destination));
如今,咱們開始初始化源Stream
let endCount = 0; function onEnd() { if (++endCount === 2) { pack.end(); } } const sourceStreamA = fstream.Reader({ type: "Directory", path: sourceA }) .on('end', onEnd); const sourceStreamB = fstream.Reader({ type: "Directory", path: sourceB }) .on('end', onEnd);
在前面的代碼中,咱們建立了從兩個源目錄(sourceStreamA
和sourceStreamB
)中讀取的Stream
那麼對於每一個源Stream
,咱們附加一個end
事件訂閱者,只有當這兩個目錄被徹底讀取時,纔會觸發pack
的end
事件。
最後,合併兩個Stream
:
sourceStreamA.pipe(pack, {end: false}); sourceStreamB.pipe(pack, {end: false});
咱們將兩個源文件都壓縮到pack
這個Stream
中,並經過設定pipe()
的option
參數爲{end:false}
配置終點Stream
的自動觸發end
事件。
這樣,咱們已經完成了咱們簡單的TAR
程序。咱們能夠經過提供目標文件做爲第一個命令行參數,而後是兩個源目錄來嘗試運行這個實用程序:
node mergeTar dest.tar /path/to/sourceA /path/to/sourceB
在npm
中咱們能夠找到一些能夠簡化Stream
的合併的模塊:
要注意,流入目標Stream
的數據是隨機混合的,這是一個在某些類型的對象流中能夠接受的屬性(正如咱們在上一個例子中看到的那樣),可是在處理二進制Stream
時一般是一個不但願這樣。
然而,咱們能夠經過一種模式按順序合併Stream
; 它包含一個接一個地合併源Stream
,當前一個結束時,開始發送第二段數據塊(就像鏈接全部源Stream
的輸出同樣)。在npm
上,咱們能夠找到一些也處理這種狀況的軟件包。其中之一是multistream。
合併Stream
模式有一個特殊的模式,咱們並非真的只想將多個Stream
合併在一塊兒,而是使用一個共享通道來傳送一組數據Stream
。與以前的不同,由於源數據Stream
在共享通道內保持邏輯分離,這使得一旦數據到達共享通道的另外一端,咱們就能夠再次分離數據Stream
。如圖所示:
將多個Stream
組合在單個Stream
上傳輸的操做被稱爲多路複用,而相反的操做(即,從共享Stream
接收數據重構原始的Stream
)則被稱爲多路分用。執行這些操做的設備分別稱爲多路複用器和多路分解器(。 這是一個在計算機科學和電信領域普遍研究的話題,由於它是幾乎任何類型的通訊媒體,如電話,廣播,電視,固然還有互聯網自己的基礎之一。 對於本書的範圍,咱們不會過多解釋,由於這是一個很大的話題。
咱們想在本節中演示的是,如何使用共享的Node.js Streams
來傳送多個邏輯上分離的Stream
,而後在共享Stream
的另外一端再次分離,即實現一次多路複用和多路分解。
舉例說明,咱們但願有一個小程序來啓動子進程,並將其標準輸出和標準錯誤都重定向到遠程服務器,服務器接受它們而後保存爲兩個單獨的文件。所以,在這種狀況下,共享介質是TCP
鏈接,而要複用的兩個通道是子進程的stdout
和stderr
。 咱們將利用分組交換的技術,這種技術與IP
,TCP
或UDP
等協議所使用的技術相同,包括將數據封裝在數據包中,容許咱們指定各類源信息,這對多路複用,路由,控制 流程,檢查損壞的數據都十分有幫助。
如圖所示,這個例子的協議大概是這樣,數據被封裝成具備如下結構的數據包:
先說客戶端,建立一個名爲client.js
的模塊,這是咱們這個應用程序的一部分,它負責啓動一個子進程並實現Stream
多路複用。
開始定義模塊,首先加載依賴:
const child_process = require('child_process'); const net = require('net');
而後開始實現多路複用的函數:
function multiplexChannels(sources, destination) { let totalChannels = sources.length; for(let i = 0; i < sources.length; i++) { sources[i] .on('readable', function() { // [1] let chunk; while ((chunk = this.read()) !== null) { const outBuff = new Buffer(1 + 4 + chunk.length); // [2] outBuff.writeUInt8(i, 0); outBuff.writeUInt32BE(chunk.length, 1); chunk.copy(outBuff, 5); console.log('Sending packet to channel: ' + i); destination.write(outBuff); // [3] } }) .on('end', () => { //[4] if (--totalChannels === 0) { destination.end(); } }); } }
multiplexChannels()
函數接受要複用的源Stream
做爲輸入
和複用接口做爲參數,而後執行如下步驟:
Stream
,它會註冊一個readable
事件偵聽器,咱們使用non-flowing
模式從流中讀取數據。channel ID
爲1字節(UInt8
),數據包大小爲4字節(UInt32BE
),而後爲實際數據。Stream
。end
事件註冊一個監聽器,以便當全部源Stream
結束時,end
事件觸發,通知目標Stream
觸發end
事件。
注意,咱們的協議最多可以複用多達256個不一樣的源流,由於咱們只有1個字節來標識
channel
。
const socket = net.connect(3000, () => { // [1] const child = child_process.fork( // [2] process.argv[2], process.argv.slice(3), { silent: true } ); multiplexChannels([child.stdout, child.stderr], socket); // [3] });
在最後,咱們執行如下操做:
TCP
客戶端鏈接到地址localhost:3000
。process.argv
數組做爲子進程的參數。咱們指定選項{silent:true}
,以便子進程不會繼承父級的stdout
和stderr
。mutiplexChannels()
函數將stdout
和stderr
多路複用到socket
裏。如今來看服務端,建立server.js
模塊,在這裏咱們未來自遠程鏈接的Stream
多路分解,並將它們傳送到兩個不一樣的文件中。
首先建立一個名爲demultiplexChannel()
的函數:
function demultiplexChannel(source, destinations) { let currentChannel = null; let currentLength = null; source .on('readable', () => { //[1] let chunk; if(currentChannel === null) { //[2] chunk = source.read(1); currentChannel = chunk && chunk.readUInt8(0); } if(currentLength === null) { //[3] chunk = source.read(4); currentLength = chunk && chunk.readUInt32BE(0); if(currentLength === null) { return; } } chunk = source.read(currentLength); //[4] if(chunk === null) { return; } console.log('Received packet from: ' + currentChannel); destinations[currentChannel].write(chunk); //[5] currentChannel = null; currentLength = null; }) .on('end', () => { //[6] destinations.forEach(destination => destination.end()); console.log('Source channel closed'); }) ; }
上面的代碼可能看起來很複雜,仔細閱讀並不是如此;因爲Node.js
可讀的Stream
的拉動特性,咱們能夠很容易地實現咱們的小協議的多路分解,以下所示:
non-flowing
模式從流中讀取數據。channel ID
,咱們嘗試從流中讀取1個字節,而後將其轉換爲數字。Buffer
尚未足夠的數據,這將致使this.read()
調用返回null
。在這種狀況下,咱們只是中斷解析,而後重試下一個readable
事件。Buffer
中拉出多少數據,因此咱們嘗試讀取全部數據。currentChannel
和currentLength
變量(這些變量將被用來解析下一個數據包)。channel
結束時,必定不要忘記調用目標Stream
的end()
方法。既然咱們能夠多路分解源Stream
,進行以下調用:
net.createServer(socket => { const stdoutStream = fs.createWriteStream('stdout.log'); const stderrStream = fs.createWriteStream('stderr.log'); demultiplexChannel(socket, [stdoutStream, stderrStream]); }) .listen(3000, () => console.log('Server started')) ;
在上面的代碼中,咱們首先在3000
端口上啓動一個TCP
服務器,而後對於咱們接收到的每一個鏈接,咱們將建立兩個可寫入的Stream
,指向兩個不一樣的文件,一個用於標準輸出,另外一個用於標準錯誤; 這些是咱們的目標channel
。 最後,咱們使用demultiplexChannel()
將套接字流解複用爲stdoutStream
和stderrStream
。
如今,咱們準備嘗試運行咱們的新的多路複用/多路分解應用程序,但首先讓咱們建立一個小的Node.js
程序來產生一些示例輸出; 咱們把它叫作generateData.js
:
console.log("out1"); console.log("out2"); console.error("err1"); console.log("out3"); console.error("err2");
首先,讓咱們開始運行服務端:
node server
而後運行客戶端,須要提供做爲子進程的文件參數:
node client generateData.js
客戶端幾乎立馬運行,可是進程結束時,generateData
應用程序的標準輸入和標準輸出通過一個TCP
鏈接,而後在服務器端,被多路分解成兩個文件。
注意,當咱們使用child_process.fork()
時,咱們的客戶端可以啓動別的Node.js
模塊。
咱們剛剛展現的例子演示瞭如何複用和解複用二進制/文本Stream
,但值得一提的是,相同的規則也適用於對象Stream
。 最大的區別是,使用對象,咱們已經有了使用原子消息(對象)傳輸數據的方法,因此多路複用就像設置一個屬性channel ID
到每一個對象同樣簡單,而多路分解只須要讀·channel ID
屬性,並將每一個對象路由到正確的目標Stream
。
還有一種模式是取一個對象上的幾個屬性並分發到多個目的Stream
的模式 經過這種模式,咱們能夠實現複雜的流程,以下圖所示:
如上圖所示,取一個對象Stream
表示animals
,而後根據動物類型:reptiles
,amphibians
和mammals
,而後分發到正確的目標Stream
中。
在本章中,咱們已經對Node.js Streams
及其使用案例進行了闡述,但同時也應該爲編程範式打開一扇大門,幾乎具備無限的可能性。咱們瞭解了爲何Stream
被Node.js
社區讚譽,而且咱們掌握了它們的基本功能,使咱們可以利用它作更多有趣的事情。咱們分析了一些先進的模式,並開始瞭解如何將不一樣配置的Streams
鏈接在一塊兒,掌握這些特性,從而使流如此多才多藝,功能強大。
若是咱們遇到不能用一個Stream
來實現的功能,咱們能夠經過將其餘Streams
鏈接在一塊兒來實現,這是Node.js
的一個很好的特性;Streams
在處理二進制數據,字符串和對象都十分有用,並具備鮮明的特色。
在下一章中,咱們將重點介紹傳統的面向對象的設計模式。儘管JavaScript
在某種程度上是面向對象的語言,但在Node.js
中,函數式或混合方法一般是首選。在閱讀下一章便揭曉答案。