gulp源碼解析(一)—— Stream詳解

做爲前端,咱們經常會和 Stream 有着頻繁的接觸。好比使用 gulp 對項目進行構建的時候,咱們會使用 gulp.src 接口將匹配到的文件轉爲 stream(流)的形式,再經過 .pipe() 接口對其進行鏈式加工處理;html

或者好比咱們經過 http 模塊建立一個 HTTP 服務:前端

const http = require('http');
http.createServer( (req, res) => {
  //...
}).listen(3000);

此處的 req 和 res 也屬於 Stream 的消費接口(前者爲 Readable Stream,後者爲 Writable Stream)node

事實上像上述的 req/res,或者 process.stdout 等接口都屬於 Stream 的實例,所以較少存在狀況,是須要咱們手動引入 Stream 模塊的,例如:git

//demo1.js
'use strict';
const Readable = require('stream').Readable;
const rs = Readable();
const s = 'VaJoy';
const l = s.length;
let i = 0;
rs._read = ()=>{
    if(i == l){
        rs.push(' is my name');
        return rs.push(null)
    }
    rs.push(s[i++])
};
rs.pipe(process.stdout);

若是不太能讀懂上述代碼,或者對 Stream 的概念感到模糊,那麼能夠放輕鬆,由於本文會進一步地對 Stream 進行剖析,而且談談直接使用它可能會存在的一些問題(這也是爲什麼 gulp 要使用 through2 的緣由)github

另外本文的示例都可在個人 github 倉庫https://github.com/VaJoy/stream/獲取到,讀者能夠自行下載和調試。npm

一. Stream的做用gulp

在介紹 Stream(流)以前,咱們先來看一個例子 —— 模擬服務器把本地某個文件內容吐給客戶端:api

//demo2
var http = require('http');
var fs = require('fs');

var server = http.createServer(function (req, res) {
    fs.readFile(__dirname + '/data.txt', function (err, data) {
        res.end(data);
    });
});
server.listen(3000);

這段代碼雖然能夠正常執行,但存在一個顯著的問題 —— 對於每個客戶端的請求,fs.readFile 接口都會把整個文件都緩存到內存中去,而後纔開始把數據吐給用戶。那麼當文件體積很大、請求也較多(且特別當請求來自慢速用戶)的時候,服務器須要消耗很大的內存,致使性能低下。緩存

然而這個問題,則正是 stream 發揮所長的地方。如前文說起的,res 是流對象,那咱們正好能夠將其利用起來:服務器

var server2 = http.createServer(function (req, res) {
    var stream = fs.createReadStream(__dirname + '/data.txt');
    stream.pipe(res);
});
server2.listen(4000);

在上方代碼段裏,fs.createReadStream 建立了 data.txt 的可讀流(Readable Stream)。這裏須要事先了解的是,流能夠簡單地分爲「可讀的(readable)」、「可寫的(writable)」,或者「讀寫都可」三種類型,且全部的流都屬於 EventEmitter 的實例

回到代碼,對於建立的可讀流,咱們經過 .pipe() 接口來監聽其 dataend 事件,並把 data.txt (的可讀流)拆分紅一小塊一小塊的數據(chunks),像流水同樣源源不斷地吐給客戶端,而再也不須要等待整個文件都加載到內存後才發送數據。

其中 .pipe 能夠視爲流的「管道/通道」方法,任何類型的流都會有這個 .pipe 方法去成對處理流的輸入與輸出。

爲了方便理解,咱們把上述兩種方式(不使用流/使用流)處理爲以下的情景(臥槽我好好一個前端爲啥要P這麼萌的圖)

⑴ 不使用流:

 

⑵ 使用流:

由此能夠得知,使用流(stream)的形式,能夠大大提高響應時間,又能有效減輕服務器內存的壓力。

二. Stream的分類

在上文咱們曾說起到,stream 能夠按讀寫權限來簡單地分作三類,不過這裏咱們再細化下,能夠把 stream 歸爲以下五個類別:

⑴ Readable Streams
⑵ Writable Streams
⑶ Transform Streams
⑷ Duplex Streams
⑸ Classic Streams

其中 Transform Streams 和 Duplex Streams 都屬於便可讀又可寫的流,而最後一個 Classic Streams 是對 Node 古早版本上的 Stream 的一個統稱。咱們將照例對其進行逐一介紹。

2.1 Readable Streams

便可讀流,經過 .pipe 接口能夠將其數據傳遞給一個 writable、transform 或者 duplex流:

readableStream.pipe(dst)

