使用 Java 開發 EMQ X MQTT 服務器插件

從 v4.1 版本開始,EMQ X MQTT 服務器 提供了專門的多語言支持插件 emqx_extension_hook ,現已支持使用其餘編程語言來處理 EMQ X 中的鉤子事件,開發者可使用 Python 或者 Java 快速開發本身的插件,在官方功能的基礎上進行擴展,知足本身的業務場景。例如:html

  • 驗證某客戶端的登陸權限:客戶端鏈接時觸發對應函數,經過參數獲取客戶端信息後經過讀取數據庫、比對等操做斷定是否有登陸權限
  • 記錄客戶端在線狀態與上下線歷史:客戶端狀態變更時觸發對應函數,經過參數獲取客戶端信息,改寫數據庫中客戶端在線狀態
  • 校驗某客戶端的 PUB/SUB 的操做權限:發佈/訂閱時觸發對應函數,經過參數獲取客戶端信息與當前主題,斷定客戶端是否有對應的操做權限
  • 處理會話 (Sessions) 和 消息 (Message) 事件,實現訂閱關係與消息處理/存儲:消息發佈、狀態變更時觸發對應函數,獲取當前客戶端信息、消息狀態與消息內容,轉發到 Kafka 或數據庫進行存儲。
注:消息(Message) 類鉤子,僅在企業版中支持。

Python 和 Java 驅動基於 Erlang/OTP-Port 進程間通訊實現,自己具備很是高的吞吐性能,本文以 Java 拓展爲例介紹 EMQ X 跨語言拓展使用方式。java

img

Java 拓展使用示例

要求

  • EMQ X 所在服務器需安裝 JDK 1.8 以上版本

開始使用

  1. 建立 Java 項目
  2. 下載 io.emqx.extension.jarerlport.jar 文件
  3. 添加SDK io.emqx.extension.jarerlport.jar 到項目依賴
  4. 複製 examples/SampleHandler.java到您的項目中
  5. 根據 SDK SampleHandler.java 中的示例編寫業務代碼,確保可以成功編譯

部署

編譯全部源代碼後,須要將 sdk 和代碼文件部署到 EMQ X 中:git

  1. 複製 io.emqx.extension.jaremqx/data/extension 目錄
  2. 將編譯後的 .class 文件,例如 SampleHandler.class 複製到 emqx/data/extension 目錄
  3. 修改 emqx/etc/plugins/emqx_extension_hook.conf 配置文件:
exhook.drivers = java
## Search path for scripts or library
exhook.drivers.java.path = data/extension/
exhook.drivers.java.init_module = SampleHandler

啓動 emqx_extension_hook 插件,若是配置錯誤或 Java 代碼編寫錯誤將沒法正常啓動。啓動後嘗試創建 MQTT 鏈接並觀察業務運行狀況。github

示例

如下爲 SampleHandler.java 示例程序, 該程序繼承自 SDK 中的 DefaultCommunicationHandler 類。該示例代碼演示瞭如何掛載 EMQ X 系統中全部的鉤子:數據庫

import emqx.extension.java.handler.*;
import emqx.extension.java.handler.codec.*;
import emqx.extension.java.handler.ActionOptionConfig.Keys;

public class SampleHandler extends DefaultCommunicationHandler {
    
    @Override
    public ActionOptionConfig getActionOption() {
        ActionOptionConfig option = new ActionOptionConfig();
        option.set(Keys.MESSAGE_PUBLISH_TOPICS, "#");
        option.set(Keys.MESSAGE_DELIVERED_TOPICS, "#");
        option.set(Keys.MESSAGE_ACKED_TOPICS, "#");
        option.set(Keys.MESSAGE_DROPPED_TOPICS, "#");
        
        return option;
    }
    
    // Clients
    @Override
    public void onClientConnect(ConnInfo connInfo, Property[] props) {
        System.err.printf("[Java] onClientConnect: connInfo: %s, props: %s\n", connInfo, props);
    }

    @Override
    public void onClientConnack(ConnInfo connInfo, ReturnCode rc, Property[] props) {
        System.err.printf("[Java] onClientConnack: connInfo: %s, rc: %s, props: %s\n", connInfo, rc, props);
    }

    @Override
    public void onClientConnected(ClientInfo clientInfo) {
        System.err.printf("[Java] onClientConnected: clientinfo: %s\n", clientInfo);
    }

    @Override
    public void onClientDisconnected(ClientInfo clientInfo, Reason reason) {
        System.err.printf("[Java] onClientDisconnected: clientinfo: %s, reason: %s\n", clientInfo, reason);
    }

    // 斷定認證結果,返回 true 或 false 
    @Override
    public boolean onClientAuthenticate(ClientInfo clientInfo, boolean authresult) {
        System.err.printf("[Java] onClientAuthenticate: clientinfo: %s, authresult: %s\n", clientInfo, authresult);

        return true;
    }

    // 斷定 ACL 檢查結果,返回 true 或 false 
    @Override
    public boolean onClientCheckAcl(ClientInfo clientInfo, PubSub pubsub, Topic topic, boolean result) {
        System.err.printf("[Java] onClientCheckAcl: clientinfo: %s, pubsub: %s, topic: %s, result: %s\n", clientInfo, pubsub, topic, result);

        return true;
    }

