JavaScript 異步數組

JavaScript 異步數組

吾輩的博客原文: https://blog.rxliuli.com/p/5e...

場景

吾輩是一隻在飛向太陽的螢火蟲

JavaScript 中的數組是一個至關泛用性的數據結構,能當數組,元組,隊列,棧進行操做,更好的是 JavaScript 提供了不少原生的高階函數,便於咱們對數組總體操做。
然而,JavaScript 中的高階函數仍有缺陷 -- 異步!當你把它們放在一塊兒使用時,就會感受到這種問題的所在。git

例如如今,有一組 id,咱們要根據 id 獲取到遠端服務器 id 對應的值,而後將之打印出來。那麼,咱們要怎麼作呢?github

const wait = ms => new Promise(resolve => setTimeout(resolve, ms))

async function get(id) {
  // 這裏只是爲了模擬每一個請求的時間多是不定的
  await wait(Math.random() * id * 100)
  return '內容: ' + id.toString()
}

const ids = [1, 2, 3, 4]

你或許會下意識地寫出下面的代碼數組

ids.forEach(async id => console.log(await get(id)))

事實上,控制檯輸出是無序的,而並不是想象中的 1, 2, 3, 4 依次輸出緩存

內容: 2 ​​​​​
內容: 3 ​​​​​
內容: 1 ​​​​​
內容: 4

這是爲何呢?緣由即是 JavaScript 中數組的高階函數並不會等待異步函數的返回!當你在網絡上搜索時,會發現不少人會說可使用 for-of, for-in 解決這個問題。服務器

;(async () => {
  for (let id of ids) {
    console.log(await get(id))
  }
})()

或者,使用 Promise.all 也是一種解決方案網絡

;(async () => {
  ;(await Promise.all(ids.map(get))).forEach(v => console.log(v))
})()

然而,第一種方式至關於丟棄了 Array 的全部高階函數,再次重返遠古 for 循環時代了。第二種則必定會執行全部的異步函數,即使你須要使用的是 find/findIndex/some/every 這些高階函數。那麼,有沒有更好的解決方案呢?數據結構

思考

既然原生的 Array 不支持完善的異步操做,那麼,爲何不禁咱們來實現一個呢?併發

實現思路:dom

  1. 建立異步數組類型 AsyncArray
  2. 內置一個數組保存當前異步操做數組的值
  3. 實現數組的高階函數並實現支持異步函數順序執行
  4. 獲取到內置的數組
class AsyncArray {
  constructor(...args) {
    this._arr = Array.from(args)
    this._task = []
  }
  async forEach(fn) {
    const arr = this._arr
    for (let i = 0, len = arr.length; i < len; i++) {
      await fn(arr[i], i, this)
    }
  }
}

new AsyncArray(...ids).forEach(async id => console.log(await get(id)))

打印結果確實有順序了,看似一切很美好?異步

然而,當咱們再實現一個 map 試一下

class AsyncArray {
  constructor(...args) {
    this._arr = Array.from(args)
  }
  async forEach(fn) {
    const arr = this._arr
    for (let i = 0, len = arr.length; i < len; i++) {
      await fn(arr[i], i, this)
    }
  }
  async map(fn) {
    const arr = this._arr
    const res = []
    for (let i = 0, len = arr.length; i < len; i++) {
      res.push(await fn(arr[i], i, this))
    }
    return this
  }
}

調用一下

new AsyncArray(...ids).map(get).forEach(async res => console.log(res))
// 拋出錯誤
// (intermediate value).map(...).forEach is not a function

然而會有問題,實際上 map 返回的是 Promise,因此咱們還必須使用 await 進行等待

;(async () => {
  ;(await new AsyncArray(...ids).map(get)).forEach(async res =>
    console.log(res),
  )
})()

是否是感受超級蠢?吾輩也是這樣認爲的!

鏈式調用加延遲執行

咱們能夠嘗試使用鏈式調用加延遲執行修改這個 AsyncArray

/**
 * 保存高階函數傳入的異步操做
 */
