原文地址javascript
想不到我在平常開發中,居然遇到「優先異步隊列」的需求。github上有相似功能,而且集成redis的庫有Kue、Bull等。可是,對於追求「簡、易、輕」的我來講,其實並不滿意。根據「二八原則」,我決定,本身來實現一個只有上述二者「兩成」功能的輕量級開源庫:priority-async-queue。java
開源庫的命名爲何這麼長呢?緣由是,我認爲開發者只需看一眼庫名就知道其功能,一點都不含糊。可是,爲了行文流暢,priority-async-queue
下面統一簡稱爲「paq」。git
paq
按照 redux 做者的套路,首先,咱們須要明確的是,你可能不須要 paq。github
只有遇到 N 個異步任務不能並行執行,而且只能順序執行時,你才須要使用 paq。redis
簡單來講,若是你須要執行的 N 個異步任務,並不存在資源爭奪和佔用、數據共享、嚴格的邏輯順序、優先權對比等,paq 多是不必的,用了反而下降執行效率、影響程序性能。redux
paq 的設計思路很是簡單,一共3個類:異步
Task
是描述每一個待執行(異步/同步)任務的執行邏輯以及配置參數。PriorityQueue
是控制每一個待執行(異步/同步)任務的優先級隊列、具備隊列的基本屬性和操做。AsyncQueue
是控制每一個待執行(異步/同步)任務能嚴格順序地執行的隊列。下面是 paq 的程序流程圖:async
1. addTask函數
addTask 是核心方法,它能夠建立一個任務,而且添加到 paq 隊列中。性能
paq.addTask([options, ]callback);
複製代碼
options
是一個可選對象,包含如下屬性:
{
id: undefined, // 任務id
priority: 'normal', // 任務權重,例如: low, normal, mid, high, urgent
context: null, // 執行任務的上下文
start: (ctx, options) => {}, // 任務將要被執行的回調
completed: (ctx, res) => {}, // 任務執行完成後的回調
failed: (ctx, err) => {}, // 任務執行失敗後的回調
remove: (ctx, options) => {} // 任務被刪除後的回調
}
複製代碼
callback
是一個描述執行任務的邏輯的函數,它包含兩個參數:ctx
和 options
:
ctx
是任務所屬的paq實例。options
是此任務的options參數的最終值。paq.addTask((ctx, options) => {
console.log(ctx === paq); // true
});
複製代碼
2. removeTask
removeTask 方法是根據任務 id 來刪除等待對列中的任務。
paq.removeTask(taskId);
複製代碼
若是成功刪除任務,它將返回true。不然,它將返回false。
3. pause
pause 方法是暫停 paq 繼續執行任務。
paq.pause();
複製代碼
注意: 可是,您沒法暫停當前正在執行的任務,由於沒法暫時檢測到異步任務的進度。
4. isPause
isPause 屬性,返回當前隊列是否處於暫停狀態。
paq.isPause; // return true or false.
複製代碼
5. resume
resume 方法,用來從新啓動 paq 隊列執行任務。
paq.resume();
複製代碼
6. clearTask
cleartTask 方法,用來清除 paq 等待隊列中全部的任務。
paq.clearTask();
複製代碼
1. 基本用法
只要向 paq 添加任務,該任務就會自動被執行。
const PAQ = require('priority-async-queue');
const paq = new PAQ();
paq.addTask((ctx, options) => {
console.log('This is a simple task!');
});
// This is a simple task!
複製代碼
2. 同步任務
你能夠使用 paq 執行一系列同步任務,例如:
const syncTask = (n) => {
for (let i = 0; i < n; i++) {
paq.addTask(() => {
console.log('Step', i, 'sync');
return i;
});
}
};
syncTask(3);
// Step 0 sync
// Step 1 sync
// Step 2 sync
複製代碼
3. 異步任務
你還能夠使用 paq 執行一系列的異步任務,例如:
const asyncTask = (n) => {
for (let i = 0; i < n; i++) {
paq.addTask(() => {
return new Promise(resolve => {
setTimeout(() => {
console.log('Step', i, 'async');
resolve(i);
}, i * 1000);
});
});
}
};
asyncTask(3);
// Step 0 async
// Step 1 async
// Step 2 async
複製代碼
4. 混合任務
你甚至能夠使用 paq 執行一系列同步和異步交錯的任務,例如:
const mixTask = (n) => {
asyncTask(n);
syncTask(n);
asyncTask(n);
};
mixTask(2);
// Step 0 async
// Step 1 async
// Step 0 sync
// Step 1 sync
// Step 0 async
// Step 1 async
複製代碼
5. 綁定執行上下文
有時若是你須要指定上下文來執行任務,例如:
const contextTask = (n) => {
var testObj = {
name: 'foo',
sayName: (name) => {
console.log(name);
}
};
for (let i = 0; i < n; i++) {
paq.addTask({ context: testObj }, function () {
this.sayName(this.name + i);
});
}
};
contextTask(3);
// foo0
// foo1
// foo2
複製代碼
注意: this
在箭頭函數中並不存在,或者說它是指向其定義處的上下文。
6. 延遲執行
paq 還支持延遲執行任務,例如:
const delayTask = (n) => {
for (let i = 0; i < n; i++) {
paq.addTask({ delay: 1000 * i }, () => {
console.log('Step', i, 'sync');
return i;
});
}
};
delayTask(3);
// Step 0 sync
// Step 1 sync
// Step 2 sync
複製代碼
7. 優先權
若是須要執行的任務具備權重,例如:
const priorityTask = (n) => {
for (let i = 0; i < n; i++) {
paq.addTask({ priority: i === n - 1 ? 'high' : 'normal' }, () => {
return new Promise(resolve => {
setTimeout(() => {
console.log('Step', i, 'async');
resolve(i);
}, i * 1000);
});
});
}
};
priorityTask(5);
// Step 0 async
// Step 4 async
// Step 1 async
// Step 2 async
// Step 3 async
複製代碼
默認優先級映射以下:
{
"low": 1,
"normal": 0, // default
"mid": -1,
"high": -2,
"urgent": -3
}
複製代碼
8. 回調函數
有時,你但願可以在任務的開始、完成、失敗、被刪除時作點事情,例如
const callbackTask = (n) => {
for (let i = 0; i < n; i++) {
paq.addTask({
id: i,
start: (ctx, options) => {
console.log('start running task id is', options.id);
},
completed: (ctx, res) => {
console.log('complete, result is', res);
},
failed: (ctx, err) => {
console.log(err);
}
}, () => {
if (i < n / 2) {
throw new Error(i + ' is too small!');
}
return i;
});
}
};
callbackTask(5);
// start running task id is 0
// Error: 0 is too small!
// start running task id is 1
// Error: 1 is too small!
// start running task id is 2
// Error: 2 is too small!
// start running task id is 3
// complete, result is 3
// start running task id is 4
// complete, result is 4
複製代碼
9. 刪除任務
有時,你須要刪除一些任務,例如:
const removeTask = (n) => {
for (let i = 0; i < n; i++) {
paq.addTask({
id: i,
remove: (ctx, options) => {
console.log('remove task id is', options.id);
}
}, () => {
return new Promise(resolve => {
setTimeout(() => {
console.log('Step', i, 'async');
resolve(i);
}, i * 1000);
});
});
}
console.log(paq.removeTask(3));
console.log(paq.removeTask(5));
};
removeTask(5);
// remove task id is 3
// true
// false
// Step 0 async
// Step 1 async
// Step 2 async
// Step 4 async
複製代碼
注意: 你必須在建立任務時分配id,並根據id刪除任務。
若是須要監視 paq 隊列的狀態,paq 提供如下事件偵聽器:
1. addTask
paq.on('addTask', (options) => {
// Triggered when the queue adds a task.
});
複製代碼
2. startTask
paq.on('startTask', (options) => {
// Triggered when a task in the queue is about to execute.
});
複製代碼
3. changeTask
paq.on('changeTask', (options) => {
// Triggered when a task in the queue changes.
});
複製代碼
4. removeTask
paq.on('removeTask', (options) => {
// Triggered when the queue remove a task.
});
複製代碼
5. completed
paq.on('completed', (options, result) => {
// Triggered when the task execution in the queue is complete.
});
複製代碼
6. failed
paq.on('failed', (options, err) => {
// Triggered when a task in the queue fails to execute.
});
複製代碼
最後,想補充的是:若遇到 Promise.all
和 Promise.race
解決不了的需求,能夠考慮一下 paq。