接觸過 Node.js 的開發人員可能知道,流(Stream)這個概念比較難理解,也不太好處理。html
這篇文章就來幫你理解流的概念,以及如何使用它。別擔憂,必定會搞懂的。node
流(Stream)是驅動 Node.js 應用的基礎概念之一。它是數據處理方法,用於按順序將輸入讀寫到輸出中。git
流是一種處理讀寫文件、網絡通訊或任何端到端信息交換的有效方式。github
流的獨特之處在於,它不像傳統的程序那樣一次將一個文件讀入內存,而是逐塊讀取數據、處理其內容,而不是將其所有保存在內存中。算法
這使得流在處理大量數據時很是強大,例如,文件可能大於你的空閒內存,不可能將整個文件讀入內存來處理,這時候流就發揮做用了。數據庫
咱們以 YouTube 或 Netflix 等流媒體服務爲例:這些服務不會讓你當即下載完整的視頻和音頻,而是瀏覽器將視頻做爲連續流的數據塊,能夠作到用戶當即收看。api
然而,流並不只僅用來處理媒體或大數據,它還賦予了代碼的「可組合性」。在設計時考慮到可組合性意味着幾個組件能夠以某種方式組合以產生相同類型的結果。在 Node.js 中,經過使用流將數據從其餘更小的代碼段中導入或導出,能夠組成功能強大的代碼段。瀏覽器
與其餘數據處理方法相比,流有兩個主要優點:安全
fs.createWriteStream()
可使用流將數據寫入文件。fs.createReadStream()
能夠從文件讀取內容。net.Socket
。若是你用過 Node.js,可能已經遇到過流了。例如,在基於 Node.js 的 HTTP 服務器中,request
是可讀流,response
是可寫流。還有fs
模塊,能同時處理可讀和可寫文件流。只要你用 Express,就是在使用流與客戶端進行交互,流也被用於各類數據庫鏈接驅動程序中,由於 TCP 套接字、TLS 堆棧和其餘鏈接都是基於 Node.js 流的。bash
引入模塊並初始化:
const Stream = require('stream')
const readableStream = new Stream.Readable()
複製代碼
初始化後就能夠給它發送數據了:
readableStream.push('ping!')
readableStream.push('pong!')
複製代碼
強烈建議在處理流時使用異步迭代器。異步迭代是一種異步檢索數據容器內容的協議,意味着當前的「任務」可能在檢索數據項以前暫停。另外,值得一提的是,流的異步迭代器的內部實現使用了 readable
事件。
當從可讀的流讀取數據時,可使用 async iterator:
import * as fs from 'fs';
async function logChunks(readable) {
for await (const chunk of readable) {
console.log(chunk);
}
}
const readable = fs.createReadStream(
'tmp/test.txt', {encoding: 'utf8'});
logChunks(readable);
// Output:
// 'This is a test!\n'
複製代碼
也能夠在字符串中收集可讀流的內容:
import { Readable } from 'stream';
async function readableToString2(readable) {
let result = '';
for await (const chunk of readable) {
result += chunk;
}
return result;
}
const readable = Readable.from('Good morning!', { encoding: 'utf8' });
assert.equal(await readableToString2(readable), 'Good morning!');
複製代碼
注意,在本例中,咱們必須使用異步函數,由於咱們但願返回一個 Promise。
記得不要將異步函數與 EventEmitter
搞混了,由於目前沒法捕獲從事件處理程序中發出的 rejection,從而致使難以跟蹤 bug 和內存泄漏。當前的最佳實踐是始終將異步函數的內容封裝在 try/catch
塊中並處理錯誤,但這很容易出錯。這個 pull request就是爲了解決這個問題,若是能加入到 Node 核心代碼的話。
stream.Readable.from(iterable, [options])
是一個實用方法,用於從迭代器建立可讀流,其中的 iterable 包含了數據。iterable 能夠是同步迭代的,也能夠是異步迭代的。options
是可選的,能夠用於指定文本編碼。
const { Readable } = require('stream');
async function * generate() {
yield 'hello';
yield 'streams';
}
const readable = Readable.from(generate());
readable.on('data', (chunk) => {
console.log(chunk);
});
複製代碼
根據 Streams API,可讀流有兩種操做模式: flowing 和 paused。 不管流是處於流模式仍是暫停模式,可讀流均可以用對象模式或非對象模式。
在flowing 模式中,數據從底層系統自動讀取,並經過 EventEmitter
接口以儘量快的速度使用事件提供給應用程序。
在paused 模式中,必須顯式地調用 stream.read()
方法來從流中讀取數據塊。
在 flowing 模式中,要從流中讀取數據,能夠監聽 data
事件並綁定回調。當數據塊可用時,可讀流發出 data
事件並執行回調。代碼以下:
var fs = require("fs");
var data = '';
var readerStream = fs.createReadStream('file.txt'); //Create a readable stream
readerStream.setEncoding('UTF8'); // Set the encoding to be utf8\.
// 處理 stream 事件 --> data, end, 和 error
readerStream.on('data', function(chunk) {
data += chunk;
});
readerStream.on('end',function() {
console.log(data);
});
readerStream.on('error', function(err) {
console.log(err.stack);
});
console.log("Program Ended");
複製代碼
函數調用 fs.createReadStream()
提供了一個可讀流。一開始,流處於靜止狀態。只要監聽 data
事件並綁定回調,它就開始流動。而後,讀取數據塊並將其傳遞給回調。流的實現者能夠決定 data
事件發出的頻率。例如,HTTP 請求能夠在每讀取幾 KB 數據時發出一個 data
事件。當你從文件中讀取數據時,你可能會採起每讀取一行就發出 data
事件。
當沒有更多的數據要讀取(到達尾部)時,流就會發出 end
事件。在上面的代碼中,咱們監聽了這個事件,以便在結束時獲得通知。
另外,若是出現錯誤,流將發出錯誤並通知。
在 paused 模式下,你只須要反覆調用流實例上的 read()
,直到每一塊數據都被讀取,以下所示:
var fs = require('fs');
var readableStream = fs.createReadStream('file.txt');
var data = '';
var chunk;
readableStream.on('readable', function() {
while ((chunk=readableStream.read()) != null) {
data += chunk;
}
});
readableStream.on('end', function() {
console.log(data)
});
複製代碼
read()
函數從內部緩衝區讀取一些數據並返回。當沒有要讀取的內容時,它返回 null
。所以,在while
循環中,咱們檢查null
並終止循環。請注意,readable
事件是在能夠從流中讀取數據塊時發出的。
全部Readable
數據流都以 paused 模式開始,但能夠經過如下方式切換到 flowing 模式:
data
事件處理器stream.resume()
方法stream.pipe()
方法發送數據到一個 Writable
Readable
可使用如下幾種方式切換回 paused 模式:
stream.pause()
方法stream.unpipe()
方法來刪除多個管道目標。要記住的重要概念是,除非提供了一種用於消費或忽略該數據的機制,不然Readable
將不會生成數據。若是消費機制被禁用或取消,Readable
將嘗試中止生成數據。 添加一個readable
事件處理程序會自動使流中止流動,並經過readable.read()
消費數據。若是刪除了readable
事件處理程序,那麼若是存在data
事件處理程序,則流就會再次開始流動。
要將數據寫入可寫流,你須要在流實例上調用write()
。 以下所示:
var fs = require('fs');
var readableStream = fs.createReadStream('file1.txt');
var writableStream = fs.createWriteStream('file2.txt');
readableStream.setEncoding('utf8');
readableStream.on('data', function(chunk) {
writableStream.write(chunk);
});
複製代碼
上面的代碼簡單直白。它只是簡單地從輸入流中讀取數據塊,並使用write()
寫入目標位置。該函數返回一個布爾值,代表操做是否成功。若是爲true,則寫入成功,你能夠繼續寫入更多數據。 若是返回 false,則表示出了點問題,目前沒法寫入任何內容。可寫流將經過發出drain
事件來通知你什麼時候能夠開始寫入更多數據。
調用writable.end()
方法代表沒有更多數據將被寫入Writable
。 若是提供可選的回調函數,將做爲finish
事件的監聽器函數。
// 寫入 'hello, ' 而後以 'world!' 結束
const fs = require('fs');
const file = fs.createWriteStream('example.txt');
file.write('hello, ');
file.end('world!');
// 不容許寫更多內容!
複製代碼
使用可寫流,你能夠從可讀流中讀取數據:
const Stream = require('stream')
const readableStream = new Stream.Readable()
const writableStream = new Stream.Writable()
writableStream._write = (chunk, encoding, next) => {
console.log(chunk.toString())
next()
}
readableStream.pipe(writableStream)
readableStream.push('ping!')
readableStream.push('pong!')
writableStream.end()
複製代碼
你還可使用異步迭代器寫入可寫流,這也是建議的作法:
import * as util from 'util';
import * as stream from 'stream';
import * as fs from 'fs';
import {once} from 'events';
const finished = util.promisify(stream.finished); // (A)
async function writeIterableToFile(iterable, filePath) {
const writable = fs.createWriteStream(filePath, {encoding: 'utf8'});
for await (const chunk of iterable) {
if (!writable.write(chunk)) { // (B)
// 處理反壓
await once(writable, 'drain');
}
}
writable.end(); // (C)
// 等待完成,若是有錯誤則拋出
await finished(writable);
}
await writeIterableToFile(
['One', ' line of text.\n'], 'tmp/log.txt');
assert.equal(
fs.readFileSync('tmp/log.txt', {encoding: 'utf8'}),
'One line of text.\n');
複製代碼
stream.finished()
的默認版本是基於回調的,可是能夠經過util.promisify()
轉換爲基於 Promise 的版本(A行)。
在此示例中,使用瞭如下兩種模式:
寫入可寫流,同時處理反壓(短時負載高峯致使系統接收數據的速率遠高於它處理數據的速率)(B行):
if (!writable.write(chunk)) {
await once(writable, 'drain');
}
複製代碼
關閉可寫流,並等待寫入完成(C行):
writable.end();
await finished(writable);
複製代碼
管道是一種機制,是將一個流的輸出做爲另外一流的輸入。它一般用於從一個流中獲取數據並將該流的輸出傳遞到另外的流。管道操做沒有限制,換句話說,管道用於分步驟處理流數據。
Node 10.x 引入了stream.pipeline()
。 這是一種模塊方法,用於在流之間進行管道傳輸,轉發錯誤信息和數據清理,並在管道完成後提供回調。
下面是使用 pipeline 的一個例子:
const { pipeline } = require('stream');
const fs = require('fs');
const zlib = require('zlib');
// 使用 pipeline API 輕鬆管理多個管道流,而且在管道所有完成時獲得通知
// 一個用來高效壓縮超大視頻文件的管道
pipeline(
fs.createReadStream('The.Matrix.1080p.mkv'),
zlib.createGzip(),
fs.createWriteStream('The.Matrix.1080p.mkv.gz'),
(err) => {
if (err) {
console.error('Pipeline failed', err);
} else {
console.log('Pipeline succeeded');
}
}
);
複製代碼
應該使用pipeline
而不是 pipe
,由於pipe
是不安全的。
Node.js stream 模塊 是構建全部流 API 的基礎。
Stream 模塊是 Node.js 中默認提供的內建模塊。 Stream 是 EventEmitter 類的實例,該類在Node 中用於異步處理事件。 所以,流本質上是基於事件的。
使用stream模塊只需:
const stream = require('stream');
複製代碼
stream
模塊對於建立新型流實例很是有用。一般沒有必要使用stream
模塊來消費流。
因爲它們的優勢,Node.js 許多核心模塊提供了原生流處理功能,最值得注意的是這些:
net.Socket
基於流的主要 node api,是如下大部分 API 的基礎process.stdin
返回鏈接到 stdin 的流process.stdout
返回鏈接到 stdout 的流process.stderr
返回鏈接到 stderr 的流fs.createReadStream()
建立一個文件可讀流fs.createWriteStream()
建立一個文件可寫流net.connect()
初始化一個基於流的鏈接http.request()
返回 http.ClientRequest
類的一個實例,是一個可寫流zlib.createGzip()
用 gzip (一種壓縮算法)將數據壓縮到流zlib.createGunzip()
解壓 gzip 流zlib.createDeflate()
用 deflate (一種壓縮算法)將數據壓縮到流zlib.createInflate()
解壓 deflate 流類型 | 功能 |
---|---|
Readable |
數據提供者 |
Writable |
數據接收者 |
Transform |
提供者和接收者 |
Duplex |
提供者和接收者(獨立的) |
更多內容請查閱文檔: Stream (nodejs.org)
const Readable = require('stream').Readable
const Writable = require('stream').Writable
const Transform = require('stream').Transform
複製代碼
clock() // 可讀流
.pipe(xformer()) // 轉換流
.pipe(renderer()) // 可寫流
複製代碼
stream.push(/*...*/) // Emit a chunk
stream.emit('error', error) // Raise an error
stream.push(null) // Close a stream
複製代碼
const st = source() // 假設 source() 是可讀流
st.on('data', (data) => { console.log('<-', data) })
st.on('error', (err) => { console.log('!', err.message) })
st.on('close', () => { console.log('** bye') })
st.on('finish', () => { console.log('** bye') })
複製代碼
// 開啓和關閉 flowing 模式
st.resume()
st.pause()
// 自動開啓 flowing 模式
st.on('data', /*...*/)
複製代碼
function clock () {
const stream = new Readable({
objectMode: true,
read() {} // 本身實現 read() 方法,若是要按需讀取
})
setInterval(() => {
stream.push({ time: new Date() })
}, 1000)
return stream
}
複製代碼
可讀流是數據生成器,用
stream.push()
寫入數據。
function xformer () {
let count = 0
return new Transform({
objectMode: true,
transform: (data, _, done) => {
done(null, { ...data, index: count++ })
}
})
}
複製代碼
將轉換後的數據塊傳給
done(null, chunk)
.
function renderer () {
return new Writable({
objectMode: true,
write: (data, _, done) => {
console.log('<-', data)
done()
}
})
}
複製代碼
clock() // 可讀流
.pipe(xformer()) // 轉換流
.pipe(renderer()) // 可寫流
複製代碼
如下是與可寫流相關的一些重要事件:
error
– 在寫入/管道操做發生了錯誤時發送pipeline
– 當將可讀流傳遞到可寫流中時,可寫流會發出此事件。unpipe
– 當你在可讀流上調用unpipe
並中止將其輸送到目標流中時發出。這就是全部關於流的基礎知識。 流、管道和鏈式操做是 Node.js 的核心和最強大的功能。流確實能夠幫助你編寫簡潔而高效的代碼來操做 I/O。
此外,還有一個值得期待的Node.js戰略計劃叫作BOB,目標是改善 Node.js 的流數據接口,既可應用於 Node.js 內部核心,未來還有但願用於公開 API。
更多技術乾貨,請關注微信公衆號:1024譯站