class Action {
  constructor(type, args) {
    /**
     * @field 異步操做的類型
     * @type {string}
     */
    this.type = type
    /**
     * @field 異步操做的參數數組
     * @type {Function}
     */
    this.args = args
  }
}

/**
 * 全部的操做類型
 */
Action.Type = {
  forEach: 'forEach',
  map: 'map',
  filter: 'filter',
}

/**
 * 真正實現的異步數組
 */
class InnerAsyncArray {
  constructor(arr) {
    this._arr = arr
  }
  async forEach(fn) {
    const arr = this._arr
    for (let i = 0, len = arr.length; i < len; i++) {
      await fn(arr[i], i, this)
    }
    this._arr = []
  }
  async map(fn) {
    const arr = this._arr
    const res = []
    for (let i = 0, len = arr.length; i < len; i++) {
      res.push(await fn(arr[i], i, this))
    }
    this._arr = res
    return this
  }
  async filter(fn) {
    const arr = this._arr
    const res = []
    for (let i = 0, len = arr.length; i < len; i++) {
      if (await fn(arr[i], i, this)) {
        res.push(arr[i])
      }
    }
    this._arr = res
    return this
  }
}

class AsyncArray {
  constructor(...args) {
    this._arr = Array.from(args)
    /**
     * @field 保存異步任務
     * @type {Action[]}
     */
    this._task = []
  }
  forEach(fn) {
    this._task.push(new Action(Action.Type.forEach, [fn]))
    return this
  }
  map(fn) {
    this._task.push(new Action(Action.Type.map, [fn]))
    return this
  }
  filter(fn) {
    this._task.push(new Action(Action.Type.filter, [fn]))
    return this
  }
  /**
   * 終結整個鏈式操做並返回結果
   */
  async value() {
    const arr = new InnerAsyncArray(this._arr)
    let result
    for (let task of this._task) {
      result = await arr[task.type](...task.args)
    }
    return result
  }
}

使用一下

new AsyncArray(...ids)
  .filter(async i => i % 2 === 0)
  .map(get)
  .forEach(async res => console.log(res))
  .value()

能夠看到,確實符合預期了,然而每次都要調用 value(),終歸有些麻煩。

使用 then 以支持 await 自動結束

這裏使用 then() 替代它以使得可使用 await 自動計算結果

class AsyncArray {
  // 上面的其餘內容...
  /**
   * 終結整個鏈式操做並返回結果
   */
  async then(resolve) {
    const arr = new InnerAsyncArray(this._arr)
    let result
    for (let task of this._task) {
      result = await arr[task.type](...task.args)
    }
    // 這裏使用 resolve(result) 是爲了兼容 await 的調用方式
    resolve(result)
    return result
  }
}

如今,可使用 await 結束此次鏈式調用了

await new AsyncArray(...ids).map(get).forEach(async res => console.log(res))

忽然之間,咱們發現了一個問題,爲何會這麼慢?一個個去進行異步操做太慢了,難道就不能一次性所有發送出去,而後有序的處理結果就行了嘛?

併發異步操做

咱們可使用 Promise.all 併發執行異步操做,而後對它們的結果進行有序地處理。

/**
 * 併發實現的異步數組
 */
class InnerAsyncArrayParallel {
  constructor(arr) {
    this._arr = arr
  }
  async _all(fn) {
    return Promise.all(this._arr.map(fn))
  }
  async forEach(fn) {
    await this._all(fn)
    this._arr = []
  }
  async map(fn) {
    this._arr = await this._all(fn)
    return this
  }
  async filter(fn) {
    const arr = await this._all(fn)
    this._arr = this._arr.filter((v, i) => arr[i])
    return this
  }
}

而後修改 AsyncArray,使用 _AsyncArrayParallel 便可

class AsyncArray {
  // 上面的其餘內容...
  /**
   * 終結整個鏈式操做並返回結果
   */
  async then(resolve) {
    const arr = new InnerAsyncArrayParallel(this._arr)
    let result = this._arr
    for (let task of this._task) {
      result = await arr[task.type](...task.args)
    }
    // 這裏使用 resolve(result) 是爲了兼容 await 的調用方式
    if (resolve) {
      resolve(result)
    }
    return result
  }
}

