Nodejs流開發詳解(翻譯自官網)

版本:8.1.3javascript

實現一個流API

stream模塊的API被設計成可以很容的使用javascript的原型繼承模式來實現streams
首先,stream的開發者必須先聲明一個新的javascript類,而且繼承如下四個基礎stream類中的一個,並確保他們適當的調用父類的構造函數。java

const { Writable } = require('stream');

class MyWritable extends Writable {
  constructor(options) {
    super(options);
    // ...
  }
}

而且新的stream類必須實現所繼承的類的一個或者更多的特定方法。具體以下表所示:es6

Use-case Class Method(s) to implement
Reading only Readable _read
Writing only Writable _write,_writev, _final
Reading and Writing Duplex _read, _write,_writev, _final
Operate on written data, then read the result Transform _transform, _flush, _final

注意實現stream的代碼毫不能調用stream"public"方法,由於那是爲了給消費者使用的。若是那樣可能會對應用在在使用流時致使不利的影響。segmentfault

一個簡單的構造函數

雖然有不少簡單的實例,可是也可以直接構造一個stream而不依賴任何繼承。可以直接使用stream.Writable,stream.Readable,stream.Duplex或者stream.Transform來建立實例對象而且傳遞適當的方法做爲構造函數的options。
例如:緩存

const { Writable } = require('stream');

const myWritable = new Writable({
  write(chunk, encoding, callback) {
    // ...
  }
});

實現一個可寫流/Implementing a Writable Stream

stream.Writable被用來擴展實現一個Writable流。安全

自定義一個Writable流必須調用new stream.Writable([options])構造函數而且實現writable._write()方法。writable._wirtev()方法也許也要實現。app

構造函數:new stream.Writable([options])

  • options <Object>異步

    • highWaterMark <number> stream.write()開始返回false的最高Buffer限制。默認值爲16384(16kb),對象模式是16socket

    • decodeStrings <boolean> 傳入方法stream._write()以前是否對字符串解碼,默認是true函數

    • objectMode <boolean> stream.write(anyOjb)方法是否有效。當stream實現支持,而且設置該值爲true,那麼將可以寫進一個javascript的值而不是字符串,Bufer,或者Uint8Array。默認值爲false

    • write <Function> 實現stream._write()方法。

    • writev <Function> 實現stream._writev()方法。

    • destroy <Function> 實現stream._destroy()方法。

    • final <Function> 實現stream._final()方法。

實例:

const { Writable } = require('stream');

class MyWritable extends Writable {
  constructor(options) {
    // Calls the stream.Writable() constructor
    super(options);
    // ...
  }
}

writable._write(chunk,encoding,callback)

  • chunk <Buffer>|<string>|<any> 被寫入的chunk,在不設置decodeStrings項爲false的狀況下將會是Buffer。或者stream在對象模式下工做。

  • encoding <string> 若是chunk是一個字符串,那麼encoding是該字符串的格式。若是chunkBuffer,或者stream是對象模式,encoding將會被忽視。

  • callback <Function> (會有一個error參數),在chunk被徹底提供時,調用這個函數。

一個Writable stream必須實現和提供一個writable._write()方法來發送數據給潛在的源。

注意 Transform stream提供他們本身的該方法writable._write()的實現。

注意該方法不可以被應用直接調用,它在子類中實現,而且只能被Writable類內部的方法調用。

回調函數必須被調用,用來講明數據寫入成功仍是出現了錯誤。第一個參數必須是Error,若是寫入失敗那麼他是一個錯誤對象,若是成功那麼是null

咱們應該着重注意到,writable.write()的調用發生在writable._write()被調用和回調函數被調用之時,此時寫入的數據將被緩存。一旦回掉函數 被調用,stream將會發出drain事件,若是stream被實現爲可以一次處理多chunk的功能,那麼writable._writev()方法應該被實現。

