如何對 Promise 限流:實現一個 Promise.map

本文連接 shanyue.tech/code/Promis…javascript

實現

假設有一個 Promise 爲 get 和一個待請求數組爲 list,使用它們進行請求數據。可是爲了不 IO 過大,須要限定三個併發數量html

function get (i) {
  console.log('In ', i)
  return new Promise((resolve, reject) => {
    setTimeout(() => {
      resolve(i * 1000) 
      console.log('Out', i, 'Out')
    }, i * 1000)
  })
}

const list = [1, 2, 3, 4, 5, 6, 7, 8, 9, 10]
複製代碼

寫一段可以實現功能的鬆散的代碼是很簡單的,不過對提供 API 的設計思路也是至關重要的。簡單實現以下,使用 count 維護一個併發數量的計數器便可java

// 併發數量計數
let count = 0
function run () {
  if (count < 3 && list.length) {
    count+=1
    get(list.shift()).then(() => {
      count-=1 
      run()
    })
  }
}

// 限定三個併發數量
run()
run()
run()
複製代碼

代碼

Promise.map(
    Iterable<any>|Promise<Iterable<any>> input,
    function(any item, int index, int length) mapper,
    [Object {concurrency: int=Infinity} options]
) -> Promise
複製代碼

設計成 Bluebird 的 API,是比較模塊化,也是易於使用的。代碼的關鍵在於維護一個隊列,當超過限定數量的 Promise 時,則交與隊列維護。代碼以下git

class Limit {
  constructor (n) {
    this.limit = n
    this.count = 0
    this.queue = []
  }

  enqueue (fn) {
    // 關鍵代碼: fn, resolve, reject 統一管理
    return new Promise((resolve, reject) => {
      this.queue.push({ fn, resolve, reject })
    })
  }

  dequeue () {
    if (this.count < this.limit && this.queue.length) {
      // 等到 Promise 計數器小於閾值時,則出隊執行
      const { fn, resolve, reject } = this.queue.shift()
      this.run(fn).then(resolve).catch(reject)
    }
  }

  // async/await 簡化錯誤處理
  async run (fn) {
    this.count++
    // 維護一個計數器
    const value = await fn()
    this.count--
    // 執行完,看看隊列有東西沒
    this.dequeue()
    return value
  }

  build (fn) {
    if (this.count < this.limit) {
      // 若是沒有到達閾值,直接執行
      return this.run(fn)
    } else {
      // 若是超出閾值,則先扔到隊列中,等待有空閒時執行
      return this.enqueue(fn)
    }
  }
}

Promise.map = function (list, fn, { concurrency }) {
  const limit = new Limit(concurrency)
  return Promise.all(list.map((...args) => {
    return limit.build(() => fn(...args))
  }))
}
複製代碼

參考

Bluebird.map

Bluebird.map(list, x => {
  return get(x)
}, {
  concurrency: 3
})
複製代碼

主要是參考 concurrency 的實現github

featurist/promise-limit


歡迎關注個人公衆號山月行,在這裏記錄着個人技術成長,歡迎交流api

歡迎關注公衆號山月行,在這裏記錄個人技術成長,歡迎交流
相關文章
相關標籤/搜索