調用方式不變。固然,因爲使用 Promise.all 實現,也一樣受到它的限制 -- 異步操做實際上所有執行了。

串行/並行相互轉換

如今咱們的 _AsyncArray_AsyncArrayParallel 兩個類只能二選一,因此,咱們須要添加兩個函數用於互相轉換。

class AsyncArray {
  constructor(...args) {
    this._arr = Array.from(args)
    /**
     * @field 保存異步任務
     * @type {AsyncArrayAction[]}
     */
    this._task = []
    /**
     * 是否並行化
     */
    this._parallel = false
  }
  // 其餘內容...

  parallel() {
    this._parallel = true
    return this
  }
  serial() {
    this._parallel = false
    return this
  }
  async then() {
    const arr = this._parallel
      ? new InnerAsyncArrayParallel(this._arr)
      : new InnerAsyncArray(this._arr)
    let result = this._arr
    for (let task of this._task) {
      result = await arr[task.type](...task.args)
    }
    if (resolve) {
      resolve(result)
    }
    return result
  }
}

如今,咱們能夠在真正執行以前在任意位置對其進行轉換了

await new AsyncArray(...ids)
  .parallel()
  .filter(async i => i % 2 === 0)
  .map(get)
  .forEach(async res => console.log(res))

併發執行多個異步操做

然而,上面的代碼有一些隱藏的問題

  1. await 以後返回值不是一個數組

    ;(async () => {
      const asyncArray = new AsyncArray(...ids)
      console.log(await asyncArray.map(i => i * 2)) // InnerAsyncArray { _arr: [ 2, 4, 6, 8 ] }
    })()
  2. 上面的 map, filter 調用在 await 以後仍會影響到下面的調用

    ;(async () => {
      const asyncArray = new AsyncArray(...ids)
      console.log(await asyncArray.map(i => i * 2)) // InnerAsyncArray { _arr: [ 2, 4, 6, 8 ] }
      console.log(await asyncArray) // InnerAsyncArray { _arr: [ 2, 4, 6, 8 ] }
    })()
  3. 併發調用的順序不能肯定,會影響到內部數組,致使結果不能肯定

    ;(async () => {
      const asyncArray = new AsyncArray(...ids)
      ;(async () => {
        console.log(
          await asyncArray.filter(async i => i % 2 === 1).map(async i => i * 2),
        ) // InnerAsyncArray { _arr: [ 2, 6 ] }
      })()
      ;(async () => {
        console.log(await asyncArray) // InnerAsyncArray { _arr: [ 2, 6 ] }
      })()
    })()

先解決第一個問題,這裏只須要判斷一下是否爲終結操做(forEach),是的話就直接返回結果,不然繼續下一次循環

class AsyncArray {
  // 其餘內容...

  async then(resolve, reject) {
    const arr = this._parallel
      ? new InnerAsyncArrayParallel(this._arr)
      : new InnerAsyncArray(this._arr)
    let result = this._arr
    for (let task of this._task) {
      const temp = await arr[task.type](...task.args)
      if (
        temp instanceof InnerAsyncArray ||
        temp instanceof InnerAsyncArrayParallel
      ) {
        result = temp._arr
      } else {
        // 若是已是終結操做就返回數組的值
        if (resolve) {
          resolve(temp)
        }
        return temp
      }
    }
    if (resolve) {
      resolve(result)
    }
    return result
  }
}

如今,第一個問題簡單解決

;(async () => {
  const asyncArray = new AsyncArray(...ids)
  console.log(await asyncArray.map(i => i * 2)) // [ 2, 4, 6, 8 ]
})()

第2、第三個問題看起來彷佛是同一個問題?其實咱們能夠按照常規思惟解決第一個問題。既然 await 以後仍然會影響到下面的調用,那就在 then 中把 _task 清空好了,修改 then 函數