若是decodeStrings屬性在構造函數的options中設置,那麼chunk應該是一個字符串而不是Buffer,此時encoding將會檢測字符串字符的編碼。這個能夠用來優化被傳入的字符串已知其編碼格式。若是decodeStrings明確設置爲false,那麼encoding參數會被安全的忽視,而且chunk將被保持爲同一個對象而後傳入.write()

writable._write()被設置爲有一個前置下劃線,由於這是類內部定義的,而且不可以被外部程序直接調用。

writable._writev(chunks,callback)

  • chunks <Array> 被寫入的chunk,每一個chunk有這樣的格式:{chunk:..., encoding:...}。

  • callback <Function> (會有一個error參數),在chunk被徹底提供時,調用這個函數。

注意該方法不可以被應用直接調用,它在子類中實現,而且只能被Writable類內部的方法調用。

在實現了writable._write()方法後也相互會實現writable._writev()方法,用來實現一次處理多個chunk。

writable._destroy(err,callback)

  • err <Error> 一個錯誤。

  • callback <Function> 一個帶有可操做的錯誤的參數,在writable被毀掉時調用。

writable._final(callback)

  • callback <Function> 當你結束等待的書局時調用。(可添加一個可選的錯誤參數。)

注意 _final()不可以被直接調用,在實現子類時,他只能被內部的Writable類調用。

可選的函數會在流結束以前調用,finish事件會在回調函數執行以後觸發。這在關閉源或者在流結束前寫入緩存數據時是有用的。

寫入時發生錯誤

建議在使用writable._write()以及writable._writev()時在回調函數的第一個參數寫入err,以防執行時發生錯誤。這個會致使error時間觸發。把錯誤拋棄可能會致使依賴流的一些行爲發生意外或矛盾。使用回掉函數保證處理可遇到的錯誤。

const { Writable } = require('stream');

const myWritable = new Writable({
  write(chunk, encoding, callback) {
    if (chunk.toString().indexOf('a') >= 0) {
      callback(new Error('chunk is invalid'));
    } else {
      callback();
    }
  }
});

實現一個readable 流

stream.Readable類被用來實現一個Readable1流。
自定義的Readable流必須調用new stream.Readable([options])構造函數,而且實現readable._read()方法。

new stream.Readable([options])

  • options <Object>

    • highWaterMark <number>在內部buffer存儲的最大字節數,數據爲在打包讀取底層數據源以前。默認爲16384(16kb),objectMode模式爲16。

    • encoding <string> 若是設置,那麼buffer將被格式化爲指定格式的字符串。默認值爲null

    • objectMode <boolean> 設置這個流的表現形式是流仍是對象。決定着stream.read(n)返回的是單一的值仍是一個必定大小的Buffer。默認是false。

    • read <Function> stream._read()方法的實現。

    • destory <Function> stream._destroy()方法的實現。

例子:

es6

const {Readable} = require('stream')

class MyReadable extends Readable{
    constructor(options){
        //調用stream.Readable(options)構造函數
        super(options)
        //。。。
    }
}

或者ES6版本以前的構造函數語法風格
es5

const {Readable} = require('stream')
const util = require('util')

function MyReadable(options){
    if(!(this instanceof MyReadable))
        return new MyReadable(options)
    Readable.call(this, options)
}
util.inherits(MyReadable, Readable)

readable._read(size)

  • size <number> 異步讀取的字節大小。

注意 這個函數必定不能被應用的代買直接調用,他應該在子類中實現,而且只可以被內部Readable類方法調用。

全部的Readable流的實現必須提供一個readable._read()方法的實現,用來取得來自底部的數據源數據。

當調用readable._read()若是自源的數據可用,那麼應該使用readable.push(dataChunk)開始把數據推動讀取隊列。_read()應該持續湊夠源讀取數據而後推動隊列,直至readable.push()返回false。只有再次調用readable._read()那麼原先中止的流纔會再次恢復添加數據到隊列。