    @Override
    public void onClientSubscribe(ClientInfo clientInfo, Property[] props, TopicFilter[] topic) {
        System.err.printf("[Java] onClientSubscribe: clientinfo: %s, topic: %s, props: %s\n", clientInfo, topic, props);
    }

    @Override
    public void onClientUnsubscribe(ClientInfo clientInfo, Property[] props, TopicFilter[] topic) {
        System.err.printf("[Java] onClientUnsubscribe: clientinfo: %s, topic: %s, props: %s\n", clientInfo, topic, props);
    }

    // Sessions
    @Override
    public void onSessionCreated(ClientInfo clientInfo) {
        System.err.printf("[Java] onSessionCreated: clientinfo: %s\n", clientInfo);
    }

    @Override
    public void onSessionSubscribed(ClientInfo clientInfo, Topic topic, SubscribeOption opts) {
        System.err.printf("[Java] onSessionSubscribed: clientinfo: %s, topic: %s\n", clientInfo, topic);
    }

    @Override
    public void onSessionUnsubscribed(ClientInfo clientInfo, Topic topic) {
        System.err.printf("[Java] onSessionUnsubscribed: clientinfo: %s, topic: %s\n", clientInfo, topic);
    }

    @Override
    public void onSessionResumed(ClientInfo clientInfo) {
        System.err.printf("[Java] onSessionResumed: clientinfo: %s\n", clientInfo);
    }

    @Override
    public void onSessionDiscarded(ClientInfo clientInfo) {
        System.err.printf("[Java] onSessionDiscarded: clientinfo: %s\n", clientInfo);
    }
    
    @Override
    public void onSessionTakeovered(ClientInfo clientInfo) {
        System.err.printf("[Java] onSessionTakeovered: clientinfo: %s\n", clientInfo);
    }

    @Override
    public void onSessionTerminated(ClientInfo clientInfo, Reason reason) {
        System.err.printf("[Java] onSessionTerminated: clientinfo: %s, reason: %s\n", clientInfo, reason);
    }

    // Messages
    @Override
    public Message onMessagePublish(Message message) {
        System.err.printf("[Java] onMessagePublish: message: %s\n", message);
        
        return message;
    }

    @Override
    public void onMessageDropped(Message message, Reason reason) {
        System.err.printf("[Java] onMessageDropped: message: %s, reason: %s\n", message, reason);
    }

    @Override
    public void onMessageDelivered(ClientInfo clientInfo, Message message) {
        System.err.printf("[Java] onMessageDelivered: clientinfo: %s, message: %s\n", clientInfo, message);
    }

    @Override
    public void onMessageAcked(ClientInfo clientInfo, Message message) {
        System.err.printf("[Java] onMessageAcked: clientinfo: %s, message: %s\n", clientInfo, message);
    }
}

SampleHandler 主要包含兩部分:編程

  1. 重載了 getActionOption 方法。該方法對消息(Message)相關的鉤子進行配置,指定了須要生效的主題列表。bash

    配置項 對應鉤子
    MESSAGE_PUBLISH_TOPICS message_publish
    MESSAGE_DELIVERED_TOPICS message_delivered
    MESSAGE_ACKED_TOPICS message_acked
    MESSAGE_DROPPED_TOPICS message_dropped
  2. 重載了 on<hookName> 方法,這些方法是實際處理鉤子事件的回調函數,函數命名方式爲各個鉤子名稱變體後前面加 on 前綴,變體方式爲鉤子名稱去掉下劃線後使用駱駝拼寫法(CamelCase),例如,鉤子client_connect對應的函數名爲onClientConnect。 EMQ X 客戶端產生的事件,例如:鏈接、發佈、訂閱等,都會最終分發到這些鉤子事件回調函數上,而後回調函數可對各屬性及狀態進行相關操做。 示例程序中僅對各參數進行了打印輸出。若是隻關心部分鉤子事件,只需對這部分鉤子事件的回調函數進行重載便可,不須要重載全部回調函數。

各回調函數的執行時機和支持的鉤子列表與 EMQ X 內置的鉤子徹底一致,參見:Hooks - EMQ X服務器

在實現本身的擴展程序時,最簡單的方式也是繼承 DefaultCommunicationHandler 父類,該類對各鉤子與回調函數的綁定進行了封裝,並進一步封裝了回調函數涉及到的參數數據結構,以方便快速上手使用。數據結構

進階開發

若是對 Java 擴展程序的可控性要求更高,DefaultCommunicationHandler 類已沒法知足需求時,能夠經過實現 CommunicationHandler 接口,從更底層控制代碼邏輯,編寫更靈活的擴展程序。編程語言

package emqx.extension.java.handler;

public interface CommunicationHandler {
    
    public Object init();
    
    public void deinit();
}
  • init() 方法:用於初始化,聲明擴展須要掛載哪些鉤子,以及掛載的配置
  • deinit() 方法:用於註銷。

詳細數據格式說明,參見 設計文檔

版權聲明: 本文爲 EMQ 原創,轉載請註明出處。

原文連接:https://www.emqx.io/cn/blog/develop-emqx-plugin-using-java

相關文章
相關標籤/搜索