本章節咱們將學習如何使用MQ
庫.git
MQ
庫實現了各種消息代理中間件(Message Broker)的鏈接協議, 目前支持:redis
、mqtt
、stomp
協議.github
MQ
庫基於上述協議實現了: 生產者 -> 消費者
與訂閱 -> 發佈
模型, 能夠在不依賴其它服務的狀況下獨立完成任務.web
cf框架提供了多種MQ
的封裝, 當咱們須要使用的時候須要根據實際的協議進行選擇:redis
-- local MQ = require "MQ.mqtt" -- local MQ = require "MQ.redis" -- local MQ = require "MQ.stomp"
此方法將會建立一個的MQ對象實例.數據庫
opt
是一個table
類型的參數, 能夠傳遞以下值:json
以redis broker爲示例:瀏覽器
local MQ = require "MQ.redis" local mq = MQ:new { host = "localhost", port = 6379, -- db = 0, -- auth = "123456789", }
此方法用來訂閱一個指定pattern
. 當broker
將消息傳遞到cf後, function
將會被調用.bash
MQ
庫會爲function
注入一個table
類型的參數msg
, 此參數將在斷開鏈接的時候爲nil
.服務器
msg
根據採用的協議的不一樣msg
的內容也將有所不一樣. 具體內容以logging
庫的打印爲準.websocket
標準使用示例:
local Log = require("logging"):new() mq:on("/notice", function(msg) if not msg then return Log:ERROR("['/notice'] SUBSCRIBE ERROR: 鏈接已斷開.") end Log:DEBUG(msg) end)
開發者能夠同時訂閱多個parttern
.
此方法用來向指定pattern
發送消息. msg爲字符串類型的消息.
使用示例:
mq:emit('/notice', '{"code":200,"data":[1,2,3,4,5,6,7,8,9,10]}')
單個MQ
能夠一直複用emit, 內部會建立一個寫入隊列去完成消息的順序發送. (在多個實例中沒法保證消息前後)
此方法在做爲獨立運行服務端時候調用.
使用示例:
mq:start()
此方法能夠關閉再也不使用的MQ; 在任何狀況下MQ使用完畢後都須要調用此方法來釋放資源.
使用示例:
mq:close()
爲了演示更加直觀, 這裏僅使用redis做爲broker中專消息.
咱們模擬100個生產者向redis的/queue
投遞消息, 同時定義了一個消費者訂閱/queue
持續進行消費
代碼以下:
local cf = require "cf" local json = require "json" local Log = require("logging"):new() local MQ = require "MQ.redis" cf.fork(function () local consumer = MQ:new { host = "localhost", port = 6379 } local count = 0 consumer:on("/queue", function (msg) if not msg then Log:ERROR("[/queue]鏈接失敗", "已經消費了"..count.."個消息") return end count = count + 1 Log:DEBUG("開始消費:", msg, "已經消費了"..count.."個消息") end) consumer:start() -- Websoket內部無需使用這個方法 end) for i = 1, 100 do cf.fork(function() local producer = MQ:new { host = "localhost", port = 6379 } producer:emit("/queue", json.encode({ code = 200, data = { id = math.random(1, 1 << 32) }, })) producer:close() end) end
輸出以下:
[candy@MacBookPro:~/Documents/core_framework] $ ./cfadmin [2019-06-25 16:05:36,240] [@script/main.lua:19] [DEBUG] : 開始消費:, {["pattern"]="/queue", ["payload"]="{"code":200,"data":{"id":3912595079}}", ["source"]="/queue", ["type"]="pmessage"}, 已經消費了1個消息 [2019-06-25 16:05:36,240] [@script/main.lua:19] [DEBUG] : 開始消費:, {["pattern"]="/queue", ["payload"]="{"code":200,"data":{"id":2938696189}}", ["source"]="/queue", ["type"]="pmessage"}, 已經消費了2個消息 [2019-06-25 16:05:36,240] [@script/main.lua:19] [DEBUG] : 開始消費:, {["pattern"]="/queue", ["payload"]="{"code":200,"data":{"id":3499397173}}", ["source"]="/queue", ["type"]="pmessage"}, 已經消費了3個消息 [2019-06-25 16:05:36,240] [@script/main.lua:19] [DEBUG] : 開始消費:, {["pattern"]="/queue", ["payload"]="{"code":200,"data":{"id":1711272453}}", ["source"]="/queue", ["type"]="pmessage"}, 已經消費了4個消息 [2019-06-25 16:05:36,240] [@script/main.lua:19] [DEBUG] : 開始消費:, {["pattern"]="/queue", ["payload"]="{"code":200,"data":{"id":3968420025}}", ["source"]="/queue", ["type"]="pmessage"}, 已經消費了5個消息 [2019-06-25 16:05:36,240] [@script/main.lua:19] [DEBUG] : 開始消費:, {["pattern"]="/queue", ["payload"]="{"code":200,"data":{"id":1887895479}}", ["source"]="/queue", ["type"]="pmessage"}, 已經消費了6個消息 [2019-06-25 16:05:36,240] [@script/main.lua:19] [DEBUG] : 開始消費:, {["pattern"]="/queue", ["payload"]="{"code":200,"data":{"id":3687986737}}", ["source"]="/queue", ["type"]="pmessage"}, 已經消費了7個消息 [2019-06-25 16:05:36,240] [@script/main.lua:19] [DEBUG] : 開始消費:, {["pattern"]="/queue", ["payload"]="{"code":200,"data":{"id":2823099353}}", ["source"]="/queue", ["type"]="pmessage"}, 已經消費了8個消息 [2019-06-25 16:05:36,240] [@script/main.lua:19] [DEBUG] : 開始消費:, {["pattern"]="/queue", ["payload"]="{"code":200,"data":{"id":2528190121}}", ["source"]="/queue", ["type"]="pmessage"}, 已經消費了9個消息 [2019-06-25 16:05:36,240] [@script/main.lua:19] [DEBUG] : 開始消費:, {["pattern"]="/queue", ["payload"]="{"code":200,"data":{"id":4107999865}}", ["source"]="/queue", ["type"]="pmessage"}, 已經消費了10個消息 . .. ... .... ..... [2019-06-25 16:05:36,247] [@script/main.lua:19] [DEBUG] : 開始消費:, {["pattern"]="/queue", ["payload"]="{"code":200,"data":{"id":3608578767}}", ["source"]="/queue", ["type"]="pmessage"}, 已經消費了100個消息
爲了方便閱讀. 咱們這裏取出前10條與最後第100條而且將msg的數據結構打印出來方便閱讀.
消費者的處理方式採用同步非阻塞處理的(當前業務未處理完成是不會繼續處理下個消息的), 若是不想阻塞當前消息隊列事件循環能夠考慮自行fork
一個協程來處理.
用戶經過認證後接入到Server後訂閱本身專屬的頻道, 當有用戶專屬消息的時候任何服務均可以利用此方法進行業務消息推送.
咱們
代碼實現以下:
local cf = require "cf" local json = require "json" local Log = require("logging"):new() local MQ = require "MQ.redis" for uid = 1, 10 do cf.fork(function () local client = MQ:new { host = "localhost", port = 6379 } client:on("/user/"..uid.."/*", function (msg) if not msg then Log:ERROR("[/user/9257]鏈接失敗") return end Log:DEBUG("UID:["..uid.."]接收到推送消息", msg) end) client:start() -- Websoket內部無需使用這個方法 end) end local server = MQ:new { host = "localhost", port = 6379 } cf.at(1, function (...) server:emit("/user/"..math.random(1, 10).."/ad", json.encode({ code = 200, data = {} })) end) server:start()
運行後終端輸出以下所示:
^C[candy@MacBookPro:~/Documents/core_framework] $ ./cfadmin [2019-06-25 16:20:23,506] [@script/main.lua:18] [DEBUG] : UID:[9]接收到推送消息, {["source"]="/user/9/ad", ["pattern"]="/user/9/*", ["type"]="pmessage", ["payload"]="{"data":{},"code":200}"} [2019-06-25 16:20:24,504] [@script/main.lua:18] [DEBUG] : UID:[4]接收到推送消息, {["source"]="/user/4/ad", ["pattern"]="/user/4/*", ["type"]="pmessage", ["payload"]="{"data":{},"code":200}"} [2019-06-25 16:20:25,506] [@script/main.lua:18] [DEBUG] : UID:[8]接收到推送消息, {["source"]="/user/8/ad", ["pattern"]="/user/8/*", ["type"]="pmessage", ["payload"]="{"data":{},"code":200}"} [2019-06-25 16:20:26,506] [@script/main.lua:18] [DEBUG] : UID:[8]接收到推送消息, {["source"]="/user/8/ad", ["pattern"]="/user/8/*", ["type"]="pmessage", ["payload"]="{"data":{},"code":200}"} [2019-06-25 16:20:27,505] [@script/main.lua:18] [DEBUG] : UID:[10]接收到推送消息, {["source"]="/user/10/ad", ["pattern"]="/user/10/*", ["type"]="pmessage", ["payload"]="{"data":{},"code":200}"} [2019-06-25 16:20:28,506] [@script/main.lua:18] [DEBUG] : UID:[2]接收到推送消息, {["source"]="/user/2/ad", ["pattern"]="/user/2/*", ["type"]="pmessage", ["payload"]="{"data":{},"code":200}"} [2019-06-25 16:20:29,506] [@script/main.lua:18] [DEBUG] : UID:[4]接收到推送消息, {["source"]="/user/4/ad", ["pattern"]="/user/4/*", ["type"]="pmessage", ["payload"]="{"data":{},"code":200}"} [2019-06-25 16:20:30,506] [@script/main.lua:18] [DEBUG] : UID:[8]接收到推送消息, {["source"]="/user/8/ad", ["pattern"]="/user/8/*", ["type"]="pmessage", ["payload"]="{"data":{},"code":200}"} [2019-06-25 16:20:31,505] [@script/main.lua:18] [DEBUG] : UID:[3]接收到推送消息, {["source"]="/user/3/ad", ["pattern"]="/user/3/*", ["type"]="pmessage", ["payload"]="{"data":{},"code":200}"} [2019-06-25 16:20:32,506] [@script/main.lua:18] [DEBUG] : UID:[6]接收到推送消息, {["source"]="/user/6/ad", ["pattern"]="/user/6/*", ["type"]="pmessage", ["payload"]="{"data":{},"code":200}"} [2019-06-25 16:20:33,506] [@script/main.lua:18] [DEBUG] : UID:[5]接收到推送消息, {["source"]="/user/5/ad", ["pattern"]="/user/5/*", ["type"]="pmessage", ["payload"]="{"data":{},"code":200}"} [2019-06-25 16:20:34,503] [@script/main.lua:18] [DEBUG] : UID:[7]接收到推送消息, {["source"]="/user/7/ad", ["pattern"]="/user/7/*", ["type"]="pmessage", ["payload"]="{"data":{},"code":200}"} [2019-06-25 16:20:35,506] [@script/main.lua:18] [DEBUG] : UID:[4]接收到推送消息, {["source"]="/user/4/ad", ["pattern"]="/user/4/*", ["type"]="pmessage", ["payload"]="{"data":{},"code":200}"} [2019-06-25 16:20:36,506] [@script/main.lua:18] [DEBUG] : UID:[6]接收到推送消息, {["source"]="/user/6/ad", ["pattern"]="/user/6/*", ["type"]="pmessage", ["payload"]="{"data":{},"code":200}"} [2019-06-25 16:20:37,505] [@script/main.lua:18] [DEBUG] : UID:[10]接收到推送消息, {["source"]="/user/10/ad", ["pattern"]="/user/10/*", ["type"]="pmessage", ["payload"]="{"data":{},"code":200}"} ^C[candy@MacBookPro:~/Documents/core_framework] $
這裏咱們能夠看到, 由消息發佈到/user/9527/*
下的topic
的時候, 咱們能夠經過一次通配符
訂閱就能夠接收到全部下屬路由消息.
在各類領域內, 消息推送已經成爲了一種最多見的業務. 咱們如今來嘗試利用MQ實現消息推送業務.
首先, 咱們將script/main.lua
的文件寫入以下代碼:
-- main.lua local cf = require "cf" local json = require "json" local Log = require("logging"):new() local MQ = require "MQ.redis" for i = 1, 3 do cf.fork(function () local uid = math.random(1, 1 << 32) local client_mq = MQ:new { host = "localhost", -- 主機名 port = 6379, -- 端口號 -- db = nil, -- 默認數據庫 -- auth = nil, -- 密碼 } client_mq:on("/system/notice", function (msg) if not msg then Log:ERROR("['/system/notice'] SUBSCRIBE ERROR: 鏈接已斷開.") return end Log:DEBUG("UID:["..uid.."]接收到消息: ", msg) end) client_mq:start() end) end local server_mq = MQ:new { host = "localhost", -- 主機名 port = 6379, -- 端口號 -- db = nil, -- 默認數據庫 -- auth = nil, -- 密碼 } cf.at(3, function (args) server_mq:emit("/system/notice", json.encode({ code = 200, msg = "系統即將關閉" })) end) server_mq:start()
這裏咱們用啓動了3個協程來模擬用戶訂閱消息, 而且每一個協程都使用不一樣的UID來打印. 而後再啓動一個定時器模擬每三秒的消息推送業務.
打開終端運行./cfadmin
後, 輸出以下:
[candy@MacBookPro:~/Documents/core_framework] $ ./cfadmin [2019-06-25 15:43:24,842] [@script/main.lua:19] [DEBUG] : UID:[3363385555]接收到消息: , {["pattern"]="/system/notice", ["payload"]="{"msg":"系統即將關閉","code":200}", ["type"]="pmessage", ["source"]="/system/notice"} [2019-06-25 15:43:24,842] [@script/main.lua:19] [DEBUG] : UID:[1693861773]接收到消息: , {["pattern"]="/system/notice", ["payload"]="{"msg":"系統即將關閉","code":200}", ["type"]="pmessage", ["source"]="/system/notice"} [2019-06-25 15:43:24,842] [@script/main.lua:19] [DEBUG] : UID:[3608578767]接收到消息: , {["pattern"]="/system/notice", ["payload"]="{"msg":"系統即將關閉","code":200}", ["type"]="pmessage", ["source"]="/system/notice"} [2019-06-25 15:43:27,841] [@script/main.lua:19] [DEBUG] : UID:[3363385555]接收到消息: , {["pattern"]="/system/notice", ["payload"]="{"msg":"系統即將關閉","code":200}", ["type"]="pmessage", ["source"]="/system/notice"} [2019-06-25 15:43:27,841] [@script/main.lua:19] [DEBUG] : UID:[1693861773]接收到消息: , {["pattern"]="/system/notice", ["payload"]="{"msg":"系統即將關閉","code":200}", ["type"]="pmessage", ["source"]="/system/notice"} [2019-06-25 15:43:27,841] [@script/main.lua:19] [DEBUG] : UID:[3608578767]接收到消息: , {["pattern"]="/system/notice", ["payload"]="{"msg":"系統即將關閉","code":200}", ["type"]="pmessage", ["source"]="/system/notice"} [2019-06-25 15:43:30,841] [@script/main.lua:19] [DEBUG] : UID:[3363385555]接收到消息: , {["pattern"]="/system/notice", ["payload"]="{"msg":"系統即將關閉","code":200}", ["type"]="pmessage", ["source"]="/system/notice"} [2019-06-25 15:43:30,841] [@script/main.lua:19] [DEBUG] : UID:[1693861773]接收到消息: , {["pattern"]="/system/notice", ["payload"]="{"msg":"系統即將關閉","code":200}", ["type"]="pmessage", ["source"]="/system/notice"} [2019-06-25 15:43:30,841] [@script/main.lua:19] [DEBUG] : UID:[3608578767]接收到消息: , {["pattern"]="/system/notice", ["payload"]="{"msg":"系統即將關閉","code":200}", ["type"]="pmessage", ["source"]="/system/notice"} [candy@MacBookPro:~/Documents/core_framework] $
從終端的輸出內容中能夠看到, 咱們確實每隔3秒就收到了一次消息推送.
首先, 咱們須要創建一套基於httpd
庫的Websocket
路由. 讓咱們打開script/main.lua
文件並將下面的代碼寫入進去.
local httpd require "httpd" local app = httpd:new("Web") app:ws('/ws', require "ws") app:listen("0.0.0.0", 8080) app:run()
Websocket
必須在創建與客戶端的鏈接完成的同時利用MQ
庫訂閱/chat
. 每當客戶端發送消息過來觸發on_message
的時候, 都將會消息直接發佈到/chat
內部經過中轉後實現推送聊天.
而後咱們利用前面章節所學的Websocket指南
, 編寫一段簡單的Websocket路由處理代碼. 因爲示例代碼沒有UID生成機制. 爲了方便調試, 咱們隨機生成32位整數做爲惟一ID標識符.
script/ws.lua
具體代碼以下所示:
local MQ = require "MQ.redis" local class = require "class" local websocket = class("websocket") function websocket:ctor (opt) self.ws = opt.ws self.id = math.random(1, 1 << 32) end function websocket:on_open () self.mq = MQ:new { host = 'localhost', port = 6379 } self.mq:on("/chat", function (msg) if not msg then return end self.ws:send(msg.payload) end) end function websocket:on_message (data, typ) if self.mq then self.mq:emit("/chat", data) end print("客戶端["..self.id.."]發送了消息:["..data.."]") end function websocket:on_error (error) end function websocket:on_close () if self.mq then self.mq:close() self.mq = nil end end return websocket
注意: 咱們須要記住當客戶端鏈接斷開的時候記得關閉訂閱回收資源. 啓動./cfadmin
, 查看是否正常運行.
讓咱們下載客戶端工具, 而且安裝到咱們的Chrome
瀏覽器上. 提取碼:cgwr
如今, 咱們運行客戶端工具在地址欄輸入localhost:8080/ws
鏈接咱們剛剛啓動的Websocket Server, 而後開始向服務器發送消息.
若是從終端中和客戶端看到相似的輸出內容, 說明咱們的示例編寫完成.
[candy@MacBookPro:~/Documents/core_framework] $ ./cfadmin [2019/06/25 20:11:59] [INFO] httpd正在監聽: 0.0.0.0:8080 [2019/06/25 20:11:59] [INFO] httpd正在運行Web Server服務... [2019/06/25 20:12:01] - ::1 - ::1 - /ws - GET - 101 - req_time: 0.000095/Sec [2019/06/25 20:12:17] - ::1 - ::1 - /ws - GET - 101 - req_time: 0.000080/Sec 客戶端[1693861773]發送了消息:[hello! 我是2] 客戶端[1693861773]發送了消息:[hello! 我是2] 客戶端[1693861773]發送了消息:[hello! 我是2] 客戶端[1693861773]發送了消息:[hello! 我是2] 客戶端[1693861773]發送了消息:[hello! 我是2] 客戶端[1693861773]發送了消息:[hello! 我是2] 客戶端[1693861773]發送了消息:[hello! 我是2] [2019/06/25 20:12:23] - ::1 - ::1 - /ws - GET - 101 - req_time: 0.000052/Sec 客戶端[3363385555]發送了消息:[hello! 我是1] 客戶端[3363385555]發送了消息:[hello! 我是1] 客戶端[3363385555]發送了消息:[hello! 我是1] 客戶端[3363385555]發送了消息:[hello! 我是1] 客戶端[3363385555]發送了消息:[hello! 我是1] 客戶端[3363385555]發送了消息:[hello! 我是1] 客戶端[3363385555]發送了消息:[hello! 我是1] 客戶端[1693861773]發送了消息:[hello! 我是2]
上述代碼僅用redis
協議進行模擬, 其它協議請參考Wiki.
至此Lua Web開發指南已經編寫完畢. 軟件開發領域內不單單須要師傅領進門, 我的修行也是一種能力的體現.
cf框架都內置庫很是的多, 維護框架都同時還要編寫使用教程. 做者不可能一個一個介紹徹底. cf框架已經有了專屬的QQ討論社區: 727531854, 點擊加羣.
目前內部就做者一我的在裏面. 若是您也對它比較感興趣, 歡迎您到羣裏來一塊兒交流技術.