一旦瞭解了基本知識,構建API就沒那麼複雜了。咱們向服務器發送HTTP請求,它作一些工做,而後返回請求的數據。這個過程很簡單。可是當請求須要完成超出其範圍的工做時會發生什麼呢?例如,當我提醒一個用戶,系統須要向受影響的全部用戶發送一個推送通知。在請求週期內處理這些通知將延遲最終的響應。隨着咱們的通知系統變得愈來愈複雜,很明顯咱們須要更多的等待時間。javascript
處理通知而後推送通知須要調用數據庫和外部api。該過程拆解以下:java
這6個步驟中的每個都至少有一個與之關聯的數據庫查詢。當須要將單個通知發送到單個用戶的設備時,這個過程能夠很是快地完成,可是若是須要更長的時間,那麼請求就有超時的風險。咱們必須將這個邏輯分離出來,以即可以在請求/響應週期以外處理它。node
任務隊列管理了一份須要在單獨進程中完成的工做列表。一個系統將工做添加到隊列的末尾,而另外一個系統將工做項從頂部彈出。咱們須要建立一個表示上述工做的任務對象,而後將其添加到任務隊列中。在咱們開始以前,我須要問幾個基本問題。redis
咱們已經在使用Redis做爲緩存系統,因此當我開始尋找構建隊列的方法時,Redis是一個顯而易見的選擇。它不只可以很好地處理這種模式,並且有不少在線資源討論它是如何構建的。對此還有許多其餘選項,好比若是你正在使用谷歌應用程序引擎(GAE),你應該研究谷歌雲任務隊列,它提供了更多內置功能。數據庫
我花了一點時間想弄明白。我不想每n毫秒輪詢一次Redis來查找新做業。我發現了兩種方法。第一個是Redis的發佈/訂閱系統。對於這個方法,我將有一個訂閱通道並在其上接收消息的函數。這些消息將提醒我準備運行一個新任務。第二種方法是使用一個簡單的Redis列表做爲隊列,使用阻塞列表pop原語(BLPOP),等待直到一個項目準備好並將其從隊列中移除。api
在這個方案的第一次迭代中,咱們使用了Pub/Sub模式,可是它增長了一層不須要的複雜性。此外,當系統擴展時,咱們必須作額外的工做來驗證消息沒有在多臺機器上處理。所以,咱們切換到List和BLPOP
方法。緩存
「嗯,咱們把任務對象插進去,嗯……」你可能會這麼想,可是隊列只支持添加字符串,因此咱們不能真正插入一個對象。咱們必須把關鍵值推到末端。這個問題困擾着我,主要是由於我不肯定「最好」的方法是什麼。鍵應該是數據庫的主ID,仍是對Redis中的某個對象的引用?咱們應該在哪裏畫出這條線呢?我決定將events主鍵ID發送到隊列,並容許任務決定如何處理它。例如,若是用戶爲一篇文章進行了upvote
,我將把vote操做的ID推到vote_queue中,一旦它從隊列中彈出,服務將知道如何處理它。服務器
好了,我已經描述這個問題,並回答了個人一些問題(但願這些問題也回答了你的一些問題),如今讓咱們看一下這將如何工做的,如圖:async
從圖中能夠看到,咱們有兩個服務在服務器上運行。TaskScheduler將建立一個新任務,將其添加到數據庫,而後將任務的ID推到任務隊列的末尾。TaskManager等待任務添加到隊列後適當地處理它。函數
TaskScheduler.js
是一個基本的例子,演示瞭如何將任務添加到數據庫中,而後將其推到任務隊列的末尾。一旦將其推入隊列,當TaskManager開始監聽時,它將開始處理。
/// TaskScheduler.js is an example of how one would schedule tasks on the task queue.
var redis = require('redis');
var redisClient = redis.createClient();
const TaskScheduler = async function(work){
// If you're using MySQL we would add the "Task" to the database.
let task = await Database.query("INSERT INTO Task ...");
let taskID = task.insertId;
await redisClient.rpush("task_queue", taskID);
}
複製代碼
TaskQueue.js
演示如何在NodeJS中使用async/await實現它的基本示例。
/// TaskQueue.js would be placed in your server and when it's launched
/// to begin listening for tasks. Or, it can be extracted out to a seperate service.
var redis = require("redis");
/// TaskManager for listening to the queue and running work.
const TaskManager = async function(redisClient){
while(true){
let task;
try{
task = await redisClient.blpopAsync("task_queue", 0);
} catch(error) {
// Redis connect could have closed. Handle those cases here.
process.exit(1);
}
try {
await HandleTask(task);
} catch (error) {
// Handling the task failed. Try rerunning it or adding it to a "Failure" queue.
}
}
}
/// Function that handles all the work for this task.
const HandleTask = async function(task){
// Do the work!
}
// Run the TaskManager function
(async function() {
// Initialize redis
let redisClient = redis.createClient();
await TaskManager(redisClient)
})()
複製代碼
因爲我給出的代碼只是一個基本的示例,因此還有不少地方須要改進。你可能想問,TaskManager應該放在哪裏?若是直接將其添加到服務器,則在高使用率期間可能會使系統過載,但這取決於你的任務執行的工做類型。在咱們的系統中,咱們將全部這些提取到一個新的微服務中,並使用一個簡單的API來檢查它的狀態。
一樣,在示例代碼中,咱們一次運行一個任務。這並不理想,由於長時間運行的任務可能得備份整個隊列。所以,咱們應該有一個運行任務池,根據須要添加和刪除這些任務。一旦池被填滿,while循環將等待一個新的空間。
本文所描述的方法並不太複雜,可是它將業務邏輯與應用程序邏輯解耦。有了這個小的更改,咱們就能夠開始迭代系統的性能,並構建更健壯的隊列和服務。咱們還能夠複製此方法來處理各類長時間運行的流程,如推薦系統、文本處理等。