從 v4.1 版本開始,EMQ X MQTT 服務器 提供了專門的多語言支持插件 emqx_extension_hook ,現已支持使用其餘編程語言來處理 EMQ X 中的鉤子事件,開發者可使用 Python 或者 Java 快速開發本身的插件,在官方功能的基礎上進行擴展,知足本身的業務場景。例如:html
注:消息(Message) 類鉤子,僅在企業版中支持。
Python 和 Java 驅動基於 Erlang/OTP-Port 進程間通訊實現,自己具備很是高的吞吐性能,本文以 Python 拓展爲例介紹 EMQ X 跨語言拓展使用方式。java
經過 pip 命令在本地安裝 SDK,確保使用 pip3 進行安裝:python
pip3 install emqx-extension-sdk
修改 emqx-extension-hook
插件配置,正確使用拓展:git
## Setup the supported drivers ## ## Value: python2 | python3 | java exhook.drivers = python3 ## Search path for scripts/library exhook.drivers.python3.path = data/extension/hooks.py ## Call timeout ## ## Value: Duration ##exhook.drivers.python3.call_timeout = 5s ## Initial module name ## Your filename or module name exhook.drivers.python3.init_module = hooks
在 emqx/data/extension
目錄下新建 hooks.py
文件,引入 SDK 編寫業務邏輯,示例程序以下:github
## data/extension/hooks.py from emqx_extension.hooks import EmqxHookSdk, hooks_handler from emqx_extension.types import EMQX_CLIENTINFO_PARSE_T, EMQX_MESSAGE_PARSE_T # 繼承 SDK HookSdk 類 class CustomHook(EmqxHookSdk): # 使用裝飾器註冊 hooks @hooks_handler() def on_client_connect(self, conninfo: EMQX_CLIENTINFO_PARSE_T = None, props: dict = None, state: list = None): print(f'[Python SDK] [on_client_connect] {conninfo.clientid} connecte') @hooks_handler() def on_client_connected(self, clientinfo: EMQX_CLIENTINFO_PARSE_T, state: list = None): print( f'[Python SDK] [on_client_connected] {clientinfo.clientid} connected') @hooks_handler() def on_client_check_acl(self, clientinfo: EMQX_CLIENTINFO_PARSE_T, pubsub: str, topic: str, result: bool, state: tuple) -> bool: print( f'[Python SDK] [on_client_check_acl] {clientinfo.username} check ACL: {pubsub} {topic}') # 用戶名爲空時,ACL 驗證不經過 if clientinfo.username == '': return False return True @hooks_handler() def on_client_authenticate(self, clientinfo: EMQX_CLIENTINFO_PARSE_T, authresult, state) -> bool: print( f'[Python SDK] [on_client_authenticate] {clientinfo.clientid} authenticate') # clientid 不爲空時,驗證經過 if clientinfo.clientid != '': return True return False # on_message_* 僅支持企業版 @hooks_handler() def on_message_publish(self, message: EMQX_MESSAGE_PARSE_T, state): print( f'[Python SDK] [on_message_publish] {message.topic} {message.payload}') emqx_hook = CustomHook(hook_module=f'{__name__}.emqx_hook') def init(): return emqx_hook.start() def deinit(): return
啓動 emqx_extension_hook
插件,若是配置錯誤或代碼編寫錯誤將沒法正常啓動。啓動後嘗試創建 MQTT 鏈接並觀察業務運行狀況。數據庫
./bin/emqx_ctl plugins load emqx_extension_hook
目前 EMQ X Python 拓展 SDK 是開源的,若是對可控性、性能要求更高,或須要使用 Python 2.7 版本的運行環境,歡迎貢獻代碼或基於原始示例進行開發:編程
版權聲明: 本文爲 EMQ 原創,轉載請註明出處。原文連接:https://www.emqx.io/cn/blog/d...bash