在平常開發過程當中,你可能會遇到併發控制的場景,好比控制請求併發數。那麼在 JavaScript 中如何實現併發控制呢?在回答這個問題以前,咱們來簡單介紹一下併發控制。javascript
假設有 6 個待辦任務要執行,而咱們但願限制同時執行的任務個數,即最多隻有 2 個任務能同時執行。當 正在執行任務列表 中的任何 1 個任務完成後,程序會自動從 待辦任務列表 中獲取新的待辦任務並把該任務添加到 正在執行任務列表 中。爲了讓你們可以更直觀地理解上述的過程,阿寶哥特地畫了如下 3 張圖:java
好的,介紹完併發控制以後,阿寶哥將以 Github 上 async-pool 這個庫來介紹一下異步任務併發控制的具體實現。git
async-pool:github.com/rxaviers/as…github
Run multiple promise-returning & async functions with limited concurrency using native ES6/ES7。面試
針對 async-pool 這個庫的具體應用,阿寶哥寫了 JavaScript 中如何實現大文件併發下載? 和 JavaScript 中如何實現大文件併發上傳? 兩篇文章,感興趣的小夥伴能夠了解一下。數組
async-pool 這個庫提供了 ES7 和 ES6 兩種不一樣版本的實現,在分析其具體實現以前,咱們來看一下它如何使用。promise
const timeout = i => new Promise(resolve => setTimeout(() => resolve(i), i));
await asyncPool(2, [1000, 5000, 3000, 2000], timeout);
複製代碼
在以上代碼中,咱們使用 async-pool 這個庫提供的 asyncPool
函數來實現異步任務的併發控制。 asyncPool
函數的簽名以下所示:微信
function asyncPool(poolLimit, array, iteratorFn){ ... }
複製代碼
該函數接收 3 個參數:markdown
poolLimit
(數字類型):表示限制的併發數;array
(數組類型):表示任務數組;iteratorFn
(函數類型):表示迭代函數,用於實現對每一個任務項進行處理,該函數會返回一個 Promise 對象或異步函數。對於以上示例來講,在使用了 asyncPool
函數以後,對應的執行過程以下所示:併發
const timeout = i => new Promise(resolve => setTimeout(() => resolve(i), i));
await asyncPool(2, [1000, 5000, 3000, 2000], timeout);
// Call iterator (i = 1000)
// Call iterator (i = 5000)
// Pool limit of 2 reached, wait for the quicker one to complete...
// 1000 finishes
// Call iterator (i = 3000)
// Pool limit of 2 reached, wait for the quicker one to complete...
// 3000 finishes
// Call iterator (i = 2000)
// Itaration is complete, wait until running ones complete...
// 5000 finishes
// 2000 finishes
// Resolves, results are passed in given array order `[1000, 5000, 3000, 2000]`.
複製代碼
經過觀察以上的註釋信息,咱們能夠大體地瞭解 asyncPool
函數內部的控制流程。下面咱們先來分析 asyncPool
函數的 ES7 實現。
關注「全棧修仙之路」閱讀阿寶哥原創的 4 本免費電子書(累計下載 3萬+)及 50 幾篇 TS 系列教程。
async function asyncPool(poolLimit, array, iteratorFn) {
const ret = []; // 存儲全部的異步任務
const executing = []; // 存儲正在執行的異步任務
for (const item of array) {
// 調用iteratorFn函數建立異步任務
const p = Promise.resolve().then(() => iteratorFn(item, array));
ret.push(p); // 保存新的異步任務
// 當poolLimit值小於或等於總任務個數時,進行併發控制
if (poolLimit <= array.length) {
// 當任務完成後,從正在執行的任務數組中移除已完成的任務
const e = p.then(() => executing.splice(executing.indexOf(e), 1));
executing.push(e); // 保存正在執行的異步任務
if (executing.length >= poolLimit) {
await Promise.race(executing); // 等待較快的任務執行完成
}
}
}
return Promise.all(ret);
}
複製代碼
在以上代碼中,充分利用了 Promise.all
和 Promise.race
函數特色,再結合 ES7 中提供的 async await
特性,最終實現了併發控制的功能。利用 await Promise.race(executing);
這行語句,咱們會等待 正在執行任務列表 中較快的任務執行完成以後,纔會繼續執行下一次循環。
asyncPool ES7 實現相對比較簡單,接下來咱們來看一下不使用 async await
特性要如何實現一樣的功能。
function asyncPool(poolLimit, array, iteratorFn) {
let i = 0;
const ret = []; // 存儲全部的異步任務
const executing = []; // 存儲正在執行的異步任務
const enqueue = function () {
if (i === array.length) {
return Promise.resolve();
}
const item = array[i++]; // 獲取新的任務項
const p = Promise.resolve().then(() => iteratorFn(item, array));
ret.push(p);
let r = Promise.resolve();
// 當poolLimit值小於或等於總任務個數時,進行併發控制
if (poolLimit <= array.length) {
// 當任務完成後,從正在執行的任務數組中移除已完成的任務
const e = p.then(() => executing.splice(executing.indexOf(e), 1));
executing.push(e);
if (executing.length >= poolLimit) {
r = Promise.race(executing);
}
}
// 正在執行任務列表 中較快的任務執行完成以後,纔會從array數組中獲取新的待辦任務
return r.then(() => enqueue());
};
return enqueue().then(() => Promise.all(ret));
}
複製代碼
在 ES6 的實現版本中,經過內部封裝的 enqueue
函數來實現核心的控制邏輯。當 Promise.race(executing)
返回的 Promise
對象變成已完成狀態時,纔會調用 enqueue
函數,從 array
數組中獲取新的待辦任務。
在 asyncPool
這個庫的 ES7 和 ES6 的具體實現中,咱們都使用到了 Promise.all
和 Promise.race
函數。其中手寫 Promise.all
是一道常見的面試題。恰好趁着這個機會,阿寶哥跟你們一塊兒來手寫簡易版的 Promise.all
和 Promise.race
函數。
Promise.all(iterable)
方法會返回一個 promise 對象,當輸入的全部 promise 對象的狀態都變成 resolved
時,返回的 promise 對象就會以數組的形式,返回每一個 promise 對象 resolve 後的結果。當輸入的任何一個 promise 對象狀態變成 rejected
時,則返回的 promise 對象會 reject 對應的錯誤信息。
Promise.all = function (iterators) {
return new Promise((resolve, reject) => {
if (!iterators || iterators.length === 0) {
resolve([]);
} else {
let count = 0; // 計數器,用於判斷全部任務是否執行完成
let result = []; // 結果數組
for (let i = 0; i < iterators.length; i++) {
// 考慮到iterators[i]多是普通對象,則統一包裝爲Promise對象
Promise.resolve(iterators[i]).then(
(data) => {
result[i] = data; // 按順序保存對應的結果
// 當全部任務都執行完成後,再統一返回結果
if (++count === iterators.length) {
resolve(result);
}
},
(err) => {
reject(err); // 任何一個Promise對象執行失敗,則調用reject()方法
return;
}
);
}
}
});
};
複製代碼
須要注意的是對於 Promise.all
的標準實現來講,它的參數是一個可迭代對象,好比 Array、String 或 Set 等。
Promise.race(iterable)
方法會返回一個 promise 對象,一旦迭代器中的某個 promise 對象 resolved 或 rejected,返回的 promise 對象就會 resolve 或 reject 相應的值。
Promise.race = function (iterators) {
return new Promise((resolve, reject) => {
for (const iter of iterators) {
Promise.resolve(iter)
.then((res) => {
resolve(res);
})
.catch((e) => {
reject(e);
});
}
});
};
複製代碼
本文阿寶哥帶你們詳細分析了 async-pool 異步任務併發控制的具體實現,同時爲了讓你們可以更好地理解 async-pool 的核心代碼。最後阿寶哥還帶你們一塊兒手寫簡易版的 Promise.all
和 Promise.race
函數。其實除了 Promise.all
函數以外,還存在另外一個函數 —— Promise.allSettled
,該函數用於解決 Promise.all
存在的問題,感興趣的小夥伴能夠自行研究一下。
關注「全棧修仙之路」閱讀阿寶哥原創的 4 本免費電子書(累計下載 3萬+)及 11 篇 Vue 3 進階系列教程。想一塊兒學習 TS/Vue 3.0 的小夥伴能夠添加阿寶哥微信 —— semlinker。