Node.js + Redis Sorted Set 任務隊列

需求:功能 A 須要調用第三方 API 獲取數據,而第三方 API 自身是異步處理方式,在調用後會返回數據與狀態 { data: "查詢結果", "status": "正在異步處理中" },這樣就須要間隔一段時間後再去調用第三方 API 獲取數據。爲了用戶在使用功能 A 時不會由於第三方 API 正在異步處理中而必須等待,將用戶請求加入任務隊列中,返回部分數據並關閉請求。而後定時從任務隊列裏中取出任務調用第三方 API,若返回狀態爲」異步處理中「,將該任務再次加入任務隊列,若返回狀態爲」已處理完畢「,將返回數據入庫。node

根據以上問題,想到使用 Node.js + Redis sorted set 來實現任務隊列。Node.js 實現自身應用 API 用來接受用戶請求,合併數據庫已存數據與 API 返回的部分數據返回給用戶,並將任務加入到任務隊列中。利用 Node.js child process 與 cron 定時從任務隊列中取出任務執行。redis

在設計任務隊列的過程當中須要考慮到的幾個問題數據庫

  1. 並行執行多個任務express

  2. 任務惟一性api

  3. 任務成功或失敗後的處理app

針對以上問題的解決方案dom

  1. 並行執行多個任務利用 Promise.all 來實現異步

  2. 任務惟一性利用 Redis sorted set 來實現。使用時間戳做爲分值能夠實現將 sorted set 做爲 list 來使用,在加入任務時判斷任務是否已經存在,在取出任務執行時將該任務分值設置爲 0,每次取出分值大於 0 的任務來執行,能夠避免重複執行任務。ui

  3. 執行任務成功後刪除任務,執行任務失敗後將任務分值更新爲當前時間時間戳,這樣就能夠將失敗的任務從新加入任務隊列尾部url

示例代碼

// remote_api.js 模擬第三方 API
'use strict';

const app = require('express')();

app.get('/', (req, res) => {
    setTimeout(() => {
        let arr = [200, 300];  // 200 表明成功,300 表明失敗須要從新請求
        res.status(200).send({ 'status': arr[parseInt(Math.random() * 2)] });
    }, 3000);
});

app.listen('9001', () => {
    console.log('API 服務監聽端口:9001');
});
// producer.js 自身應用 API,用來接受用戶請求並將任務加入任務隊列
'use strict';

const app = require('express')();
const redisClient = require('redis').createClient();

const QUEUE_NAME = 'queue:example';

function addTaskToQueue(taskName, callback) {
    // 先判斷任務是否已經存在,存在:跳過,不存在:加入任務隊列
    redisClient.zscore(QUEUE_NAME, taskName, (error, task) => {
        if (error) {
            console.log(error);
        } else {
            if (task) {
                console.log('任務已存在,不新增相同任務');
                callback(null, task);
            } else {
                redisClient.zadd(QUEUE_NAME, new Date().getTime(), taskName, (error, result) => {
                    if (error) {
                        callback(error);
                    } else {
                        callback(null, result);
                    }
                });
            }
        }
    });
}

app.get('/', (req, res) => {
    let taskName = req.query['task-name'];
    addTaskToQueue(taskName, (error, result) => {
        if (error) {
            console.log(error);
        } else {
            res.status(200).send('正在查詢中......');
        }
    });
});

app.listen(9002, () => {
    console.log('生產者服務監聽端口:9002');
});
// consumer.js 定時獲取任務並執行
'use strict';

const redisClient = require('redis').createClient();
const request = require('request');
const schedule = require('node-schedule');

const QUEUE_NAME = 'queue:expmple';
const PARALLEL_TASK_NUMBER = 2;  // 並行執行任務數量

function getTasksFromQueue(callback) {
    // 獲取多個任務
    redisClient.zrangebyscore([QUEUE_NAME, 1, new Date().getTime(), 'LIMIT', 0, PARALLEL_TASK_NUMBER], (error, tasks) => {
        if (error) {
            callback(error);
        } else {
            // 將任務分值設置爲 0,表示正在處理
            if (tasks.length > 0) {
                let tmp = [];
                tasks.forEach((task) => {
                    tmp.push(0);
                    tmp.push(task);
                });
                redisClient.zadd([QUEUE_NAME].concat(tmp), (error, result) => {
                    if (error) {
                        callback(error);
                    } else {
                        callback(null, tasks)
                    }
                });
            }
        }
    });
}

function addFailedTaskToQueue(taskName, callback) {
    redisClient.zadd(QUEUE_NAME, new Date().getTime(), taskName, (error, result) => {
        if (error) {
            callback(error);
        } else {
            callback(null, result);
        }
    });
}

function removeSucceedTaskFromQueue(taskName, callback) {
    redisClient.zrem(QUEUE_NAME, taskName, (error, result) => {
        if (error) {
            callback(error);
        } else {
            callback(null, result);
        }
    })
}

function execTask(taskName) {
    return new Promise((resolve, reject) => {
        let requestOptions = {
            'url': 'http://127.0.0.1:9001',
            'method': 'GET',
            'timeout': 5000
        };
        request(requestOptions, (error, response, body) => {
            if (error) {
                resolve('failed');
                console.log(error);
                addFailedTaskToQueue(taskName, (error) => {
                    if (error) {
                        console.log(error);
                    } else {

                    }
                });
            } else {
                try {
                    body = typeof body !== 'object' ? JSON.parse(body) : body;
                } catch (error) {
                    resolve('failed');
                    console.log(error);
                    addFailedTaskToQueue(taskName, (error, result) => {
                        if (error) {
                            console.log(error);
                        } else {

                        }
                    });
                    return;
                }
                if (body.status !== 200) {
                    resolve('failed');
                    addFailedTaskToQueue(taskName, (error, result) => {
                        if (error) {
                            console.log(error);
                        } else {

                        }
                    });
                } else {
                    resolve('succeed');
                    removeSucceedTaskFromQueue(taskName, (error, result) => {
                        if (error) {
                            console.log(error);
                        } else {

                        }
                    });
                }
            }
        });
    });
}

// 定時,每隔 5 秒獲取新的任務來執行
let job = schedule.scheduleJob('*/5 * * * * *', () => {
    console.log('獲取新任務');
    getTasksFromQueue((error, tasks) => {
        if (error) {
            console.log(error);
        } else {
            if (tasks.length > 0) {
                console.log(tasks);

                Promise.all(tasks.map(execTask))
                .then((results) => {
                    console.log(results);
                })
                .catch((error) => {
                    console.log(error);
                });
                
            }
        }
    });
});
相關文章
相關標籤/搜索