常見的 Readable Streams 包括:

  • 客戶端上的 HTTP responses
  • 服務端上的 HTTP requests
  • fs read streams
  • zlib streams
  • crypto streams
  • TCP sockets
  • 子進程的 stdout 和 stderr
  • process.stdin

例如在前面 demo2 的代碼段中,咱們就使用了 fs.createReadStream 接口來建立了一個 fs read stream:

var server2 = http.createServer(function (req, res) {
    var stream = fs.createReadStream(__dirname + '/data.txt');
    stream.pipe(res);
});
server2.listen(4000);

這裏有個有趣的地方 —— 雖然 Readable Streams 稱爲可讀流,但在將其傳入一個消耗對象以前,它都是可寫的:

var Readable = require('stream').Readable;

var rs = new Readable;
rs.push('servers ');
rs.push('are listening on\n');
rs.push('3000 and 4000\n');
rs.push(null);

rs.pipe(process.stdout);

執行結果:

在這段代碼中,咱們經過 readStream.push(data) 的形式往可讀流裏注入數據,並以 readStream.push(null) 來結束可讀流。

不過這種寫法有個弊端 —— 從使用 .push() 將數據注入 readable 流中開始,直到另外一個東西(process.stdout)來消耗數據以前,這些數據都會存在緩存中。

這裏有個內置接口 ._read()  能夠用來處理這個問題,它是從系統底層開始讀取數據流時纔會不斷調用自身,從而減小緩存冗餘。

咱們能夠回過頭來看 demo1 的例子:

'use strict';
const Readable = require('stream').Readable;
const rs = Readable();
const s = 'VaJoy';
const l = s.length;
let i = 0;
rs._read = ()=>{
    if(i == l){
        rs.push(' is my name');
        return rs.push(null)
    }
    rs.push(s[i++])
};
rs.pipe(process.stdout);

咱們是在 ._read 方法中才使用 readStream.push(data) 往可讀流裏注入數據供下游消耗(也會流經緩存),從而提高流處理的性能。

這裏也有個小問題 —— 上一句話所提到的「供下游消耗」,這個下游一般又會以怎樣的形式來消耗可讀流的呢?

首先,可使用咱們熟悉的 .pipe() 方法將可讀流推送給一個消耗對象(writable、transform 或者 duplex流)

//ext1
const fs = require('fs');
const zlib = require('zlib');

const r = fs.createReadStream('data.txt');
const z = zlib.createGzip();
const w = fs.createWriteStream('data.txt.gz');
r.pipe(z).pipe(w);

其次,也能夠經過監聽可讀流的「data」事件(別忘了文章前面提到的「全部的流都屬於 EventEmitter 的實例」)來實現消耗處理 —— 在首次監聽其 data 事件後,readStream 便會持續不斷地調用 _read(),經過觸發 data 事件將數據輸出。當數據所有被消耗時,則觸發 end 事件。

示例:

//demo3
const Readable = require('stream').Readable;

class ToReadable extends Readable {
    constructor(iterator) {
        super();
        this.iterator = iterator
    }
    _read() {
        const res = this.iterator.next();
        if (res.done) {
            // 迭代結束,順便結束可讀流
            this.push(null)
        }
        setTimeout(() => {
            // 將數據添加到流中
            this.push(res.value + '\n')
        }, 0)
    }
}

const gen = function *(a){
    let count = 5,
        res = a;
    while(count--){
        res = res*res;
        yield res
    }
};

const readable = new ToReadable(gen(2));

// 監聽`data`事件,一次獲取一個數據
readable.on('data', data => process.stdout.write(data));

// 可讀流消耗完畢
readable.on('end', () => process.stdout.write('readable stream ends~'));

執行結果爲:

這裏須要留意的是,在使用 .push() 往可讀流裏注入數據的代碼段,咱們使用了 setTimeout 將其包裹起來,這是爲了讓系統能有足夠時間優先處理接收流結束信號的事務。固然你也能夠改寫爲:

        if (res.done) {
            // 直接 return
            return this.push(null)
        }
        this.push(res.value + '\n')

2.2 Writable Streams

Writable(可寫)流接口是對寫入數據的目標的抽象:

src.pipe(writableStream)

常見的 Writable Streams 包括:

  • 客戶端的 HTTP requests
  • 服務端的 HTTP responses
  • fs write streams
  • zlib streams
  • crypto streams
  • TCP sockets
  • 子進程的 stdin
  • process.stdout 和 process.stderr

可寫流有兩個重要的方法:

  •  writableStream.write(chunk[, encoding, callback]) —— 往可寫流裏寫入數據;
  •  writableStream.end([chunk, encoding, callback]) —— 中止寫入數據,結束可寫流。在調用 .end() 後,再調用 .write() 方法會產生錯誤。

