做爲前端,咱們經常會和 Stream 有着頻繁的接觸。好比使用 gulp 對項目進行構建的時候,咱們會使用 gulp.src 接口將匹配到的文件轉爲 stream(流)的形式,再經過 .pipe() 接口對其進行鏈式加工處理;html
或者好比咱們經過 http 模塊建立一個 HTTP 服務:前端
const http = require('http'); http.createServer( (req, res) => { //... }).listen(3000);
此處的 req 和 res 也屬於 Stream 的消費接口(前者爲 Readable Stream,後者爲 Writable Stream)。node
事實上像上述的 req/res,或者 process.stdout 等接口都屬於 Stream 的實例,所以較少存在狀況,是須要咱們手動引入 Stream 模塊的,例如:git
//demo1.js 'use strict'; const Readable = require('stream').Readable; const rs = Readable(); const s = 'VaJoy'; const l = s.length; let i = 0; rs._read = ()=>{ if(i == l){ rs.push(' is my name'); return rs.push(null) } rs.push(s[i++]) }; rs.pipe(process.stdout);
若是不太能讀懂上述代碼,或者對 Stream 的概念感到模糊,那麼能夠放輕鬆,由於本文會進一步地對 Stream 進行剖析,而且談談直接使用它可能會存在的一些問題(這也是爲什麼 gulp 要使用 through2 的緣由)。github
另外本文的示例都可在個人 github 倉庫(https://github.com/VaJoy/stream/)獲取到,讀者能夠自行下載和調試。npm
一. Stream的做用gulp
在介紹 Stream(流)以前,咱們先來看一個例子 —— 模擬服務器把本地某個文件內容吐給客戶端:api
//demo2 var http = require('http'); var fs = require('fs'); var server = http.createServer(function (req, res) { fs.readFile(__dirname + '/data.txt', function (err, data) { res.end(data); }); }); server.listen(3000);
這段代碼雖然能夠正常執行,但存在一個顯著的問題 —— 對於每個客戶端的請求,fs.readFile 接口都會把整個文件都緩存到內存中去,而後纔開始把數據吐給用戶。那麼當文件體積很大、請求也較多(且特別當請求來自慢速用戶)的時候,服務器須要消耗很大的內存,致使性能低下。緩存
然而這個問題,則正是 stream 發揮所長的地方。如前文說起的,res 是流對象,那咱們正好能夠將其利用起來:服務器
var server2 = http.createServer(function (req, res) { var stream = fs.createReadStream(__dirname + '/data.txt'); stream.pipe(res); }); server2.listen(4000);
在上方代碼段裏,fs.createReadStream 建立了 data.txt 的可讀流(Readable Stream)。這裏須要事先了解的是,流能夠簡單地分爲「可讀的(readable)」、「可寫的(writable)」,或者「讀寫都可」三種類型,且全部的流都屬於 EventEmitter 的實例。
回到代碼,對於建立的可讀流,咱們經過 .pipe() 接口來監聽其 data 和 end 事件,並把 data.txt (的可讀流)拆分紅一小塊一小塊的數據(chunks),像流水同樣源源不斷地吐給客戶端,而再也不須要等待整個文件都加載到內存後才發送數據。
其中 .pipe 能夠視爲流的「管道/通道」方法,任何類型的流都會有這個 .pipe 方法去成對處理流的輸入與輸出。
爲了方便理解,咱們把上述兩種方式(不使用流/使用流)處理爲以下的情景(臥槽我好好一個前端爲啥要P這麼萌的圖):
⑴ 不使用流:
⑵ 使用流:
由此能夠得知,使用流(stream)的形式,能夠大大提高響應時間,又能有效減輕服務器內存的壓力。
二. Stream的分類
在上文咱們曾說起到,stream 能夠按讀寫權限來簡單地分作三類,不過這裏咱們再細化下,能夠把 stream 歸爲以下五個類別:
⑴ Readable Streams
⑵ Writable Streams
⑶ Transform Streams
⑷ Duplex Streams
⑸ Classic Streams
其中 Transform Streams 和 Duplex Streams 都屬於便可讀又可寫的流,而最後一個 Classic Streams 是對 Node 古早版本上的 Stream 的一個統稱。咱們將照例對其進行逐一介紹。
2.1 Readable Streams
便可讀流,經過 .pipe 接口能夠將其數據傳遞給一個 writable、transform 或者 duplex流:
readableStream.pipe(dst)
常見的 Readable Streams 包括:
例如在前面 demo2 的代碼段中,咱們就使用了 fs.createReadStream 接口來建立了一個 fs read stream:
var server2 = http.createServer(function (req, res) { var stream = fs.createReadStream(__dirname + '/data.txt'); stream.pipe(res); }); server2.listen(4000);
這裏有個有趣的地方 —— 雖然 Readable Streams 稱爲可讀流,但在將其傳入一個消耗對象以前,它都是可寫的:
var Readable = require('stream').Readable; var rs = new Readable; rs.push('servers '); rs.push('are listening on\n'); rs.push('3000 and 4000\n'); rs.push(null); rs.pipe(process.stdout);
執行結果:
在這段代碼中,咱們經過 readStream.push(data) 的形式往可讀流裏注入數據,並以 readStream.push(null) 來結束可讀流。
不過這種寫法有個弊端 —— 從使用 .push() 將數據注入 readable 流中開始,直到另外一個東西(process.stdout)來消耗數據以前,這些數據都會存在緩存中。
這裏有個內置接口 ._read() 能夠用來處理這個問題,它是從系統底層開始讀取數據流時纔會不斷調用自身,從而減小緩存冗餘。
咱們能夠回過頭來看 demo1 的例子:
'use strict'; const Readable = require('stream').Readable; const rs = Readable(); const s = 'VaJoy'; const l = s.length; let i = 0; rs._read = ()=>{ if(i == l){ rs.push(' is my name'); return rs.push(null) } rs.push(s[i++]) }; rs.pipe(process.stdout);
咱們是在 ._read 方法中才使用 readStream.push(data) 往可讀流裏注入數據供下游消耗(也會流經緩存),從而提高流處理的性能。
這裏也有個小問題 —— 上一句話所提到的「供下游消耗」,這個下游一般又會以怎樣的形式來消耗可讀流的呢?
首先,可使用咱們熟悉的 .pipe() 方法將可讀流推送給一個消耗對象(writable、transform 或者 duplex流):
//ext1 const fs = require('fs'); const zlib = require('zlib'); const r = fs.createReadStream('data.txt'); const z = zlib.createGzip(); const w = fs.createWriteStream('data.txt.gz'); r.pipe(z).pipe(w);
其次,也能夠經過監聽可讀流的「data」事件(別忘了文章前面提到的「全部的流都屬於 EventEmitter 的實例」)來實現消耗處理 —— 在首次監聽其 data 事件後,readStream 便會持續不斷地調用 _read(),經過觸發 data 事件將數據輸出。當數據所有被消耗時,則觸發 end 事件。
示例:
//demo3 const Readable = require('stream').Readable; class ToReadable extends Readable { constructor(iterator) { super(); this.iterator = iterator } _read() { const res = this.iterator.next(); if (res.done) { // 迭代結束,順便結束可讀流 this.push(null) } setTimeout(() => { // 將數據添加到流中 this.push(res.value + '\n') }, 0) } } const gen = function *(a){ let count = 5, res = a; while(count--){ res = res*res; yield res } }; const readable = new ToReadable(gen(2)); // 監聽`data`事件,一次獲取一個數據 readable.on('data', data => process.stdout.write(data)); // 可讀流消耗完畢 readable.on('end', () => process.stdout.write('readable stream ends~'));
執行結果爲:
這裏須要留意的是,在使用 .push() 往可讀流裏注入數據的代碼段,咱們使用了 setTimeout 將其包裹起來,這是爲了讓系統能有足夠時間優先處理接收流結束信號的事務。固然你也能夠改寫爲:
if (res.done) { // 直接 return return this.push(null) } this.push(res.value + '\n')
2.2 Writable Streams
Writable(可寫)流接口是對寫入數據的目標的抽象:
src.pipe(writableStream)
常見的 Writable Streams 包括:
可寫流有兩個重要的方法:
上方兩方法的 encoding 參數表示編碼字符串(chunk爲String時才能夠用)。
write 方法的 callback 回調參數會在 chunk 被消費後(從緩存中移除後)被觸發;end 方法的 callback 回調參數則在 Stream 結束時觸發。
另外,如同經過 readable._read() 方法能夠處理可讀流,咱們能夠經過 writable._write(chunk, enc, next) 方法在系統底層處理流寫入的邏輯中,對數據進行處理。
其中參數 chunk 表明寫進來的數據;enc 表明編碼的字符串;next(err) 則是一個回調函數,調用它能夠告知消費者進行下一輪的數據流寫入。
示例:
//demo4 const Writable = require('stream').Writable; const writable = Writable(); writable._write = (chunck, enc, next) => { // 輸出打印 process.stdout.write(chunck.toString().toUpperCase()); // 寫入完成時,調用`next()`方法通知流傳入下一個數據 process.nextTick(next) }; // 全部數據均已寫入底層 writable.on('finish', () => process.stdout.write('DONE')); // 將一個數據寫入流中 writable.write('a' + '\n'); writable.write('b' + '\n'); writable.write('c' + '\n'); // 再無數據寫入流時,須要調用`end`方法 writable.end();
執行以下:
2.3 Duplex Streams
Duplex 是雙工的意思,所以很容易猜到 Duplex 流就是既能讀又能寫的一類流,它繼承了 Readable 和 Writable 的接口。
常見的 Duplex Streams 有:
示例:
//demo5 const Duplex = require('stream').Duplex; const duplex = Duplex(); duplex._read = function () { var date = new Date(); this.push( date.getFullYear().toString() ); this.push(null) }; duplex._write = function (buf, enc, next) { console.log( buf.toString() + '\n' ); next() }; duplex.on('data', data => console.log( data.toString() )); duplex.write('the year is'); duplex.end();
執行結果:
2.4 Transform Streams
Transform Stream 是在繼承了 Duplex Streams 的基礎上再進行了擴展,它能夠把寫入的數據和輸出的數據,經過 ._transform 接口關聯起來。
常見的 Transform Streams 有:
示例:
//demo6 const Transform = require('stream').Transform; class SetName extends Transform { constructor(name, option) { super(option || {}); this.name = name || '' } // .write接口寫入的數據,處理後直接從 data 事件的回調中可取得 _transform(buf, enc, next) { var res = buf.toString().toUpperCase(); this.push(res + this.name + '\n'); next() } } var transform = new SetName('VaJoy'); transform.on('data', data => process.stdout.write(data)); transform.write('my name is '); transform.write('here is '); transform.end();
執行結果:
其中的 _transform 是 Transform Streams 的內置方法,全部 Transform Streams 都須要使用該接口來接收輸入和處理輸出,且該方法只能由子類來調用。
_transform 接口格式以下:
transform._transform(chunk, encoding, callback)
第一個參數表示被轉換(transformed)的數據塊(chunk),除非構造方法 option 參數(可選)傳入了 「decodeString : false」,不然其類型均爲 Buffer;
第二個參數用於設置編碼,但只有當 chunck 爲 String 格式(即構造方法傳入 「decodeString : false」參數)的時候纔可配置,不然默認爲「buffer」;
第三個參數 callback 用於在 chunk 被處理後調用,通知系統進入下一輪 _transform 調用。該回調方法接收兩個可選參數 —— callback([error, data]),其中的 data 參數能夠將 chunck 寫入緩存中(供更後面的消費者去消費):
transform.prototype._transform = function(data, encoding, callback){ this.push(data); callback() }; ///////等價於 transform.prototype._transform = function(data, encoding, callback){ callback(null, data) };
另外 Transform Streams 還有一個 _flush(callback) 內置方法,它會在沒有更多可消耗的數據時、在「end」事件以前被觸發,並且會清空緩存數據並結束 Stream。
該內置方法一樣只容許由子類來調用,並且執行後,不能再調用 .push 方法。
關於 Transform Streams 的更多細節還能夠參考這篇文章,推薦閱讀。
2.5 Classic Streams
在較早版本的 NodeJS 裏,Stream 的實現相較簡陋,例如上文說起的「Stream.Readable」接口均是從 Node 0.9.4 開始纔有,所以咱們每每須要對其進行屢次封裝擴展才能更好地用來開發。
而 Classic Streams 即是對這種古舊模式的 Stream 接口的統稱。
須要留意的是,只要往任意一個 stream 註冊一個「data」事件監聽器,它就會自動切換到「classic」模式,並按照舊的 API 去執行。
classic 流能夠看成一個帶有 .pipe 接口的事件發射器(event emitter),當它要爲消耗者提供數據時會發射「data」事件,當要結束生產數據時,則發射「end」事件。
另外只有當設置 Stream.readable 爲 true 時,.pipe 接口才會將當前流視做可讀流:
//demo7 var Stream = require('stream'); var stream = new Stream(); stream.readable = true; //告訴 .pipe 這是個可讀流 var c = 64; var iv = setInterval(function () { if (++c >= 75) { clearInterval(iv); stream.emit('end'); } else stream.emit('data', String.fromCharCode(c)); }, 100); stream.pipe(process.stdout);
另外,Classic readable streams 還有 .pause() 和 .resume() 兩個接口可用於暫停/恢復流的讀取:
createServer(function(q,s) { // ADVISORY only! q.pause() session(q, function(ses) { q.on('data', handler) q.resume() }) })
3. Object Mode
對於可讀流來講,push(data) 時,data 的類型只能是 String 或Buffer,且消耗時 data 事件輸出的數據類型都爲 Buffer;
對於可寫流來講,write(data) 時,data 的類型也只能是 String 或 Buffer,_write(data) 調用時所傳進來的 data 類型都爲 Buffer。
示例:
//demo8 writable._write = (chunck, enc, next) => { // 輸出打印 console.log(chunck); //Buffer //console.log(chunck.toString()); //轉爲String process.nextTick(next) }; writable.write('Happy Chinese Year'); writable.end();
執行結果:
不過,爲了加強數據類型的靈活性,不管是可讀流或是可寫流,只須要往其構造函數裏傳入配置參數「{ objectMode: true }」,即可往流裏傳入/獲取任意類型(null除外)的數據:
const objectModeWritable = Writable({ objectMode: true }); objectModeWritable._write = (chunck, enc, next) => { // 輸出打印 console.log(typeof chunck); console.log(chunck); process.nextTick(next) }; objectModeWritable.write('Happy Chinese Year'); objectModeWritable.write( { year : 2017 } ); objectModeWritable.end( 2017 );
執行結果:
4. Stream的兼容問題
在前文咱們介紹了 classic streams,它屬於陳舊版本的 Node 上的 Stream 接口,能夠把它稱爲 Streams1。而從 Node 0.10 開始,Stream 新增了系列實用的新接口,能夠作更多除了 .pipe() 以外的事情,咱們把其歸類爲 Streams2(事實上,在 Node 0.11+開始,Stream有些許新的變更,從該版本開始的 Stream 也可稱爲 Streams3)。
那麼這裏存在一個問題 —— 那些使用了 Stream1 的項目(特別是 npm 包),想升級使用環境的 Node 版本到 0.10+,會否致使兼容問題呢?
還好 Streams2 雖然改頭換面,但本質上是設計爲向後兼容的。
打個比方,若是你同時推送了一條 Streams2 流和一條舊格式的、基於事件發射器的流,Stream2 將降級爲舊模式(shim mode)來向後兼容。
可是,若是咱們的開發環境使用的是 Node 0.8(且由於某些緣由不能升級),但又想使用 Streams2 的API怎麼辦呢?或者好比 npm 上的某些開源的工具包,想要擁抱 Streams2 的便利,又想保持對使用 Node 0.8 的用戶進行兼容處理,這樣又得怎麼處理?
針對上述問題,早在 Node 0.10 釋放以前,Issacs 就把 Node-core 中操做 Stream 的核心接口獨立拷貝了一份出來,開源到了 npm 上並持續更新,它就是 readable-stream。
經過使用 readable-stream,咱們就能夠在那些核內心沒有 Streams2/3 的低版本 Node 中,直接使用 Streams2/3:
var Readable = require('stream').Readable || require('readable-stream').Readable
readable-stream 如今有 v1.0.x 和 v1.1.x 兩個主要版本,前者跟進 Streams2 的迭代,後者跟進 Streams3 的迭代,用戶能夠根據需求使用對應版本的包。
5. through2
readable-stream 雖然提供了一個 Streams 的兼容方案,但咱們也但願能對 Stream 複雜的API進行精簡。
而 through2 便基於 readable-stream 對 Stream 接口進行了封裝,並提供了更簡單和靈活的方法。
through2 會爲你生成 Transform Streams(貌似舊版本是 Duplex Streams)來處理任意你想使用的流 —— 如前文介紹,相比其它流,Transform 流處理起數據會更加靈活方便。
來看下 through2 的示例:
//demo9 const fs = require('fs'); const through2 = require('through2'); fs.createReadStream('data.txt') .pipe(through2(function (chunk, enc, callback) { for (var i = 0; i < chunk.length; i++) if (chunk[i] == 97) chunk[i] = 122; // 把 'a' 替換爲 'z' this.push(chunk); callback() })) .pipe(fs.createWriteStream('out.txt')) .on('finish', ()=> { console.log('DONE') });
使用 through2.obj 接口操做 Object Mode 下的流:
//demo10 const fs = require('fs'); const through2 = require('through2'); const csv2 = require('csv2'); let all = []; fs.createReadStream('list.csv') .pipe(csv2()) // through2.obj(fn) 是 through2({ objectMode: true }, fn) 的簡易封裝 .pipe(through2.obj(function (chunk, enc, callback) { var data = { name: chunk[0], sex: chunk[1], addr: chunk[2] }; this.push(data); callback() })) .on('data', function (data) { all.push(data) }) .on('end', function () { console.log(all) });
對比原生的 Stream API,through2 簡潔了很多,加上有 readable-stream 依賴加持,也很好理解爲什麼像 gulp 及其插件都會使用 through2 來操做和處理 stream 了。
順便貼下對 through2 的源碼註釋:
var Transform = require('readable-stream/transform'), inherits = require('util').inherits, xtend = require('xtend'); //構造方法,繼承了Transform function DestroyableTransform(opts) { Transform.call(this, opts); this._destroyed = false } inherits(DestroyableTransform, Transform); //原型接口 destroy,用於關閉當前流 DestroyableTransform.prototype.destroy = function (err) { if (this._destroyed) return; this._destroyed = true; var self = this; process.nextTick(function () { if (err) self.emit('error', err); self.emit('close') }) }; // a noop _transform function function noop(chunk, enc, callback) { callback(null, chunk) } // 閉包,用於返回對外接口方法 function through2(construct) { //最終返回此匿名函數 return function (options, transform, flush) { if (typeof options == 'function') { flush = transform transform = options options = {} } if (typeof transform != 'function') transform = noop if (typeof flush != 'function') flush = null return construct(options, transform, flush) } } // 出口,執行 throuh2 閉包函數,返回一個 DestroyableTransform 的實例(t2) module.exports = through2(function (options, transform, flush) { //t2 爲 Transform Stream 對象 var t2 = new DestroyableTransform(options); //Transform Streams 的內置接口 _transform(chunk, encoding, next) 方法 t2._transform = transform; if (flush) t2._flush = flush; return t2 }); // 對外暴露一個能夠直接 new (或者不加 new)來建立實例的的構造函數 module.exports.ctor = through2(function (options, transform, flush) { function Through2(override) { if (!(this instanceof Through2)) return new Through2(override) this.options = xtend(options, override) DestroyableTransform.call(this, this.options) } inherits(Through2, DestroyableTransform) Through2.prototype._transform = transform if (flush) Through2.prototype._flush = flush return Through2 }) //Object Mode接口的簡易封裝 module.exports.obj = through2(function (options, transform, flush) { var t2 = new DestroyableTransform(xtend({objectMode: true, highWaterMark: 16}, options)) t2._transform = transform if (flush) t2._flush = flush return t2 })
以上是本文對 Stream 的一個介紹,但事實上 Stream 還有許多未露面的 API,感興趣的同窗能夠直接閱讀官方 API文檔作進一步瞭解。
本篇文章是對後續 gulp 源碼解析系列的一個基礎鋪墊,想了解更多 gulp 相關內容的話能夠留意個人博客。最後恭祝你們雞年大吉!共勉~
Reference
⑴ Stream API Doc - https://nodejs.org/api/stream.html
⑵ stream-handbook - https://github.com/substack/stream-handbook
⑶ Node.js Stream - 基礎篇 - http://www.cnblogs.com/zapple/p/5759670.html
⑷ Why I don't use Node's core 'stream' module - https://r.va.gg/2014/06/why-i-dont-use-nodes-core-stream-module.html