OpenTSDB 是可擴展的分佈式時序數據庫,底層依賴 HBase 並充分發揮了HBase的分佈式列存儲特性,支持數百萬每秒的讀寫。git
面對大規模快速增加的物聯網傳感器採集、交易記錄等數據,時間序列數據累計速度很是快,時序數據庫經過提升效率來處理這種大規模數據,並帶來性能的提高,包括:更高的容納率(Ingest Rates)、更快的大規模查詢以及更好的數據壓縮。github
讀者能夠參考 OpenTSDB 官方文檔 (http://opentsdb.net) 或 Docker (https://hub.docker.com/r/pete... 來下載安裝 OpenTSDB 服務器,本文使用 OpenTSDB 2.4.0 版本。sql
該場景須要將 EMQ X 指定主題下且知足條件的消息存儲到 OpenTSDB 數據庫。爲了便於後續分析檢索,消息內容須要進行拆分存儲。docker
該場景下客戶端上報數據以下:shell
Payload:數據庫
{ "metric": "cpu", "tags": { "host": "serverA" }, "value":12 }
啓動 OpenTSDB Server 並開放 4242 端口。json
$ docker pull petergrace/opentsdb-docker $ docker run -d --name opentsdb -p 4242:4242 petergrace/opentsdb-docker
打開 EMQ X Dashboard,進入左側菜單的 資源 頁面,點擊 新建 按鈕,選擇 OpenTSDB 資源類型並完成相關配置進行資源建立。性能優化
進入左側菜單的 規則 頁面,點擊 新建 按鈕,進行規則建立。這裏選擇觸發事件 message.publish,即在 EMQ X 收到 PUBLISH 消息時觸發該規則進行數據處理。服務器
選定觸發事件後,咱們可在界面上看到可選字段及示例 SQL:socket
規則引擎使用 SQL 語句過濾和處理數據。例如前文提到的場景中咱們須要將 payload
中的字段提取出來使用,則能夠經過 payload.<fieldName>
實現。同時咱們僅僅指望處理 stat/cpu
主題,那麼能夠在 WHERE 子句中使用主題通配符 =~
對 topic
進行篩選:topic =~ 'stat/cpu'
, 最終咱們獲得 SQL 以下:
SELECT payload.metric as metric, payload.tags as tags, payload.value as value FROM "message.publish" WHERE topic =~ 'stat/cpu'
藉助 SQL 測試功能,咱們能夠快速確認剛剛填寫的 SQL 語句可否達成咱們的目的。首先填寫用於測試的 payload 等數據以下:
而後點擊 測試 按鈕,咱們獲得如下數據輸出:
{ "metric": "cpu", "tags": { "host": "serverA" }, "value": 12 }
測試輸出與預期相符,咱們能夠進行後續步驟。
SQL 條件輸入輸出無誤後,咱們繼續添加相應動做,配置寫入 SQL 語句,將篩選結果存儲到 OpenTSDB。
點擊響應動做中的 添加 按鈕,選擇 保存數據到 OpenTSDB 動做,選取剛剛建立的 OpenTSDB
資源並完成剩餘參數設置。OpenTSDB 動做用到的幾個參數分別爲:
這裏咱們所有使用默認配置,點擊 新建 按鈕完成規則建立。
咱們成功建立了一條規則,包含一個處理動做,動做指望效果以下:
stat/cpu
主題上報消息時,該消息將命中 SQL,規則列表中 已命中 數字增長 1;切換到 工具 --> Websocket 頁面,使用任意信息客戶端鏈接到 EMQ X,鏈接成功後在 消息 卡片中發送以下消息:
Payload:
{ "metric": "cpu", "tags": { "host": "serverA" }, "value":12 }
點擊 發送 按鈕,發送成功後能夠看到當前規則已命中次數已經變爲了 1。
而後經過 Postman 向 OpenTSDB 發送查詢請求,當咱們獲得以下應答時說明新的 data point 已經添加成功:
至此,咱們經過規則引擎實現了使用規則引擎存儲消息到 OpenTSDB 數據庫的業務開發。
更多信息請訪問咱們的官網 emqx.io,或關注咱們的開源項目 github.com/emqx/emqx ,詳細文檔請訪問 官方文檔。