class AsyncArray {
  // 其餘內容...

  async then(resolve, reject) {
    const arr = this._parallel
      ? new InnerAsyncArrayParallel(this._arr)
      : new InnerAsyncArray(this._arr)
    let result = this._arr
    for (let task of this._task) {
      const temp = await arr[task.type](...task.args)
      if (
        temp instanceof InnerAsyncArray ||
        temp instanceof InnerAsyncArrayParallel
      ) {
        result = temp._arr
      } else {
        // 若是已是終結操做就返回數組的值
        if (resolve) {
          resolve(temp)
        }
        this._task = []
        return temp
      }
    }
    if (resolve) {
      resolve(result)
    }
    this._task = []
    return result
  }
}

如今,第一個問題解決了,但第二個問題不會解決。究其緣由,仍是異步事件隊列的問題,雖然 async-await 可以讓咱們以同步的方式寫異步的代碼,但千萬不可忘記它們本質上仍是異步的!

;(async () => {
  await Promise.all([
    (async () => {
      console.log(
        await asyncArray.filter(async i => i % 2 === 1).map(async i => i * 2),
      ) // [ 2, 6 ]
    })(),
    (async () => {
      console.log(await asyncArray) // [ 2, 6 ]
    })(),
  ])
  console.log(await asyncArray) // [ 1, 2, 3, 4 ]
})()

能夠看到,在使用 await 進行等待以後就如同預期的 _task 被清空了。然而,併發執行的沒有等待的 await asyncArray 卻有奇怪的問題,由於它是在 _task 清空以前執行的。

而且,這帶來一個反作用: 沒法緩存操做了

;(async () => {
  const asyncArray = new AsyncArray(...ids).map(i => i * 2)
  console.log(await asyncArray) // [ 2, 4, 6, 8 ]
  console.log(await asyncArray) // [ 1, 2, 3, 4 ]
})()

使用不可變數據

爲了解決直接修改內部數組形成的問題,咱們可使用不可變數據解決這個問題。試想:若是咱們每次操做都返回一個新的 AsyncArray,他們之間沒有關聯,這樣又如何呢?

class AsyncArray {
  constructor(...args) {
    this._arr = Array.from(args)
    /**
     * @field 保存異步任務
     * @type {Action[]}
     */
    this._task = []
    /**
     * 是否並行化
     */
    this._parallel = false
  }
  forEach(fn) {
    return this._addTask(Action.Type.forEach, [fn])
  }
  map(fn) {
    return this._addTask(Action.Type.map, [fn])
  }
  filter(fn) {
    return this._addTask(Action.Type.filter, [fn])
  }
  parallel() {
    this._parallel = true
    return this
  }
  serial() {
    this._parallel = false
    return this
  }
  _addTask(type, args) {
    const result = new AsyncArray(...this._arr)
    result._task = [...this._task, new Action(type, args)]
    result._parallel = this._parallel
    return result
  }
  /**
   * 終結整個鏈式操做並返回結果
   */
  async then(resolve, reject) {
    const arr = this._parallel
      ? new InnerAsyncArrayParallel(this._arr)
      : new InnerAsyncArray(this._arr)
    let result = this._arr
    for (let task of this._task) {
      const temp = await arr[task.type](...task.args)
      if (
        temp instanceof InnerAsyncArray ||
        temp instanceof InnerAsyncArrayParallel
      ) {
        result = temp._arr
      } else {
        // 若是已是終結操做就返回數組的值
        if (resolve) {
          resolve(temp)
        }
        return temp
      }
    }
    if (resolve) {
      resolve(result)
    }
    return result
  }
}

再次測試上面的那第三個問題,發現已經一切正常了呢

  • 併發調用的順序不能肯定,但不會影響內部數組了,結果是肯定的
;(async () => {
  const asyncArray = new AsyncArray(...ids)
  await Promise.all([
    (async () => {
      console.log(
        await asyncArray.filter(async i => i % 2 === 1).map(async i => i * 2),
      ) // [ 2, 6 ]
    })(),
    (async () => {
      console.log(await asyncArray) // [ 1, 2, 3, 4 ]
    })(),
  ])
  console.log(await asyncArray) // [ 1, 2, 3, 4 ]
})()
  • 操做能夠被緩存
