異步編程解決方案

 

  • 事件發佈/訂閱模式
  • promise/deferrd模式
  • 流程控制模式

事件發佈/訂閱模式

事件監聽器模式是異步回調的事件化,又稱發佈訂閱/模式node

node核心模塊events

方法數據庫

  • addListener/on
  • once
  • removeListener
  • removeAllListeners
  • emit

簡單操做編程

let events = require('events')
let request = require('http').request
let emit = new events.EventEmitter()
emit.on('message', function (msg) {
    console.log(msg)
})
emit.once('message1', function (msg) {
    console.log(msg)
})
emit.emit('message','message')
emit.emit('message','message')
emit.emit('message1','message1')
emit.emit('message1','message1')

 


此示例中,由於message事件經過on來監聽,message1經過once來監聽,因此message事件的回調能夠執行屢次,message1事件的回調只能執行一次json

結果:api

message
message
message1

 

事件分發/訂閱也能夠用來作一個功能黑盒,分發者只須要觸發這個事件,不須要知道有多少訂閱者,這些訂閱者都在處理什麼業務。事件偵聽器模式也是一種鉤子機制,node中的封裝不少是一個黑盒,不使用事件監聽咱們很難知道內部狀態變化的中間數據。使用事件偵聽器能夠沒必要知道內部如何實現,只關注特定的事件點,好比下面的http請求promise

let request = require('http').request
let options = {
    host: 'www.google.com',
    port: 80,
    path: '/upload',
    method: 'POST'
}
let req = request(options, function(res) {
    console.log('STATUS', res.statusCode)
    console.log('header', JSON.stringify(res.headers))
    res.setEncoding('utf-8')
    res.on('data', function (chunck) {
        console.log('Chunck', chunck)
    })
    res.on('end', function() {
        console.log('end')
    })
})
req.on('error', function(err) {
    console.log(err)
})
req.write('data\n')
req.write('data\n')
req.end()

只須要監聽data  end  error事件就能夠完成功能,不須要知道request究竟如何實現緩存

細節:併發

  1. 若是對一個事件設置了10個以上的監聽器,會有警告。可經過emitter.setMaxListeners(0)將限制去掉
  2. 異常處理,若是有監聽error事件,則會執行error中的代碼不然將會拋出錯誤,整個進程結束

利用隊列解決雪崩問題

雪崩問題:在高併發高訪問量的狀況下緩存失效的情境下,大量請求同時發向數據庫,數據庫沒法承受如此多的查詢請求,進而影響網站總體性能異步

var proxy = new events.EventEmitter();
var status = "ready";
var select = function (callback) {
  proxy.once("selected", callback);
  if (status === "ready") {
    status = "pending";
    db.select("SQL", function (results) {
      proxy.emit("selected", results);
      status = "ready";
    });
  }
};

 


短期內,若是執行多個select,不會執行屢次查詢請求,而是會用這次的callback來監聽select事件,只執行一次查詢請求,當這個請求返回時,觸發select事件,執行以前的全部callback,因爲用的once來監聽,因此每一個callback只會執行一次。async

多異步之間的協做方案

const fs = require('fs')
const after = function (times = 0, callback) {
    let time = 0,datas = {}
    return function (key, value) {
        datas[key] = value
        if(++ time >= times) {
            return callback(datas)
        }
    }
}
const done = after(2, function(data) {
    console.log(data)
})
const readFile = function(path,key, callback) {
    fs.readFile(path, 'utf-8', function(err, data) {
        if(err) {
            throw(err)
        }
        callback(key,data)
    })
}
readFile('../a.json', 'a', done)
readFile('../b.json', 'b', done)

 


經過總數和當前執行的次數來計數並執行最終的calback

 

const done = after(2, function(data) {
    console.log(data)
})
let emit = new events.EventEmitter()
emit.on('done', done)
emit.on('done', other)
function readFile(path, key, emit, name) {
    fs.readFile(path, 'utf-8', function(err, data) {
        if(err) {
            throw err
        }
        emit.emit(name, key, data)
    })
}
readFile('../a.json', 'a', emit, 'done')
readFile('../b.json', 'b', emit, 'done')

 

經過事件監聽來實現多對多異步

 