上方兩方法的 encoding 參數表示編碼字符串(chunk爲String時才能夠用)。

write 方法的 callback 回調參數會在 chunk 被消費後(從緩存中移除後)被觸發;end 方法的 callback 回調參數則在 Stream 結束時觸發。

另外,如同經過 readable._read() 方法能夠處理可讀流,咱們能夠經過 writable._write(chunk, enc, next) 方法在系統底層處理流寫入的邏輯中,對數據進行處理。

其中參數 chunk 表明寫進來的數據;enc 表明編碼的字符串;next(err) 則是一個回調函數,調用它能夠告知消費者進行下一輪的數據流寫入。

示例:

//demo4
const Writable = require('stream').Writable;
const writable = Writable();

writable._write = (chunck, enc, next) => {
    // 輸出打印
    process.stdout.write(chunck.toString().toUpperCase());
    // 寫入完成時,調用`next()`方法通知流傳入下一個數據
    process.nextTick(next)
};

// 全部數據均已寫入底層
writable.on('finish', () => process.stdout.write('DONE'));

// 將一個數據寫入流中
writable.write('a' + '\n');
writable.write('b' + '\n');
writable.write('c' + '\n');

// 再無數據寫入流時,須要調用`end`方法
writable.end();

執行以下:

2.3 Duplex Streams

Duplex 是雙工的意思,所以很容易猜到 Duplex 流就是既能讀又能寫的一類流,它繼承了 Readable 和 Writable 的接口。

常見的 Duplex Streams 有:

  • TCP sockets
  • zlib streams
  • crypto streams

示例:

//demo5
const Duplex = require('stream').Duplex;
const duplex = Duplex();

duplex._read = function () {
    var date = new Date();
    this.push( date.getFullYear().toString() );
    this.push(null)
};

duplex._write = function (buf, enc, next) {
    console.log( buf.toString() + '\n' );
    next()
};

duplex.on('data', data => console.log( data.toString() ));

duplex.write('the year is');

duplex.end();

執行結果:

2.4 Transform Streams

Transform Stream 是在繼承了 Duplex Streams 的基礎上再進行了擴展,它能夠把寫入的數據和輸出的數據,經過 ._transform 接口關聯起來。

常見的 Transform Streams 有:

  • zlib streams
  • crypto streams

示例:

//demo6
const Transform = require('stream').Transform;
class SetName extends Transform {
    constructor(name, option) {
        super(option || {});
        this.name = name || ''
    }
    // .write接口寫入的數據,處理後直接從 data 事件的回調中可取得
    _transform(buf, enc, next) {
        var res = buf.toString().toUpperCase();
        this.push(res + this.name + '\n');
        next()
    }

}

var transform = new SetName('VaJoy');
transform.on('data', data => process.stdout.write(data));

transform.write('my name is ');
transform.write('here is ');
transform.end();

執行結果:

其中的 _transform 是 Transform Streams 的內置方法,全部 Transform Streams 都須要使用該接口來接收輸入和處理輸出,且該方法只能由子類來調用。

_transform 接口格式以下:

transform._transform(chunk, encoding, callback)

第一個參數表示被轉換(transformed)的數據塊(chunk),除非構造方法 option 參數(可選)傳入了 「decodeString : false」,不然其類型均爲 Buffer;

第二個參數用於設置編碼,但只有當 chunck 爲 String 格式(即構造方法傳入 「decodeString : false」參數)的時候纔可配置,不然默認爲「buffer」;

第三個參數 callback 用於在 chunk 被處理後調用,通知系統進入下一輪 _transform 調用。該回調方法接收兩個可選參數 —— callback([error, data]),其中的 data 參數能夠將 chunck 寫入緩存中(供更後面的消費者去消費)

transform.prototype._transform = function(data, encoding, callback){
    this.push(data);
    callback()
};
///////等價於
transform.prototype._transform = function(data, encoding, callback){
    callback(null, data)
};

另外 Transform Streams 還有一個 _flush(callback) 內置方法,它會在沒有更多可消耗的數據時、在「end」事件以前被觸發,並且會清空緩存數據並結束 Stream。

該內置方法一樣只容許由子類來調用,並且執行後,不能再調用 .push 方法。

關於 Transform Streams 的更多細節還能夠參考這篇文章,推薦閱讀。

2.5 Classic Streams

在較早版本的 NodeJS 裏,Stream 的實現相較簡陋,例如上文說起的「Stream.Readable」接口均是從 Node 0.9.4 開始纔有,所以咱們每每須要對其進行屢次封裝擴展才能更好地用來開發。

而 Classic Streams 即是對這種古舊模式的 Stream 接口的統稱。

