Node.js 流

穩定性: 2 - 不穩定

流是一個抽象接口,在 Node 裏被不一樣的對象實現。例如request to an HTTPserver 是流,stdout 是流。流是可讀,可寫,或者可讀寫。全部的流是 EventEmitter 的實例。javascript

你能夠經過 require('stream') 加載 Stream 基類。其中包括了 Readable 流、Writable 流、Duplex 流和 Transform 流的基類。css

這個文檔分爲 3 個章節。第一個章節解釋了在你的程序中使用流時候須要瞭解的部分。若是你不用實現流式 API,能夠只看這個章節。html

若是你想實現你本身的流,第二個章節解釋了這部分 API。這些 API 讓你的實現更加簡單。java

第三個部分深刻的解釋了流是如何工做的,包括一些內部機制和函數,這些內容不要改動,除非你明確知道你要作什麼。node

面向流消費者的 API

流能夠是可讀(Readable),可寫(Writable),或者兼具二者(Duplex,雙工)的。json

全部的流都是事件分發器(EventEmitters),可是也有本身的方法和屬性,這取決於他它們是可讀(Readable),可寫(Writable),或者兼具二者(Duplex,雙工)的。api

若是流式可讀寫的,則它實現了下面的全部方法和事件。所以,這個章節 API 徹底闡述了DuplexTransform 流,即使他們的實現有所不一樣。緩存

沒有必要爲了消費流而在你的程序裏實現流的接口。若是你正在你的程序裏實現流接口,請同時參考下面的API for Stream Implementors安全

基本全部的 Node 程序,不管多簡單,都會使用到流。這有一個使用流的例子。app

