node源碼解析 -- Stream探究

1. 認識Stream

  • Stream的概念最先來源於Unix系統,其能夠將一個大型系統拆分紅一些小的組件,而後將這些小的組件能夠很好地運行html

  • TCP/IP協議中的TCP協議也用到了Stream的思想,進而能夠進行流量控制、差錯控制node

  • 在unix中經過 |來表示流;node中經過pipe方法git

  • Stream能夠認爲數據就像管道同樣,屢次不斷地被傳遞下去,而不是一次性所有傳遞給下游github

2. node中的stream

node stream中能夠看到第一段的描述:api

A stream is an abstract interface implemented by various objects in Node. For example a request to an HTTP server is a stream, as is stdout. Streams are readable, writable, or both. All streams are instances of EventEmitter數組

對上面一段話進行解析,能夠獲得以下幾點:緩存

  • Stream是Node中一個很是重要的概念,被大量對象實現,尤爲是Node中的I/O操做工具

  • Stream是一個抽像的接口,通常不會直接使用,須要實現內部的某些抽象方法(例如_read、_write、_transform)ui

  • Stream是EventEmitter的子類,實際上Stream的數據傳遞內部依然是經過事件(data)來實現的this

  • Stream分爲四種:readable、writeable、Duplex、transform

3.Readable Stream 與 Writeable Stream

3.1 兩者的關係

Readable Stream是提供數據的Stream,外部來源的數據均會存儲到內部的buffer數組內緩存起來。

writeable Stream是消費數據的Stream,從readable stream中獲取數據,而後對獲得的chunk塊數據進行處理,至於如何處理,就依賴於具體實現(也就是_write的實現)。

首先看看Readdable Streamwriteable stream兩者之間的流動關係:

圖片描述

3.2 pipe的流程解析

stream內部是如何從readable stream流到writeable stream裏面呢?有兩種方法:

a) pipe 鏈接兩個stream

先看一個簡單地demo

var Read = require('stream').Readable;
var Write = require('stream').Writable;
var r = new Read();
var w = new Write();

r.push('hello ');
r.push('world!');
r.push(null)


w._write = function (chunk, ev, cb) {
    console.log(chunk.toString());
    cb();
}

r.pipe(w);

pipe是一種最簡單直接的方法鏈接兩個stream,內部實現了數據傳遞的整個過程,在開發的時候不須要關注內部數據的流動:

Readable.prototype.pipe = function (dest, pipeOpts) {
    var src = this;
    ...
    src.on('data', ondata);
    
    function ondata(chunk) {
        var ret = dest.write(chunk);
        if (false === ret) {
              debug('false write response, pause',
            src._readableState.awaitDrain);
              src._readableState.awaitDrain++;
              src.pause();
        }
    }
    ...
}

b) 事件data + 事件drain聯合實現

var Read = require('stream').Readable;
var Write = require('stream').Writable;
var r = new Read();
var w = new Write();

r.push('hello ');
r.push('world!');
r.push(null)


w._write = function (chunk, ev, cb) {
    console.log(chunk.toString());
    cb();
}

r.on('data', function (chunk) {
    if (!w.write(chunk)) {
        r.pause();
    }
})

w.on('drain', function () {
    r.resume();
})

// hello
// world!

4 Readable Stream的模式

4.1 內部模式的實現

Readable Stream 存在兩種模式(flowing mode 與 paused mode),這兩種模式決定了chunk數據流動的方式---自動流動仍是手工流動。那如何觸發這兩種模式呢:

  • flowing mode: 註冊事件data、調用resume方法、調用pipe方法

  • paused mode: 調用pause方法(沒有pipe方法)、移除data事件 && unpipe全部pipe

讓咱們再深刻一些,看看裏面具體是如何實現的:

// data事件觸發flowing mode
Readable.prototype.on = function(ev, fn) {
    ...
    if (ev === 'data' && false !== this._readableState.flowing) {
        this.resume();
      }
      ...
}