須要留意的是,只要往任意一個 stream 註冊一個「data」事件監聽器,它就會自動切換到「classic」模式,並按照舊的 API 去執行。

classic 流能夠看成一個帶有 .pipe 接口的事件發射器(event emitter),當它要爲消耗者提供數據時會發射「data」事件,當要結束生產數據時,則發射「end」事件。

另外只有當設置 Stream.readable 爲 true 時,.pipe 接口才會將當前流視做可讀流:

//demo7
var Stream = require('stream');
var stream = new Stream();
stream.readable = true; //告訴 .pipe 這是個可讀流

var c = 64;
var iv = setInterval(function () {
    if (++c >= 75) {
        clearInterval(iv);
        stream.emit('end');
    }
    else stream.emit('data', String.fromCharCode(c));
}, 100);

stream.pipe(process.stdout);

另外,Classic readable streams 還有 .pause() 和 .resume() 兩個接口可用於暫停/恢復流的讀取:

createServer(function(q,s) {
  // ADVISORY only!
  q.pause()
  session(q, function(ses) {
    q.on('data', handler)
    q.resume()
  })
})

3. Object Mode

對於可讀流來講,push(data) 時,data 的類型只能是 String 或Buffer,且消耗時 data 事件輸出的數據類型都爲 Buffer;

對於可寫流來講,write(data) 時,data 的類型也只能是 String 或 Buffer,_write(data) 調用時所傳進來的 data 類型都爲 Buffer。

示例:

//demo8
writable._write = (chunck, enc, next) => {
    // 輸出打印
    console.log(chunck);   //Buffer
    //console.log(chunck.toString());  //轉爲String

    process.nextTick(next)
};

writable.write('Happy Chinese Year');
writable.end();

執行結果:

不過,爲了加強數據類型的靈活性,不管是可讀流或是可寫流,只須要往其構造函數裏傳入配置參數「{ objectMode: true }」,即可往流裏傳入/獲取任意類型(null除外)的數據:

const objectModeWritable = Writable({ objectMode: true });

objectModeWritable._write = (chunck, enc, next) => {
    // 輸出打印
    console.log(typeof chunck);
    console.log(chunck);
    process.nextTick(next)
};

objectModeWritable.write('Happy Chinese Year');
objectModeWritable.write( { year : 2017 } );
objectModeWritable.end( 2017 );

執行結果:

4. Stream的兼容問題

在前文咱們介紹了 classic streams,它屬於陳舊版本的 Node 上的 Stream 接口,能夠把它稱爲 Streams1。而從 Node 0.10 開始,Stream 新增了系列實用的新接口,能夠作更多除了 .pipe() 以外的事情,咱們把其歸類爲 Streams2(事實上,在 Node 0.11+開始,Stream有些許新的變更,從該版本開始的 Stream 也可稱爲 Streams3)

那麼這裏存在一個問題 —— 那些使用了 Stream1 的項目(特別是 npm 包),想升級使用環境的 Node 版本到 0.10+,會否致使兼容問題呢?

還好 Streams2 雖然改頭換面,但本質上是設計爲向後兼容的。

打個比方,若是你同時推送了一條 Streams2 流和一條舊格式的、基於事件發射器的流,Stream2 將降級爲舊模式(shim mode)來向後兼容。

可是,若是咱們的開發環境使用的是 Node 0.8(且由於某些緣由不能升級),但又想使用 Streams2 的API怎麼辦呢?或者好比 npm 上的某些開源的工具包,想要擁抱 Streams2 的便利,又想保持對使用 Node 0.8 的用戶進行兼容處理,這樣又得怎麼處理?

針對上述問題,早在 Node 0.10 釋放以前,Issacs 就把 Node-core 中操做 Stream 的核心接口獨立拷貝了一份出來,開源到了 npm 上並持續更新,它就是 readable-stream

經過使用 readable-stream,咱們就能夠在那些核內心沒有 Streams2/3 的低版本 Node 中,直接使用 Streams2/3:

var Readable = require('stream').Readable || require('readable-stream').Readable

readable-stream 如今有 v1.0.x 和 v1.1.x 兩個主要版本,前者跟進 Streams2 的迭代,後者跟進 Streams3 的迭代,用戶能夠根據需求使用對應版本的包。

5. through2

readable-stream 雖然提供了一個 Streams 的兼容方案,但咱們也但願能對 Stream 複雜的API進行精簡。

through2 便基於 readable-stream 對 Stream 接口進行了封裝,並提供了更簡單和靈活的方法。

