kumavis/obj-multiplex

https://github.com/kumavis/obj-multiplexhtml

 

obj-multiplex多路複用

simple stream multiplexing for objectModenode

其實就是一個多路複用流可以使用name來區分各個子流,以達到一個parent流下其實有多個子流在運行,能夠經過多個子流來讀入寫出數據,效率更高。並且parent流結束了,則全部子流也會被銷燬git

usage

// create multiplexer
const mux = new ObjMultiplex()

// setup substreams
const streamA = mux.createStream('hello')
const streamB = mux.createStream('world')

// pipe over transport (and back)
mux.pipe(transport).pipe(mux)

// send values over the substreams
streamA.write({ thisIsAn: 'object' })
streamA.write(123)

// or pipe together normally
streamB.pipe(evilAiBrain).pipe(streamB)

 

 obj-multiplex/index.jsgithub

const { Duplex } = require('readable-stream')
const endOfStream = require('end-of-stream')//看博客mafintosh/end-of-stream
const once = require('once')
const noop = () => {}

const IGNORE_SUBSTREAM = {}


class ObjectMultiplex extends Duplex {

  constructor(_opts = {}) {
    const opts = Object.assign({}, _opts, {
      objectMode: true,//流可傳各種形式數據
    })
    super(opts)//生成這個流

    this._substreams = {}
  }

  createStream (name) {//就是建立兩個流,一個是這個流,另外一個是parent是這個流的一個子流,並返回子流
    // validate name
    if (!name) throw new Error('ObjectMultiplex - name must not be empty')//name不能爲空
    if (this._substreams[name]) throw new Error('ObjectMultiplex - Substream for name "${name}" already exists')//name不能重複

    // create substream
    const substream = new Substream({ parent: this, name: name })
    this._substreams[name] = substream

    // listen for parent stream to end
    anyStreamEnd(this, (err) => {//定義當parent流結束,則相應的全部子流也要被銷燬
      substream.destroy(err)//substream被destroy,若是出錯返回的錯誤信息即err
    })

    return substream
  }

  // ignore streams (dont display orphaned data warning)
  ignoreStream (name) {//就是將以前建立的name的子流的內容清空
    // validate name
    if (!name) throw new Error('ObjectMultiplex - name must not be empty')
    if (this._substreams[name]) throw new Error('ObjectMultiplex - Substream for name "${name}" already exists')
    // set
    this._substreams[name] = IGNORE_SUBSTREAM
  }

  // stream plumbing
  //下面就是parent流可以作的一系列讀寫操做
  _read () {}

  _write(chunk, encoding, callback) {//當調用  時,數據會被緩衝在可寫流中。
    // parse message,就是當parent流write時,將根據其傳入的name來決定該數據是寫到哪一個子流上的
    const name = chunk.name
    const data = chunk.data
    if (!name) {//name不能爲空,不然不知道是哪一個子流
      console.warn(`ObjectMultiplex - malformed chunk without name "${chunk}"`)
      return callback()
    }

    // get corresponding substream
    const substream = this._substreams[name]//而後根據name獲得子流
    if (!substream) {//若是爲空則warn
      console.warn(`ObjectMultiplex - orphaned data for stream "${name}"`)
      return callback()
    }

    // push data into substream
    if (substream !== IGNORE_SUBSTREAM) {//只有當子流不爲{}時,纔將data壓入
      substream.push(data)//當調用  時,數據會被緩衝在可讀流中。 若是流的消費者沒有調用 ,則數據會保留在內部隊列中直到被消費
    }

    callback()
  }//_write

}//class


class Substream extends Duplex {

  constructor ({ parent, name }) {
    super({
      objectMode: true,
    })

    this._parent = parent
    this._name = name
  }

  _read () {}//讀入的操做即Duplex的定義

  _write (chunk, enc, callback) {//當子流被寫入時,實際上是將數據壓入流parent中
    this._parent.push({
      name: this._name,
      data: chunk,
    })
    callback()//而後調用回調函數
  }

}

module.exports = ObjectMultiplex

// util

function anyStreamEnd(stream, _cb) {//就是當stream結束的時候就會調用cb回調函數
  const cb = once(_cb)
  endOfStream(stream, { readable: false }, cb)
  endOfStream(stream, { writable: false }, cb)
}writable.write(chunk)stream.push(chunk)stream.read()

 

經過測試學習該庫的使用:函數

obj-multiplex/test/index.jsoop

const test = require('tape')
const once = require('once')
const { PassThrough, Transform } = require('readable-stream')//PassThrough本質也是Transform流,是最簡單的Transform流,只是將數據今後處傳過
// a passthrough stream.
// basically just the most minimal sort of Transform stream.
// Every written chunk gets output as-is
const endOfStream = require('end-of-stream') const pump = require('pump') const ObjMultiplex = require('../index.js') test('basic - string', (t) => { t.plan(2) const { inTransport, outTransport, inMux, outMux, inStream, outStream, } = basicTestSetup() bufferToEnd(outStream, (err, results) => { t.error(err, 'should not error') t.deepEqual(results, ['haay', 'wuurl'], 'results should match') t.end() }) // pass in messages inStream.write('haay') inStream.write('wuurl') // simulate disconnect setTimeout(() => inTransport.destroy()) }) test('basic - obj', (t) => { t.plan(2) const { inTransport, outTransport, inMux, outMux, inStream, outStream, } = basicTestSetup() bufferToEnd(outStream, (err, results) => { t.error(err, 'should not error') t.deepEqual(results, [{ message: 'haay' }, { message: 'wuurl' }], 'results should match') t.end() }) // pass in messages inStream.write({ message: 'haay' }) inStream.write({ message: 'wuurl' }) // simulate disconnect setTimeout(() => inTransport.destroy()) }) test('roundtrip', (t) => { t.plan(2) const { inTransport, outTransport, inMux, outMux, inStream, outStream, } = basicTestSetup() const doubler = new Transform({ objectMode: true, transform (chunk, end, callback) {//對流內數據進行*2計算 // console.log('doubler!', chunk) const result = chunk * 2 callback(null, result) } }) pump(//即將從outStream處獲得的數據進行*2處理後再傳回outStream outStream, doubler, outStream ) bufferToEnd(inStream, (err, results) => { t.error(err, 'should not error') t.deepEqual(results, [20, 24], 'results should match') t.end() }) // pass in messages inStream.write(10) inStream.write(12) // simulate disconnect setTimeout(() => outTransport.destroy(), 100) }) // util function basicTestSetup() { // setup multiplex and Transport const inMux = new ObjMultiplex()//定義了兩個parent流 const outMux = new ObjMultiplex() const inTransport = new PassThrough({ objectMode: true }) const outTransport = new PassThrough({ objectMode: true }) // setup substreams const inStream = inMux.createStream('hello')//分別在兩個parent流中各自定義一個name爲hello的子流 const outStream = outMux.createStream('hello') pump(//造成一個pipe流 inMux, inTransport, outMux, outTransport, inMux ) return { inTransport, outTransport, inMux, outMux, inStream, outStream, } } function bufferToEnd(stream, callback) { const results = [] endOfStream(stream, (err) => callback(err, results))//定義了流結束後的回調 stream.on('data', (chunk) => results.push(chunk))//並監聽data事件,用於將數據壓入流 }
本站公眾號
   歡迎關注本站公眾號,獲取更多信息