nodejs Stream使用手冊

介紹

本文介紹了使用 node.js streams 開發程序的基本方法。html

cc-by-3.0

"We should have some ways of connecting programs like garden hose--screw in
another segment when it becomes necessary to massage data in
another way. This is the way of IO also."

Doug McIlroy. October 11, 1964node

doug mcilroy


最先接觸Stream是從早期的unix開始的 數十年的實踐證實Stream 思想能夠很簡單的開發出一些龐大的系統。在unix裏,Stream是經過 |實現的;在node中,做爲內置的stream模塊,不少核心模塊和三方模塊都使用到。和unix同樣, node Stream主要的操做也是.pipe(),使用者可使用反壓力機制來控制讀和寫的平衡。 Stream 能夠爲開發者提供能夠重複使用統一的接口,經過抽象的Stream接口來控制Stream之間的讀寫平衡。c++


爲何使用Stream

node中的I/O是異步的,所以對磁盤和網絡的讀寫須要經過回調函數來讀取數據,下面是一個文件下載服務器git

的簡單代碼:github

var http = require('http');
var fs = require('fs');

var server = http.createServer(function (req, res) {
    fs.readFile(__dirname + '/data.txt', function (err, data) {
        res.end(data);
    });
});
server.listen(8000);

這些代碼能夠實現須要的功能,可是服務在發送文件數據以前須要緩存整個文件數據到內存,若是"data.txt"文件很 大而且併發量很大的話,會浪費不少內存。由於用戶須要等到整個文件緩存到內存才能接受的文件數據,這樣致使 用戶體驗至關很差。不過還好(req, res)兩個參數都是Stream,這樣咱們能夠用fs.createReadStream()代替npm

fs.readFile():api

var http = require('http');
var fs = require('fs');

var server = http.createServer(function (req, res) {
    var stream = fs.createReadStream(__dirname + '/data.txt');
    stream.pipe(res);
});
server.listen(8000);

.pipe()方法監聽fs.createReadStream()'data''end'事件,這樣"data.txt"文件就不須要緩存整 個文件,當客戶端鏈接完成以後立刻能夠發送一個數據塊到客戶端。使用.pipe()另外一個好處是能夠解決當客戶 端延遲很是大時致使的讀寫不平衡問題。若是想壓縮文件再發送,可使用三方模塊實現:瀏覽器

var http = require('http');
var fs = require('fs');
var oppressor = require('oppressor');

var server = http.createServer(function (req, res) {
    var stream = fs.createReadStream(__dirname + '/data.txt');
    stream.pipe(oppressor(req)).pipe(res);
});
server.listen(8000);

這樣文件就會對支持gzip和deflate的瀏覽器進行壓縮。oppressor 模塊會處理全部的content-encoding緩存

Stream使開發程序變得簡單。服務器

基礎概念

有五種基本的Stream: readable, writable, transform, duplex, and"classic".

pipe

全部類型的Stream收是使用 .pipe() 來建立一個輸入輸出對,接收一個可讀流src並將其數據輸出到可寫流dst,以下:

src.pipe(dst)

.pipe(dst)方法爲返回dst流,這樣就能夠接連使用多個.pipe(),以下:

a.pipe(b).pipe(c).pipe(d)

功能與下面的代碼相同:

a.pipe(b);
b.pipe(c);
c.pipe(d);

這樣的用法十分相似於unix命令下面用法:

a | b | c | d

readable streams

經過調用Readable streams的 .pipe()方法能夠把Readable streams的數據寫入一個

Writable , Transform, 或者Duplex stream。

readableStream.pipe(dst)

建立 readable stream

這裏咱們建立一個readable stream!

var Readable = require('stream').Readable;

var rs = new Readable;
rs.push('beep ');
rs.push('boop\n');
rs.push(null);

rs.pipe(process.stdout);


$ node read0.js
beep boop

rs.push(null) 通知數據接收者數據已經發送完畢. 注意到咱們在將全部數據內容壓入可讀流以前並無調用rs.pipe(process.stdout);,可是咱們壓入的全部數據 內容仍是徹底的輸出了,這是由於可讀流在接收者沒有讀取數據以前,會緩存全部壓入的數據。可是在不少狀況下, 更好的方法是隻有數據接收着請求數據的時候,才壓入數據到可讀流而不是緩存整個數據。下面咱們重寫 一下

._read()函數:

var Readable = require('stream').Readable;
var rs = Readable();

var c = 97;
rs._read = function () {
    rs.push(String.fromCharCode(c++));
    if (c > 'z'.charCodeAt(0)) rs.push(null);
};

rs.pipe(process.stdout);
```

```
$ node read1.js
abcdefghijklmnopqrstuvwxyz

