through2原理解析

寫在前面

through2常常被用於處理nodestream,假如使用過gulp的話,對於這個包必定不會陌生,如:node

gulp.task('rewrite', () => {
  return gulp.src('./through/enter.txt')
    .pipe(through2.obj(function(chunk, enc, callback) {
      const { contents } = chunk;
      for (var i = 0; i < contents.length; i++) {
        if (contents[i] === 97) {
          contents[i] = 122;
        }
      }

      chunk.contents = contents;
      this.push(chunk);

      callback();
    }))
    .pipe(gulp.dest('./dist'));
});

這裏將文件中全部的字符a轉換爲字符z,在寫gulp插件時必定會應用到這個包,下面就來窺探一下這個使用率很是高的包。git

Transform stream

through2的源碼僅僅就100多行,本質上就是對於node原生的transform流進行的封裝,先來看下Transform streamTransform是一個雙工流,既可讀,也可寫,可是與Duplex仍是有着一些區別,Duplex的寫和讀能夠說是沒有任何的關聯,是兩個緩衝區和管道互補干擾,而Transform將其輸入和輸出是存在相互關聯的,中間作了處理。具體差異能夠參考下面圖片對比:github

Duplex stream:gulp

Duplex 圖示

Transform stream:api

transform 圖示

Transform stream的兩個緩存區相互關聯,對於每一個緩衝區來講,highWaterMark爲閾值,超過閾值後,將會中止讀或者寫操做,如:緩存

let i = 0;
const readable = Readable({
  highWaterMark: 2,
  read: function () {
    var data = i < 26 ? String.fromCharCode(i++ + 97) : null;
    console.log('push', data);
    this.push(data);
  }
});

const transform = Transform({
  highWaterMark: 2,
  transform: function (buf, enc, next) {
    console.log('transform', buf.toString());
    next(null, buf);
  }
})

readable.pipe(transform);

stream流向爲:函數

因爲閾值爲2,因此只能pushf,這時readable的緩存區已滿,transform的讀緩存區和寫緩存區已經滿了(因爲transform的兩個緩存區的閾值爲2,因此寫緩存區在寫入b以後就已經滿了,後續不能繼續寫入),所有滿以後,天然中止了讀取,最終e,f存在A中,c,d存在B中,a,b存在C中,想要解決很簡單,在添加一個流向就能夠:oop

readable.pipe(transform).pipe(process.stdout);

through2源碼

在瞭解Transform stream以後,through2的源碼很是的簡單,就是對於其的一層封裝,暴露出三個api(through2through2.objthrough2.ctor)並且三者接收的參數一致,由於都是由一個工廠方法創造出的:ui

function through2 (construct) {
  return function (options, transform, flush) {
    // 作了一些參數整理
    if (typeof options == 'function') {
      flush     = transform
      transform = options
      options   = {}
    }

    if (typeof transform != 'function')
      transform = noop

    if (typeof flush != 'function')
      flush = null

    return construct(options, transform, flush)
  }
}

來看一下through2對於Transform stream的再加工,也就是源碼中的DestroyableTransform,與其名字同樣,就是一個替咱們實現好了destory方法的Transform streamthis

DestroyableTransform.prototype.destroy = function(err) {
  if (this._destroyed) return
  this._destroyed = true

  var self = this
  // 觸發destory後,close掉流
  process.nextTick(function() {
    if (err)
      self.emit('error', err)
    self.emit('close')
  })
}

through2through2.obj所有是創造出一個再加工後的Transform,區別以下:

  • 後者開啓了對象模式(objectMode屬性爲true),寫入的參數不單單限制在string or uint8Array
  • 後者下降了閾值(highWaterMark16,而不是默認的16kb),這樣作的緣由,是爲了和node的默認保持一致,具體能夠參見這裏

through2.ctor能夠用來再次定製,其返回的是一個構造函數,用法能夠參考下面:

const Tran = through.ctor(function(chunk, enc, callback) {
  console.log('transform', chunk.toString());
  callback(null, chunk);
});
const transform = new Tran();

寫在最後

streamnode中有着很是普遍的應用,可是它使用起來卻不是那麼友好,throgh2的出現能夠減小使用上的麻煩,其原理也很是的簡單;以上內容均爲本人理解,若有錯誤還請指出,不勝感激~

相關文章
相關標籤/搜索