版本:8.1.3javascript
stream
模塊的API被設計成可以很容的使用javascript
的原型繼承模式來實現streams
。
首先,stream的開發者必須先聲明一個新的javascript類,而且繼承如下四個基礎stream類中的一個,並確保他們適當的調用父類的構造函數。java
const { Writable } = require('stream'); class MyWritable extends Writable { constructor(options) { super(options); // ... } }
而且新的stream類必須實現所繼承的類的一個或者更多的特定方法。具體以下表所示:es6
Use-case | Class | Method(s) to implement |
---|---|---|
Reading only | Readable | _read |
Writing only | Writable | _write,_writev, _final |
Reading and Writing | Duplex | _read, _write,_writev, _final |
Operate on written data, then read the result | Transform | _transform, _flush, _final |
注意實現stream的代碼毫不能調用stream"public"
方法,由於那是爲了給消費者使用的。若是那樣可能會對應用在在使用流時致使不利的影響。segmentfault
雖然有不少簡單的實例,可是也可以直接構造一個stream
而不依賴任何繼承。可以直接使用stream.Writable
,stream.Readable
,stream.Duplex
或者stream.Transform
來建立實例對象而且傳遞適當的方法做爲構造函數的options。
例如:緩存
const { Writable } = require('stream'); const myWritable = new Writable({ write(chunk, encoding, callback) { // ... } });
stream.Writable
被用來擴展實現一個Writable
流。安全
自定義一個Writable
流必須調用new stream.Writable([options])
構造函數而且實現writable._write()
方法。writable._wirtev()
方法也許也要實現。app
options <Object>
異步
highWaterMark <number>
stream.write()
開始返回false
的最高Buffer
限制。默認值爲16384(16kb)
,對象模式是16
。socket
decodeStrings <boolean>
傳入方法stream._write()
以前是否對字符串解碼,默認是true
。函數
objectMode <boolean>
stream.write(anyOjb)
方法是否有效。當stream
實現支持,而且設置該值爲true
,那麼將可以寫進一個javascript
的值而不是字符串,Bufer
,或者Uint8Array
。默認值爲false
。
write <Function>
實現stream._write()
方法。
writev <Function>
實現stream._writev()
方法。
destroy <Function>
實現stream._destroy()
方法。
final <Function>
實現stream._final()
方法。
實例:
const { Writable } = require('stream'); class MyWritable extends Writable { constructor(options) { // Calls the stream.Writable() constructor super(options); // ... } }
chunk <Buffer>
|<string>
|<any>
被寫入的chunk
,在不設置decodeStrings
項爲false
的狀況下將會是Buffer
。或者stream
在對象模式下工做。
encoding <string>
若是chunk
是一個字符串,那麼encoding
是該字符串的格式。若是chunk
是Buffer
,或者stream
是對象模式,encoding
將會被忽視。
callback <Function>
(會有一個error
參數),在chunk
被徹底提供時,調用這個函數。
一個Writable stream
必須實現和提供一個writable._write()
方法來發送數據給潛在的源。
注意 Transform stream
提供他們本身的該方法writable._write()
的實現。
注意該方法不可以被應用直接調用,它在子類中實現,而且只能被Writable
類內部的方法調用。
回調函數必須被調用,用來講明數據寫入成功仍是出現了錯誤。第一個參數必須是Error
,若是寫入失敗那麼他是一個錯誤對象,若是成功那麼是null
。
咱們應該着重注意到,writable.write()
的調用發生在writable._write()
被調用和回調函數被調用之時,此時寫入的數據將被緩存。一旦回掉函數 被調用,stream
將會發出drain
事件,若是stream
被實現爲可以一次處理多chunk
的功能,那麼writable._writev()
方法應該被實現。
若是decodeStrings
屬性在構造函數的options
中設置,那麼chunk
應該是一個字符串而不是Buffer
,此時encoding
將會檢測字符串字符的編碼。這個能夠用來優化被傳入的字符串已知其編碼格式。若是decodeStrings
明確設置爲false
,那麼encoding
參數會被安全的忽視,而且chunk
將被保持爲同一個對象而後傳入.write()
。
writable._write()
被設置爲有一個前置下劃線,由於這是類內部定義的,而且不可以被外部程序直接調用。
chunks <Array> 被寫入的chunk,每一個chunk有這樣的格式:{chunk:..., encoding:...}。
callback <Function> (會有一個error
參數),在chunk
被徹底提供時,調用這個函數。
注意該方法不可以被應用直接調用,它在子類中實現,而且只能被Writable
類內部的方法調用。
在實現了writable._write()方法後也相互會實現writable._writev()方法,用來實現一次處理多個chunk。
err
<Error>
一個錯誤。
callback
<Function>
一個帶有可操做的錯誤的參數,在writable被毀掉時調用。
callback
<Function>
當你結束等待的書局時調用。(可添加一個可選的錯誤參數。)
注意 _final()
不可以被直接調用,在實現子類時,他只能被內部的Writable類調用。
可選的函數會在流結束以前調用,finish事件會在回調函數執行以後觸發。這在關閉源或者在流結束前寫入緩存數據時是有用的。
建議在使用writable._write()以及writable._writev()時在回調函數的第一個參數寫入err,以防執行時發生錯誤。這個會致使error
時間觸發。把錯誤拋棄可能會致使依賴流的一些行爲發生意外或矛盾。使用回掉函數保證處理可遇到的錯誤。
const { Writable } = require('stream'); const myWritable = new Writable({ write(chunk, encoding, callback) { if (chunk.toString().indexOf('a') >= 0) { callback(new Error('chunk is invalid')); } else { callback(); } } });
stream.Readable
類被用來實現一個Readable1
流。
自定義的Readable流必須調用new stream.Readable([options])
構造函數,而且實現readable._read()
方法。
options <Object>
highWaterMark <number>
在內部buffer存儲的最大字節數,數據爲在打包讀取底層數據源以前。默認爲16384(16kb)
,objectMode
模式爲16。
encoding <string>
若是設置,那麼buffer將被格式化爲指定格式的字符串。默認值爲null
。
objectMode <boolean>
設置這個流的表現形式是流仍是對象。決定着stream.read(n)
返回的是單一的值仍是一個必定大小的Buffer。默認是false。
read <Function>
stream._read()
方法的實現。
destory <Function>
stream._destroy()
方法的實現。
例子:
es6
const {Readable} = require('stream') class MyReadable extends Readable{ constructor(options){ //調用stream.Readable(options)構造函數 super(options) //。。。 } }
或者ES6版本以前的構造函數語法風格es5
const {Readable} = require('stream') const util = require('util') function MyReadable(options){ if(!(this instanceof MyReadable)) return new MyReadable(options) Readable.call(this, options) } util.inherits(MyReadable, Readable)
size
<number>
異步讀取的字節大小。
注意 這個函數必定不能被應用的代買直接調用,他應該在子類中實現,而且只可以被內部Readable類方法調用。
全部的Readable流的實現必須提供一個readable._read()
方法的實現,用來取得來自底部的數據源數據。
當調用readable._read()
若是自源的數據可用,那麼應該使用readable.push(dataChunk)
開始把數據推動讀取隊列。_read()
應該持續湊夠源讀取數據而後推動隊列,直至readable.push()
返回false
。只有再次調用readable._read()
那麼原先中止的流纔會再次恢復添加數據到隊列。
注意一旦readable._read()
方法被調用,那麼在readable.push()
被調用前是不用再次調用的。
size
參數是一個公告,在read是個單一操做,那麼返回的數據可使用size來限制大小。其餘的實現也許會忽視這個參數,而且僅在數據可讀取是才提供。調用stream.push(chunk)
時不必等待size
足夠再調用。
readable._read()
是底層源的體現設置,由於只可以在類的內部定義它,因此永遠不能在其餘用戶的程序裏直接調用。
chunk <Buffer>
|<Uint8Array>
|<string>
|<null>
|<any>
Chunk的數據要被推動讀取隊列。chunk必須是一個字符串,Buffer或者Uint8Array。若是是對象模式,那麼chunk或許是個js值。
encoding <string>
對chunk的字符串編碼。不準是一個有效的Buffer編碼格式,例如'utf8'
或者'ascii'
。
returns <boolean>
`true 若是添加的chunk可以繼續push;
false`反之。
若是chunk
是一個Buffer
,Uint8Array
或者string
,那麼chunk
將被添加到內部隊列等待流用戶來消費。傳遞的chunk
是一個null
,說明到了流的結束,以後便不會有數據被寫入。
當Readable在paused
模式下操做時,readable.push()
方法可以添加從readable.read()(被
readable方法觸發)
方法中讀取的數據。
當Readable在flowin
g模式下操做時,readable.push()
方法更夠在data
事件觸發下傳遞數據。
readable.push()
被設計的很靈活,例如,當封裝從pause/resume機制或者回到函數的chunk提供的一些數據這些底層源可以 被自定義的readale實例來封裝,以下:
// source is an object with readStop() and readStart() methods, // and an `ondata` member that gets called when it has data, and // an `onend` member that gets called when the data is over. class SourceWrapper extends Readable { constructor(options) { super(options); this._source = getLowlevelSourceObject(); // Every time there's data, push it into the internal buffer. this._source.ondata = (chunk) => { // if push() returns false, then stop reading from source if (!this.push(chunk)) this._source.readStop(); }; // When the source ends, push the EOF-signaling `null` chunk this._source.onend = () => { this.push(null); }; } // _read will be called when the stream wants to pull more data in // the advisory size argument is ignored in this case. _read(size) { this._source.readStart(); } }
注意readable.push()
只可以被Readable的實現者調用,而且只可以在readable._read()
裏面。
建議在使用readable._read()
方法發生錯誤時觸發error
事件,並且不是拋棄。在readable_read()
裏面拋棄錯誤會在操做流是否依賴flowing
或者parse
模式形成矛盾,以及意外。使用error
事件確保一致以及處理可預測的錯誤。
const { Readable } = require('stream'); const myReadable = new Readable({ read(size) { if (checkSomeErrorCondition()) { process.nextTick(() => this.emit('error', err)); return; } // do some work } });
const { Readable } = require('stream'); class Counter extends Readable { constructor(opt) { super(opt); this._max = 1000000; this._index = 1; } _read() { const i = this._index++; if (i > this._max) this.push(null); else { const str = '' + i; const buf = Buffer.from(str, 'ascii'); this.push(buf); } } }
一個Duplex
流及實現了可讀流又實現了可寫流,例如TCP socket
鏈接(這樣翻譯順點)。
因爲javaScript
不能多個繼承,stream.Duplex
類被用來實現雙向流/Duplex
的一個類(畢竟同時繼承stream.Readable
和stream.Writable
類是相抵觸的)。
注意 stream.Duplex
是原型繼承stream.Readable
類,寄生繼承stream.Writable
,可是instanceof
將會正確表示基於兩者,由於該類覆蓋了stream.Writable
的Symbole.hasInstance
方法。
自定義的Duplex
流必須調用new stream.Duplex([options])
構造函數以及實現readable._read()
和writable._write()
方法。
options <object>
同時傳給可讀流和和諧劉的構造函數,而且額外有下面的域。
allowHalfOpen <boolean>
默認true
,若是設置爲false
,流會在可寫流結束時中止可讀流,反之亦然。
readableObjectMode <boolean>
默認false
。 設置流的可讀流模式爲objectMode
,若是爲true
則沒有效果。
writableObjectMode <boolean>
默認false
。 設置流的可寫流模式爲objectMode
,若是爲true則沒有效果。
例子:
lang:ES6
const { Duplex } = require('stream'); class MyDuplex extends Duplex { constructor(options) { super(options); // ... } }
或者使用es6版本以前的構造函數風格。
const { Duplex } = require('stream'); const util = require('util'); function MyDuplex(options) { if (!(this instanceof MyDuplex)) return new MyDuplex(options); Duplex.call(this, options); } util.inherits(MyDuplex, Duplex);
下面是一個Duplex
流的簡單示例,雖然Node.js
的streams
的API是不兼容的,可是咱們向這個包了個底層源對象寫入數據,同時也可用從其中讀取數據。該例子演示向Writable
接口中寫入數據,從Readable
接口中讀取數據。
lang:ES6
const { Duplex } = require('stream'); const kSource = Symbol('source'); class MyDuplex extends Duplex { constructor(source, options) { super(options); this[kSource] = source; } _write(chunk, encoding, callback) { // The underlying source only deals with strings if (Buffer.isBuffer(chunk)) chunk = chunk.toString(); this[kSource].writeSomeData(chunk); callback(); } _read(size) { this[kSource].fetchSomeData(size, (data, encoding) => { this.push(Buffer.from(data, encoding)); }); } }
這個語法形式中最重要的是儘管可讀流和可寫流共存於一個對象實例,可是對它們進行的相對對立的操做。
在Duplex流中,能夠根據設置中的readableObjectMode
或者writableObjectMode
項把可讀流或者可寫流其中之一設置爲對象模式。
下面的例子,是一個Transform
流(Duplex流的一種)的例子,其建立的可寫流是一個對象模式,接受一個可讀流傳來的數據,該可讀流接受javaScript數字而且轉換成十六進制的字符串。
lang:ES6
const { Transform } = require('stream'); // All Transform streams are also Duplex Streams const myTransform = new Transform({ writableObjectMode: true, transform(chunk, encoding, callback) { // Coerce the chunk to a number if necessary chunk |= 0; // Transform the chunk into something else. const data = chunk.toString(16); // Push the data onto the readable queue. callback(null, '0'.repeat(data.length % 2) + data); } }); myTransform.setEncoding('ascii'); myTransform.on('data', (chunk) => console.log(chunk)); myTransform.write(1); // Prints: 01 myTransform.write(10); // Prints: 0a myTransform.write(100); // Prints: 64
一個Transfrom
流是一個把輸入從新計算過的Duplex
流。包括可以壓縮的[zlib][]
流或者對數據加密解密的crypto流。
注意 streams
沒有輸入和輸出的數據必須大小一致,chuank
的數量一致,或者同時送達,的規範要求。例如,一個Hash
流在輸入結束時只會有一個輸出chunk
。一個zlib
流產生的輸出可能比輸入的大也可能小。
stream.Transform
類用來擴展實現Transform
類。
stream.Transform
類原型繼承於stream.Duplex
類,而且須要實現本身的writable._write()
和readable._read()
方法。自定義的Transform類還必須實現transform._transform()
方法,必要時也須要實現transform._flush()
方法。
注意 在使用Transfrom流,往流中寫入數據時要特別當心,由於若是Reable一方沒有消費者是可能到致使Writable一方的流暫停。
options <Object>
傳遞給Writable
和Readalbe
構造函數,同時也包含如下的域。
transform <Function>
實現stream._transform()
方法。
flush <Function>
實現stream._flush()
方法。
例子:
ES6
const { Transform } = require('stream'); class MyTransform extends Transform { constructor(options) { super(options); // ... } }
es6以前版本的構造函數風格。
const { Transform } = require('stream'); const util = require('util'); function MyTransform(options) { if (!(this instanceof MyTransform)) return new MyTransform(options); Transform.call(this, options); } util.inherits(MyTransform, Transform);
finish
和end
事件finish
和end
來自繼承的stream.Writable
以及stream.Readable
類。finish
在stream.end()
調用是觸發,此時全部chunk
已經被stream._transform()
處理完畢。end
事件在全部的數據被輸出,執行transform._flusn()
方法的回調函數時觸發。
callback <Function>
回調函數(另外有可選的error,和data參數),當數據流完時執行。
注意 這個方法絕對不能應用的代碼直接調用。其只應該被繼承的子類的Readable
類中的方法調用。
在一些狀況下,一個transform操做須要發出一些額外的數據在流的結尾。例如,一個zlib
壓縮流會保存一些有關內部狀態的數據來優化壓縮輸出。當流結束,然而額外的信息須要被髮出來,至此整個壓縮過程纔算結束。
自定義的Transform
類實現也許會實現transform._flush()
方法。當沒有更多的數據須要被寫入時會被觸發,可是會在end
事件發出告訴Readable
流結束以前。
實現transform._flush()
方法,可能會在內部適當調用readable.push()
方法零次或屢次。在flush
操做結束後必須調用回調函數。
transform._flush()
方法有一個前置的下劃線,代表這是一個類內部定義的,永遠不該該被用戶的程序直接調用。
chunk <Buffer>
|<string>
|<any>
須要被轉換的chunk。在不設置decodeString
參數爲false
或者不爲對象模式狀況下,用仍是buffer.
encoding <string>
若是chunk是字符串,那麼該參數設置其編碼類型。若是chunk是buffer,那麼這個值是'buffer'
,這種狀況下會忽略。
calback <Function>
一個回調函數(帶有可選的error和data參數),提供的chunk
被處理後會被調用。
注意 這個方法絕對不能應用的代碼直接調用。其只應該被繼承的子類的Readable
類中的方法調用。
全部的Transofrom流的實現都必須提供一個_transform()
方法類接受輸入,產生輸出。transfrom._transform()
方法實現處理被寫入的字節,計算輸出,而後從readable部分使用readable.push()
傳出。
transform.push()
方法可能會被屢次調用,根據input chunk來生成output。取決於chunk能產生多少output結果。
有時也可能發生不管什麼樣的數據chunk也產生不了output。
當前的chunk被消耗完了以後必須調用回調函數。回調函數的第一個參數必須是Error
對象,若是發生錯誤該對象會被傳遞,不然能夠是null
。若是有第二個參數,那麼它將被推薦到readable.push()
方法。簡單來講,以下:
transform.prototype._transform = function(data, encoding, callback) { this.push(data); callback(); }; transform.prototype._transform = function(data, encoding, callback) { callback(null, data); };
此外,該方法也是一個類的內部方法,不該該被用戶程序直接調用。
stream.PassThough
類是一個Transform
流的一個瑣碎實現,可以簡單的傳遞input字節到output。原本的目的是檢驗和測試。可是也有一些場景,例如整理小說章節分類。