上面的代碼經過重寫_read()方法實現了只有在數據接受者請求數據才向可讀流中壓入數據。_read()方法 也能夠接收一個size參數表示數據請求着請求的數據大小,可是可讀流能夠根據須要忽略這個參數。注意我 們也能夠用util.inherits()繼承可讀流。爲了說明只有在數據接受者請求數據時_read()方法才被調用,咱們 在向可讀流壓入數據時作一個延時,以下:

var Readable = require('stream').Readable;
var rs = Readable();

var c = 97 - 1;

rs._read = function () {
    if (c >= 'z'.charCodeAt(0)) return rs.push(null);
    
    setTimeout(function () {
        rs.push(String.fromCharCode(++c));
    }, 100);
};

rs.pipe(process.stdout);

process.on('exit', function () {
    console.error('\n_read() called ' + (c - 97) + ' times');
});
process.stdout.on('error', process.exit);

用下面的命令運行程序咱們發現_read()方法只調用了5次:

$ node read2.js | head -c5
abcde
_read() called 5 times

使用計時器的緣由是系統須要時間來發送信號來通知程序關閉管道。使用process.stdout.on('error', fn) 是爲了處理系統由於header命令關閉管道而發送SIGPIPE信號,由於這樣會致使process.stdout觸發 EPIPE事件。若是想建立一個的能夠壓入任意形式數據的可讀流,只要在建立流的時候設置參數objectMode 爲true便可,例如:Readable({ objectMode: true })

讀取readable stream數據

大部分狀況下咱們只要簡單的使用pipe方法將可讀流的數據重定向到另外形式的流,可是在某些狀況下也許 直接從可讀流中讀取數據更有用。以下所示:

process.stdin.on('readable', function () {
    var buf = process.stdin.read();
    console.dir(buf);
});



$ (echo abc; sleep 1; echo def; sleep 1; echo ghi) | node consume0.js 
<Buffer 61 62 63 0a>
<Buffer 64 65 66 0a>
<Buffer 67 68 69 0a>
null

當可讀流中有數據可讀取時,流會觸發'readable' 事件,這樣就能夠調用.read()方法來讀取相 關數據,當可讀流中沒有數據可讀取時,.read() 會返回null,這樣就能夠結束.read() 的調用, 等待下一次'readable' 事件的觸發。下面是一個使用.read(n)從標準輸入每次讀取3個字節的例子:

process.stdin.on('readable', function () {
    var buf = process.stdin.read(3);
    console.dir(buf);
});

以下運行程序發現,輸出結果並不徹底!

$ (echo abc; sleep 1; echo def; sleep 1; echo ghi) | node consume1.js 
<Buffer 61 62 63>
<Buffer 0a 64 65>
<Buffer 66 0a 67>

這是應爲額外的數據數據留在流的內部緩衝區裏了,而咱們須要通知流咱們要讀取更多的數據.read(0) 能夠達到這個目的。

process.stdin.on('readable', function () {
    var buf = process.stdin.read(3);
    console.dir(buf);
    process.stdin.read(0);
});

此次運行結果以下:

$ (echo abc; sleep 1; echo def; sleep 1; echo ghi) | node consume2.js 
<Buffer 61 62 63>
<Buffer 0a 64 65>
<Buffer 66 0a 67>
<Buffer 68 69 0a>

咱們可使用 .unshift() 將數據從新押回流數據隊列的頭部,這樣能夠接續讀取押回的數據。以下面 的代碼,會按行輸出標準輸入的內容:

var offset = 0;

process.stdin.on('readable', function () {
    var buf = process.stdin.read();
    if (!buf) return;
    for (; offset < buf.length; offset++) {
        if (buf[offset] === 0x0a) {
            console.dir(buf.slice(0, offset).toString());
            buf = buf.slice(offset + 1);
            offset = 0;
            process.stdin.unshift(buf);
            return;
        }
    }
    process.stdin.unshift(buf);
});

$ tail -n +50000 /usr/share/dict/american-english | head -n10 | node lines.js 
'hearties'
'heartiest'
'heartily'
'heartiness'
'heartiness\'s'
'heartland'
'heartland\'s'
'heartlands'
'heartless'
'heartlessly'

固然,有不少模塊能夠實現這個功能,如:split

writable streams

writable streams只能夠做爲.pipe()函數的目的參數。以下代碼:

src.pipe(writableStream)

建立 writable stream

重寫 ._write(chunk, enc, next) 方法就能夠接受一個readable stream的數據。

var Writable = require('stream').Writable;
var ws = Writable();
ws._write = function (chunk, enc, next) {
    console.dir(chunk);
    next();
};

process.stdin.pipe(ws);

$ (echo beep; sleep 1; echo boop) | node write0.js 
<Buffer 62 65 65 70 0a>
<Buffer 62 6f 6f 70 0a>

