MetaMask/json-rpc-middleware-stream

https://github.com/MetaMask/json-rpc-middleware-stream/blob/master/test/index.js#L20git

A small toolset for streaming json rpc and matching requests and responses. Made to be used with json-rpc-engine.github

能夠用來與json-rpc-engine結合使用的,對輸入的json rpc進行讀入、寫出處理json

 

json-rpc-middleware-stream/index.js數組

const SafeEventEmitter = require('safe-event-emitter')
const DuplexStream = require('readable-stream').Duplex //即一個可讀且可寫的流

module.exports = createStreamMiddleware

function createStreamMiddleware() {
  const idMap = {}
  const stream = new DuplexStream({
    objectMode: true, //輸入的是任意形式的數據,而不非是字符串和 (或 )
    read: readNoop, //下面定義的返回false的函數,會覆寫_read(),當要從別地流處讀出時調用
    write: processMessage,//會覆寫_write(),當要寫入別的流時調用
  })

  const events = new SafeEventEmitter()

  const middleware = (req, res, next, end) => {
    // write req to stream
    stream.push(req)//從req流中讀出,就會去調用readNoop函數
    // register request on id map
    idMap[req.id] = { req, res, next, end }//並在數組中將req根據其id記錄下來
  }

  return { events, middleware, stream }

  function readNoop () {
    return false
  }

  function processMessage (res, encoding, cb) {//當要寫出時調用
    let err
    try {
      const isNotification = !res.id //若是res.id有值,則isNotification爲false;不然爲true
      if (isNotification) {//res.id沒值或爲0
        processNotification(res)//觸發事件'notification'
      } else {
        processResponse(res)//res.id有值
      }
    } catch (_err) {
      err = _err
    }
    // continue processing stream
    cb(err)
  }

  function processResponse(res) {//將流中內容寫出到res流
    const context = idMap[res.id] //查看有沒有與相應的res.id相同的ID的流以前讀入過
    if (!context) throw new Error(`StreamMiddleware - Unknown response id ${res.id}`) //若是context爲空,則說明相應的id的流並無讀入過,這樣寫出的時候就不知道要怎麼寫出了,無end,因此會出錯
    delete idMap[res.id] //若是有讀入過,則寫出前先清空idMap中的相應內容
    // copy whole res onto original res
    Object.assign(context.res, res) //而後將如今要寫出到的res流覆寫context.res,並返回context.res
    // run callback on empty stack,
    // prevent internal stream-handler from catching errors
    setTimeout(context.end) //調用以前讀入時寫好的end函數來結束寫出操做
  }

  function processNotification(res) {//該事件的監聽會在inpage-provider處設置
    events.emit('notification', res)
  }

}BufferUint8Array
:用於將全部可枚舉屬性的值從一個或多個源對象複製到目標對象。它將返回目標對象。Object.assign(target, ...sources)

 

json-rpc-middleware-stream/engineStream.jside

const DuplexStream = require('readable-stream').Duplex

module.exports = createEngineStream

function createEngineStream({ engine }) {//engineRpcEngine
  if (!engine) throw new Error('Missing engine parameter!')
  const stream = new DuplexStream({ objectMode: true, read, write })
  // forward notifications
  if (engine.on) {
    engine.on('notification', (message) => {//監聽'notification'事件
      stream.push(message) //事件被觸發的話就將message數據讀入stream,調用read函數
    })
  }
  return stream

  function read () {
    return false
  }
  function write (req, encoding, cb) {//當寫出時調用該函數
    engine.handle(req, (err, res) => {
      this.push(res)
    })
    cb()
  }
}

 

 

測試:函數

json-rpc-middleware-stream/test/index.jsoop

const test = require('tape')
const RpcEngine = require('json-rpc-engine')
const createJsonRpcStream = require('../index')
const createEngineStream = require('../engineStream')

test('middleware - raw test', (t) => {

  const jsonRpcConnection = createJsonRpcStream()
  const req = { id: 1, jsonrpc: '2.0', method: 'test' }
  const initRes = { id: 1, jsonrpc: '2.0' }
  const res = { id: 1, jsonrpc: '2.0', result: 'test' }

  // listen for incomming requests
  jsonRpcConnection.stream.on('data', (_req) => {//監聽data事件
    t.equal(req, _req, 'got the expected request')//說明觸發data事件傳來的
    jsonRpcConnection.stream.write(res)//將流中的與res.id相同的數據寫出
  })

  // run middleware, expect end fn to be called
  jsonRpcConnection.middleware(req, initRes, () => {//將req流寫入createJsonRpcStream
    t.fail('should not call next')
  }, (err) => {
    t.notOk(err, 'should not error')
    t.deepEqual(initRes, res, 'got the expected response')
    t.end()
  })

})

test('engine to stream - raw test', (t) => {

  const engine = new RpcEngine()
  engine.push((req, res, next, end) => {
    res.result = 'test'
    end()
  })

  const stream = createEngineStream({ engine })
  const req = { id: 1, jsonrpc: '2.0', method: 'test' }
  const res = { id: 1, jsonrpc: '2.0', result: 'test' }

  // listen for incomming requests
  stream.on('data', (_res) => {
    t.deepEqual(res, _res, 'got the expected response')
    t.end()
  })

  stream.on('error', (err) => {
    t.fail(error.message)
  })

  stream.write(req)

})


test('middleware and engine to stream', (t) => {//上面二者的結合

  // create guest
  const engineA = new RpcEngine()
  const jsonRpcConnection = createJsonRpcStream()
  engineA.push(jsonRpcConnection.middleware)

  // create host
  const engineB = new RpcEngine()
  engineB.push((req, res, next, end) => {
    res.result = 'test'
    end()
  })

  // connect both
  const clientSideStream = jsonRpcConnection.stream
  const hostSideStream = createEngineStream({ engine: engineB })
  clientSideStream
  .pipe(hostSideStream)
  .pipe(clientSideStream)

  // request and expected result
  const req = { id: 1, jsonrpc: '2.0', method: 'test' }
  const res = { id: 1, jsonrpc: '2.0', result: 'test' }

  engineA.handle(req, (err, _res) => {//req調用jsonRpcConnection.middleware讀入clientSideStream,而後hostSideStreamclientSideStream中讀入req數據,而後調用engineB的方法寫出,因此獲得的result: 'test'
    t.notOk(err, 'does not error')
    t.deepEqual(res, _res, 'got the expected response')
    t.end()
  })

})

test('server notification', (t) => {
  t.plan(1)

  const jsonRpcConnection = createJsonRpcStream()
  const notif = { jsonrpc: '2.0', method: 'test_notif' }//這裏沒有設置id,因此調用write因此會觸發processNotification函數,觸發'notification'事件

  jsonRpcConnection.events.once('notification', (message) => {
    t.equals(message.method, notif.method)
    t.end()
  })

  // receive notification
  jsonRpcConnection.stream.write(notif)
})


test('server notification in stream', (t) => {
  const engine = new RpcEngine()

  const stream = createEngineStream({ engine })
  const notif = { jsonrpc: '2.0', method: 'test_notif' }

  // listen for incomming requests
  stream.once('data', (_notif) => {
    t.deepEqual(notif, _notif, 'got the expected notification')
    t.end()
  })

  stream.on('error', (err) => {
    t.fail(error.message)
  })

  engine.emit('notification', notif)//將觸發createEngineStream中的'notification'事件,notif將被讀入stream,將觸發data事件
})
相關文章
相關標籤/搜索
本站公眾號
   歡迎關注本站公眾號,獲取更多信息