// resume觸發flowing mode
Readable.prototype.resume = function() {
    var state = this._readableState;
    if (!state.flowing) {
           debug('resume');
           state.flowing = true;
    resume(this, state);
  }
  return this;
}

// pipe方法觸發flowing模式
Readable.prototype.resume = function() {
    if (!state.flowing) {
        this.resume()
    }
}

結論

  • 兩種方式取決於一個flowing字段:true --> flowing mode;false --> paused mode

  • 三種方式最後均是經過resume方法,將state.flowing = true

4.2 兩種模式的操做

a) paused mode

在paused mode下,須要手動地讀取數據,而且能夠直接指定讀取數據的長度:

var Read = require('stream').Readable;
var r = new Read();

r.push('hello');
r.push('world');
r.push(null);

console.log('輸出結果爲: ', r.read(1).toString())
// 輸出結果爲: 'h'

還能夠經過監聽事件readable,觸發時手工讀取chunk數據:

var Read = require('stream').Readable;
var r = new Read();

r.push('hello');
r.push('world');
r.push(null);

r.on('readable', function () {
    var chunk = r.read();
    console.log('get data by readable event: ', chunk.toString())
});

// get data by readable event:  hello world!

須要注意的是,一旦註冊了readable事件,必須手工讀取read數據,不然數據就會流失,看看內部實現:

function emitReadable_(stream) {
    debug('emit readable');
    stream.emit('readable');
    flow(stream);
}

function flow(stream) {
    var state = stream._readableState;
    debug('flow', state.flowing);
    if (state.flowing) {
           do {    
              var chunk = stream.read();
        } while (null !== chunk && state.flowing);
    }
}

Readable.prototype.read = function (n) {
    ...
    var res = fromList(n, state);
    
    if (!util.isNull(ret)) {
        this.emit('data', ret);
    }
    ...
}

flow方法直接read數據,將獲得的數據經過事件data交付出去,然而此處沒有註冊data事件監控,所以,獲得的chunk數據並無交付給任何對象,這樣數據就白白流失了,因此在觸發emit('readable')時,須要提早read數據。

b) flowing mode

經過註冊data、pipe、resume能夠自動獲取所須要的數據,看看內部實現:

// 事件data方式
var Read = require('stream').Readable;

var r = new Read();

r.push('hello ');
r.push('world!');
r.push(null)

r.on('data', function (chunk) {
    console.log('chunk :', chunk.toString())
})
// chunk : hello 
// chunk : world!
// 經過pipe方式
var r = new Read();

r.push('hello ');
r.push('world!');
r.push(null)

r.pipe(process.stdout)
// hello world!

c) 兩種mode的總結

圖片描述

5. transform stream的實現

用過browserify的人都知道,browserify是一種基於stream的模塊打包工具,裏面存在browserify.prototype.transform(tr)方法,其中的tr就要求是transform stream,且browserify內部經過through2構建了不少tranform stream。也能夠說browserify是創建在transform stream的基礎上。那麼具有readable、writeablestream的transform stream內部是如何工做的呢?

圖片描述

6. 自定義stream

自定義stream很簡單,只要實現相應的內部待實現方法就能夠了,具體來講:

  • readable stream: 實現_read方法來解決數據的獲取問題

  • writeable stream: 實現_write方法來解決數據的去向問題

  • tranform stream: 實現_tranform方法來解決數據存放在buffer前的轉換工做

// 自定義readable stream的實現
var Stream = require('stream');
var Read = Stream.Readable;
var util = require('util');

util.inherits(MyReadStream, Read);

function MyReadStream(data, opt) {
    Read.call(this, opt);
    this.data = data || [];
}
MyReadStream.prototype._read = function () {
    var _this = this;
    this.data.forEach(function (d) {
        _this.push(d);
    })
    this.push(null);
}

var data = ['aa', 'bb', 'cc'];
var r = new MyReadStream(data);

r.on('data', function (chunk) {
    console.log(chunk.toString());
})

7. 參考資料

相關文章
相關標籤/搜索