EMQ X 持久化插件系列(二)- InfluxDB 數據存儲

InfluxDB 是一個由 InfluxData 開發的開源時序型數據庫。 它由 Go 寫成,着力於高性能地查詢與存儲時序型數據,相比上一期中介紹的 OpenTSDB 數據庫 InfluxDB 較爲輕量,在 InfluxData 官方給出的各項指標基準測試用 InfluxDB 都強於 OpenTSDB。git

面對大規模快速增加的物聯網傳感器採集、交易記錄等數據,時間序列數據累計速度很是快,時序數據庫經過提升效率來處理這種大規模數據,並帶來性能的提高,包括:更高的容納率(Ingest Rates)、更快的大規模查詢(儘管有一些比其餘數據庫支持更多的查詢)以及更好的數據壓縮。github

本文以 CentOS 7.2 系統中的實際例子來講明如何經過 InfluxDB 來存儲相關的信息。docker

安裝與驗證 InfluxDB 服務器

讀者能夠參考 InfluxDB 官方文檔(https://docs.influxdata.com/i... 或 Docker (https://hub.docker.com/_/infl... 來下載安裝 InfluxDB 服務器,本文使用 InfluxDB 1.7 版本。數據庫

配置 EMQ X 服務器

經過 RPM 方式安裝的 EMQ X,InfluxDB 相關的配置文件位於 /etc/emqx/plugins/emqx_backend_influxdb.conf,考慮到功能定位,InfluxDB 插件僅支持消息存儲功能。json

配置鏈接地址與鏈接池大小:bash

## InfluxDB UDP Server
## 僅使用 UDP 接入
backend.influxdb.pool1.server = 127.0.0.1:8089

## InfluxDB Pool Size
backend.influxdb.pool1.pool_size = 5

## Whether or not set timestamp when encoding InfluxDB line
backend.influxdb.pool1.set_timestamp = trues

InfluxDB Backend 消息存儲規則參數: 服務器

經過 topic 過濾器,設置須要存儲消息的主題,pool 參數區別多個數據源:數據結構

## Store Publish Message
backend.influxdb.hook.message.publish.1 = {"topic": "#", "action": {"function": "on_message_publish"}, "pool": "pool1"}

啓動該插件,啓動插件的方式有 命令行控制檯兩種方式,用戶能夠任選其一。性能

消息模板

因爲 MQTT Message 沒法直接寫入 InfluxDB, InfluxDB Backend 提供了 emqx_backend_influxdb.tmpl 模板文件將 MQTT Message 轉換爲可寫入 InfluxDB 的 DataPoint。學習

消息模板功能須要重啓 EMQ X 才能應用更改。

tmpl 文件位於 data/templates/emqx_backend_influxdb_example.tmpl,使用 json 格式, 用戶能夠爲不一樣 Topic 定義不一樣的 Template, 相似:

{
    "timestamp": <Where is value of timestamp>
        "measurement": <Where is value of measurement>,
    "tags": {
        <Tag Key>: <Where is value of tag>
    },
        "fields": {
        <Field Key>: <Where is value of field>
    }
}

其中, measurement 與 fields 爲必選項, tags 與 timestamp 爲可選項。<Where is value of> 支持經過佔位符如 $key 提取變量名爲 key 的變量,支持的變量以下:

  • qos: 消息 QoS
  • form: 發佈者信息
  • topic: 發佈主題
  • timestamp: 時間戳
  • payload.*: JSON 消息體內任意變量,如 { "data": [{ "temp": 1 }] } 使用 ["$payload", "data", "temp"] 能夠提取出 1

本示例設定模板以下:

{
    "sample": {
        "measurement": "$topic",
        "tags": {
            "host": ["$payload", "data", "$0", "host"],
            "region": ["$payload", "data", "$0", "region"],
            "qos": "$qos",
            "from": "$from"
        },
        "fields": {
            "temperature": ["$payload", "data", "$0", "temp"]
        },
        "timestamp": "$timestamp"
    }
}

當 Topic 爲 "sample" 的 MQTT Message 擁有如下 Payload 時:

{
  "data": [
    {
      "temp": 1,
      "host": "serverA",
      "region": "hangzhou"
    },
    {
      "temp": 2,
      "host": "serverB",
      "region": "ningbo"
    }
  ]
}

Backend 會將 MQTT Message 轉換爲:

[
  {
    "measurement": "sample",
    "tags": {
      "from": "mqttjs_ebcc36079a",
      "host": "serverA",
      "qos": "0",
      "region": "hangzhou"
    },
    "fields": {
      "temperature": "1"
    },
    "timestamp": "1560743513626681000"
  },
  {
    "measurement": "sample",
    "tags": {
      "from": "mqttjs_ebcc36079a",
      "host": "serverB",
      "qos": "0",
      "region": "ningbo"
    },
    "fields": {
      "temperature": "2"
    },
    "timestamp": "1560743513626681000"
  }
]

使用示例

EMQ X 管理控制檯 WebSocket 頁面中,向 sample 主題發佈如上格式消息消息,消息將解析存儲到 InfluxDB udp 數據庫對應的 measurement 中。

總結

讀者在理解了 InfluxDB 中所存儲的數據結構,學習使用消息模板配置寫入消息字段格式後能夠結合 InfluxDB 拓展相關應用。


更多信息請訪問咱們的官網 emqx.io,或關注咱們的開源項目 github.com/emqx/emqx ,詳細文檔請訪問 官方文檔

圖片描述

相關文章
相關標籤/搜索