基於Node.js的優先異步隊列

原文地址javascript

前言

想不到我在平常開發中,居然遇到「優先異步隊列」的需求。github上有相似功能,而且集成redis的庫有KueBull等。可是,對於追求「簡、易、輕」的我來講,其實並不滿意。根據「二八原則」,我決定,本身來實現一個只有上述二者「兩成」功能的輕量級開源庫:priority-async-queuejava

開源庫的命名爲何這麼長呢?緣由是,我認爲開發者只需看一眼庫名就知道其功能,一點都不含糊。可是,爲了行文流暢,priority-async-queue下面統一簡稱爲「paq」。git

你可能不須要paq

按照 redux 做者的套路,首先,咱們須要明確的是,你可能不須要 paqgithub

只有遇到 N 個異步任務不能並行執行,而且只能順序執行時,你才須要使用 paq。redis

簡單來講,若是你須要執行的 N 個異步任務,並不存在資源爭奪和佔用、數據共享、嚴格的邏輯順序、優先權對比等,paq 多是不必的,用了反而下降執行效率、影響程序性能。redux

paq 設計思路

paq 的設計思路很是簡單,一共3個類:異步

  • Task 是描述每一個待執行(異步/同步)任務的執行邏輯以及配置參數。
  • PriorityQueue 是控制每一個待執行(異步/同步)任務的優先級隊列、具備隊列的基本屬性和操做。
  • AsyncQueue 是控制每一個待執行(異步/同步)任務能嚴格順序地執行的隊列。

下面是 paq 的程序流程圖:async

paq

paq 基本概念和API

1. addTask函數

addTask 是核心方法,它能夠建立一個任務,而且添加到 paq 隊列中。性能

paq.addTask([options, ]callback);
複製代碼

options 是一個可選對象,包含如下屬性:

{
  id: undefined,          // 任務id
  priority: 'normal',     // 任務權重,例如: low, normal, mid, high, urgent
  context: null,          // 執行任務的上下文
  start: (ctx, options) => {}, // 任務將要被執行的回調
  completed: (ctx, res) => {}, // 任務執行完成後的回調
  failed: (ctx, err) => {},    // 任務執行失敗後的回調
  remove: (ctx, options) => {} // 任務被刪除後的回調
}
複製代碼

callback 是一個描述執行任務的邏輯的函數,它包含兩個參數:ctxoptions

  • ctx 是任務所屬的paq實例。
  • options 是此任務的options參數的最終值。
paq.addTask((ctx, options) => {
  console.log(ctx === paq); // true
});
複製代碼

2. removeTask

removeTask 方法是根據任務 id 來刪除等待對列中的任務。

paq.removeTask(taskId);
複製代碼

若是成功刪除任務,它將返回true。不然,它將返回false。

3. pause

pause 方法是暫停 paq 繼續執行任務。

paq.pause();
複製代碼

注意: 可是,您沒法暫停當前正在執行的任務,由於沒法暫時檢測到異步任務的進度。

4. isPause

isPause 屬性,返回當前隊列是否處於暫停狀態。

paq.isPause; // return true or false.
複製代碼

5. resume

resume 方法,用來從新啓動 paq 隊列執行任務。

paq.resume();
複製代碼

6. clearTask

cleartTask 方法,用來清除 paq 等待隊列中全部的任務。

paq.clearTask();
複製代碼

paq 用法

1. 基本用法

只要向 paq 添加任務,該任務就會自動被執行。

const PAQ = require('priority-async-queue');
const paq = new PAQ();

paq.addTask((ctx, options) => {
  console.log('This is a simple task!');
});

// This is a simple task!
複製代碼

2. 同步任務

你能夠使用 paq 執行一系列同步任務,例如:

const syncTask = (n) => {
  for (let i = 0; i < n; i++) {
    paq.addTask(() => {
      console.log('Step', i, 'sync');
      return i;
    });
  }
};

syncTask(3);

// Step 0 sync
// Step 1 sync
// Step 2 sync
複製代碼

3. 異步任務

你還能夠使用 paq 執行一系列的異步任務,例如:

const asyncTask = (n) => {
  for (let i = 0; i < n; i++) {
    paq.addTask(() => {
      return new Promise(resolve => {
        setTimeout(() => {
          console.log('Step', i, 'async');
          resolve(i);
        }, i * 1000);
      });
    });
  }
};

asyncTask(3);

// Step 0 async
// Step 1 async
// Step 2 async
複製代碼

4. 混合任務

你甚至能夠使用 paq 執行一系列同步和異步交錯的任務,例如:

