最近在開發一個小型前端監控項目,因爲技術棧中使用到了 Node + Redis 做爲消息隊列實現,所以這裏記錄下在 Node 中經過 Redis 來實現消息隊列時的 使用方法
和 注意事項
前端
消息隊列,是一種存放 消息
是隊列結構,能夠用來解決 分佈式系統通訊 從而解耦系統模塊
、異步任務處理
、請求消峯限流
的問題。node
既然叫作隊列,那它通常是從一側推入消息,從另外一側消費消息;大概是以下的流程。redis
在個人需求當中,我用消息隊列來作異步的入庫處理。後端
我經過 Node 作了一個對外的日誌接收層(即圖中 Koa Server)用於接收上報日誌,當Koa Server接收完成會當即給用戶響應 OK,由於用戶是不必去感知後端日誌的入庫結果的。promise
所以 Koa Server 收到日誌後,將消息放入 Redis 消息隊列便可。另一端,我啓動了一個 消費
程序(即上圖中的日誌入庫模塊,它也是一個 Node 腳本)來對MQ消息進行讀取並進行入庫操做。bash
消息隊列,其實有 2種類型。一種是基於 隊列模型
的,一種是基於 訂閱發佈模式
的。網絡
對於 訂閱發佈模式
來講,是指的多個消費者均可以訂閱某一個 channel 的消息,當channel中來了消息,全部的訂閱者都會收到通知,而且全部的訂閱者均可以對同一個消息進行處理(消費)。數據結構
對於 隊列模型
來講,當消息入隊後,在另外一端只出隊一次,若是有多個消費者在等待這個隊列,那麼只有一個消費者能拿到這個消息進行處理。app
在 Redis 中,以上 2 種模型,分別經過 pub/sub
功能和 list
結構能夠來實現。異步
對於個人日誌接收場景來講,我指望的是不管我後端有多少個 入庫消費者
,我但願同一條上報只能入庫一次。所以對我來講,我須要使用 隊列模型
來實現消息隊列,即便用 Redis 的 List 結構。
咱們經過 redis-cli 來簡單實驗下 list 結構是如何當作消息隊列的。
首先,經過 lpush 命令往 redis 中某個隊列的左側推入一條消息:
lpush my_mq abc
這樣,咱們就往 my_mq
這個隊列推入了一條內容爲 abc
的消息。因爲此時並無消費者,因此這條消息依然存在於隊列當中。咱們甚至能夠再次往裏推入第2條 def
消息,並經過 llen
命令來查看當前隊列的長度。
接下來,咱們在另一個命令行窗口輸入:
rpop my_mq
意思是從 my_mq
隊列的右側拿出一條消息。結果:
Redis 的 List 結構,爲了方便你們當作消息隊列。提供了一種阻塞模式。 阻塞和非阻塞有什麼區別呢?
咱們用一個新命令行窗口,去執行 阻塞等待消息
:
brpop my_mq 0
注意後面要加一個 超時時間
,0就表示一直阻塞等待。而後,咱們看到 redis 命令行就阻塞在這裏了,處於等待消息的狀態:
而若是使用 rpop
非阻塞命令的話,則會返回空並直接退出等待:
所以,能夠發現,阻塞非阻塞模式,最大的區別:是在於當消息隊列爲空的時候,阻塞模式不會退出等待,而非阻塞模式則會直接返回空並退出等待。
當 brpop
正在等待的時候,咱們往隊列中 push 一個消息:
lpush my_mq 123
能夠看到,阻塞模式的消費端,收到了 123 這個消息,同時本身也退出了等待:
這說明:
所以 redis 所謂的阻塞,是 當還未等到1條消息時,則阻塞等待;當等到1條消息,即馬上退出
;它並不會循環阻塞---即等到消息後它就再也不阻塞監聽這個隊列了。 這將給咱們編寫 Node 代碼提供一些啓發。
到了重點了。咱們在 Node 中編碼來使用 redis 消息隊列,跟在 cli 界面使用的方式是同樣的。可是須要咱們考慮如何編寫 消費者
端的代碼,才能實現所謂的 持續監聽隊列
。畢竟,咱們的 消費者
是須要常駐進程,持續監聽隊列消息的。並非說 收到一個消息就退出進程
。
所以,咱們須要編寫一個
首先,咱們能夠這樣編寫代碼來在 Node 中建立 redis 客戶端:
const redis = require('promise-redis-client') let client = redis.createClient(...options) client.on('error', err => { console.log('redis連接出錯') }) client.on('ready', () => { console.log('redis ready') })
爲了實現 當redis客戶端建立完畢,再開啓消息隊列監聽
,咱們把上面的代碼,封裝成一個模塊,用 promise 方式導出:
// redis.js const redis = require('promise-redis-client') exports.createClient = function() { return new Promise((resolve, reject) => { let client = redis.createClient(...options) client.on('error', err => { console.log('redis 鏈接出錯') reject(err) }) client.on('ready', () => { console.log('redis ready') resolve(client) }) }) }
OK,接下來,咱們能夠去 app.js
中編寫隊列的消費者代碼。爲了更優雅的使用 async/await,咱們能夠這樣來編寫一個 startWait 函數:
async function startWaitMsg(client) { ... }
而後,在 client ready 的時候,去啓動它:
const { createClient } = require('./redis.js') const c = createClient() client.then(async c => { await startWaitMsg(c) })
最難的地方在於,startWaitMsg 函數該如何編寫。因爲咱們使用了 promise 版本的 redis庫。所以,咱們能夠像這樣去讀取一個消息:
async function startWaitMsg(client) { await client.rpop('my_mq') }
但這樣寫的話,redis返回消息後,node繼續日後執行,最終 startWaitMsg 函數就執行結束了。儘管整個 Node 進程會由於 redis 鏈接未斷開而不會退出,但 node 此時已經沒法再次去執行 client.rpop 這句代碼了,也所以沒法再次從消息隊列中獲取新來的消息。
咱們想到,可使用循環來實現 持續監聽隊列
。因而,把代碼改爲:
async function startWaitMsg(client) { while(true) { await client.rpop('my_mq') } }
如此便實現了 持續執行 rpop 指令
。然而,若是你在 rpop 代碼後面加一行日誌打印的話,會觀察到 client.rpop 在持續打印 null。
這是由於,rpop 指令是 非阻塞的
,所以當隊列沒有消息,他便返回一個 null,由此觸發你的 while 循環在不斷執行。這會致使咱們程序佔用過多的 cpu時間片,且對 redis 網絡IO有過多的不必的消耗。
整個while循環不停的執行,只有執行rpop這一行的時候會短暫釋放一下EventLoop給其餘代碼,這對腳本性能影響也會較大。國家提倡節能減排,這顯然不是最優雅的。
讓咱們來用上 redis 隊列的阻塞模式試試。
async function startWaitMsg(c) { while(true) { const res = await c.brpop('my_mq', 0) console.log('收到消息', res) } }
經過 brpop 指令,可讓 brpop 代碼阻塞在這裏。這裏所謂的 阻塞
並非對 Node 程序的阻塞,而是 redis 客戶端自身的阻塞。實際上對 Node 進程來講,不管是 rpop
仍是 brpop
都是 非阻塞
的異步 IO操做,只是在消息隊列爲空時 rpop
底層會馬上返回null,從而node進程會 resolve一個空,而 brpop
會在底層redis阻塞等待消息,消息到達後再給 Node 進程通知 resolve。
所以,brpop 對 Node 來講,能夠避免本身實現隊列的內容輪詢,能夠在等待IO回調期間將cpu留給其餘任務。從而大大減小 Node 進程的 CPU 消耗。
在代碼運行過程當中,出現了一個新的問題: redis 客戶端會在某些狀況下斷開鏈接(可能因爲網絡等緣由)。而經過分析日誌發現:一旦發生鏈接異常,咱們的消費者腳本就沒法繼續接收新的消息了(個人日誌入庫功能失效)。
通過分析,發現問題緣由依然在於咱們的 while 語句 和 brpop 的配合問題。
當 redis client 對象發生鏈接異常時,會向當前正在等待的 brpop
代碼拋出一個 reject 異常。咱們回看上述代碼的 startWait 函數:
async function startWaitMsg(c) { while(true) { const res = await c.brpop('my_mq', 0) console.log('收到消息', res) } }
若是 await brpop 這一行拋出 reject 異常,因爲咱們未捕獲該異常,則異常會拋出 startWaitMsg 函數,結果就是 while 循環被退出了。
事實上,當鏈接出現問題,咱們須要對 client 進行重連。不過,這個重連機制,redisclient 會自動進行,所以咱們的代碼要作的僅僅只須要保證while循環能在異常時恢復
。因而,咱們在發生異常時,continue 一下:
async function startWaitMsg(c) { while(true) { let res = null try { res = await c.brpop('my_mq', 0) console.log('收到消息', res) } catch(err) { console.log('brpop 出錯,從新brpop') continue } // ... 消息處理任務 } }
因爲 redis 客戶端內部的重連過程不會再觸發 reject (只是斷開鏈接的時候觸發一次),所以 continue 以後的 brpop 又會從新 "阻塞" 等待,由此,咱們的 消費者
即可以正常活着了。
const redis = require('promise-redis-client') exports.createClient = function() { return new Promise((resolve, reject) => { let client = redis.createClient(...options) client.on('error', err => { console.log('redis 鏈接出錯') reject(err) }) client.on('ready', () => { console.log('redis ready') resolve(client) }) }) }
app.js
const { createClient } = require('./redis.js') const c = createClient() client.then(async c => { await startWaitMsg(c) // 啓動消息監聽 }) async function startWaitMsg(c) { while(true) { let res = null try { res = await c.brpop('my_mq', 0) console.log('收到消息', res) } catch(err) { console.log('brpop 出錯,從新brpop') continue } // ... 消息處理任務 } }
隊列模式
的消息隊列
brpop
阻塞指令的使用,能夠避免 cpu 空轉來監聽隊列