JavaScript 中如何實現併發控制?

1、併發控制簡介

在平常開發過程當中,你可能會遇到併發控制的場景,好比控制請求併發數。那麼在 JavaScript 中如何實現併發控制呢?在回答這個問題以前,咱們來簡單介紹一下併發控制。javascript

假設有 6 個待辦任務要執行,而咱們但願限制同時執行的任務個數,即最多隻有 2 個任務能同時執行。當 正在執行任務列表 中的任何 1 個任務完成後,程序會自動從 待辦任務列表 中獲取新的待辦任務並把該任務添加到 正在執行任務列表 中。爲了讓你們可以更直觀地理解上述的過程,阿寶哥特地畫了如下 3 張圖:java

1.1 階段一

1.2 階段二

1.3 階段三

好的,介紹完併發控制以後,阿寶哥將以 Github 上 async-pool 這個庫來介紹一下異步任務併發控制的具體實現。git

async-poolgithub.com/rxaviers/as…github

Run multiple promise-returning & async functions with limited concurrency using native ES6/ES7。面試

針對 async-pool 這個庫的具體應用,阿寶哥寫了 JavaScript 中如何實現大文件併發下載?JavaScript 中如何實現大文件併發上傳? 兩篇文章,感興趣的小夥伴能夠了解一下。數組

2、併發控制的實現

async-pool 這個庫提供了 ES7 和 ES6 兩種不一樣版本的實現,在分析其具體實現以前,咱們來看一下它如何使用。promise

2.1 asyncPool 的使用

const timeout = i => new Promise(resolve => setTimeout(() => resolve(i), i));
await asyncPool(2, [1000, 5000, 3000, 2000], timeout);
複製代碼

在以上代碼中,咱們使用 async-pool 這個庫提供的 asyncPool 函數來實現異步任務的併發控制。 asyncPool 函數的簽名以下所示:微信

function asyncPool(poolLimit, array, iteratorFn){ ... }
複製代碼

該函數接收 3 個參數:markdown

  • poolLimit(數字類型):表示限制的併發數;
  • array(數組類型):表示任務數組;
  • iteratorFn(函數類型):表示迭代函數,用於實現對每一個任務項進行處理,該函數會返回一個 Promise 對象或異步函數。

對於以上示例來講,在使用了 asyncPool 函數以後,對應的執行過程以下所示:併發

const timeout = i => new Promise(resolve => setTimeout(() => resolve(i), i));
await asyncPool(2, [1000, 5000, 3000, 2000], timeout);
// Call iterator (i = 1000)
// Call iterator (i = 5000)
// Pool limit of 2 reached, wait for the quicker one to complete...
// 1000 finishes
// Call iterator (i = 3000)
// Pool limit of 2 reached, wait for the quicker one to complete...
// 3000 finishes
// Call iterator (i = 2000)
// Itaration is complete, wait until running ones complete...
// 5000 finishes
// 2000 finishes
// Resolves, results are passed in given array order `[1000, 5000, 3000, 2000]`.
複製代碼

經過觀察以上的註釋信息,咱們能夠大體地瞭解 asyncPool 函數內部的控制流程。下面咱們先來分析 asyncPool 函數的 ES7 實現。

關注「全棧修仙之路」閱讀阿寶哥原創的 4 本免費電子書(累計下載 3萬+)及 50 幾篇 TS 系列教程。

2.2 asyncPool ES7 實現

async function asyncPool(poolLimit, array, iteratorFn) {
  const ret = []; // 存儲全部的異步任務
  const executing = []; // 存儲正在執行的異步任務
  for (const item of array) {
    // 調用iteratorFn函數建立異步任務
    const p = Promise.resolve().then(() => iteratorFn(item, array));
    ret.push(p); // 保存新的異步任務

    // 當poolLimit值小於或等於總任務個數時,進行併發控制
    if (poolLimit <= array.length) {
      // 當任務完成後,從正在執行的任務數組中移除已完成的任務
      const e = p.then(() => executing.splice(executing.indexOf(e), 1));
      executing.push(e); // 保存正在執行的異步任務
      if (executing.length >= poolLimit) {
        await Promise.race(executing); // 等待較快的任務執行完成
      }
    }
  }
  return Promise.all(ret);
}
複製代碼

在以上代碼中,充分利用了 Promise.allPromise.race 函數特色,再結合 ES7 中提供的 async await 特性,最終實現了併發控制的功能。利用 await Promise.race(executing); 這行語句,咱們會等待 正在執行任務列表 中較快的任務執行完成以後,纔會繼續執行下一次循環。

