[轉]nodejs Stream使用手冊

介紹
本文介紹了使用 node.js streams 開發程序的基本方法。html

"We should have some ways of connecting programs like garden hose--screw in
another segment when it becomes necessary to massage data in
another way. This is the way of IO also."

Doug McIlroy. October 11, 1964node

最先接觸Stream是從早期的unix開始的
數十年的實踐證實Stream 思想能夠很簡單的開發出一些龐大的系統。在unix裏,Stream是經過
|實現的;在node中,做爲內置的stream模塊,不少核心模塊和三方模塊都使用到。和unix同樣,
node Stream主要的操做也是.pipe(),使用者可使用反壓力機制來控制讀和寫的平衡。
Stream 能夠爲開發者提供能夠重複使用統一的接口,經過抽象的Stream接口來控制Stream之間的讀寫平衡。c++

爲何使用Stream
node中的I/O是異步的,所以對磁盤和網絡的讀寫須要經過回調函數來讀取數據,下面是一個文件下載服務器git

的簡單代碼:github

var http = require('http');
var fs = require('fs');

var server = http.createServer(function (req, res) {
    fs.readFile(__dirname + '/data.txt', function (err, data) {
        res.end(data);
    });
});
server.listen(8000);

這些代碼能夠實現須要的功能,可是服務在發送文件數據以前須要緩存整個文件數據到內存,若是"data.txt"文件很
大而且併發量很大的話,會浪費不少內存。由於用戶須要等到整個文件緩存到內存才能接受的文件數據,這樣致使
用戶體驗至關很差。不過還好(req, res)兩個參數都是Stream,這樣咱們能夠用fs.createReadStream()代替fs.readFile():npm

var http = require('http');
var fs = require('fs');

var server = http.createServer(function (req, res) {
    var stream = fs.createReadStream(__dirname + '/data.txt');
    stream.pipe(res);
});
server.listen(8000);

.pipe()方法監聽fs.createReadStream()'data''end'事件,這樣'data.txt'文件就不須要緩存整
個文件,當客戶端鏈接完成以後立刻能夠發送一個數據塊到客戶端。使用.pipe()另外一個好處是能夠解決當客戶
端延遲很是大時致使的讀寫不平衡問題。若是想壓縮文件再發送,可使用三方模塊實現:api

var http = require('http');
var fs = require('fs');
var oppressor = require('oppressor');

var server = http.createServer(function (req, res) {
    var stream = fs.createReadStream(__dirname + '/data.txt');
    stream.pipe(oppressor(req)).pipe(res);
});
server.listen(8000);

這樣文件就會對支持gzipdeflate的瀏覽器進行壓縮。oppressor 模塊會處理全部的content-encoding瀏覽器

Stream使開發程序變得簡單。緩存

基礎概念
有五種基本的Stream: readable, writable, transform, duplex, and"classic」.服務器

pipe

全部類型的Stream收是使用 .pipe() 來建立一個輸入輸出對,接收一個可讀流src並將其數據輸出到可寫流dst,以下:

src.pipe(dst)

.pipe(dst)方法爲返回dst流,這樣就能夠接連使用多個.pipe(),以下:

a.pipe(b).pipe(c).pipe(d)

功能與下面的代碼相同:

a.pipe(b);
b.pipe(c);
c.pipe(d);

這樣的用法十分相似於unix命令下面用法:

a | b | c | d

readable streams

經過調用Readable streams的 .pipe()方法能夠把Readable streams的數據寫入一個

Writable , Transform, 或者Duplex stream。

readableStream.pipe(dst)

建立 readable stream

這裏咱們建立一個readable stream!

var Readable = require('stream').Readable;

var rs = new Readable;
rs.push('beep ');
rs.push('boop\n');
rs.push(null);

rs.pipe(process.stdout);


$ node read0.js
beep boop

rs.push(null) 通知數據接收者數據已經發送完畢.
注意到咱們在將全部數據內容壓入可讀流以前並無調用rs.pipe(process.stdout);,可是咱們壓入的全部數據
內容仍是徹底的輸出了,這是由於可讀流在接收者沒有讀取數據以前,會緩存全部壓入的數據。可是在不少狀況下,更好的方法是隻有數據接收着請求數據的時候,才壓入數據到可讀流而不是緩存整個數據。下面咱們重寫 一下
._read()函數:

var Readable = require('stream').Readable;
var rs = Readable();

var c = 97;
rs._read = function () {
    rs.push(String.fromCharCode(c++));
    if (c > 'z'.charCodeAt(0)) rs.push(null);
};

rs.pipe(process.stdout);

$ node read1.js
abcdefghijklmnopqrstuvwxyz