注意一旦readable._read()方法被調用,那麼在readable.push()被調用前是不用再次調用的。

size參數是一個公告,在read是個單一操做,那麼返回的數據可使用size來限制大小。其餘的實現也許會忽視這個參數,而且僅在數據可讀取是才提供。調用stream.push(chunk)時不必等待size足夠再調用。

readable._read()是底層源的體現設置,由於只可以在類的內部定義它,因此永遠不能在其餘用戶的程序裏直接調用。

readable.push(chunk[,encoding])

  • chunk <Buffer>|<Uint8Array>|<string>|<null>|<any>Chunk的數據要被推動讀取隊列。chunk必須是一個字符串,Buffer或者Uint8Array。若是是對象模式,那麼chunk或許是個js值。

  • encoding <string> 對chunk的字符串編碼。不準是一個有效的Buffer編碼格式,例如'utf8'或者'ascii'

  • returns <boolean>`true 若是添加的chunk可以繼續push;false`反之。

若是chunk是一個Buffer,Uint8Array或者string,那麼chunk將被添加到內部隊列等待流用戶來消費。傳遞的chunk是一個null,說明到了流的結束,以後便不會有數據被寫入。

當Readable在paused模式下操做時,readable.push()方法可以添加從readable.read()(被readable方法觸發)方法中讀取的數據。

當Readable在flowing模式下操做時,readable.push()方法更夠在data事件觸發下傳遞數據。

readable.push()被設計的很靈活,例如,當封裝從pause/resume機制或者回到函數的chunk提供的一些數據這些底層源可以 被自定義的readale實例來封裝,以下:

// source is an object with readStop() and readStart() methods,
// and an `ondata` member that gets called when it has data, and
// an `onend` member that gets called when the data is over.

class SourceWrapper extends Readable {
  constructor(options) {
    super(options);

    this._source = getLowlevelSourceObject();

    // Every time there's data, push it into the internal buffer.
    this._source.ondata = (chunk) => {
      // if push() returns false, then stop reading from source
      if (!this.push(chunk))
        this._source.readStop();
    };

    // When the source ends, push the EOF-signaling `null` chunk
    this._source.onend = () => {
      this.push(null);
    };
  }
  // _read will be called when the stream wants to pull more data in
  // the advisory size argument is ignored in this case.
  _read(size) {
    this._source.readStart();
  }
}

注意readable.push()只可以被Readable的實現者調用,而且只可以在readable._read()裏面。

讀取時發生錯誤/Errors While Reading

建議在使用readable._read()方法發生錯誤時觸發error事件,並且不是拋棄。在readable_read()裏面拋棄錯誤會在操做流是否依賴flowing或者parse模式形成矛盾,以及意外。使用error事件確保一致以及處理可預測的錯誤。

const { Readable } = require('stream');

const myReadable = new Readable({
  read(size) {
    if (checkSomeErrorCondition()) {
      process.nextTick(() => this.emit('error', err));
      return;
    }
    // do some work
  }
});

一個流的例子

const { Readable } = require('stream');

class Counter extends Readable {
  constructor(opt) {
    super(opt);
    this._max = 1000000;
    this._index = 1;
  }

  _read() {
    const i = this._index++;
    if (i > this._max)
      this.push(null);
    else {
      const str = '' + i;
      const buf = Buffer.from(str, 'ascii');
      this.push(buf);
    }
  }
}

實現一個雙向流/Implementing a Duplex Stream

一個Duplex流及實現了可讀流又實現了可寫流,例如TCP socket鏈接(這樣翻譯順點)。

因爲javaScript不能多個繼承,stream.Duplex類被用來實現雙向流/Duplex的一個類(畢竟同時繼承stream.Readablestream.Writable類是相抵觸的)。

注意 stream.Duplex是原型繼承stream.Readable類,寄生繼承stream.Writable,可是instanceof將會正確表示基於兩者,由於該類覆蓋了stream.WritableSymbole.hasInstance方法。

