Node.js+Redis實現消息隊列的最佳實踐

問題來由

最近在開發一個小型前端監控項目,因爲技術棧中使用到了 Node + Redis 做爲消息隊列實現,所以這裏記錄下在 Node 中經過 Redis 來實現消息隊列時的 使用方法注意事項前端

什麼是消息隊列

消息隊列,是一種存放 消息 是隊列結構,能夠用來解決 分佈式系統通訊 從而解耦系統模塊異步任務處理請求消峯限流的問題。node

既然叫作隊列,那它通常是從一側推入消息,從另外一側消費消息;大概是以下的流程。redis

image.png

在個人需求當中,我用消息隊列來作異步的入庫處理。後端

image.png

我經過 Node 作了一個對外的日誌接收層(即圖中 Koa Server)用於接收上報日誌,當Koa Server接收完成會當即給用戶響應 OK,由於用戶是不必去感知後端日誌的入庫結果的。promise

所以 Koa Server 收到日誌後,將消息放入 Redis 消息隊列便可。另一端,我啓動了一個 消費 程序(即上圖中的日誌入庫模塊,它也是一個 Node 腳本)來對MQ消息進行讀取並進行入庫操做。bash

Redis 如何作消息隊列

消息隊列,其實有 2種類型。一種是基於 隊列模型 的,一種是基於 訂閱發佈模式的。網絡

對於 訂閱發佈模式 來講,是指的多個消費者均可以訂閱某一個 channel 的消息,當channel中來了消息,全部的訂閱者都會收到通知,而且全部的訂閱者均可以對同一個消息進行處理(消費)。數據結構

對於 隊列模型 來講,當消息入隊後,在另外一端只出隊一次,若是有多個消費者在等待這個隊列,那麼只有一個消費者能拿到這個消息進行處理。app

在 Redis 中,以上 2 種模型,分別經過 pub/sub 功能和 list 結構能夠來實現。異步

對於個人日誌接收場景來講,我指望的是不管我後端有多少個 入庫消費者,我但願同一條上報只能入庫一次。所以對我來講,我須要使用 隊列模型 來實現消息隊列,即便用 Redis 的 List 結構。

CLI 簡單實驗

咱們經過 redis-cli 來簡單實驗下 list 結構是如何當作消息隊列的。

首先,經過 lpush 命令往 redis 中某個隊列的左側推入一條消息:

lpush my_mq abc

這樣,咱們就往 my_mq 這個隊列推入了一條內容爲 abc 的消息。因爲此時並無消費者,因此這條消息依然存在於隊列當中。咱們甚至能夠再次往裏推入第2條 def 消息,並經過 llen 命令來查看當前隊列的長度。

image.png

接下來,咱們在另一個命令行窗口輸入:

rpop my_mq

意思是從 my_mq 隊列的右側拿出一條消息。結果:

image.png

阻塞模式實驗

Redis 的 List 結構,爲了方便你們當作消息隊列。提供了一種阻塞模式。 阻塞和非阻塞有什麼區別呢?

咱們用一個新命令行窗口,去執行 阻塞等待消息:

brpop my_mq 0

注意後面要加一個 超時時間,0就表示一直阻塞等待。而後,咱們看到 redis 命令行就阻塞在這裏了,處於等待消息的狀態:
image.png
而若是使用 rpop 非阻塞命令的話,則會返回空並直接退出等待:
image.png

所以,能夠發現,阻塞非阻塞模式,最大的區別:是在於當消息隊列爲空的時候,阻塞模式不會退出等待,而非阻塞模式則會直接返回空並退出等待。

brpop 正在等待的時候,咱們往隊列中 push 一個消息:

lpush my_mq 123

能夠看到,阻塞模式的消費端,收到了 123 這個消息,同時本身也退出了等待:

image.png

