從 v4.1 版本開始,EMQ X MQTT 服務器 提供了專門的多語言支持插件 emqx_extension_hook ,現已支持使用其餘編程語言來處理 EMQ X 中的鉤子事件,開發者可使用 Python 或者 Java 快速開發本身的插件,在官方功能的基礎上進行擴展,知足本身的業務場景。例如:html
注:消息(Message) 類鉤子,僅在企業版中支持。
Python 和 Java 驅動基於 Erlang/OTP-Port 進程間通訊實現,自己具備很是高的吞吐性能,本文以 Java 拓展爲例介紹 EMQ X 跨語言拓展使用方式。java
io.emqx.extension.jar
和 erlport.jar
到項目依賴examples/SampleHandler.java
到您的項目中SampleHandler.java
中的示例編寫業務代碼,確保可以成功編譯編譯全部源代碼後,須要將 sdk
和代碼文件部署到 EMQ X 中:git
io.emqx.extension.jar
到 emqx/data/extension
目錄.class
文件,例如 SampleHandler.class
複製到 emqx/data/extension
目錄emqx/etc/plugins/emqx_extension_hook.conf
配置文件:exhook.drivers = java ## Search path for scripts or library exhook.drivers.java.path = data/extension/ exhook.drivers.java.init_module = SampleHandler
啓動 emqx_extension_hook
插件,若是配置錯誤或 Java 代碼編寫錯誤將沒法正常啓動。啓動後嘗試創建 MQTT 鏈接並觀察業務運行狀況。github
如下爲 SampleHandler.java 示例程序, 該程序繼承自 SDK 中的 DefaultCommunicationHandler
類。該示例代碼演示瞭如何掛載 EMQ X 系統中全部的鉤子:數據庫
import emqx.extension.java.handler.*; import emqx.extension.java.handler.codec.*; import emqx.extension.java.handler.ActionOptionConfig.Keys; public class SampleHandler extends DefaultCommunicationHandler { @Override public ActionOptionConfig getActionOption() { ActionOptionConfig option = new ActionOptionConfig(); option.set(Keys.MESSAGE_PUBLISH_TOPICS, "#"); option.set(Keys.MESSAGE_DELIVERED_TOPICS, "#"); option.set(Keys.MESSAGE_ACKED_TOPICS, "#"); option.set(Keys.MESSAGE_DROPPED_TOPICS, "#"); return option; } // Clients @Override public void onClientConnect(ConnInfo connInfo, Property[] props) { System.err.printf("[Java] onClientConnect: connInfo: %s, props: %s\n", connInfo, props); } @Override public void onClientConnack(ConnInfo connInfo, ReturnCode rc, Property[] props) { System.err.printf("[Java] onClientConnack: connInfo: %s, rc: %s, props: %s\n", connInfo, rc, props); } @Override public void onClientConnected(ClientInfo clientInfo) { System.err.printf("[Java] onClientConnected: clientinfo: %s\n", clientInfo); } @Override public void onClientDisconnected(ClientInfo clientInfo, Reason reason) { System.err.printf("[Java] onClientDisconnected: clientinfo: %s, reason: %s\n", clientInfo, reason); } // 斷定認證結果,返回 true 或 false @Override public boolean onClientAuthenticate(ClientInfo clientInfo, boolean authresult) { System.err.printf("[Java] onClientAuthenticate: clientinfo: %s, authresult: %s\n", clientInfo, authresult); return true; } // 斷定 ACL 檢查結果,返回 true 或 false @Override public boolean onClientCheckAcl(ClientInfo clientInfo, PubSub pubsub, Topic topic, boolean result) { System.err.printf("[Java] onClientCheckAcl: clientinfo: %s, pubsub: %s, topic: %s, result: %s\n", clientInfo, pubsub, topic, result); return true; } @Override public void onClientSubscribe(ClientInfo clientInfo, Property[] props, TopicFilter[] topic) { System.err.printf("[Java] onClientSubscribe: clientinfo: %s, topic: %s, props: %s\n", clientInfo, topic, props); } @Override public void onClientUnsubscribe(ClientInfo clientInfo, Property[] props, TopicFilter[] topic) { System.err.printf("[Java] onClientUnsubscribe: clientinfo: %s, topic: %s, props: %s\n", clientInfo, topic, props); } // Sessions @Override public void onSessionCreated(ClientInfo clientInfo) { System.err.printf("[Java] onSessionCreated: clientinfo: %s\n", clientInfo); } @Override public void onSessionSubscribed(ClientInfo clientInfo, Topic topic, SubscribeOption opts) { System.err.printf("[Java] onSessionSubscribed: clientinfo: %s, topic: %s\n", clientInfo, topic); } @Override public void onSessionUnsubscribed(ClientInfo clientInfo, Topic topic) { System.err.printf("[Java] onSessionUnsubscribed: clientinfo: %s, topic: %s\n", clientInfo, topic); } @Override public void onSessionResumed(ClientInfo clientInfo) { System.err.printf("[Java] onSessionResumed: clientinfo: %s\n", clientInfo); } @Override public void onSessionDiscarded(ClientInfo clientInfo) { System.err.printf("[Java] onSessionDiscarded: clientinfo: %s\n", clientInfo); } @Override public void onSessionTakeovered(ClientInfo clientInfo) { System.err.printf("[Java] onSessionTakeovered: clientinfo: %s\n", clientInfo); } @Override public void onSessionTerminated(ClientInfo clientInfo, Reason reason) { System.err.printf("[Java] onSessionTerminated: clientinfo: %s, reason: %s\n", clientInfo, reason); } // Messages @Override public Message onMessagePublish(Message message) { System.err.printf("[Java] onMessagePublish: message: %s\n", message); return message; } @Override public void onMessageDropped(Message message, Reason reason) { System.err.printf("[Java] onMessageDropped: message: %s, reason: %s\n", message, reason); } @Override public void onMessageDelivered(ClientInfo clientInfo, Message message) { System.err.printf("[Java] onMessageDelivered: clientinfo: %s, message: %s\n", clientInfo, message); } @Override public void onMessageAcked(ClientInfo clientInfo, Message message) { System.err.printf("[Java] onMessageAcked: clientinfo: %s, message: %s\n", clientInfo, message); } }
SampleHandler
主要包含兩部分:編程
重載了 getActionOption
方法。該方法對消息(Message)相關的鉤子進行配置,指定了須要生效的主題列表。bash
配置項 | 對應鉤子 |
---|---|
MESSAGE_PUBLISH_TOPICS | message_publish |
MESSAGE_DELIVERED_TOPICS | message_delivered |
MESSAGE_ACKED_TOPICS | message_acked |
MESSAGE_DROPPED_TOPICS | message_dropped |
on<hookName>
方法,這些方法是實際處理鉤子事件的回調函數,函數命名方式爲各個鉤子名稱變體後前面加 on
前綴,變體方式爲鉤子名稱去掉下劃線後使用駱駝拼寫法(CamelCase),例如,鉤子client_connect對應的函數名爲onClientConnect。 EMQ X 客戶端產生的事件,例如:鏈接、發佈、訂閱等,都會最終分發到這些鉤子事件回調函數上,而後回調函數可對各屬性及狀態進行相關操做。 示例程序中僅對各參數進行了打印輸出。若是隻關心部分鉤子事件,只需對這部分鉤子事件的回調函數進行重載便可,不須要重載全部回調函數。各回調函數的執行時機和支持的鉤子列表與 EMQ X 內置的鉤子徹底一致,參見:Hooks - EMQ X服務器
在實現本身的擴展程序時,最簡單的方式也是繼承 DefaultCommunicationHandler
父類,該類對各鉤子與回調函數的綁定進行了封裝,並進一步封裝了回調函數涉及到的參數數據結構,以方便快速上手使用。數據結構
若是對 Java 擴展程序的可控性要求更高,DefaultCommunicationHandler
類已沒法知足需求時,能夠經過實現 CommunicationHandler
接口,從更底層控制代碼邏輯,編寫更靈活的擴展程序。編程語言
package emqx.extension.java.handler; public interface CommunicationHandler { public Object init(); public void deinit(); }
init()
方法:用於初始化,聲明擴展須要掛載哪些鉤子,以及掛載的配置deinit()
方法:用於註銷。詳細數據格式說明,參見 設計文檔。
版權聲明: 本文爲 EMQ 原創,轉載請註明出處。原文連接:https://www.emqx.io/cn/blog/develop-emqx-plugin-using-java