使用 Python 開發 EMQ X MQTT 服務器插件

從 v4.1 版本開始,EMQ X MQTT 服務器 提供了專門的多語言支持插件 emqx_extension_hook ,現已支持使用其餘編程語言來處理 EMQ X 中的鉤子事件,開發者可使用 Python 或者 Java 快速開發本身的插件,在官方功能的基礎上進行擴展,知足本身的業務場景。例如:html

  • 驗證某客戶端的登陸權限:客戶端鏈接時觸發對應函數,經過參數獲取客戶端信息後經過讀取數據庫、比對等操做斷定是否有登陸權限
  • 記錄客戶端在線狀態與上下線歷史:客戶端狀態變更時觸發對應函數,經過參數獲取客戶端信息,改寫數據庫中客戶端在線狀態
  • 校驗某客戶端的 PUB/SUB 的操做權限:發佈/訂閱時觸發對應函數,經過參數獲取客戶端信息與當前主題,斷定客戶端是否有對應的操做權限
  • 處理會話 (Sessions) 和 消息 (Message) 事件,實現訂閱關係與消息處理/存儲:消息發佈、狀態變更時觸發對應函數,獲取當前客戶端信息、消息狀態與消息內容,轉發到 Kafka 或數據庫進行存儲。
注:消息(Message) 類鉤子,僅在企業版中支持。

Python 和 Java 驅動基於 Erlang/OTP-Port 進程間通訊實現,自己具備很是高的吞吐性能,本文以 Python 拓展爲例介紹 EMQ X 跨語言拓展使用方式。java

img

Python 拓展使用示例

要求

  • EMQ X 所在服務器需安裝 Python 3.6 以上版本

使用步驟

  1. 經過 pip 安裝 Python SDK
  2. 調整 EMQ X 配置,確保相關配置項正確指向 Python 項目
  3. 引入 SDK 編寫代碼

Python 插件安裝

經過 pip 命令在本地安裝 SDK,確保使用 pip3 進行安裝python

pip3 install emqx-extension-sdk

修改配置

修改 emqx-extension-hook 插件配置,正確使用拓展:git

## Setup the supported drivers
##
## Value: python2 | python3 | java
exhook.drivers = python3

## Search path for scripts/library
exhook.drivers.python3.path = data/extension/hooks.py

## Call timeout
##
## Value: Duration
##exhook.drivers.python3.call_timeout = 5s

## Initial module name
## Your filename or module name
exhook.drivers.python3.init_module = hooks

編寫代碼

emqx/data/extension 目錄下新建 hooks.py 文件,引入 SDK 編寫業務邏輯,示例程序以下:github

## data/extension/hooks.py

from emqx_extension.hooks import EmqxHookSdk, hooks_handler
from emqx_extension.types import EMQX_CLIENTINFO_PARSE_T, EMQX_MESSAGE_PARSE_T


# 繼承 SDK HookSdk 類
class CustomHook(EmqxHookSdk):

      # 使用裝飾器註冊 hooks
    @hooks_handler()
    def on_client_connect(self,
                          conninfo: EMQX_CLIENTINFO_PARSE_T = None,
                          props: dict = None,
                          state: list = None):
        print(f'[Python SDK] [on_client_connect] {conninfo.clientid} connecte')

    @hooks_handler()
    def on_client_connected(self,
                            clientinfo: EMQX_CLIENTINFO_PARSE_T,
                            state: list = None):
        print(
            f'[Python SDK] [on_client_connected] {clientinfo.clientid} connected')

    @hooks_handler()
    def on_client_check_acl(self, clientinfo: EMQX_CLIENTINFO_PARSE_T,
                            pubsub: str,
                            topic: str,
                            result: bool,
                            state: tuple) -> bool:
        print(
            f'[Python SDK] [on_client_check_acl] {clientinfo.username} check ACL: {pubsub} {topic}')
        # 用戶名爲空時,ACL 驗證不經過
        if clientinfo.username == '':
            return False
        return True

    @hooks_handler()
    def on_client_authenticate(self, clientinfo: EMQX_CLIENTINFO_PARSE_T, authresult,
                               state) -> bool:
        print(
            f'[Python SDK] [on_client_authenticate] {clientinfo.clientid} authenticate')
        # clientid 不爲空時,驗證經過
        if clientinfo.clientid != '':
            return True
        return False

    # on_message_* 僅支持企業版
    @hooks_handler()
    def on_message_publish(self, message: EMQX_MESSAGE_PARSE_T, state):
        print(
            f'[Python SDK] [on_message_publish] {message.topic} {message.payload}')


emqx_hook = CustomHook(hook_module=f'{__name__}.emqx_hook')


def init():
    return emqx_hook.start()


def deinit():
    return

啓動

啓動 emqx_extension_hook 插件,若是配置錯誤或代碼編寫錯誤將沒法正常啓動。啓動後嘗試創建 MQTT 鏈接並觀察業務運行狀況。數據庫

./bin/emqx_ctl plugins load emqx_extension_hook

進階開發

目前 EMQ X Python 拓展 SDK 是開源的,若是對可控性、性能要求更高,或須要使用 Python 2.7 版本的運行環境,歡迎貢獻代碼或基於原始示例進行開發:編程

  • 代碼倉庫:emqx-extension-python-sdk
  • Python 原始示例,可以使用該示例自行封裝:[emqx-extension-hook main.py
版權聲明: 本文爲 EMQ 原創,轉載請註明出處。

原文連接:https://www.emqx.io/cn/blog/d...bash

相關文章
相關標籤/搜索