;(async () => {
  const asyncArray = new AsyncArray(...ids).map(i => i * 2)
  console.log(await asyncArray) // [ 2, 4, 6, 8 ]
  console.log(await asyncArray) // [ 2, 4, 6, 8 ]
})()

完整代碼

下面吾輩把完整的代碼貼出來

/**
 * 保存高階函數傳入的異步操做
 */
class Action {
  constructor(type, args) {
    /**
     * @field 異步操做的類型
     * @type {string}
     */
    this.type = type
    /**
     * @field 異步操做的參數數組
     * @type {Function}
     */
    this.args = args
  }
}

/**
 * 全部的操做類型
 */
Action.Type = {
  forEach: 'forEach',
  map: 'map',
  filter: 'filter',
}

/**
 * 真正實現的異步數組
 */
class InnerAsyncArray {
  constructor(arr) {
    this._arr = arr
  }
  async forEach(fn) {
    const arr = this._arr
    for (let i = 0, len = arr.length; i < len; i++) {
      await fn(arr[i], i, this)
    }
    this._arr = []
  }
  async map(fn) {
    const arr = this._arr
    const res = []
    for (let i = 0, len = arr.length; i < len; i++) {
      res.push(await fn(arr[i], i, this))
    }
    this._arr = res
    return this
  }
  async filter(fn) {
    const arr = this._arr
    const res = []
    for (let i = 0, len = arr.length; i < len; i++) {
      if (await fn(arr[i], i, this)) {
        res.push(arr[i])
      }
    }
    this._arr = res
    return this
  }
}

class InnerAsyncArrayParallel {
  constructor(arr) {
    this._arr = arr
  }
  async _all(fn) {
    return Promise.all(this._arr.map(fn))
  }
  async forEach(fn) {
    await this._all(fn)
    this._arr = []
  }
  async map(fn) {
    this._arr = await this._all(fn)
    return this
  }
  async filter(fn) {
    const arr = await this._all(fn)
    this._arr = this._arr.filter((v, i) => arr[i])
    return this
  }
}

class AsyncArray {
  constructor(...args) {
    this._arr = Array.from(args)
    /**
     * @field 保存異步任務
     * @type {Action[]}
     */
    this._task = []
    /**
     * 是否並行化
     */
    this._parallel = false
  }
  forEach(fn) {
    return this._addTask(Action.Type.forEach, [fn])
  }
  map(fn) {
    return this._addTask(Action.Type.map, [fn])
  }
  filter(fn) {
    return this._addTask(Action.Type.filter, [fn])
  }
  parallel() {
    this._parallel = true
    return this
  }
  serial() {
    this._parallel = false
    return this
  }
  _addTask(type, args) {
    const result = new AsyncArray(...this._arr)
    result._task = [...this._task, new Action(type, args)]
    result._parallel = this._parallel
    return result
  }
  /**
   * 終結整個鏈式操做並返回結果
   */
  async then(resolve, reject) {
    const arr = this._parallel
      ? new InnerAsyncArrayParallel(this._arr)
      : new InnerAsyncArray(this._arr)
    let result = this._arr
    for (let task of this._task) {
      const temp = await arr[task.type](...task.args)
      if (
        temp instanceof InnerAsyncArray ||
        temp instanceof InnerAsyncArrayParallel
      ) {
        result = temp._arr
      } else {
        // 若是已是終結操做就返回數組的值
        if (resolve) {
          resolve(temp)
        }
        return temp
      }
    }
    if (resolve) {
      resolve(result)
    }
    return result
  }
}

總結

那麼,關於 JavaScript 中如何封裝一個可使用異步操做高階函數的數組就先到這裏了,完整的 JavaScript 異步數組請參考吾輩的 AsyncArray(使用 TypeScript 編寫)。

相關文章
相關標籤/搜索