第一個參數chunk是數據輸入者寫入的數據。第二個參數end是數據的編碼格式。第三個參數next(err)經過回調函數通知數據寫入者能夠寫入更多的時間。若是readable stream寫入的是字符串,那麼字符串會默認轉換爲Buffer,若是在建立流的時候設置Writable({ decodeStrings: false })參數,那麼不會作轉換。若是readable stream寫入的數據時對象,那麼須要這樣建立writable stream

Writable({ objectMode: true })

寫數據到 writable stream

調用writable stream的.write(data)方法便可完成數據寫入。

process.stdout.write('beep boop\n');

調用.end()方法通知writable stream 數據已經寫入完成。

var fs = require('fs');
var ws = fs.createWriteStream('message.txt');

ws.write('beep ');

setTimeout(function () {
    ws.end('boop\n');
}, 1000);

$ node writing1.js 
$ cat message.txt
beep boop

若是須要設置writable stream的緩衝區的大小,那麼在建立流的時候,須要設置opts.highWaterMark, 這樣若是緩衝區裏的數據超過opts.highWaterMark.write(data)方法會返回false。當緩衝區可寫的 時候,writable stream會觸發'drain' 事件。

classic streams

Classic streams比較老的接口了,最先出如今node 0.4版本中,可是瞭解一下其運行原理仍是十分有好 處的。當一個流被註冊了"data" 事件的回到函數,那麼流就會工做在老版本模式下,即會使用老的API。

classic readable streams

Classic readable streams事件就是一個事件觸發器,若是Classic readable streams有數據可讀取,那 麼其觸發 "data" 事件,等到數據讀取完畢時,會觸發"end" 事件。.pipe() 方法經過檢查stream.readable 的值肯定流是否有數據可讀。下面是一個使用Classic readable streams打印A-J字母的例子:

var Stream = require('stream');
var stream = new Stream;
stream.readable = true;

var c = 64;
var iv = setInterval(function () {
    if (++c >= 75) {
        clearInterval(iv);
        stream.emit('end');
    }
    else stream.emit('data', String.fromCharCode(c));
}, 100);

stream.pipe(process.stdout);


$ node classic0.js
ABCDEFGHIJ

若是要從classic readable stream中讀取數據,註冊"data""end"兩個事件的回調函數便可,代碼以下:

process.stdin.on('data', function (buf) {
    console.log(buf);
});
process.stdin.on('end', function () {
    console.log('__END__');
});


$ (echo beep; sleep 1; echo boop) | node classic1.js 
<Buffer 62 65 65 70 0a>
<Buffer 62 6f 6f 70 0a>
__END__

須要注意的是若是你使用這種方式讀取數據,那麼會失去使用新接口帶來的好處。好比你在往一個 延遲很是大的流寫數據時,須要注意讀取數據和寫數據的平衡問題,不然會致使大量數據緩存在內 存中,致使浪費大量內存。通常這時候強烈建議使用流的.pipe()方法,這樣就不用本身監聽"data""end"事件了,也不用擔憂讀寫不平衡的問題了。固然你也能夠用 through代替本身監聽

"data""end" 事件,以下面的代碼:

var through = require('through');
process.stdin.pipe(through(write, end));

function write (buf) {
    console.log(buf);
}
function end () {
    console.log('__END__');
}

$ (echo beep; sleep 1; echo boop) | node through.js 
<Buffer 62 65 65 70 0a>
<Buffer 62 6f 6f 70 0a>
__END__

或者也可使用concat-stream來緩存整個流的內容:

var concat = require('concat-stream');
process.stdin.pipe(concat(function (body) {
    console.log(JSON.parse(body));
}));


$ echo '{"beep":"boop"}' | node concat.js 
{ beep: 'boop' }

固然若是你非要本身監聽"data""end"事件,那麼你能夠在寫數據的流不可寫的 時候使用.pause()方法暫停Classic readable streams繼續觸發"data" 事件。等到 寫數據的流可寫的時候再使用.resume() 方法通知流繼續觸發"data" 事件繼續讀取 數據。

classic writable streams

Classic writable streams 很是簡單。只有 .write(buf), .end(buf).destroy()三個方法。.end(buf) 方法的buf參數是可選的,若是選擇該參數,至關於stream.write(buf); stream.end() 這樣的操做,須要注意的是當流的緩衝區寫滿即流不可寫時.write(buf)方法會返回false,若是流再次可寫時,流會觸發drain事件。

transform

transform是一個對讀入數據過濾然輸出的流。

duplex

duplex stream是一個可讀也可寫的雙向流,以下面的a就是一個duplex stream:

a.pipe(b).pipe(a)

read more

  • core stream documentation
  • You can use the readable-stream module to make your streams2 code compliant with node 0.8 and below. Just require('readable-stream') instead of require('stream') after you npm install readable-stream.
相關文章
相關標籤/搜索