EMQ X 規則引擎系列(十)—— 存儲消息到 OpenTSDB 數據庫

OpenTSDB 介紹

OpenTSDB 是可擴展的分佈式時序數據庫,底層依賴 HBase 並充分發揮了HBase的分佈式列存儲特性,支持數百萬每秒的讀寫。git

面對大規模快速增加的物聯網傳感器採集、交易記錄等數據,時間序列數據累計速度很是快,時序數據庫經過提升效率來處理這種大規模數據,並帶來性能的提高,包括:更高的容納率(Ingest Rates)、更快的大規模查詢以及更好的數據壓縮。github

安裝與驗證 OpenTSDB 服務器

讀者能夠參考 OpenTSDB 官方文檔 (http://opentsdb.net) 或 Docker (https://hub.docker.com/r/pete... 來下載安裝 OpenTSDB 服務器,本文使用 OpenTSDB 2.4.0 版本。sql

場景介紹

該場景須要將 EMQ X 指定主題下且知足條件的消息存儲到 OpenTSDB 數據庫。爲了便於後續分析檢索,消息內容須要進行拆分存儲。docker

該場景下客戶端上報數據以下:shell

  • Topic:stat/cpu
  • Payload:數據庫

    {
      "metric": "cpu",
      "tags": {
        "host": "serverA"
      },
      "value":12
    }

準備工做

啓動 OpenTSDB Server

啓動 OpenTSDB Server 並開放 4242 端口。json

$ docker pull petergrace/opentsdb-docker

$ docker run -d --name opentsdb -p 4242:4242 petergrace/opentsdb-docker

配置說明

建立資源

打開 EMQ X Dashboard,進入左側菜單的 資源 頁面,點擊 新建 按鈕,選擇 OpenTSDB 資源類型並完成相關配置進行資源建立。性能優化

image-20190725110536094.png

建立規則

進入左側菜單的 規則 頁面,點擊 新建 按鈕,進行規則建立。這裏選擇觸發事件 message.publish,即在 EMQ X 收到 PUBLISH 消息時觸發該規則進行數據處理。服務器

選定觸發事件後,咱們可在界面上看到可選字段及示例 SQL:socket

image-20190719112141128.png

篩選所需字段

規則引擎使用 SQL 語句過濾和處理數據。例如前文提到的場景中咱們須要將 payload 中的字段提取出來使用,則能夠經過 payload.<fieldName> 實現。同時咱們僅僅指望處理 stat/cpu 主題,那麼能夠在 WHERE 子句中使用主題通配符 =~topic 進行篩選:topic =~ 'stat/cpu', 最終咱們獲得 SQL 以下:

SELECT
  payload.metric as metric, payload.tags as tags, payload.value as value
FROM
  "message.publish"
WHERE
    topic =~ 'stat/cpu'

SQL 測試

藉助 SQL 測試功能,咱們能夠快速確認剛剛填寫的 SQL 語句可否達成咱們的目的。首先填寫用於測試的 payload 等數據以下:

image-20190725110913878.png

而後點擊 測試 按鈕,咱們獲得如下數據輸出:

{
  "metric": "cpu",
  "tags": {
    "host": "serverA"
  },
  "value": 12
}

測試輸出與預期相符,咱們能夠進行後續步驟。

添加響應動做,存儲消息到 OpenTSDB

SQL 條件輸入輸出無誤後,咱們繼續添加相應動做,配置寫入 SQL 語句,將篩選結果存儲到 OpenTSDB。

點擊響應動做中的 添加 按鈕,選擇 保存數據到 OpenTSDB 動做,選取剛剛建立的 OpenTSDB 資源並完成剩餘參數設置。OpenTSDB 動做用到的幾個參數分別爲:

  1. 詳細信息。是否須要 OpenTSDB Server 返回存儲失敗的 Data points 及失敗緣由,默認爲 false。
  2. 摘要信息。是否須要 OpenTSDB Server 返回 data point 存儲成功與失敗的數量,默認爲 true。
  3. 最大批處理數量。消息請求頻繁時容許驅動從隊列中一次讀取多少個 Data Points 合併爲一個 HTTP 請求,爲性能優化參數,默認爲 20。
  4. 是否同步調用。配置 OpenTSDB Server 是否等待全部數據都被寫入後才返回結果,默認爲 false。
  5. 同步調用超時時間。OpenTSDB Server 等待數據寫入的最大時間,默認爲 0,即永不超時。

這裏咱們所有使用默認配置,點擊 新建 按鈕完成規則建立。

image-20190725111158382.png

測試

預期結果

咱們成功建立了一條規則,包含一個處理動做,動做指望效果以下:

  1. 客戶端向 stat/cpu 主題上報消息時,該消息將命中 SQL,規則列表中 已命中 數字增長 1;
  2. OpenTSDB Server 中將增長一條數據,數據內容與消息內容一致。

使用 Dashboard 中的 Websocket 工具測試

切換到 工具 --> Websocket 頁面,使用任意信息客戶端鏈接到 EMQ X,鏈接成功後在 消息 卡片中發送以下消息:

  • Topic:stat/cpu
  • Payload:

    {
      "metric": "cpu",
      "tags": {
        "host": "serverA"
      },
      "value":12
    }

image-20190725112738414.png
點擊 發送 按鈕,發送成功後能夠看到當前規則已命中次數已經變爲了 1。

而後經過 Postman 向 OpenTSDB 發送查詢請求,當咱們獲得以下應答時說明新的 data point 已經添加成功:

image-20190725113422461.png

至此,咱們經過規則引擎實現了使用規則引擎存儲消息到 OpenTSDB 數據庫的業務開發。


更多信息請訪問咱們的官網 emqx.io,或關注咱們的開源項目 github.com/emqx/emqx ,詳細文檔請訪問 官方文檔

相關文章
相關標籤/搜索