做者:Liz Parody翻譯:瘋狂的技術宅javascript
原文:https://nodesource.com/blog/u...html
未經容許嚴禁轉載前端
Node.js 中的流(Stream)是出了名的難用甚至是難以理解。java
用 Dominic Tarr 的話來講:「流是 Node 中最好的,也是最容易被誤解的想法。」即便是 Redux 的建立者和 React.js 的核心團隊成員 Dan Abramov 也懼怕 Node 流。node
本文將幫助你瞭解流以及如何使用。不要懼怕,你徹底能夠把它搞清楚!git
流是爲 Node.js 應用提供動力的基本概念之一。它們是數據處理方法,用於將輸入的數據順序讀取或把數據寫入輸出。程序員
流是一種以有效方式處理讀寫文件、網絡通訊或任何類型的端到端信息交換的方式。github
流的處理方式很是獨特,流不是像傳統方式那樣將文件一次所有讀取到存儲器中,而是逐段讀取數據塊並處理數據的內容,不將其所有保留在內存中。面試
這種方式使流在處理大量數據時很是強大,例如,文件的大小可能大於可用的內存空間,從而沒法將整個文件讀入內存進行處理。那是流的用武之地!算法
既能用流來處理較小的數據塊,也能夠讀取較大的文件。
以 YouTube 或 Netflix 之類的「流媒體」服務爲例:這些服務不會讓你你當即下載視頻和音頻文件。取而代之的是,你的瀏覽器以連續的塊流形式接收視頻,從而使接收者幾乎能夠當即開始觀看和收聽。
可是,流不只涉及處理媒體和大數據。它們還在代碼中賦予了咱們「可組合性」的力量。考慮可組合性的設計意味着可以以某種方式組合多個組件以產生相同類型的結果。在 Node.js 中,能夠經過流在其餘較小的代碼段中傳遞數據,從而組成功能強大的代碼段。
與其餘數據處理方法相比,流基本上具備兩個主要優勢:
fs.createWriteStream()
使咱們可使用流將數據寫入文件。fs.createReadStream()
讓咱們讀取文件的內容。net.Socket
若是你已經使用過 Node.js,則可能遇到過流。例如在基於 Node.js 的 HTTP 服務器中,request
是可讀流,而 response
是可寫流。你可能用過 fs
模塊,該模塊可以讓你用可讀和可寫文件流。每當使用 Express 時,你都在使用流與客戶端進行交互,並且因爲 TCP 套接字、TLS棧和其餘鏈接都基於 Node.js,因此在每一個可使用的數據庫鏈接驅動的程序中使用流。
首先須要可讀性流,而後將其初始化。
const Stream = require('stream') const readableStream = new Stream.Readable()
如今,流已初始化,能夠向其發送數據了:
readableStream.push('ping!') readableStream.push('pong!')
強烈建議在使用流時配合異步迭代器(async iterator)。根據 Axel Rauschmayer 博士的說法,異步迭代是一種用於異步檢索數據容器內容的協議(這意味着當前「任務」能夠在檢索項目以前被暫停)。另外必須說起的是,流異步迭代器實現使用內部的 readable
事件。
從可讀流中讀取時,可使用異步迭代器:
import * as fs from 'fs'; async function logChunks(readable) { for await (const chunk of readable) { console.log(chunk); } } const readable = fs.createReadStream( 'tmp/test.txt', {encoding: 'utf8'}); logChunks(readable); // Output: // 'This is a test!\n'
也能夠用字符串收集可讀流的內容:
import {Readable} from 'stream'; async function readableToString2(readable) { let result = ''; for await (const chunk of readable) { result += chunk; } return result; } const readable = Readable.from('Good morning!', {encoding: 'utf8'}); assert.equal(await readableToString2(readable), 'Good morning!');
注意,在這種狀況下必須使用異步函數,由於咱們想返回 Promise。
請切記不要將異步功能與 EventEmitter
混合使用,由於當前在事件處理程序中發出拒絕時,沒法捕獲拒絕,從而致使難以跟蹤錯誤和內存泄漏。目前的最佳實踐是始終將異步函數的內容包裝在 try/catch 塊中並處理錯誤,但這很容易出錯。 這個 pull request 旨在解決一旦其落在 Node 核心上產生的問題。
要了解有關異步迭代的 Node.js 流的更多信息,請查看這篇很棒的文章。
stream.Readable.from(iterable, [options])
這是一種實用方法,用於從迭代器中建立可讀流,該迭代器保存可迭代對象中包含的數據。可迭代對象能夠是同步可迭代對象或異步可迭代對象。參數選項是可選的,除其餘做用外,還能夠用於指定文本編碼。
const { Readable } = require('stream'); async function * generate() { yield 'hello'; yield 'streams'; } const readable = Readable.from(generate()); readable.on('data', (chunk) => { console.log(chunk); });
根據 Streams API,可讀流有效地以兩種模式之一運行:flowing和paused。可讀流能夠處於對象模式,不管處於 flowing 模式仍是 paused 模式。
EventEmitter
接口使用事件將其儘快提供給程序。stream.read()
方法以從流中讀取數據塊。在 flowing 模式中,要從流中讀取數據,能夠監聽數據事件並附加回調。當有大量數據可用時,可讀流將發出一個數據事件,並執行你的回調。看下面的代碼片斷:
var fs = require("fs"); var data = ''; var readerStream = fs.createReadStream('file.txt'); //Create a readable stream readerStream.setEncoding('UTF8'); // Set the encoding to be utf8. // Handle stream events --> data, end, and error readerStream.on('data', function(chunk) { data += chunk; }); readerStream.on('end',function() { console.log(data); }); readerStream.on('error', function(err) { console.log(err.stack); }); console.log("Program Ended");
函數調用 fs.createReadStream()
給你一個可讀流。最初流處於靜態狀態。一旦你偵聽數據事件並附加了回調,它就會開始流動。以後將讀取大塊數據並將其傳遞給你的回調。流實現者決定發送數據事件的頻率。例如,每當有幾 KB 的數據被讀取時,HTTP 請求就可能發出一個數據事件。當從文件中讀取數據時,你可能會決定讀取一行後就發出數據事件。
當沒有更多數據要讀取(結束)時,流將發出結束事件。在以上代碼段中,咱們監聽此事件以在結束時獲得通知。
另外,若是有錯誤,流將發出並通知錯誤。
在 paused 模式下,你只需在流實例上重複調用 read()
,直到讀完全部數據塊爲止,如如下示例所示:
var fs = require('fs'); var readableStream = fs.createReadStream('file.txt'); var data = ''; var chunk; readableStream.on('readable', function() { while ((chunk=readableStream.read()) != null) { data += chunk; } }); readableStream.on('end', function() { console.log(data) });
read()
函數從內部緩衝區讀取一些數據並將其返回。當沒有內容可讀取時返回 null
。因此在 while
循環中,咱們檢查是否爲 null
並終止循環。請注意,當能夠從流中讀取大量數據時,將會發出可讀事件。
全部 Readable
流均以 paused 模式開始,但能夠經過如下方式之一切換爲 flowing 模式:
stream.resume()
方法。stream.pipe()
方法將數據發送到可寫對象。Readable
可使如下方法之一切換回 paused 模式:
stream.pause()
方法。stream.unpipe()
方法來刪除多個管道目標。一個須要記住的重要概念是,除非提供了一種用於消耗或忽略該數據的機制,不然 Readable
將不會生成數據。若是使用機制被禁用或取消,則 Readable
將會試圖中止生成數據。添加 readable
事件處理會自動使流中止 flowing,並經過 read.read()
獲得數據。若是刪除了 readable
事件處理,那麼若是存在 'data' 事件處理,則流將再次開始 flowing。
要將數據寫入可寫流,你須要在流實例上調用 write()
。如如下示例所示:
var fs = require('fs'); var readableStream = fs.createReadStream('file1.txt'); var writableStream = fs.createWriteStream('file2.txt'); readableStream.setEncoding('utf8'); readableStream.on('data', function(chunk) { writableStream.write(chunk); });
上面的代碼很簡單。它只是簡單地從輸入流中讀取數據塊,並使用 write()
寫入目的地。該函數返回一個布爾值,指示操做是否成功。若是爲 true
,則寫入成功,你能夠繼續寫入更多數據。若是返回 false
,則表示出了點問題,你目前沒法寫任何內容。可寫流將經過發出 drain
事件來通知你何時能夠開始寫入更多數據。
調用 writable.end()
方法表示沒有更多數據將被寫入 Writable。若是提供,則可選的回調函數將做爲 finish
事件的偵聽器附加。
// Write 'hello, ' and then end with 'world!'. const fs = require('fs'); const file = fs.createWriteStream('example.txt'); file.write('hello, '); file.end('world!'); // Writing more now is not allowed!
你能夠用可寫流從可讀流中讀取數據:
const Stream = require('stream') const readableStream = new Stream.Readable() const writableStream = new Stream.Writable() writableStream._write = (chunk, encoding, next) => { console.log(chunk.toString()) next() } readableStream.pipe(writableStream) readableStream.push('ping!') readableStream.push('pong!') writableStream.end()
還能夠用異步迭代器來寫入可寫流,建議使用
import * as util from 'util'; import * as stream from 'stream'; import * as fs from 'fs'; import {once} from 'events'; const finished = util.promisify(stream.finished); // (A) async function writeIterableToFile(iterable, filePath) { const writable = fs.createWriteStream(filePath, {encoding: 'utf8'}); for await (const chunk of iterable) { if (!writable.write(chunk)) { // (B) // Handle backpressure await once(writable, 'drain'); } } writable.end(); // (C) // Wait until done. Throws if there are errors. await finished(writable); } await writeIterableToFile( ['One', ' line of text.\n'], 'tmp/log.txt'); assert.equal( fs.readFileSync('tmp/log.txt', {encoding: 'utf8'}), 'One line of text.\n');
stream.finished()
的默認版本是基於回調的,可是能夠經過 util.promisify()
轉換爲基於 Promise 的版本(A行)。
在此例中,使用如下兩種模式:
Writing to a writable stream while handling backpressure (line B):
在處理 backpressure
時寫入可寫流(B行):
if (!writable.write(chunk)) { await once(writable, 'drain'); }
關閉可寫流,並等待寫入完成(C行):
writable.end(); await finished(writable);
管道是一種機制,能夠將一個流的輸出做爲另外一流的輸入。它一般用於從一個流中獲取數據並將該流的輸出傳遞到另外一個流。管道操做沒有限制。換句話說,管道可用於分多個步驟處理流數據。
在 Node 10.x 中引入了 stream.pipeline()
。這是一種模塊方法,用於在流轉發錯誤和正確清理之間進行管道傳輸,並在管道完成後提供回調。
這是使用管道的例子:
const { pipeline } = require('stream'); const fs = require('fs'); const zlib = require('zlib'); // 使用 pipeline API 能夠輕鬆將一系列流 // 經過管道傳輸在一塊兒,並在管道徹底完成後獲得通知。 // 一個有效地用 gzip壓縮巨大視頻文件的管道: pipeline( fs.createReadStream('The.Matrix.1080p.mkv'), zlib.createGzip(), fs.createWriteStream('The.Matrix.1080p.mkv.gz'), (err) => { if (err) { console.error('Pipeline failed', err); } else { console.log('Pipeline succeeded'); } } );
因爲pipe
不安全,應使用 pipeline
代替 pipe
。
Node.js 流模塊 提供了構建全部流 API 的基礎。
Stream 模塊是 Node.js 中默認提供的原生模塊。 Stream 是 EventEmitter 類的實例,該類在 Node 中異步處理事件。所以流本質上是基於事件的。
要訪問流模塊:
const stream = require('stream');
stream
模塊對於建立新型流實例很是有用。一般不須要使用 stream
模塊來消耗流。
因爲它們的優勢,許多 Node.js 核心模塊提供了原生流處理功能,最值得注意的是:
net.Socket
是流所基於的主 API 節點,它是如下大多數 API 的基礎process.stdin
返回鏈接到 stdin 的流process.stdout
返回鏈接到 stdout 的流process.stderr
返回鏈接到 stderr 的流fs.createReadStream()
建立一個可讀的文件流fs.createWriteStream()
建立可寫的文件流net.connect()
啓動基於流的鏈接http.request()
返回 http.ClientRequest
類的實例,它是可寫流zlib.createGzip()
使用gzip(一種壓縮算法)將數據壓縮到流中zlib.createGunzip()
解壓縮 gzip 流。zlib.createDeflate()
deflate(壓縮算法)將數據壓縮到流中zlib.createInflate()
解壓縮一個deflate流
查看更多:Node.js 流速查表
如下是與可寫流相關的一些重要事件:
error
–表示在寫或配置管道時發生了錯誤。pipeline
– 當把可讀流傳遞到可寫流中時,該事件由可寫流發出。unpipe
– 當你在可讀流上調用 unpipe 並中止將其輸送到目標流中時發出。這就是全部關於流的基礎知識。流、管道和鏈是 Node.js 的核心和最強大的功能。流確實能夠幫你編寫簡潔而高效的代碼來執行 I/O。
另外,還有一個值得期待的 Node.js 戰略計劃,稱爲 BOB,旨在改善 Node.js 的內部數據流以及但願做爲將來 Node.js 流數據接口的公共 API 的。