自定義的Duplex流必須調用new stream.Duplex([options])構造函數以及實現readable._read()writable._write()方法。

new stream.Duplex([options])

  • options <object> 同時傳給可讀流和和諧劉的構造函數,而且額外有下面的域。

    • allowHalfOpen <boolean> 默認true,若是設置爲false,流會在可寫流結束時中止可讀流,反之亦然。

    • readableObjectMode <boolean> 默認false。 設置流的可讀流模式爲objectMode,若是爲true則沒有效果。

    • writableObjectMode <boolean> 默認false。 設置流的可寫流模式爲objectMode,若是爲true則沒有效果。

例子:

lang:ES6

const { Duplex } = require('stream');

class MyDuplex extends Duplex {
  constructor(options) {
    super(options);
    // ...
  }
}

或者使用es6版本以前的構造函數風格。

const { Duplex } = require('stream');
const util = require('util');

function MyDuplex(options) {
  if (!(this instanceof MyDuplex))
    return new MyDuplex(options);
  Duplex.call(this, options);
}
util.inherits(MyDuplex, Duplex);

一個雙向流的例子/An Example Duplex Stream

下面是一個Duplex流的簡單示例,雖然Node.jsstreams的API是不兼容的,可是咱們向這個包了個底層源對象寫入數據,同時也可用從其中讀取數據。該例子演示向Writable接口中寫入數據,從Readable接口中讀取數據。

lang:ES6

const { Duplex } = require('stream');
const kSource = Symbol('source');

class MyDuplex extends Duplex {
  constructor(source, options) {
    super(options);
    this[kSource] = source;
  }

  _write(chunk, encoding, callback) {
    // The underlying source only deals with strings
    if (Buffer.isBuffer(chunk))
      chunk = chunk.toString();
    this[kSource].writeSomeData(chunk);
    callback();
  }

  _read(size) {
    this[kSource].fetchSomeData(size, (data, encoding) => {
      this.push(Buffer.from(data, encoding));
    });
  }
}

這個語法形式中最重要的是儘管可讀流和可寫流共存於一個對象實例,可是對它們進行的相對對立的操做。

一個雙向流的對象模式/Object Mode Duplex Streams

在Duplex流中,能夠根據設置中的readableObjectMode或者writableObjectMode項把可讀流或者可寫流其中之一設置爲對象模式。

下面的例子,是一個Transform流(Duplex流的一種)的例子,其建立的可寫流是一個對象模式,接受一個可讀流傳來的數據,該可讀流接受javaScript數字而且轉換成十六進制的字符串。

lang:ES6

const { Transform } = require('stream');

// All Transform streams are also Duplex Streams
const myTransform = new Transform({
  writableObjectMode: true,

  transform(chunk, encoding, callback) {
    // Coerce the chunk to a number if necessary
    chunk |= 0;

    // Transform the chunk into something else.
    const data = chunk.toString(16);

    // Push the data onto the readable queue.
    callback(null, '0'.repeat(data.length % 2) + data);
  }
});

myTransform.setEncoding('ascii');
myTransform.on('data', (chunk) => console.log(chunk));

myTransform.write(1);
// Prints: 01
myTransform.write(10);
// Prints: 0a
myTransform.write(100);
// Prints: 64

實現一個Transform流/Implementing a Transform Stream

一個Transfrom流是一個把輸入從新計算過的Duplex流。包括可以壓縮的[zlib][]流或者對數據加密解密的crypto流。

注意 streams沒有輸入和輸出的數據必須大小一致,chuank的數量一致,或者同時送達,的規範要求。例如,一個Hash流在輸入結束時只會有一個輸出chunk。一個zlib流產生的輸出可能比輸入的大也可能小。

stream.Transform類用來擴展實現Transform類。

stream.Transform類原型繼承於stream.Duplex類,而且須要實現本身的writable._write()readable._read()方法。自定義的Transform類還必須實現transform._transform()方法,必要時也須要實現transform._flush()方法。