javascript
var http = require('http'); var server = http.createServer(function (req, res) { // req is an http.IncomingMessage, which is 可讀流(Readable stream) // res is an http.ServerResponse, which is a Writable Stream var body = ''; // we want to get the data as utf8 strings // If you don't set an encoding, then you'll get Buffer objects req.setEncoding('utf8'); // 可讀流(Readable stream) emit 'data' 事件 once a 監聽器(listener) is added req.on('data', function (chunk) { body += chunk; }); // the end 事件 tells you that you have entire body req.on('end', function () { try { var data = JSON.parse(body); } catch (er) { // uh oh! bad json! res.statusCode = 400; return res.end('error: ' + er.message); } // write back something interesting to the user: res.write(typeof data); res.end(); }); }); server.listen(1337); // $ curl localhost:1337 -d '{}' // object // $ curl localhost:1337 -d '"foo"' // string // $ curl localhost:1337 -d 'not json' // error: Unexpected token o

類: stream.Readable

可讀流(Readable stream)接口是對你正在讀取的數據的來源的抽象。換句話說,數據來來自

可讀流(Readable stream)不會分發數據,直到你代表準備就緒。

可讀流(Readable stream) 有2種模式: 流動模式(flowing mode)暫停模式(paused mode). 流動模式(flowing mode)時,儘快的從底層系統讀取數據並提供給你的程序。 暫停模式(paused mode)時, 你必須明確的調用 stream.read() 來讀取數據。 暫停模式(paused mode) 是默認模式。

注意: 若是沒有綁定數據處理函數,而且沒有 pipe() 目標,流會切換到流動模式(flowing mode),而且數據會丟失。

能夠經過下面幾個方法,將流切換到流動模式(flowing mode)。

  • 添加一個 ['data' 事件][] 事件處理器來監聽數據.
  • 調用 resume() 方法來明確的開啓數據流。
  • 調用 pipe() 方法來發送數據給Writable.

能夠經過如下方法來切換到暫停模式(paused mode):

  • 若是沒有 導流(pipe) 目標,調用 pause()方法.
  • 若是有 導流(pipe) 目標, 移除全部的 ['data' 事件][]處理函數, 調用 unpipe() 方法移除全部的 導流(pipe) 目標。

注意, 爲了向後兼容考慮, 移除 'data' 事件監聽器並不會自動暫停流。一樣的,當有導流目標時,調用 pause() 並不能保證流在那些目標排空後,請求更多數據時保持暫停狀態。

可讀流(Readable stream)例子包括:

事件: 'readable'

當一個數據塊能夠從流中讀出,將會觸發'readable' 事件.`

某些狀況下, 若是沒有準備好,監聽一個 'readable' 事件將會致使一些數據從底層系統讀取到內部緩存。

javascript
var readble = getReadableStreamSomehow(); readable.on('readable', function() { // there is some data to read now });

一旦內部緩存排空,一旦有更多數據將會再次觸發 readable 事件。

事件: 'data'

  • chunk {Buffer | String} 數據塊

綁定一個 data 事件的監聽器(listener)到一個未明確暫停的流,會將流切換到流動模式。數據會盡額能的傳遞。

若是你像儘快的從流中獲取數據,這是最快的方法。

javascript
var readable = getReadableStreamSomehow(); readable.on('data', function(chunk) { console.log('got %d bytes of data', chunk.length); }); 

事件: 'end'

若是沒有更多的可讀數據,將會觸發這個事件。

注意,除非數據已經被徹底消費, the end 事件纔會觸發。 能夠經過切換到流動模式(flowing mode)來實現,或者經過調用重複調用 read()獲取數據,直到結束。

javascript
    var readable = getReadableStreamSomehow(); readable.on('data', function(chunk) { console.log('got %d bytes of data', chunk.length); }); readable.on('end', function() { console.log('there will be no more data.'); }); 

事件: 'close'

當底層資源(例如源頭的文件描述符)關閉時觸發。並非全部流都會觸發這個事件。

事件: 'error'

  • {Error Object}

當接收數據時發生錯誤觸發。

readable.read([size])

  • size {Number} 可選參數, 須要讀入的數據量
  • 返回 {String | Buffer | null}

read() 方法從內部緩存中拉取數據。若是沒有可用數據,將會返回null

若是傳了 size參數,將會返回至關字節的數據。若是size不可用,將會返回 null

若是你沒有指定 size 參數。將會返回內部緩存的全部數據。

這個方法僅能再暫停模式(paused mode)裏調用. 流動模式(flowing mode)下這個方法會被自動調用直到內存緩存排空。

javascript
var readable = getReadableStreamSomehow(); readable.on('readable', function() { var chunk; while (null !== (chunk = readable.read())) { console.log('got %d bytes of data', chunk.length); } });

若是這個方法返回一個數據塊, 它同時也會觸發['data' 事件][].

readable.setEncoding(encoding)

  • encoding {String} 要使用的編碼.
  • 返回: this

調用此函數會使得流返回指定編碼的字符串,而不是 Buffer 對象。例如,若是你調用readable.setEncoding('utf8'),輸出數據將會是UTF-8 編碼,而且返回字符串。若是你調用 readable.setEncoding('hex'),將會返回2進制編碼的數據。

該方法能正確處理多字節字符。若是不想這麼作,僅簡單的直接拉取緩存並調buf.toString(encoding) ,可能會致使字節錯位。所以,若是你想以字符串讀取數據,請使用這個方法。

javascript
var readable = getReadableStreamSomehow(); readable.setEncoding('utf8'); readable.on('data', function(chunk) { assert.equal(typeof chunk, 'string'); console.log('got %d characters of string data', chunk.length); });

readable.resume()

  • 返回: this

這個方法讓可讀流(Readable stream)繼續觸發 data 事件.

這個方法會將流切換到流動模式(flowing mode). 若是你不想從流中消費數據,而想獲得end 事件,能夠調用 readable.resume() 來打開數據流。

javascript
var readable = getReadableStreamSomehow(); readable.resume(); readable.on('end', function(chunk) { console.log('got to the end, but did not read anything'); });

readable.pause()

  • 返回: this

這個方法會使得流動模式(flowing mode)的流中止觸發 data 事件, 切換到流動模式(flowing mode). 並讓後續可用數據留在內部緩衝區中。

javascript
var readable = getReadableStreamSomehow(); readable.on('data', function(chunk) { console.log('got %d bytes of data', chunk.length); readable.pause(); console.log('there will be no more data for 1 second'); setTimeout(function() { console.log('now data will start flowing again'); readable.resume(); }, 1000); });

readable.isPaused()

  • 返回: Boolean

這個方法返回readable 是否被客戶端代碼 明確的暫停(調用 readable.pause())。

var readable = new stream.Readable readable.isPaused() // === false readable.pause() readable.isPaused() // === true readable.resume() readable.isPaused() // === false

readable.pipe(destination[, options])

  • destination {Writable Stream} 寫入數據的目標
  • options {Object} 導流(pipe) 選項
    • end {Boolean} 讀取到結束符時,結束寫入者。默認 = true

這個方法從可讀流(Readable stream)拉取全部數據, 並將數據寫入到提供的目標中。自動管理流量,這樣目標不會快速的可讀流(Readable stream)淹沒。

能夠導流到多個目標。

javascript
var readable = getReadableStreamSomehow(); var writable = fs.createWriteStream('file.txt'); // All the data from readable goes into 'file.txt' readable.pipe(writable);

這個函數返回目標流, 所以你能夠創建導流鏈:

javascript
var r = fs.createReadStream('file.txt'); var z = zlib.createGzip(); var w = fs.createWriteStream('file.txt.gz'); r.pipe(z).pipe(w);

例如, 模擬 Unix 的 cat 命令:

javascript process.stdin.pipe(process.stdout);

默認狀況下,當源數據流觸發 end的時候調用end(),因此 destination 不可再寫。傳 { end:false }做爲options,能夠保持目標流打開狀態。

這會讓 writer保持打開狀態,能夠在最後寫入"Goodbye" 。

javascript
reader.pipe(writer, { end: false }); reader.on('end', function() { writer.end('Goodbye\n'); });

注意 process.stderrprocess.stdout 直到進程結束纔會關閉,不管是否指定

readable.unpipe([destination])

  • destination {Writable Stream} 可選,指定解除導流的流

這個方法會解除以前調用 pipe() 設置的鉤子( pipe() )。

若是沒有指定 destination,全部的 導流(pipe) 都會被移除。

若是指定了 destination,可是沒有創建若是沒有指定 destination,則什麼事情都不會發生。

javascript
var readable = getReadableStreamSomehow(); var writable = fs.createWriteStream('file.txt'); // All the data from readable goes into 'file.txt', // but only for the first second readable.pipe(writable); setTimeout(function() { console.log('stop writing to file.txt'); readable.unpipe(writable); console.log('manually close the file stream'); writable.end(); }, 1000);

readable.unshift(chunk)

  • chunk {Buffer | String} 數據塊插入到讀隊列中

這個方法頗有用,當一個流正被一個解析器消費,解析器可能須要將某些剛拉取出的數據「逆消費」,返回到原來的源,以便流能將它傳遞給其它消費者。

若是你在程序中必須常常調用 stream.unshift(chunk) ,那你能夠考慮實現Transform來替換(參見下文API for Stream Implementors)。

javascript
// Pull off a header delimited by \n\n // use unshift() if we get too much // Call the callback with (error, header, stream) var StringDecoder = require('string_decoder').StringDecoder; function parseHeader(stream, callback) { stream.on('error', callback); stream.on('readable', onReadable); var decoder = new StringDecoder('utf8'); var header = ''; function onReadable() { var chunk; while (null !== (chunk = stream.read())) { var str = decoder.write(chunk); if (str.match(/\n\n/)) { // found the header boundary var split = str.split(/\n\n/); header += split.shift(); var remaining = split.join('\n\n'); var buf = new Buffer(remaining, 'utf8'); if (buf.length) stream.unshift(buf); stream.removeListener('error', callback); stream.removeListener('readable', onReadable); // now the body of the message can be read from the stream. callback(null, header, stream); } else { // still reading the header. header += str; } } } }

readable.wrap(stream)

  • stream {Stream} 一箇舊式的可讀流(Readable stream)

v0.10 版本以前的 Node 流並未實現如今全部流的API(更多信息詳見下文「兼容性」章節)。

若是你使用的是舊的 Node 庫,它觸發 'data' 事件,並擁有僅作查詢用的pause() 方法,那麼你能使用wrap() 方法來建立一個Readable 流來使用舊版本的流,做爲數據源。

你應該不多須要用到這個函數,但它會留下方便和舊版本的 Node 程序和庫交互。

例如:

javascript
var OldReader = require('./old-api-module.js').OldReader; var oreader = new OldReader; var Readable = require('stream').Readable; var myReader = new Readable().wrap(oreader); myReader.on('readable', function() { myReader.read(); // etc. });

類: stream.Writable

可寫流(Writable stream )接口是你正把數據寫到一個目標的抽象。

可寫流(Writable stream )的例子包括:

writable.write(chunk[, encoding][, callback])

  • chunk {String | Buffer} 準備寫的數據
  • encoding {String} 編碼方式(若是chunk 是字符串)
  • callback {Function} 數據塊寫入後的回調
  • 返回: {Boolean} 若是數據已被所有處理返回true

這個方法向底層系統寫入數據,並在數據處理完畢後調用所給的回調。

返回值表示你是否應該繼續當即寫入。若是數據要緩存在內部,將會返回false。不然返回 true

返回值僅供參考。即便返回 false,你也可能繼續寫。可是寫會緩存在內存裏,因此不要作的太過度。最好的辦法是等待drain 事件後,再寫入數據。

事件: 'drain'

若是調用 writable.write(chunk) 返回 false, drain 事件會告訴你何時將更多的數據寫入到流中。

javascript
// Write the data to the supplied 可寫流(Writable stream ) 1MM times. // Be attentive to back-pressure. function writeOneMillionTimes(writer, data, encoding, callback) { var i = 1000000; write(); function write() { var ok = true; do { i -= 1; if (i === 0) { // last time! writer.write(data, encoding, callback); } else { // see if we should continue, or wait // don't pass the callback, because we're not done yet. ok = writer.write(data, encoding); } } while (i > 0 && ok); if (i > 0) { // had to stop early! // write some more once it drains writer.once('drain', write); } } }

writable.cork()

強制緩存全部寫入。

調用 .uncork().end()後,會把緩存數據寫入。

writable.uncork()

寫入全部 .cork() 調用以後緩存的數據。

writable.setDefaultEncoding(encoding)

  • encoding {String} 新的默認編碼
  • 返回: Boolean

給寫數據流設置默認編碼方式,如編碼有效,返回 true ,不然返回 false

writable.end([chunk][, encoding][, callback])

  • chunk {String | Buffer} 可選,要寫入的數據
  • encoding {String} 編碼方式(若是 chunk 是字符串)
  • callback {Function} 可選, stream 結束時的回調函數

當沒有更多的數據寫入的時候調用這個方法。若是給出,回調會被用做 finish 事件的監聽器。

調用 end() 後調用 write() 會產生錯誤。

javascript
// write 'hello, ' and then end with 'world!' var file = fs.createWriteStream('example.txt'); file.write('hello, '); file.end('world!'); // writing more now is not allowed!

事件: 'finish'

調用end() 方法後,而且全部的數據已經寫入到底層系統,將會觸發這個事件。

javascript
var writer = getWritableStreamSomehow(); for (var i = 0; i < 100; i ++) { writer.write('hello, #' + i + '!\n'); } writer.end('this is the end\n'); writer.on('finish', function() { console.error('all writes are now complete.'); });

事件: 'pipe'

  • src {Readable Stream} 是導流(pipe)到可寫流的源流

不管什麼時候在可寫流(Writable stream )上調用pipe() 方法,都會觸發 'pipe' 事件,添加這個流到目標。

javascript
var writer = getWritableStreamSomehow(); var reader = getReadableStreamSomehow(); writer.on('pipe', function(src) { console.error('something is piping into the writer'); assert.equal(src, reader); }); reader.pipe(writer);

事件: 'unpipe'

不管什麼時候在可寫流(Writable stream )上調用unpipe() 方法,都會觸發 'unpipe' 事件,將這個流從目標上移除。

javascript
var writer = getWritableStreamSomehow(); var reader = getReadableStreamSomehow(); writer.on('unpipe', function(src) { console.error('something has stopped piping into the writer'); assert.equal(src, reader); }); reader.pipe(writer); reader.unpipe(writer);

事件: 'error'

  • {Error object}

寫或導流(pipe)數據時,若是有錯誤會觸發。

類: stream.Duplex

雙工流(Duplex streams)是同時實現了 Readable and Writable 接口。用法詳見下文。

雙工流(Duplex streams) 的例子包括:

類: stream.Transform

轉換流(Transform streams) 是雙工 Duplex 流,它的輸出是從輸入計算得來。 它實現了ReadableWritable 接口. 用法詳見下文.

轉換流(Transform streams) 的例子包括:

API for Stream Implementors

不管實現什麼形式的流,模式都是同樣的:

  1. 在你的子類中擴展適合的父類. (util.inherits 方法頗有幫助)
  2. 在你的構造函數中調用父類的構造函數,以確保內部的機制初始化正確。
  3. 實現一個或多個方法,以下所列

所擴展的類和要實現的方法取決於你要編寫的流類。

Use-case

Class

方法(s) to implement

Reading only

[Readable](#stream_class_stream_readable_1)

[_read][]

Writing only

[Writable](#stream_class_stream_writable_1)

[_write][]

Reading and writing

[Duplex](#stream_class_stream_duplex_1)

[_read][], [_write][]

Operate on written data, then read the result

[Transform](#stream_class_stream_transform_1)

_transform, _flush

在你的代碼裏,千萬不要調用 API for Stream Consumers 裏的方法。不然可能會引發消費流的程序反作用。

類: stream.Readable

stream.Readable 是一個可被擴充的、實現了底層 _read(size) 方法的抽象類。

參照以前的API for Stream Consumers查看如何在你的程序裏消費流。底下內容解釋了在你的程序裏如何實現可讀流(Readable stream)。

Example: 計數流

這是可讀流(Readable stream)的基礎例子. 它將從 1 至 1,000,000 遞增地觸發數字,而後結束。

javascript
var Readable = require('stream').Readable; var util = require('util'); util.inherits(Counter, Readable); function Counter(opt) { Readable.call(this, opt); this._max = 1000000; this._index = 1; } Counter.prototype._read = function() { var i = this._index++; if (i > this._max) this.push(null); else { var str = '' + i; var buf = new Buffer(str, 'ascii'); this.push(buf); } };

Example: 簡單協議 v1 (初始版)

和以前描述的 parseHeader 函數相似, 但它被實現爲自定義流。注意這個實現不會將輸入數據轉換爲字符串。

實際上,更好的辦法是將他實現爲 Transform 流。下面的實現方法更好。

javascript
// A parser for a simple data protocol. // "header" is a JSON object, followed by 2 \n characters, and // then a message body. // // 注意: This can be done more simply as a Transform stream! // Using Readable directly for this is sub-optimal. See the // alternative example below under Transform section. var Readable = require('stream').Readable; var util = require('util'); util.inherits(SimpleProtocol, Readable); function SimpleProtocol(source, options) { if (!(this instanceof SimpleProtocol)) return new SimpleProtocol(source, options); Readable.call(this, options; this._inBody = false; this._sawFirstCr = false; // source is 可讀流(Readable stream), such as a socket or file this._source = source; var self = this; source.on('end', function() { self.push(null); }); // give it a kick whenever the source is readable // read(0) will not consume any bytes source.on('readable', function() { self.read(0); }); this._rawHeader = []; this.header = null; } SimpleProtocol.prototype._read = function(n) { if (!this._inBody) { var chunk = this._source.read(); // if the source doesn't have data, we don't have data yet. if (chunk === null) return this.push(''); // check if the chunk has a \n\n var split = -1; for (var i = 0; i < chunk.length; i++) { if (chunk[i] === 10) { // '\n' if (this._sawFirstCr) { split = i; break; } else { this._sawFirstCr = true; } } else { this._sawFirstCr = false; } } if (split === -1) { // still waiting for the \n\n // stash the chunk, and try again. this._rawHeader.push(chunk); this.push(''); } else { this._inBody = true; var h = chunk.slice(0, split); this._rawHeader.push(h); var header = Buffer.concat(this._rawHeader).toString(); try { this.header = JSON.parse(header); } catch (er) { this.emit('error', new Error('invalid simple protocol data')); return; } // now, because we got some extra data, unshift the rest // back into the 讀取隊列 so that our consumer will see it. var b = chunk.slice(split); this.unshift(b); // and let them know that we are done parsing the header. this.emit('header', this.header); } } else { // from there on, just provide the data to our consumer. // careful not to push(null), since that would indicate EOF. var chunk = this._source.read(); if (chunk) this.push(chunk); } }; // Usage: // var parser = new SimpleProtocol(source); // Now parser is 可讀流(Readable stream) that will emit 'header' // with the parsed header data.

new stream.Readable([options])

  • options {Object}
    • highWaterMark {Number} 中止從底層資源讀取數據前,存儲在內部緩存的最大字節數。默認=16kb, objectMode 流是16.
    • encoding {String} 若指定,則 Buffer 會被解碼成所給編碼的字符串。缺省爲 null
    • objectMode {Boolean} 該流是否爲對象的流。意思是說 stream.read(n) 返回一個單獨的值,而不是大小爲 n 的 Buffer。

Readable 的擴展類中,確保調用了 Readable 的構造函數,這樣才能正確初始化。

readable._read(size)

  • size {Number} 異步讀取的字節數

注意: 實現這個函數, 但不要直接調用.

這個函數不要直接調用. 在子類裏實現,僅能被內部的 Readable 類調用。

全部可讀流(Readable stream) 的實現必須停供一個 _read 方法,從底層資源裏獲取數據。

這個方法如下劃線開頭,是由於對於定義它的類是內部的,不會被用戶程序直接調用。 你能夠在本身的擴展類中實現。

當數據可用時,經過調用readable.push(chunk) 將之放到讀取隊列中。再次調用 _read ,須要繼續推出更多數據。

size 參數僅供參考. 調用 「read」 能夠知道知道應當抓取多少數據;其他與之無關的實現,好比 TCP 或 TLS,則可忽略這個參數,並在可用時返回數據。例如,沒有必要「等到」 size 個字節可用時才調用stream.push(chunk)。

readable.push(chunk[, encoding])

  • chunk {Buffer | null | String} 推入到讀取隊列的數據塊
  • encoding {String} 字符串塊的編碼。必須是有效的 Buffer 編碼,好比 utf8 或 ascii。
  • 返回 {Boolean} 是否應該繼續推入

注意: 這個函數必須被 Readable 實現者調用, 而不是可讀流(Readable stream)的消費者.

_read() 函數直到調用push(chunk) 後才能被再次調用。

Readable 類將數據放到讀取隊列,當 'readable' 事件觸發後,被 read() 方法取出。push() 方法會插入數據到讀取隊列中。若是調用了 null ,會觸發 數據結束信號 (EOF)。

這個 API 被設計成儘量地靈活。好比說,你能夠包裝一個低級別的,具有某種暫停/恢復機制,和數據回調的數據源。這種狀況下,你能夠經過這種方式包裝低級別來源對象:

javascript
// source is an object with readStop() and readStart() 方法s, // and an `ondata` member that gets called when it has data, and // an `onend` member that gets called when the data is over. util.inherits(SourceWrapper, Readable); function SourceWrapper(options) { Readable.call(this, options); this._source = getLowlevelSourceObject(); var self = this; // Every time there's data, we push it into the internal buffer. this._source.ondata = function(chunk) { // if push() 返回 false, then we need to stop reading from source if (!self.push(chunk)) self._source.readStop(); }; // When the source ends, we push the EOF-signaling `null` chunk this._source.onend = function() { self.push(null); }; } // _read will be called when the stream wants to pull more data in // the advisory size 參數 is ignored in this case. SourceWrapper.prototype._read = function(size) { this._source.readStart(); };

類: stream.Writable

stream.Writable 是個抽象類,它擴展了一個底層的實現_write(chunk, encoding, callback) 方法.

參考上面的API for Stream Consumers,來了解在你的程序裏如何消費可寫流。下面內容介紹瞭如何在你的程序裏實現可寫流。

new stream.Writable([options])

  • options {Object}
    • highWaterMark {Number} 當 write() 返回 false 時的緩存級別. 默認=16kb,objectMode 流是 16.
    • decodeStrings {Boolean} 傳給 _write() 前是否解碼爲字符串。 默認=true
    • objectMode {Boolean} write(anyObj) 是不是有效操做.若是爲 true,能夠寫任意數據,而不只僅是Buffer / String. 默認=false

請確保 Writable 類的擴展類中,調用構造函數以便緩衝設定能被正確初始化。

writable._write(chunk, encoding, callback)

  • chunk {Buffer | String} 要寫入的數據塊。老是 buffer, 除非 decodeStrings 選項爲 false
  • encoding {String} 若是數據塊是字符串,這個參數就是編碼方式。若是是緩存,則忽略。注意,除非decodeStrings 被設置爲 false ,不然這個數據塊一直是buffer。
  • callback {函數} 當你處理完數據後調用這個函數 (錯誤參數爲可選參數)。

因此可寫流(Writable stream ) 實現必須提供一個 _write()方法,來發送數據給底層資源。

注意: 這個函數不能直接調用 ,由子類實現, 僅內部可寫方法能夠調用。

使用標準的 callback(error) 方法調用回調函數,來代表寫入完成或遇到錯誤。

若是構造函數選項中設定了 decodeStrings 標識,則 chunk 可能會是字符串而不是 Buffer, encoding 代表了字符串的格式。這種設計是爲了支持對某些字符串數據編碼提供優化處理的實現。若是你沒有明確的設置decodeStringsfalse,這樣你就能夠安無論 encoding 參數,並假定 chunk 一直是一個緩存。

該方法如下劃線開頭,是由於對於定義它的類來講,這個方法是內部的,而且不該該被用戶程序直接調用。你應當在你的擴充類中重寫這個方法。

writable._writev(chunks, callback)

  • chunks {Array} 準備寫入的數據塊,每一個塊格式以下: { chunk: ..., encoding: ... }.
  • callback {函數} 當你處理完數據後調用這個函數 (錯誤參數爲可選參數)。

注意: 這個函數不能直接調用。 由子類實現,僅內部可寫方法能夠調用.

這個函數的實現是可選的。多數狀況下,沒有必要實現。若是實現,將會在全部數據塊緩存到寫隊列後調用。

類: stream.Duplex

雙工流(duplex stream)同時兼具可讀和可寫特性,好比一個 TCP socket 鏈接。

注意 stream.Duplex 能夠像 Readable 或 Writable 同樣被擴充,實現了底層 _read(sise) 和 _write(chunk, encoding, callback) 方法的抽象類。

因爲 JavaScript 並無多重繼承能力,所以這個類繼承自 Readable,寄生自 Writable.從而讓用戶在雙工擴展類中同時實現低級別的_read(n) 方法和低級別的_write(chunk, encoding, callback)方法。

new stream.Duplex(options)

  • options {Object} 傳遞 Writable and Readable 構造函數,有如下的內容:
    • allowHalfOpen {Boolean} 默認=true. 若是設置爲 false, 當寫端結束的時候,流會自動的結束讀端,反之亦然。
    • readableObjectMode {Boolean} 默認=false. 將 objectMode 設爲讀端的流,若是爲 true,將沒有效果。
    • writableObjectMode {Boolean} 默認=false. 將 objectMode設爲寫端的流,若是爲 true,將沒有效果。

擴展自 Duplex 的類,確保調用了父親的構造函數,保證緩存設置能正確初始化。

類: stream.Transform

轉換流(transform class) 是雙工流(duplex stream),輸入輸出端有因果關係,好比zlib 流或 crypto 流。

輸入輸出沒有要求大小相同,塊數量相同,到達時間相同。例如,一個 Hash 流只會在輸入結束時產生一個數據塊的輸出;一個 zlib 流會產生比輸入小得多或大得多的輸出。

轉換流(transform class) 必須實現_transform() 方法,而不是_read()_write() 方法,也能夠實現_flush() 方法(參見以下)。

new stream.Transform([options])

  • options {Object} 傳遞給 Writable and Readable 構造函數。

擴展自 轉換流(transform class) 的類,確保調用了父親的構造函數,保證緩存設置能正確初始化。

transform._transform(chunk, encoding, callback)

  • chunk {Buffer | String} 準備轉換的數據塊。是buffer,除非 decodeStrings 選項設置爲 false
  • encoding {String} 若是數據塊是字符串, 這個參數就是編碼方式,不然就忽略這個參數
  • callback {函數} 當你處理完數據後調用這個函數 (錯誤參數爲可選參數)。

注意: 這個函數不能直接調用。 由子類實現,僅內部可寫方法能夠調用.

全部的轉換流(transform class) 實現必須提供 _transform方法來接收輸入,並生產輸出。

_transform 能夠作轉換流(transform class)裏的任何事,處理寫入的字節,傳給接口的寫端,異步 I/O,處理事情等等。

調用 transform.push(outputChunk)0或屢次,從這個輸入塊裏產生輸出,依賴於你想要多少數據做爲輸出。

僅在當前數據塊徹底消費後調用這個回調。注意,輸入塊可能有,也可能沒有對應的輸出塊。若是你提供了第二個參數,將會傳給push 方法。如底下的例子

javascript
transform.prototype._transform = function (data, encoding, callback) { this.push(data); callback(); } transform.prototype._transform = function (data, encoding, callback) { callback(null, data); }

該方法如下劃線開頭,是由於對於定義它的類來講,這個方法是內部的,而且不該該被用戶程序直接調用。你應當在你的擴充類中重寫這個方法。

transform._flush(callback)

  • callback {函數} 當你處理完數據後調用這個函數 (錯誤參數爲可選參數)

注意: 這個函數不能直接調用。 由子類實現,僅內部可寫方法能夠調用.

某些狀況下,轉換操做可能須要分發一點流最後的數據。例如, Zlib流會存儲一些內部狀態,以便優化壓縮輸出。

有些時候,你能夠實現 _flush 方法,它能夠在最後面調用,當全部的寫入數據被消費後,分發end告訴讀端。和 _transform 同樣,當刷新操做完畢, transform.push(chunk) 爲0或更屢次數,。

該方法如下劃線開頭,是由於對於定義它的類來講,這個方法是內部的,而且不該該被用戶程序直接調用。你應當在你的擴充類中重寫這個方法。

事件: 'finish' and 'end'

finishend 事件 分別來自 Writable 和 Readable 類。.end()事件結束後調用 finish 事件,全部的數據已經被_transform處理完畢,調用 _flush 後,全部的數據輸出完畢,觸發end

Example: SimpleProtocol parser v2

上面的簡單協議分析例子列子能夠經過使用高級別的Transform 流來實現,和 parseHeaderSimpleProtocol v1列子相似。

在這個示例中,輸入會被導流到解析器中,而不是做爲參數提供。這種作法更符合 Node 流的慣例。

javascript
var util = require('util'); var Transform = require('stream').Transform; util.inherits(SimpleProtocol, Transform); function SimpleProtocol(options) { if (!(this instanceof SimpleProtocol)) return new SimpleProtocol(options); Transform.call(this, options); this._inBody = false; this._sawFirstCr = false; this._rawHeader = []; this.header = null; } SimpleProtocol.prototype._transform = function(chunk, encoding, done) { if (!this._inBody) { // check if the chunk has a \n\n var split = -1; for (var i = 0; i < chunk.length; i++) { if (chunk[i] === 10) { // '\n' if (this._sawFirstCr) { split = i; break; } else { this._sawFirstCr = true; } } else { this._sawFirstCr = false; } } if (split === -1) { // still waiting for the \n\n // stash the chunk, and try again. this._rawHeader.push(chunk); } else { this._inBody = true; var h = chunk.slice(0, split); this._rawHeader.push(h); var header = Buffer.concat(this._rawHeader).toString(); try { this.header = JSON.parse(header); } catch (er) { this.emit('error', new Error('invalid simple protocol data')); return; } // and let them know that we are done parsing the header. this.emit('header', this.header); // now, because we got some extra data, emit this first. this.push(chunk.slice(split)); } } else { // from there on, just provide the data to our consumer as-is. this.push(chunk); } done(); }; // Usage: // var parser = new SimpleProtocol(); // source.pipe(parser) // Now parser is 可讀流(Readable stream) that will emit 'header' // with the parsed header data.

類: stream.PassThrough

這是Transform 流的簡單實現,將輸入的字節簡單的傳遞給輸出。它的主要用途是測試和演示。偶爾要構建某種特殊流時也會用到。

流: 內部細節

緩衝

可寫流(Writable streams ) 和 可讀流(Readable stream)都會緩存數據到內部對象上,叫作 _writableState.buffer_readableState.buffer

緩存的數據量,取決於構造函數是傳入的 highWaterMark 參數。

調用 stream.push(chunk) 時,緩存數據到可讀流(Readable stream)。在數據消費者調用 stream.read() 前,數據會一直緩存在內部隊列中。

調用 stream.write(chunk) 時,緩存數據到可寫流(Writable stream)。即便 write() 返回 false

流(尤爲是pipe() 方法)得目的是限制數據的緩存量到一個可接受的水平,使得不一樣速度的源和目的不會淹沒可用內存。

stream.read(0)

某些時候,你可能想不消費數據的狀況下,觸發底層可讀流(Readable stream)機制的刷新。這種狀況下能夠調用 stream.read(0),它總會返回 null。

若是內部讀取緩衝低於 highWaterMark,而且流當前不在讀取狀態,那麼調用 read(0) 會觸發一個低級 _read 調用。

雖然基本上沒有必要這麼作。但你在 Node 內部的某些地方看到它確實這麼作了,尤爲是在 Readable 流類的內部。

stream.push('')

推一個0字節的字符串或緩存 (不在Object mode時)會發送有趣的反作用. 由於它是一個對stream.push() 的調用, 它將會結束 reading 進程. 然而,它沒有添加任何數據到可讀緩衝區中,因此沒有東西可供用戶消費。

少數狀況下,你當時沒有提供數據,但你的流的消費者(或你的代碼的其它部分)會經過調用 stream.read(0) 得知什麼時候再次檢查。在這種狀況下,你能夠調用 stream.push('')

到目前爲止,這個功能惟一一個使用情景是在 tls.CryptoStream 類中,但它將在 Node v0.12 中被廢棄。若是你發現你不得不使用 stream.push(''),請考慮另外一種方式。

和老版本的兼容性

v0.10 版本前,可讀流(Readable stream)接口比較簡單,所以功能和用處也小。

  • 'data'事件會當即開始觸發,而不會等待你調用 read() 方法。若是你須要進行某些 I/O 來決定如何處理數據,那麼你只能將數據塊儲存到某種緩衝區中以防它們流失。
  • pause() 方法僅供參考,而不保證生效。這意味着,即使流處於暫停狀態時,你仍然須要準備接收 'data' 事件。

在 Node v0.10中, 加入了下文所述的 Readable 類。爲了考慮向後兼容,添加了 'data' 事件監聽器或 resume() 方法被調用時,可讀流(Readable stream)會切換到 "流動模式(flowing mode)"。其做用是,即使你不使用新的 read() 方法和'readable'事件,你也沒必要擔憂丟失'data' 數據塊。

大多數程序會維持正常功能。然而,下列條件下也會引入邊界狀況:

  • 沒有添加 ['data' 事件][] 處理器
  • 歷來沒有調用 resume() 方法
  • 流歷來沒有被倒流(pipe)到任何可寫目標上、

例如:

javascript
// WARNING! BROKEN! net.createServer(function(socket) { // we add an 'end' 方法, but never consume the data socket.on('end', function() { // It will never get here. socket.end('I got your message (but didnt read it)\n'); }); }).listen(1337);

v0.10 版本前的 Node, 流入的消息數據會被簡單的拋棄。以後的版本,socket 會一直保持暫停。

這種情形下,調用resume() 方法來開始工做:

javascript
// Workaround net.createServer(function(socket) { socket.on('end', function() { socket.end('I got your message (but didnt read it)\n'); }); // start the flow of data, discarding it. socket.resume(); }).listen(1337);

可讀流(Readable stream)切換到流動模式(flowing mode),v0.10 版本前,可使用wrap() 方法將風格流包含在一個可讀類裏。

Object Mode

一般狀況下,流僅操做字符串和緩存。

處於 object mode 的流,除了 緩存和字符串,還能夠能夠讀出普通 JavaScript值。

在對象模式裏,可讀流(Readable stream) 調用 stream.read(size)總會返回單個項目,不管是什麼參數。

在對象模式裏, 可寫流(Writable stream ) 總會忽略傳給stream.write(data, encoding)encoding參數。

特殊值 null 在對象模式裏,依舊保持它的特殊性。也就說,對於對象模式的可讀流(Readable stream),stream.read() 返回 null 意味着沒有更多數據,同時stream.push(null) 會告知流數據結束(EOF)。

Node 核心不存在對象模式的流,這種設計只被某些用戶態流式庫所使用。

應該在你的子類構造函數裏,設置objectMode 。在過程當中設置不安全。

對於雙工流(Duplex streams),objectMode能夠用readableObjectModewritableObjectMode 分別爲讀寫端分別設置。這些選項,被轉換流(Transform streams)用來實現解析和序列化。

javascript
var util = require('util'); var StringDecoder = require('string_decoder').StringDecoder; var Transform = require('stream').Transform; util.inherits(JSONParseStream, Transform); // Gets \n-delimited JSON string data, and emits the parsed objects function JSONParseStream() { if (!(this instanceof JSONParseStream)) return new JSONParseStream(); Transform.call(this, { readableObjectMode : true }); this._buffer = ''; this._decoder = new StringDecoder('utf8'); } JSONParseStream.prototype._transform = function(chunk, encoding, cb) { this._buffer += this._decoder.write(chunk); // split on newlines var lines = this._buffer.split(/\r?\n/); // keep the last partial line buffered this._buffer = lines.pop(); for (var l = 0; l < lines.length; l++) { var line = lines[l]; try { var obj = JSON.parse(line); } catch (er) { this.emit('error', er); return; } // push the parsed object out to the readable consumer this.push(obj); } cb(); }; JSONParseStream.prototype._flush = function(cb) { // Just handle any leftover var rem = this._buffer.trim(); if (rem) { try { var obj = JSON.parse(rem); } catch (er) { this.emit('error', er); return; } // push the parsed object out to the readable consumer this.push(obj); } cb(); };
相關文章
相關標籤/搜索