EMQ X 規則引擎系列(三)存儲消息到 InfluxDB 時序數據庫

前言

InfluxDB 是一個用於存儲和分析時間序列數據的開源數據庫,內置 HTTP API,類 SQL 語句的支持和無結構的特性對使用者而言都很是友好。它強大的數據吞吐能力以及穩定的性能表現使其很是適合 IoT 領域。html

經過 EMQ X 消息引擎,咱們能夠自定義 Template 文件,而後將 Json 格式的 MQTT 消息轉換爲 Measurement 寫入 InfluxDB:git

場景介紹

該場景須要將 EMQ X 指定主題下且知足條件的消息存儲到 InfluxDB 時序數據庫。爲了便於後續分析檢索,消息內容須要進行拆分存儲。github

該場景下客戶端上報數據以下:sql

  • Topic:data/sensordocker

  • Payload:shell

    {
      "location": "bedroom",
      "data": {
        "temperature": 25,
        "humidity": 46.4,
        "pm2_5": 0.5
      }
    }
    複製代碼

準備工做

數據庫安裝及初始化

建立 db 數據庫並開放 8089 UDP 端口。數據庫

$ docker pull influxdb
 $ git clone -b v1.0.0 https://github.com/palkan/influx_udp.git
 $ cd influx_udp
 $ docker run --name=influxdb --rm -d -p 8086:8086 -p 8089:8089/udp -v ${PWD}/files/influxdb.conf:/etc/influxdb/influxdb.conf:ro -e INFLUXDB_DB=db influxdb:latest
複製代碼

配置說明

建立資源

打開 EMQ X Dashboard,進入左側菜單的 資源 頁面,點擊 新建 按鈕,選擇 InfluxDB 資源類型並完成相關配置進行資源建立。json

建立規則

進入左側菜單的 規則 頁面,點擊 新建 按鈕,進行規則建立。觸發事件 選擇 message.publish,即在 EMQ X 收到 PUBLISH 消息時觸發該規則進行數據處理。bash

選定觸發事件後,咱們可在界面上看到可選字段及示例 SQL:socket

篩選所需字段

規則引擎使用 SQL 語句過濾和處理數據。例如前文提到的場景中咱們須要將 payload 中的字段提取出來使用,則能夠經過 payload.<fieldName> 實現。同時咱們僅僅指望處理 data/sensor 主題,那麼能夠在 WHERE 子句中使用主題通配符 =~topic 進行篩選:topic =~ 'data/sensor', 最終咱們獲得 SQL 以下:

SELECT
  payload.location as location,
  payload.data.temperature as temperature,
  payload.data.humidity as humidity,
  payload.data.pm2_5 as pm2_5
FROM
  "message.publish"
WHERE
	topic =~ 'data/sensor'
複製代碼

SQL 測試

藉助 SQL 測試功能,咱們能夠快速確認剛剛填寫的 SQL 語句是否能達到咱們的目的。首先填寫用於測試的 payload 等數據以下:

而後點擊 測試 按鈕,獲得如下輸出結果,與預期相符。

{
  "humidity": 46.4,
  "location": "bedroom",
  "pm2_5": 0.5,
  "temperature": 25
}
複製代碼

添加響應動做,存儲消息到 InfluxDB

SQL 條件輸入輸出無誤後,咱們繼續添加響應動做,配置寫入 SQL 語句,將篩選結果存儲到 InfluxDB。

點擊響應動做中的 添加 按鈕,選擇動做 保存數據到 InfluxDB,選取剛剛建立的 InfluxDB 資源,再按照實際需求將 ${fieldName} 填寫到 Field Keys, Tag KeysTimestamp Key 中,Measurement 表示將數據寫入 InfluxDB 時使用的 Measurement,最後點擊 新建 按鈕完成規則建立。

測試

預期結果

咱們成功建立了一條規則,包含一個處理動做,動做指望效果以下:

  1. 客戶端向 data/sensor 主題上報消息時,該消息將命中規則,規則列表中 已命中 數字將會增長 1;
  2. InfluxDB 的 db 數據庫中將會增長一條數據,數據內容與處理後的消息內容一致。

使用 Dashboard 中的 Websocket 工具測試

切換到 工具 --> Websocket 頁面,使用任意 Client ID 鏈接到 EMQ X,鏈接成功後在 消息 卡片中發送以下消息:

  • Topic:data/sensor

  • Payload:

    {
      "location": "bedroom",
      "data": {
        "temperature": 25,
        "humidity": 46.4,
        "pm2_5": 0.5
      }
    }
    複製代碼

點擊 發送 按鈕,發送成功後能夠看到當前規則已命中次數已經變爲 1。

而後檢查 InfluxDB,新的 data point 是否添加成功:

$ docker exec -it influxdb influx

> use db
Using database db
> select * from "sensor_data"
name: sensor_data
time                humidity location pm2_5 temperature
----                -------- -------- ----- -----------
1561535778444457348 46.4     bedroom  0.5   25
複製代碼

至此,咱們經過規則引擎實現了存儲消息到 InfluxDB 數據庫的業務開發。

在閱讀該教程以前,假定你已經瞭解 MQTTEMQ X 的簡單知識。


更多信息請訪問咱們的官網 emqx.io,或關注咱們的開源項目 github.com/emqx/emqx ,詳細文檔請訪問 官方文檔

相關文章
相關標籤/搜索