Stream的概念最先來源於Unix系統,其能夠將一個大型系統拆分紅一些小的組件,而後將這些小的組件能夠很好地運行html
TCP/IP協議中的TCP協議也用到了Stream的思想,進而能夠進行流量控制、差錯控制node
在unix中經過 |
來表示流;node中經過pipe方法git
Stream能夠認爲數據就像管道同樣,屢次不斷地被傳遞下去,而不是一次性所有傳遞給下游github
在node stream中能夠看到第一段的描述:api
A stream is an abstract interface implemented by various objects in Node. For example a request to an HTTP server is a stream, as is stdout. Streams are readable, writable, or both. All streams are instances of EventEmitter數組
對上面一段話進行解析,能夠獲得以下幾點:緩存
Stream是Node中一個很是重要的概念,被大量對象實現,尤爲是Node中的I/O操做工具
Stream是一個抽像的接口,通常不會直接使用,須要實現內部的某些抽象方法(例如_read、_write、_transform)ui
Stream是EventEmitter的子類,實際上Stream的數據傳遞內部依然是經過事件(data)來實現的this
Stream分爲四種:readable、writeable、Duplex、transform
Readable Stream
是提供數據的Stream,外部來源的數據均會存儲到內部的buffer數組內緩存起來。
writeable Stream
是消費數據的Stream,從readable stream
中獲取數據,而後對獲得的chunk塊數據進行處理,至於如何處理,就依賴於具體實現(也就是_write的實現)。
首先看看Readdable Stream
與writeable stream
兩者之間的流動關係:
stream內部是如何從readable stream流到writeable stream裏面呢?有兩種方法:
a) pipe 鏈接兩個stream
先看一個簡單地demo
var Read = require('stream').Readable; var Write = require('stream').Writable; var r = new Read(); var w = new Write(); r.push('hello '); r.push('world!'); r.push(null) w._write = function (chunk, ev, cb) { console.log(chunk.toString()); cb(); } r.pipe(w);
pipe是一種最簡單直接的方法鏈接兩個stream,內部實現了數據傳遞的整個過程,在開發的時候不須要關注內部數據的流動:
Readable.prototype.pipe = function (dest, pipeOpts) { var src = this; ... src.on('data', ondata); function ondata(chunk) { var ret = dest.write(chunk); if (false === ret) { debug('false write response, pause', src._readableState.awaitDrain); src._readableState.awaitDrain++; src.pause(); } } ... }
b) 事件data + 事件drain聯合實現
var Read = require('stream').Readable; var Write = require('stream').Writable; var r = new Read(); var w = new Write(); r.push('hello '); r.push('world!'); r.push(null) w._write = function (chunk, ev, cb) { console.log(chunk.toString()); cb(); } r.on('data', function (chunk) { if (!w.write(chunk)) { r.pause(); } }) w.on('drain', function () { r.resume(); }) // hello // world!
Readable Stream
存在兩種模式(flowing mode 與 paused mode),這兩種模式決定了chunk數據流動的方式---自動流動仍是手工流動。那如何觸發這兩種模式呢:
flowing mode: 註冊事件data、調用resume方法、調用pipe方法
paused mode: 調用pause方法(沒有pipe方法)、移除data事件 && unpipe全部pipe
讓咱們再深刻一些,看看裏面具體是如何實現的:
// data事件觸發flowing mode Readable.prototype.on = function(ev, fn) { ... if (ev === 'data' && false !== this._readableState.flowing) { this.resume(); } ... } // resume觸發flowing mode Readable.prototype.resume = function() { var state = this._readableState; if (!state.flowing) { debug('resume'); state.flowing = true; resume(this, state); } return this; } // pipe方法觸發flowing模式 Readable.prototype.resume = function() { if (!state.flowing) { this.resume() } }
結論
兩種方式取決於一個flowing字段:true --> flowing mode;false --> paused mode
三種方式最後均是經過resume方法,將state.flowing = true
a) paused mode
在paused mode下,須要手動地讀取數據,而且能夠直接指定讀取數據的長度:
var Read = require('stream').Readable; var r = new Read(); r.push('hello'); r.push('world'); r.push(null); console.log('輸出結果爲: ', r.read(1).toString()) // 輸出結果爲: 'h'
還能夠經過監聽事件readable,觸發時手工讀取chunk數據:
var Read = require('stream').Readable; var r = new Read(); r.push('hello'); r.push('world'); r.push(null); r.on('readable', function () { var chunk = r.read(); console.log('get data by readable event: ', chunk.toString()) }); // get data by readable event: hello world!
須要注意的是,一旦註冊了readable事件,必須手工讀取read數據,不然數據就會流失,看看內部實現:
function emitReadable_(stream) { debug('emit readable'); stream.emit('readable'); flow(stream); } function flow(stream) { var state = stream._readableState; debug('flow', state.flowing); if (state.flowing) { do { var chunk = stream.read(); } while (null !== chunk && state.flowing); } } Readable.prototype.read = function (n) { ... var res = fromList(n, state); if (!util.isNull(ret)) { this.emit('data', ret); } ... }
flow方法直接read數據,將獲得的數據經過事件data交付出去,然而此處沒有註冊data事件監控,所以,獲得的chunk數據並無交付給任何對象,這樣數據就白白流失了,因此在觸發emit('readable')時,須要提早read數據。
b) flowing mode
經過註冊data、pipe、resume能夠自動獲取所須要的數據,看看內部實現:
// 事件data方式 var Read = require('stream').Readable; var r = new Read(); r.push('hello '); r.push('world!'); r.push(null) r.on('data', function (chunk) { console.log('chunk :', chunk.toString()) }) // chunk : hello // chunk : world!
// 經過pipe方式 var r = new Read(); r.push('hello '); r.push('world!'); r.push(null) r.pipe(process.stdout) // hello world!
c) 兩種mode的總結
用過browserify的人都知道,browserify是一種基於stream的模塊打包工具,裏面存在browserify.prototype.transform(tr)方法,其中的tr就要求是transform stream,且browserify內部經過through2
構建了不少tranform stream。也能夠說browserify是創建在transform stream的基礎上。那麼具有readable、writeablestream的transform stream內部是如何工做的呢?
自定義stream很簡單,只要實現相應的內部待實現方法就能夠了,具體來講:
readable stream: 實現_read方法來解決數據的獲取問題
writeable stream: 實現_write方法來解決數據的去向問題
tranform stream: 實現_tranform方法來解決數據存放在buffer前的轉換工做
// 自定義readable stream的實現 var Stream = require('stream'); var Read = Stream.Readable; var util = require('util'); util.inherits(MyReadStream, Read); function MyReadStream(data, opt) { Read.call(this, opt); this.data = data || []; } MyReadStream.prototype._read = function () { var _this = this; this.data.forEach(function (d) { _this.push(d); }) this.push(null); } var data = ['aa', 'bb', 'cc']; var r = new MyReadStream(data); r.on('data', function (chunk) { console.log(chunk.toString()); })