輕量級 JS 任務調度工具,容許並行數控制,超時,重試,錯誤抓取,運行狀態統計等,同時支持多種調度模式,包括當即調度、按頻率調度等。npm
爲何說任務執行隊列對 JS 來講比其它熱門語言更爲重要?因爲 JS 是事件驅動型的動態腳本語言,咱們最多見到的就是在代碼裏各類各樣的異步回調,NodeJs 標榜的輕量高效也是基於 Javascript 非阻塞 I/O 模型而談的,能夠說,基於回調的編程思惟是 JS 的一大特點,但有時它會出問題,例如在 for 循環中執行異步任務,因爲每一個任務都是瞬間返回的,也就是說,循環會很快遍歷完,但任務倒是堆積的,輕則服務繁忙,重則直接把服務壓垮,如何輕巧地控制任務並行量不少時候是一個痛點。編程
npm i bobolink
建立一個默認配置的Bobolinkpromise
按照須要建立一個Bobolink實例(Bobolink實例之間互不影響, 因此能夠多種場景使用多個Bobolink,甚至能夠經過多個Bobolink合從而應對一些複雜場景)異步
const Bobolink = require('bobolink'); // 每一個Bobolink實例都有一個大的隊列用於存聽任務,因此能夠很放心地將任務扔給它,適當的時機下Bobolink會很可靠地調度這些任務。 let q = new Bobolink();
put單個任務函數
因爲Promise的執行代碼在建立的時刻就已經被執行(then和catch內的代碼則經過回調執行),因此簡單把Promise扔進Bobolink是不可行的工具
// 下面的打印序是 1 2 3 new Promise(resolve => { console.log(1); resolve(3) }).then(res => { console.log(res); }) console.log(2)
經過將Promise扔進一個function能夠達到延期執行的效果ui
function p() { // 返回promise任務 return new Promise(resolve => { console.log(1); resolve(3) }).then(res => { console.log(res); }) } console.log(2);
此時p必須等待調用纔會執行內部的Promise代碼,且p返回的是該Promise,值即可以繼續傳遞。 每一個放置到Bobolink的Promise任務都應該以這種方式封裝spa
function p() { return new Promise(resolve => { console.log(1); resolve(2) }).then(res => { console.log(res); return 3; }) } // 因爲隊列很空閒, 能夠當即調度本任務, // 因此很快就成功打印出了1, 以後的then則須要等待合適的時機回調, // 若是Promise及其上面的全部then都執行完了, 最終會傳遞到put.then q.put(p).then(task => { // 打印最終值3 console.log(task.res) });
固然,若是在put的時候,隊列執行中的任務數已經到達最大並行量,則須要等待有任務執行完成時騰出空間,而且排在當前任務以前的任務已經都被調度完了纔會獲得執行。隊列
put一組任務事件
Bobolink容許同時put多個任務,且put.then會在該組任務都被執行完畢時才被調用
function getP(flag) { return function p() { return new Promise(resolve => { resolve(flag) }); } } q.put([getP(1), getP(2), getP(3)]).then(tasks => { // 打印每一個任務的返回值, 按放入順序一一對應 for (let i = 0; i < tasks.length; i++) { console.log(tasks[i].res); } })
配置
目前支持的參數以下:
let q = new Bobolink({ // 最大並行數,最小爲1 concurrency: 5, // 任務超時時間ms,0不超時 timeout: 15000, // 任務失敗重試次數,0不重試 retry: 0, // 是否優先處理失敗重試的任務,爲true則失敗的任務會被放置到隊列頭 retryPrior: false, // 是否優先處理新任務,爲true則新任務會被放置到隊列頭 newPrior: false, // 最大可排隊的任務數, -1爲無限制, 超過最大限制時添加任務將返回錯誤'bobolink_exceeded_maximum_task_number' max: -1, // 指定任務的調度模式,僅在初始化時設置有效 scheduling: { // 默認爲'immediately',任務將在隊列空閒時當即獲得調度。 // 你也能夠將它設置爲'frequency', 而且指定countPerSecond, Bobolink將嚴格地按照設定的頻率去調度任務。 enable: 'frequency', frequency: { // 每秒須要調度的任務數,僅在任務隊列有空閒時纔會真正調度。 countPerSecond: 10000 } }, // 任務失敗的handler函數,若是設置了重試,同個任務失敗屢次會執行catch屢次 catch: (err) => { } });
參數能夠在運行期更改, 對後續生效
q.setOptions({ concurrency: 5, timeout: 15000, retry: 0, retryPrior: false, newPrior: false, catch: null });
任務運行狀態
使用Bobolink執行的Promise任務全部錯誤會被catch幷包裝,因此只存在put.then而不存在put.catch(除非put.then自身出錯)。任務執行以後獲取到的響應有一些有用的值能夠用於服務統計
taskRes = { // 執行是否遇到錯誤, 判斷任務是否執行成功的判斷依據是err === undefined, err爲任何其它值都表明了運行失敗。 // 任務出錯時, 若是不重試, 那麼catch到的錯誤會直接放入err, 超時時err爲'bobolink_timeout' // 若是重試, 且在最大重試次數以後依然錯誤的話, 會將最後一次的錯誤放入err // 若是重試, 且在重試期間成功的話, 被認爲是成功的, 因此err爲空 err: undefined, // 執行Promise返回的結果 res: Object, // 從任務放入隊列到該任務最後一次被調度, 所通過的時間(ms) waittingTime: 20, // 該任務最後一次運行的時間(ms) runTime: 1, // 該任務出錯重試的次數 retry: 2 }
插隊
除了隊列控制參數newPrior和retryPrior以外,也容許在put的時候指定當前任務是否優先處理
Bobolink.ptototype.put(tasks, prior)
默認狀況下,任務是放入隊尾的,但若是指定了prior爲true,則會被放置到隊頭,put任務組時會維持組任務本來的順序,並整個放入隊頭。
更多
q.options:獲取當前隊列的配置。
q.queueTaskSize:獲取隊列排隊中的任務數。
q.runningTaskCount:獲取隊列執行中的任務數。