本文節選自 Node.js CheatSheet | Node.js 語法基礎、框架使用與實踐技巧,也能夠閱讀 JavaScript CheatSheet 或者 現代 Web 開發基礎與工程實踐 瞭解更多 JavaScript/Node.js 的實際應用。
Stream 是 Node.js 中的基礎概念,相似於 EventEmitter,專一於 IO 管道中事件驅動的數據處理方式;類比於數組或者映射,Stream 也是數據的集合,只不過其表明了不必定正在內存中的數據。。Node.js 的 Stream 分爲如下類型:node
Stream 自己提供了一套接口規範,不少 Node.js 中的內建模塊都遵循了該規範,譬如著名的 fs
模塊,便是使用 Stream 接口來進行文件讀寫;一樣的,每一個 HTTP 請求是可讀流,而 HTTP 響應則是可寫流。git
const stream = require('stream'); const fs = require('fs'); const readableStream = fs.createReadStream(process.argv[2], { encoding: 'utf8' }); // 手動設置流數據編碼 // readableStream.setEncoding('utf8'); let wordCount = 0; readableStream.on('data', function(data) { wordCount += data.split(/\s{1,}/).length; }); readableStream.on('end', function() { // Don't count the end of the file. console.log('%d %s', --wordCount, process.argv[2]); });
當咱們建立某個可讀流時,其還並未開始進行數據流動;添加了 data 的事件監聽器,它纔會變成流動態的。在這以後,它就會讀取一小塊數據,而後傳到咱們的回調函數裏面。 data
事件的觸發頻次一樣是由實現者決定,譬如在進行文件讀取時,可能每行都會觸發一次;而在 HTTP 請求處理時,可能數 KB 的數據纔會觸發一次。能夠參考 nodejs/readable-stream/_stream_readable 中的相關實現,發現 on 函數會觸發 resume 方法,該方法又會調用 flow 函數進行流讀取:github
// function on if (ev === 'data') { // Start flowing on next tick if stream isn't explicitly paused if (this._readableState.flowing !== false) this.resume(); } ... // function flow while (state.flowing && stream.read() !== null) {}
咱們還能夠監聽 readable
事件,而後手動地進行數據讀取:數組
let data = ''; let chunk; readableStream.on('readable', function() { while ((chunk = readableStream.read()) != null) { data += chunk; } }); readableStream.on('end', function() { console.log(data); });
Readable Stream 還包括以下經常使用的方法:緩存
在平常開發中,咱們能夠用 stream-wormhole 來模擬消耗可讀流:服務器
sendToWormhole(readStream, true);
readableStream.on('data', function(chunk) { writableStream.write(chunk); }); writableStream.end();
當 end()
被調用時,全部數據會被寫入,而後流會觸發一個 finish
事件。注意在調用 end()
以後,你就不能再往可寫流中寫入數據了。app
const { Writable } = require('stream'); const outStream = new Writable({ write(chunk, encoding, callback) { console.log(chunk.toString()); callback(); } }); process.stdin.pipe(outStream);
Writable Stream 中一樣包含一些與 Readable Stream 相關的重要事件:框架
const fs = require('fs'); const inputFile = fs.createReadStream('REALLY_BIG_FILE.x'); const outputFile = fs.createWriteStream('REALLY_BIG_FILE_DEST.x'); // 當創建管道時,才發生了流的流動 inputFile.pipe(outputFile);
多個管道順序調用,便是構建了連接(Chaining):異步
const fs = require('fs'); const zlib = require('zlib'); fs.createReadStream('input.txt.gz') .pipe(zlib.createGunzip()) .pipe(fs.createWriteStream('output.txt'));
管道也經常使用於 Web 服務器中的文件處理,以 Egg.js 中的應用爲例,咱們能夠從 Context 中獲取到文件流並將其傳入到可寫文件流中:分佈式
📎 完整代碼參考 Backend Boilerplate/egg
const awaitWriteStream = require('await-stream-ready').write; const sendToWormhole = require('stream-wormhole'); ... const stream = await ctx.getFileStream(); const filename = md5(stream.filename) + path.extname(stream.filename).toLocaleLowerCase(); //文件生成絕對路徑 const target = path.join(this.config.baseDir, 'app/public/uploads', filename); //生成一個文件寫入文件流 const writeStream = fs.createWriteStream(target); try { //異步把文件流寫入 await awaitWriteStream(stream.pipe(writeStream)); } catch (err) { //若是出現錯誤,關閉管道 await sendToWormhole(stream); throw err; } ...
參照分佈式系統導論,可知在典型的流處理場景中,咱們不能夠避免地要處理所謂的背壓(Backpressure)問題。不管是 Writable Stream 仍是 Readable Stream,實際上都是將數據存儲在內部的 Buffer 中,能夠經過 writable.writableBuffer
或者 readable.readableBuffer
來讀取。當要處理的數據存儲超過了 highWaterMark
或者當前寫入流處於繁忙狀態時,write 函數都會返回 false
。pipe
函數即會自動地幫咱們啓用背壓機制:
當 Node.js 的流機制監測到 write 函數返回了 false
,背壓系統會自動介入;其會暫停當前 Readable Stream 的數據傳遞操做,直到消費者準備完畢。
+===============+ | Your_Data | +=======+=======+ | +-------v-----------+ +-------------------+ +=================+ | Readable Stream | | Writable Stream +---------> .write(chunk) | +-------+-----------+ +---------^---------+ +=======+=========+ | | | | +======================+ | +------------------v---------+ +-----> .pipe(destination) >---+ | Is this chunk too big? | +==^=======^========^==+ | Is the queue busy? | ^ ^ ^ +----------+-------------+---+ | | | | | | | | > if (!chunk) | | ^ | | emit .end(); | | ^ ^ | > else | | | ^ | emit .write(); +---v---+ +---v---+ | | ^----^-----------------< No | | Yes | ^ | +-------+ +---v---+ ^ | | | ^ emit .pause(); +=================+ | | ^---^---------------------+ return false; <-----+---+ | +=================+ | | | ^ when queue is empty +============+ | ^---^-----------------^---< Buffering | | | |============| | +> emit .drain(); | <Buffer> | | +> emit .resume(); +------------+ | | <Buffer> | | +------------+ add chunk to queue | | <--^-------------------< +============+
Duplex Stream 能夠看作讀寫流的聚合體,其包含了相互獨立、擁有獨立內部緩存的兩個讀寫流, 讀取與寫入操做也能夠異步進行:
Duplex Stream ------------------| Read <----- External Source You ------------------| Write -----> External Sink ------------------|
咱們可使用 Duplex 模擬簡單的套接字操做:
const { Duplex } = require('stream'); class Duplexer extends Duplex { constructor(props) { super(props); this.data = []; } _read(size) { const chunk = this.data.shift(); if (chunk == 'stop') { this.push(null); } else { if (chunk) { this.push(chunk); } } } _write(chunk, encoding, cb) { this.data.push(chunk); cb(); } } const d = new Duplexer({ allowHalfOpen: true }); d.on('data', function(chunk) { console.log('read: ', chunk.toString()); }); d.on('readable', function() { console.log('readable'); }); d.on('end', function() { console.log('Message Complete'); }); d.write('....');
在開發中咱們也常常須要直接將某個可讀流輸出到可寫流中,此時也能夠在其中引入 PassThrough,以方便進行額外地監聽:
const { PassThrough } = require('stream'); const fs = require('fs'); const duplexStream = new PassThrough(); // can be piped from reaable stream fs.createReadStream('tmp.md').pipe(duplexStream); // can pipe to writable stream duplexStream.pipe(process.stdout); // 監聽數據,這裏直接輸出的是 Buffer<Buffer 60 60 ... > duplexStream.on('data', console.log);
Transform Stream 則是實現了 _transform
方法的 Duplex Stream,其在兼具讀寫功能的同時,還能夠對流進行轉換:
Transform Stream --------------|-------------- You Write ----> ----> Read You --------------|--------------
這裏咱們實現簡單的 Base64 編碼器:
const util = require('util'); const Transform = require('stream').Transform; function Base64Encoder(options) { Transform.call(this, options); } util.inherits(Base64Encoder, Transform); Base64Encoder.prototype._transform = function(data, encoding, callback) { callback(null, data.toString('base64')); }; process.stdin.pipe(new Base64Encoder()).pipe(process.stdout);