through2 會爲你生成 Transform Streams(貌似舊版本是 Duplex Streams)來處理任意你想使用的流 —— 如前文介紹,相比其它流,Transform 流處理起數據會更加靈活方便。

來看下 through2 的示例:

//demo9
const fs = require('fs');
const through2 = require('through2');
fs.createReadStream('data.txt')
    .pipe(through2(function (chunk, enc, callback) {
        for (var i = 0; i < chunk.length; i++)
            if (chunk[i] == 97)
                chunk[i] = 122; // 把 'a' 替換爲 'z'

        this.push(chunk);

        callback()
    }))
    .pipe(fs.createWriteStream('out.txt'))
    .on('finish', ()=> {
        console.log('DONE')
    });

使用 through2.obj 接口操做 Object Mode 下的流:

//demo10
const fs = require('fs');
const through2 = require('through2');
const csv2 = require('csv2');

let all = [];

fs.createReadStream('list.csv')
    .pipe(csv2())
    // through2.obj(fn) 是 through2({ objectMode: true }, fn) 的簡易封裝
    .pipe(through2.obj(function (chunk, enc, callback) {
        var data = {
            name: chunk[0],
            sex: chunk[1],
            addr: chunk[2]
        };
        this.push(data);

        callback()
    }))
    .on('data', function (data) {
        all.push(data)
    })
    .on('end', function () {
        console.log(all)
    });

對比原生的 Stream API,through2 簡潔了很多,加上有 readable-stream 依賴加持,也很好理解爲什麼像 gulp 及其插件都會使用 through2 來操做和處理 stream 了。

順便貼下對 through2 的源碼註釋:

var Transform = require('readable-stream/transform'),
    inherits = require('util').inherits,
    xtend = require('xtend');

//構造方法,繼承了Transform
function DestroyableTransform(opts) {
    Transform.call(this, opts);
    this._destroyed = false
}

inherits(DestroyableTransform, Transform);

//原型接口 destroy,用於關閉當前流
DestroyableTransform.prototype.destroy = function (err) {
    if (this._destroyed) return;
    this._destroyed = true;

    var self = this;
    process.nextTick(function () {
        if (err)
            self.emit('error', err);
        self.emit('close')
    })
};

// a noop _transform function
function noop(chunk, enc, callback) {
    callback(null, chunk)
}


// 閉包,用於返回對外接口方法
function through2(construct) {
    //最終返回此匿名函數
    return function (options, transform, flush) {
        if (typeof options == 'function') {
            flush = transform
            transform = options
            options = {}
        }

        if (typeof transform != 'function')
            transform = noop

        if (typeof flush != 'function')
            flush = null

        return construct(options, transform, flush)
    }
}


// 出口,執行 throuh2 閉包函數,返回一個 DestroyableTransform 的實例(t2)
module.exports = through2(function (options, transform, flush) {
    //t2 爲 Transform Stream 對象
    var t2 = new DestroyableTransform(options);

    //Transform Streams 的內置接口 _transform(chunk, encoding, next) 方法
    t2._transform = transform;

    if (flush)
        t2._flush = flush;

    return t2
});


// 對外暴露一個能夠直接 new (或者不加 new)來建立實例的的構造函數
module.exports.ctor = through2(function (options, transform, flush) {
    function Through2(override) {
        if (!(this instanceof Through2))
            return new Through2(override)

        this.options = xtend(options, override)

        DestroyableTransform.call(this, this.options)
    }

    inherits(Through2, DestroyableTransform)

    Through2.prototype._transform = transform

    if (flush)
        Through2.prototype._flush = flush

    return Through2
})

//Object Mode接口的簡易封裝
module.exports.obj = through2(function (options, transform, flush) {
    var t2 = new DestroyableTransform(xtend({objectMode: true, highWaterMark: 16}, options))

    t2._transform = transform

    if (flush)
        t2._flush = flush

    return t2
})
View Code

以上是本文對 Stream 的一個介紹,但事實上 Stream 還有許多未露面的 API,感興趣的同窗能夠直接閱讀官方 API文檔作進一步瞭解。

本篇文章是對後續 gulp 源碼解析系列的一個基礎鋪墊,想了解更多 gulp 相關內容的話能夠留意個人博客。最後恭祝你們雞年大吉!共勉~

Reference

⑴ Stream API Doc - https://nodejs.org/api/stream.html

⑵ stream-handbook - https://github.com/substack/stream-handbook

⑶ Node.js Stream - 基礎篇 - http://www.cnblogs.com/zapple/p/5759670.html

⑷ Why I don't use Node's core 'stream' module - https://r.va.gg/2014/06/why-i-dont-use-nodes-core-stream-module.html

相關文章
相關標籤/搜索