上面的代碼經過重寫_read()方法實現了只有在數據接受者請求數據才向可讀流中壓入數據。_read()方法也能夠接收一個size參數表示數據請求着請求的數據大小,可是可讀流能夠根據須要忽略這個參數。注意咱們也能夠用util.inherits()繼承可讀流。爲了說明只有在數據接受者請求數據時_read()方法才被調用,咱們在向可讀流壓入數據時作一個延時,以下:

var Readable = require('stream').Readable;
var rs = Readable();

var c = 97 - 1;

rs._read = function () {
    if (c >= 'z'.charCodeAt(0)) return rs.push(null);

    setTimeout(function () {
        rs.push(String.fromCharCode(++c));
    }, 100);
};

rs.pipe(process.stdout);

process.on('exit', function () {
    console.error('\n_read() called ' + (c - 97) + ' times');
});
process.stdout.on('error', process.exit);

用下面的命令運行程序咱們發現_read()方法只調用了5次:

$ node read2.js | head -c5
abcde
_read() called 5 times

使用計時器的緣由是系統須要時間來發送信號來通知程序關閉管道。使用process.stdout.on('error', fn) 是爲了處理系統由於header命令關閉管道而發送SIGPIPE信號,由於這樣會致使process.stdout觸發EPIPE事件。若是想建立一個的能夠壓入任意形式數據的可讀流,只要在建立流的時候設置參數objectModetrue便可,例如:Readable({ objectMode: true })

讀取readable stream數據

大部分狀況下咱們只要簡單的使用pipe方法將可讀流的數據重定向到另外形式的流,可是在某些狀況下也許直接從可讀流中讀取數據更有用。以下所示:

process.stdin.on('readable', function () {
    var buf = process.stdin.read();
    console.dir(buf);
});



$ (echo abc; sleep 1; echo def; sleep 1; echo ghi) | node consume0.js 
<Buffer 61 62 63 0a>
<Buffer 64 65 66 0a>
<Buffer 67 68 69 0a>
null

當可讀流中有數據可讀取時,流會觸發'readable' 事件,這樣就能夠調用.read()方法來讀取相關數據,當可讀流中沒有數據可讀取時,.read() 會返回null,這樣就能夠結束.read() 的調用, 等待下一次'readable' 事件的觸發。下面是一個使用.read(n)從標準輸入每次讀取3個字節的例子:

process.stdin.on('readable', function () {
    var buf = process.stdin.read(3);
    console.dir(buf);
});

以下運行程序發現,輸出結果並不徹底!

$ (echo abc; sleep 1; echo def; sleep 1; echo ghi) | node consume1.js 
<Buffer 61 62 63>
<Buffer 0a 64 65>
<Buffer 66 0a 67>

這是應爲額外的數據數據留在流的內部緩衝區裏了,而咱們須要通知流咱們要讀取更多的數據.read(0) 能夠達到這個目的。

process.stdin.on('readable', function () {
    var buf = process.stdin.read(3);
    console.dir(buf);
    process.stdin.read(0);
});

此次運行結果以下:

$ (echo abc; sleep 1; echo def; sleep 1; echo ghi) | node consume2.js 
<Buffer 61 62 63>
<Buffer 0a 64 65>
<Buffer 66 0a 67>
<Buffer 68 69 0a>

咱們可使用 .unshift() 將數據從新押回流數據隊列的頭部,這樣能夠接續讀取押回的數據。以下面的代碼,會按行輸出標準輸入的內容:

var offset = 0;

process.stdin.on('readable', function () {
    var buf = process.stdin.read();
    if (!buf) return;
    for (; offset < buf.length; offset++) {
        if (buf[offset] === 0x0a) {
            console.dir(buf.slice(0, offset).toString());
            buf = buf.slice(offset + 1);
            offset = 0;
            process.stdin.unshift(buf);
            return;
        }
    }
    process.stdin.unshift(buf);
});

$ tail -n +50000 /usr/share/dict/american-english | head -n10 | node lines.js 
'hearties'
'heartiest'
'heartily'
'heartiness'
'heartiness\'s'
'heartland'
'heartland\'s'
'heartlands'
'heartless'
'heartlessly'

固然,有不少模塊能夠實現這個功能,如:split

writable streams

重寫 ._write(chunk, enc, next) 方法就能夠接受一個readable stream的數據。

var Writable = require('stream').Writable;
var ws = Writable();
ws._write = function (chunk, enc, next) {
    console.dir(chunk);
    next();
};

process.stdin.pipe(ws);

$ (echo beep; sleep 1; echo boop) | node write0.js 
<Buffer 62 65 65 70 0a>
<Buffer 62 6f 6f 70 0a>