asyncPool ES7 實現相對比較簡單,接下來咱們來看一下不使用 async await 特性要如何實現一樣的功能。

2.3 asyncPool ES6 實現

function asyncPool(poolLimit, array, iteratorFn) {
  let i = 0;
  const ret = []; // 存儲全部的異步任務
  const executing = []; // 存儲正在執行的異步任務
  const enqueue = function () {
    if (i === array.length) {
      return Promise.resolve();
    }
    const item = array[i++]; // 獲取新的任務項
    const p = Promise.resolve().then(() => iteratorFn(item, array));
    ret.push(p);

    let r = Promise.resolve();

    // 當poolLimit值小於或等於總任務個數時,進行併發控制
    if (poolLimit <= array.length) {
      // 當任務完成後,從正在執行的任務數組中移除已完成的任務
      const e = p.then(() => executing.splice(executing.indexOf(e), 1));
      executing.push(e);
      if (executing.length >= poolLimit) {
        r = Promise.race(executing); 
      }
    }
 
    // 正在執行任務列表 中較快的任務執行完成以後,纔會從array數組中獲取新的待辦任務
    return r.then(() => enqueue());
  };
  return enqueue().then(() => Promise.all(ret));
}
複製代碼

在 ES6 的實現版本中,經過內部封裝的 enqueue 函數來實現核心的控制邏輯。當 Promise.race(executing) 返回的 Promise 對象變成已完成狀態時,纔會調用 enqueue 函數,從 array 數組中獲取新的待辦任務。

3、阿寶哥有話說

asyncPool 這個庫的 ES7 和 ES6 的具體實現中,咱們都使用到了 Promise.allPromise.race 函數。其中手寫 Promise.all 是一道常見的面試題。恰好趁着這個機會,阿寶哥跟你們一塊兒來手寫簡易版的 Promise.allPromise.race 函數。

3.1 手寫 Promise.all

Promise.all(iterable) 方法會返回一個 promise 對象,當輸入的全部 promise 對象的狀態都變成 resolved 時,返回的 promise 對象就會以數組的形式,返回每一個 promise 對象 resolve 後的結果。當輸入的任何一個 promise 對象狀態變成 rejected 時,則返回的 promise 對象會 reject 對應的錯誤信息。

Promise.all = function (iterators) {
  return new Promise((resolve, reject) => {
    if (!iterators || iterators.length === 0) {
      resolve([]);
    } else {
      let count = 0; // 計數器,用於判斷全部任務是否執行完成
      let result = []; // 結果數組
      for (let i = 0; i < iterators.length; i++) {
        // 考慮到iterators[i]多是普通對象,則統一包裝爲Promise對象
        Promise.resolve(iterators[i]).then(
          (data) => {
            result[i] = data; // 按順序保存對應的結果
            // 當全部任務都執行完成後,再統一返回結果
            if (++count === iterators.length) {
              resolve(result);
            }
          },
          (err) => {
            reject(err); // 任何一個Promise對象執行失敗,則調用reject()方法
            return;
          }
        );
      }
    }
  });
};
複製代碼

須要注意的是對於 Promise.all 的標準實現來講,它的參數是一個可迭代對象,好比 Array、String 或 Set 等。

3.2 手寫 Promise.race

Promise.race(iterable) 方法會返回一個 promise 對象,一旦迭代器中的某個 promise 對象 resolvedrejected,返回的 promise 對象就會 resolve 或 reject 相應的值。

Promise.race = function (iterators) {
  return new Promise((resolve, reject) => {
    for (const iter of iterators) {
      Promise.resolve(iter)
        .then((res) => {
          resolve(res);
        })
        .catch((e) => {
          reject(e);
        });
    }
  });
};
複製代碼

本文阿寶哥帶你們詳細分析了 async-pool 異步任務併發控制的具體實現,同時爲了讓你們可以更好地理解 async-pool 的核心代碼。最後阿寶哥還帶你們一塊兒手寫簡易版的 Promise.allPromise.race 函數。其實除了 Promise.all 函數以外,還存在另外一個函數 —— Promise.allSettled,該函數用於解決 Promise.all 存在的問題,感興趣的小夥伴能夠自行研究一下。

關注「全棧修仙之路」閱讀阿寶哥原創的 4 本免費電子書(累計下載 3萬+)及 11 篇 Vue 3 進階系列教程。想一塊兒學習 TS/Vue 3.0 的小夥伴能夠添加阿寶哥微信 —— semlinker

4、參考資源

相關文章
相關標籤/搜索