let EventProxy = require('eventproxy')
let fs = require('fs')
let proxy = new EventProxy()
proxy.all('a', 'b', function(a, b) {
    console.log('a',a,'b',b)
})
function readFile(path, name) {
    fs.readFile(path, 'utf-8', function(err, data) {
        if(err) {
            throw err
        }
        proxy.emit(name, data)
    })
}
readFile('../a.json', 'a')
readFile('../b.json', 'b')
readFile('../a.json', 'a')
readFile('../b.json', 'b')

 

經過eventproxy來實現一對多異步

promise/deffered模式

用event實現一個request的promise

var Deferred = function() {
    this.promise = new Promise()
    this.status = 'unfullfiled'
}
Deferred.prototype.resolve = function(obj) {
    this.status = 'fullfiled'
    this.promise.emit('success', obj)
}
Deferred.prototype.reject = function(err) {
    this.status = 'failed'
    this.promise.emit('error', err)
}
Deferred.prototype.process = function(data) {
    this.promise.emit('process', data)
}

var promisify = function(res) {
    var deffer = new Deferred()
    let result = ''
    res.setEncoding('utf-8')
    res.on('data', function(chunck) {
        result += chunck
        deffer.process(chunck)
    })
    res.on('end', function() {
        deffer.resolve(result) 
    })
    res.on('error', function(err) {
        deffer.reject(err)
    })
    return deffer.promise
}

let option = {
    host: 'www.baidu.com',
    port: 80,
    method: 'POST',
    path: 'upload'
}
let req = http.request(option, function(res) {
    promisify(res).then(function(data) {
        console.log('success', data)
    }, function(err) {
        console.log('error', err)
    }, function(chunck) {
        console.log('Chunck', chunck)
    })
})
req.write('data')
req.write('data')
req.end()

 

defferd爲內部封裝promise爲外部調用

有一些庫來幫助定製化promise

流程控制庫

  1. 尾觸發和next(connect)
  2. 流程控制模塊async
    1. 無依賴串行

      let async = require('async')
      let fs = require('fs')
      async.series([
          function(callback) {
              fs.readFile('../a.json', 'utf-8', callback)
          },
          function(callback) {
              fs.readFile('../b.json', 'utf-8', callback)
          },
      ], function(err, datas) {
          console.log(datas)
      })

       


      至關於 

      let fs = require('fs')
      let callback = function(err, results) {
          console.log(results)
      }
      fs.readFile('../a.json', 'utf-8', function(err, content) {
          if(err) {
              return callback(err)
          }
          fs.readFile('../b.json', 'utf-8', function(err, data) {
              if(err) {
                  return callback(err)
              }
              callback(null, [content, data])
          })
      })

       callback傳遞由async完成

    2. 並行

      let fs = require('fs')
      let async = require('async')
      async.parallel([
          function(callback) {
              fs.readFile('../a.json', 'utf-8', callback)
          },
          function(callback) {
              fs.readFile('../b.json', 'utf-8', callback)
          },
      ], function(err, results) {
          console.log(results)
      })

       

      至關於

      let fs = require('fs')
      
      let counter = 2
      let results = []
      let failError = null
      let callback = function(err, data) {
          console.log(data)
      }
      
      let done = function(index, data) {
          results[index] = data
          if(-- counter === 0) {
              callback(null, results)
          }
      }
      
      let fail = function(err) {
          if(!failError) {
              failError = err
              callback(failError)
          }
      }
      
      fs.readFile('../a.json', 'utf-8', function(err, data) {
          if(err) {
              fail()
          }
          done(0, data)
      })
      fs.readFile('../b.json', 'utf-8', function(err, data) {
          if(err) {
              fail()
          }
          done(1, data)
      })
    3. 前一個輸出爲後一個輸入async.waterfall
    4. 自動化執行

      let async = require('async')
      let fs = require('fs')
      
      let dep = {
          b: function(callback) {
              fs.readFile('../b.json', 'utf-8', function(err, data) {
                  console.log(data)
                  callback()
              })
          },
          a: ['b', function(callback) {
              fs.readFile('../a.json', 'utf-8', function(err, data) {
                  console.log(data)
              })
          }]
      }
      
      async.auto(dep)

       使a在b執行完後執行

相關文章
相關標籤/搜索