這說明:

  • 阻塞模式: 當隊列爲空時,(即沒有等到消息時),則一直阻塞着;等到一條消息就退出
  • 非阻塞模式:當隊列爲空(即沒有等到消息),也不阻塞,而是直接返回null退出

所以 redis 所謂的阻塞,是 當還未等到1條消息時,則阻塞等待;當等到1條消息,即馬上退出;它並不會循環阻塞---即等到消息後它就再也不阻塞監聽這個隊列了。 這將給咱們編寫 Node 代碼提供一些啓發。

Node 如何使用

到了重點了。咱們在 Node 中編碼來使用 redis 消息隊列,跟在 cli 界面使用的方式是同樣的。可是須要咱們考慮如何編寫 消費者 端的代碼,才能實現所謂的 持續監聽隊列。畢竟,咱們的 消費者 是須要常駐進程,持續監聽隊列消息的。並非說 收到一個消息就退出進程

所以,咱們須要編寫一個

  • 能常駐的Node進程,可以持續的等待 redis 隊列消息
  • 當收到1條消息,便由 Node 腳本處理;處理完要繼續等待隊列中下一條消息。如此循環往復。

首先,咱們能夠這樣編寫代碼來在 Node 中建立 redis 客戶端:

const redis = require('promise-redis-client')
let client = redis.createClient(...options)
client.on('error', err => {
    console.log('redis連接出錯')
})
client.on('ready', () => {
    console.log('redis ready')
})

爲了實現 當redis客戶端建立完畢,再開啓消息隊列監聽,咱們把上面的代碼,封裝成一個模塊,用 promise 方式導出:

// redis.js
const redis = require('promise-redis-client')
exports.createClient = function() {
    return new Promise((resolve, reject) => {
        let client = redis.createClient(...options)
        client.on('error', err => {
            console.log('redis 鏈接出錯')
            reject(err)
        })
        client.on('ready', () => {
            console.log('redis ready')
            resolve(client)
        })
    })
}

OK,接下來,咱們能夠去 app.js 中編寫隊列的消費者代碼。爲了更優雅的使用 async/await,咱們能夠這樣來編寫一個 startWait 函數:

async function startWaitMsg(client) {
    ...
}

而後,在 client ready 的時候,去啓動它:

const { createClient } = require('./redis.js')
const c = createClient()
client.then(async c => {
    await startWaitMsg(c)
})

最難的地方在於,startWaitMsg 函數該如何編寫。因爲咱們使用了 promise 版本的 redis庫。所以,咱們能夠像這樣去讀取一個消息:

async function startWaitMsg(client) {
    await client.rpop('my_mq')
}

但這樣寫的話,redis返回消息後,node繼續日後執行,最終 startWaitMsg 函數就執行結束了。儘管整個 Node 進程會由於 redis 鏈接未斷開而不會退出,但 node 此時已經沒法再次去執行 client.rpop 這句代碼了,也所以沒法再次從消息隊列中獲取新來的消息。

循環實現持續等待

咱們想到,可使用循環來實現 持續監聽隊列。因而,把代碼改爲:

async function startWaitMsg(client) {
    while(true) {
      await client.rpop('my_mq')
    }
}

如此便實現了 持續執行 rpop 指令。然而,若是你在 rpop 代碼後面加一行日誌打印的話,會觀察到 client.rpop 在持續打印 null。
image.png

這是由於,rpop 指令是 非阻塞的,所以當隊列沒有消息,他便返回一個 null,由此觸發你的 while 循環在不斷執行。這會致使咱們程序佔用過多的 cpu時間片,且對 redis 網絡IO有過多的不必的消耗。

整個while循環不停的執行,只有執行rpop這一行的時候會短暫釋放一下EventLoop給其餘代碼,這對腳本性能影響也會較大。國家提倡節能減排,這顯然不是最優雅的。

使用阻塞模式

讓咱們來用上 redis 隊列的阻塞模式試試。