const mixTask = (n) => {
  asyncTask(n);
  syncTask(n);
  asyncTask(n);
};

mixTask(2);

// Step 0 async
// Step 1 async
// Step 0 sync
// Step 1 sync
// Step 0 async
// Step 1 async
複製代碼

5. 綁定執行上下文

有時若是你須要指定上下文來執行任務,例如:

const contextTask = (n) => {
  var testObj = {
    name: 'foo',
    sayName: (name) => {
      console.log(name);
    }
  };
  for (let i = 0; i < n; i++) {
    paq.addTask({ context: testObj }, function () {
      this.sayName(this.name + i);
    });
  }
};

contextTask(3);

// foo0
// foo1
// foo2
複製代碼

注意: this 在箭頭函數中並不存在,或者說它是指向其定義處的上下文。

6. 延遲執行

paq 還支持延遲執行任務,例如:

const delayTask = (n) => {
  for (let i = 0; i < n; i++) {
    paq.addTask({ delay: 1000 * i }, () => {
      console.log('Step', i, 'sync');
      return i;
    });
  }
};

delayTask(3);

// Step 0 sync
// Step 1 sync
// Step 2 sync
複製代碼

7. 優先權

若是須要執行的任務具備權重,例如:

const priorityTask = (n) => {
  for (let i = 0; i < n; i++) {
    paq.addTask({ priority: i === n - 1 ? 'high' : 'normal' }, () => {
      return new Promise(resolve => {
        setTimeout(() => {
          console.log('Step', i, 'async');
          resolve(i);
        }, i * 1000);
      });
    });
  }
};

priorityTask(5);

// Step 0 async
// Step 4 async
// Step 1 async
// Step 2 async
// Step 3 async
複製代碼

默認優先級映射以下:

{
  "low": 1,
  "normal": 0, // default
  "mid": -1,
  "high": -2,
  "urgent": -3
}
複製代碼

8. 回調函數

有時,你但願可以在任務的開始、完成、失敗、被刪除時作點事情,例如

const callbackTask = (n) => {
  for (let i = 0; i < n; i++) {
    paq.addTask({
      id: i,
      start: (ctx, options) => {
        console.log('start running task id is', options.id);
      },
      completed: (ctx, res) => {
        console.log('complete, result is', res);
      },
      failed: (ctx, err) => {
        console.log(err);
      }
    }, () => {
      if (i < n / 2) {
        throw new Error(i + ' is too small!');
      }
      return i;
    });
  }
};

callbackTask(5);

// start running task id is 0
// Error: 0 is too small!
// start running task id is 1
// Error: 1 is too small!
// start running task id is 2
// Error: 2 is too small!
// start running task id is 3
// complete, result is 3
// start running task id is 4
// complete, result is 4
複製代碼

9. 刪除任務

有時,你須要刪除一些任務,例如:

const removeTask = (n) => {
  for (let i = 0; i < n; i++) {
    paq.addTask({
      id: i,
      remove: (ctx, options) => {
        console.log('remove task id is', options.id);
      }
    }, () => {
      return new Promise(resolve => {
        setTimeout(() => {
          console.log('Step', i, 'async');
          resolve(i);
        }, i * 1000);
      });
    });
  }
  console.log(paq.removeTask(3));
  console.log(paq.removeTask(5));
};

removeTask(5);

// remove task id is 3
// true
// false
// Step 0 async
// Step 1 async
// Step 2 async
// Step 4 async
複製代碼

注意: 你必須在建立任務時分配id,並根據id刪除任務。

paq 事件

若是須要監視 paq 隊列的狀態,paq 提供如下事件偵聽器:

1. addTask

paq.on('addTask', (options) => {
  // Triggered when the queue adds a task.
});
複製代碼

2. startTask

paq.on('startTask', (options) => {
  // Triggered when a task in the queue is about to execute.
});
複製代碼

3. changeTask

paq.on('changeTask', (options) => {
  // Triggered when a task in the queue changes.
});
複製代碼

4. removeTask

paq.on('removeTask', (options) => {
  // Triggered when the queue remove a task.
});
複製代碼

5. completed

paq.on('completed', (options, result) => {
  // Triggered when the task execution in the queue is complete.
});
複製代碼

6. failed

paq.on('failed', (options, err) => {
  // Triggered when a task in the queue fails to execute.
});
複製代碼

最後,想補充的是:若遇到 Promise.allPromise.race 解決不了的需求,能夠考慮一下 paq

相關文章
相關標籤/搜索