注意 在使用Transfrom流,往流中寫入數據時要特別當心,由於若是Reable一方沒有消費者是可能到致使Writable一方的流暫停。

new stream.Transform([options])

  • options <Object> 傳遞給WritableReadalbe構造函數,同時也包含如下的域。

    • transform <Function> 實現stream._transform()方法。

    • flush <Function> 實現stream._flush()方法。

例子:

ES6

const { Transform } = require('stream');

class MyTransform extends Transform {
  constructor(options) {
    super(options);
    // ...
  }
}

es6以前版本的構造函數風格。

const { Transform } = require('stream');
const util = require('util');

function MyTransform(options) {
  if (!(this instanceof MyTransform))
    return new MyTransform(options);
  Transform.call(this, options);
}
util.inherits(MyTransform, Transform);

事件:finishend

事件finishend來自繼承的stream.Writable以及stream.Readable類。finishstream.end()調用是觸發,此時全部chunk已經被stream._transform()處理完畢。end事件在全部的數據被輸出,執行transform._flusn()方法的回調函數時觸發。

transform._flush(callback)

  • callback <Function> 回調函數(另外有可選的error,和data參數),當數據流完時執行。

注意 這個方法絕對不能應用的代碼直接調用。其只應該被繼承的子類的Readable類中的方法調用。

在一些狀況下,一個transform操做須要發出一些額外的數據在流的結尾。例如,一個zlib壓縮流會保存一些有關內部狀態的數據來優化壓縮輸出。當流結束,然而額外的信息須要被髮出來,至此整個壓縮過程纔算結束。

自定義的Transform類實現也許會實現transform._flush()方法。當沒有更多的數據須要被寫入時會被觸發,可是會在end事件發出告訴Readable流結束以前。

實現transform._flush()方法,可能會在內部適當調用readable.push()方法零次或屢次。在flush操做結束後必須調用回調函數。

transform._flush()方法有一個前置的下劃線,代表這是一個類內部定義的,永遠不該該被用戶的程序直接調用。

transform._transform(chunk,encoding,callback)

  • chunk <Buffer>|<string>|<any> 須要被轉換的chunk。在不設置decodeString參數爲false或者不爲對象模式狀況下,用仍是buffer.

  • encoding <string> 若是chunk是字符串,那麼該參數設置其編碼類型。若是chunk是buffer,那麼這個值是'buffer',這種狀況下會忽略。

  • calback <Function> 一個回調函數(帶有可選的error和data參數),提供的chunk被處理後會被調用。

注意 這個方法絕對不能應用的代碼直接調用。其只應該被繼承的子類的Readable類中的方法調用。

全部的Transofrom流的實現都必須提供一個_transform()方法類接受輸入,產生輸出。transfrom._transform()方法實現處理被寫入的字節,計算輸出,而後從readable部分使用readable.push()傳出。

transform.push()方法可能會被屢次調用,根據input chunk來生成output。取決於chunk能產生多少output結果。

有時也可能發生不管什麼樣的數據chunk也產生不了output。

當前的chunk被消耗完了以後必須調用回調函數。回調函數的第一個參數必須是Error對象,若是發生錯誤該對象會被傳遞,不然能夠是null。若是有第二個參數,那麼它將被推薦到readable.push()方法。簡單來講,以下:

transform.prototype._transform = function(data, encoding, callback) {
  this.push(data);
  callback();
};

transform.prototype._transform = function(data, encoding, callback) {
  callback(null, data);
};

此外,該方法也是一個類的內部方法,不該該被用戶程序直接調用。

類:stream.PassThough/Class:stream.PassThrough

stream.PassThough類是一個Transform流的一個瑣碎實現,可以簡單的傳遞input字節到output。原本的目的是檢驗和測試。可是也有一些場景,例如整理小說章節分類。

相關文章
相關標籤/搜索