async function startWaitMsg(c) {
    while(true) {
        const res = await c.brpop('my_mq', 0)
        console.log('收到消息', res)
    }
}

經過 brpop 指令,可讓 brpop 代碼阻塞在這裏。這裏所謂的 阻塞 並非對 Node 程序的阻塞,而是 redis 客戶端自身的阻塞。實際上對 Node 進程來講,不管是 rpop 仍是 brpop 都是 非阻塞 的異步 IO操做,只是在消息隊列爲空時 rpop 底層會馬上返回null,從而node進程會 resolve一個空,而 brpop 會在底層redis阻塞等待消息,消息到達後再給 Node 進程通知 resolve。

所以,brpop 對 Node 來講,能夠避免本身實現隊列的內容輪詢,能夠在等待IO回調期間將cpu留給其餘任務。從而大大減小 Node 進程的 CPU 消耗。

redis斷開沒法繼續消費的問題

在代碼運行過程當中,出現了一個新的問題: redis 客戶端會在某些狀況下斷開鏈接(可能因爲網絡等緣由)。而經過分析日誌發現:一旦發生鏈接異常,咱們的消費者腳本就沒法繼續接收新的消息了(個人日誌入庫功能失效)。

通過分析,發現問題緣由依然在於咱們的 while 語句 和 brpop 的配合問題。

當 redis client 對象發生鏈接異常時,會向當前正在等待的 brpop 代碼拋出一個 reject 異常。咱們回看上述代碼的 startWait 函數:

async function startWaitMsg(c) {
    while(true) {
        const res = await c.brpop('my_mq', 0)
        console.log('收到消息', res)
    }
}

若是 await brpop 這一行拋出 reject 異常,因爲咱們未捕獲該異常,則異常會拋出 startWaitMsg 函數,結果就是 while 循環被退出了。

思考如何解決

事實上,當鏈接出現問題,咱們須要對 client 進行重連。不過,這個重連機制,redisclient 會自動進行,所以咱們的代碼要作的僅僅只須要保證while循環能在異常時恢復。因而,咱們在發生異常時,continue 一下:

async function startWaitMsg(c) {
    while(true) {
        let res = null
        try {
            res = await c.brpop('my_mq', 0)
            console.log('收到消息', res)
        }
        catch(err) {
            console.log('brpop 出錯,從新brpop')
            continue
        }
        // ... 消息處理任務
    }
}

因爲 redis 客戶端內部的重連過程不會再觸發 reject (只是斷開鏈接的時候觸發一次),所以 continue 以後的 brpop 又會從新 "阻塞" 等待,由此,咱們的 消費者 即可以正常活着了。

最終代碼

  • 客戶端鏈接代碼文件:redis.js
const redis = require('promise-redis-client')
exports.createClient = function() {
    return new Promise((resolve, reject) => {
        let client = redis.createClient(...options)
        client.on('error', err => {
            console.log('redis 鏈接出錯')
            reject(err)
        })
        client.on('ready', () => {
            console.log('redis ready')
            resolve(client)
        })
    })
}

app.js

const { createClient } = require('./redis.js')

const c = createClient()
client.then(async c => {
    await startWaitMsg(c) // 啓動消息監聽
})

async function startWaitMsg(c) {
    while(true) {
        let res = null
        try {
            res = await c.brpop('my_mq', 0)
            console.log('收到消息', res)
        }
        catch(err) {
            console.log('brpop 出錯,從新brpop')
            continue
        }
        // ... 消息處理任務
    }
}

總結

  • redis 的 list 數據結構,能夠用做實現隊列模式消息隊列
  • Node 中能夠經過 while(true) 實現隊列的持續循環監聽
  • 經過 brpop 阻塞指令的使用,能夠避免 cpu 空轉來監聽隊列
  • Node 中要注意 redis 鏈接斷開時的錯誤處理,以免因出錯致使沒法從新監聽隊列
相關文章
相關標籤/搜索