第一個參數chunk是數據輸入者寫入的數據。第二個參數end是數據的編碼格式。第三個參數next(err)經過回調函數通知數據寫入者能夠寫入更多的時間。若是readable stream寫入的是字符串,那麼字符串會默認轉換爲Buffer,若是在建立流的時候設置Writable({ decodeStrings: false })參數,那麼不會作轉換。若是readable stream寫入的數據時對象,那麼須要這樣建立writable stream

Writable({ objectMode: true })

寫數據到 writable stream

調用writable stream的.write(data)方法便可完成數據寫入。

process.stdout.write('beep boop\n');

調用.end()方法通知writable stream 數據已經寫入完成。

var fs = require('fs');
var ws = fs.createWriteStream('message.txt');

ws.write('beep ');

setTimeout(function () {
    ws.end('boop\n');
}, 1000);

$ node writing1.js 
$ cat message.txt
beep boop

若是須要設置writable stream的緩衝區的大小,那麼在建立流的時候,須要設置opts.highWaterMark,這樣若是緩衝區裏的數據超過opts.highWaterMark.write(data)方法會返回false。當緩衝區可寫的時候,writable stream會觸發'drain' 事件。

classic streams

Classic streams比較老的接口了,最先出如今node 0.4版本中,可是瞭解一下其運行原理仍是十分有好處的。當一個流被註冊了"data"事件的回到函數,那麼流就會工做在老版本模式下,即會使用老的API。

classic readable streams

Classic readable streams事件就是一個事件觸發器,若是Classic readable streams有數據可讀取,那麼其觸發 "data" 事件,等到數據讀取完畢時,會觸發"end" 事件。.pipe() 方法經過檢查stream.readable的值肯定流是否有數據可讀。下面是一個使用Classic readable streams打印A-J字母的例子:

var Stream = require('stream');
var stream = new Stream;
stream.readable = true;

var c = 64;
var iv = setInterval(function () {
    if (++c >= 75) {
        clearInterval(iv);
        stream.emit('end');
    }
    else stream.emit('data', String.fromCharCode(c));
}, 100);

stream.pipe(process.stdout);


$ node classic0.js
ABCDEFGHIJ

若是要從classic readable stream中讀取數據,註冊"data""end"兩個事件的回調函數便可,代碼以下:

process.stdin.on('data', function (buf) {
    console.log(buf);
});
process.stdin.on('end', function () {
    console.log('__END__');
});


$ (echo beep; sleep 1; echo boop) | node classic1.js 
<Buffer 62 65 65 70 0a>
<Buffer 62 6f 6f 70 0a>
__END__

須要注意的是若是你使用這種方式讀取數據,那麼會失去使用新接口帶來的好處。好比你在往一個
延遲很是大的流寫數據時,須要注意讀取數據和寫數據的平衡問題,不然會致使大量數據緩存在內
存中,致使浪費大量內存。通常這時候強烈建議使用流的.pipe()方法,這樣就不用本身監聽"data"
"end"事件了,也不用擔憂讀寫不平衡的問題了。固然你也能夠用 through代替本身監聽"data""end" 事件,以下面的代碼:

var through = require('through');
process.stdin.pipe(through(write, end));

function write (buf) {
    console.log(buf);
}
function end () {
    console.log('__END__');
}

$ (echo beep; sleep 1; echo boop) | node through.js 
<Buffer 62 65 65 70 0a>
<Buffer 62 6f 6f 70 0a>
__END__

或者也可使用concat-stream來緩存整個流的內容:

var concat = require('concat-stream');
process.stdin.pipe(concat(function (body) {
    console.log(JSON.parse(body));
}));


$ echo '{"beep":"boop"}' | node concat.js 
{ beep: 'boop' }

固然若是你非要本身監聽"data""end"事件,那麼你能夠在寫數據的流寫的 時候使用.pause()方法暫停Classic readable streams繼續觸發"data" 事件。等到寫數據的流可寫的時候再使用.resume() 方法通知流繼續觸發"data" 事件繼續讀取數據。

classic writable streams

Classic writable streams 很是簡單。只有 .write(buf), .end(buf).destroy()三個方法。.end(buf) 方法的buf參數是可選的,若是選擇該參數,至關於stream.write(buf); stream.end() 這樣的操做,須要注意的是當流的緩衝區寫滿即流不可寫時.write(buf)方法會返回false,若是流再次可寫時,流會觸發drain事件。

transform

transform是一個對讀入數據過濾然輸出的流。

duplex

duplex stream是一個可讀也可寫的雙向流,以下面的a就是一個duplex stream:

a.pipe(b).pipe(a)

read more

相關文章
相關標籤/搜索