# EMQ X 持久化插件系列(一)- 消息存儲到 OpenTSDB 數據庫

OpenTSDB 是可擴展的分佈式時序數據庫,底層依賴 HBase 並充分發揮了HBase的分佈式列存儲特性,支持數百萬每秒的讀寫。html

面對大規模快速增加的物聯網傳感器採集、交易記錄等數據,時間序列數據累計速度很是快,時序數據庫經過提升效率來處理這種大規模數據,並帶來性能的提高,包括:更高的容納率(Ingest Rates)、更快的大規模查詢(儘管有一些比其餘數據庫支持更多的查詢)以及更好的數據壓縮。git

本文以 CentOS 7.2 系統中的實際例子來講明如何經過 OpenTSDB 來存儲相關的信息。github

安裝與驗證 OpenTSDB 服務器

讀者能夠參考 OpenTSDB 官方文檔Docker 來下載安裝 OpenTSDB 服務器,本文使用 OpenTSDB 2.4.0 版本。docker

配置 EMQ X 服務器

經過 RPM 方式安裝的 EMQ X,OpenTSDB 相關的配置文件位於 /etc/emqx/plugins/emqx_backend_opentsdb.conf,考慮到功能定位,OpenTSDB 插件僅支持消息存儲功能。更多 backend 插件詳見 EMQ X 數據持久化數據庫

配置鏈接地址與鏈接池大小、batch 策略:json

## OpenTSDB Server 接入地址
backend.opentsdb.pool1.server = 127.0.0.1:4242

## 鏈接池大小
backend.opentsdb.pool1.pool_size = 8


## Max batch size of put 最大批量寫條數
backend.opentsdb.pool1.max_batch_size = 20

## 經過 topic 過濾器存儲所有消息
backend.opentsdb.hook.message.publish.1 = {"topic": "#", "action": {"function": "on_message_publish"}, "pool": "pool1"}

**OpenTSDB Backend 消息存儲規則參數: **bash

經過 topic 過濾器,設置須要存儲消息的主題,pool 參數區別多個數據源:服務器

## Store Publish Message
backend.opentsdb.hook.message.publish.1 = {"topic": "#", "action": {"function": "on_message_publish"}, "pool": "pool1"}

啓動該插件:數據結構

./bin/emqx_ctl plugins load emqx_backend_opentsdb

消息模板

因爲 MQTT Message 沒法直接寫入 OpenTSDB, OpenTSDB Backend 提供了 emqx_backend_opentsdb.tmpl 模板文件將 MQTT Message 轉換爲可寫入 OpenTSDB 的 DataPoint。分佈式

消息模板功能須要重啓 EMQ X 才能應用更改。

tmpl 文件位於 data/templates/emqx_backend_opentsdb_example.tmpl,使用 json 格式, 用戶能夠爲不一樣 Topic 定義不一樣的 Template, 相似:

{
    "sample": {
        "measurement": "$topic",
        "tags": {
            "host": ["$payload", "data", "$0", "host"],
            "region": ["$payload", "data", "$0", "region"],
            "qos": "$qos",
            "from": "$from"
        },
        "value": ["$payload", "data", "$0", "temp"],
        "timestamp": "$timestamp"
    }
}

其中, measurement 與 fields 爲必選項, tags 與 timestamp 爲可選項。<Where is value of> 支持經過佔位符如 $key 提取變量名爲 key 的變量,支持的變量以下:

  • qos: 消息 QoS
  • form: 發佈者信息
  • topic: 發佈主題
  • timestamp: 時間戳
  • payload.*: JSON 消息體內任意變量,如 { "data": [{ "temp": 1 }] } 使用 ["$payload", "data", "temp"] 能夠提取出 1

本示例設定模板以下:

{
    "sample": {
        "measurement": "$topic",
        "tags": {
            "host": ["$payload", "data", "$0", "host"],
            "region": ["$payload", "data", "$0", "region"],
            "qos": "$qos",
            "from": "$from"
        },
        "value": ["$payload", "data", "$0", "temp"],
        "timestamp": "$timestamp"
    }
}

當 Topic 爲」sample」 的 MQTT Message 擁有如下 Payload 時:

{
  "data": [
    {
      "temp": 1,
      "host": "serverA",
      "region": "hangzhou"
    },
    {
      "temp": 2,
      "host": "serverB",
      "region": "ningbo"
    }
  ]
}

Backend 會將 MQTT Message 轉換爲:

[
  {
    "measurement": "sample",
    "tags": {
      "from": "mqttjs_ebcc36079a",
      "host": "serverA",
      "qos": "0",
      "region": "hangzhou"
    },
    "value": "1",
    "timestamp": "1560743513626681000"
  },
  {
    "measurement": "sample",
    "tags": {
      "from": "mqttjs_ebcc36079a",
      "host": "serverB",
      "qos": "0",
      "region": "ningbo"
    },
    "value": "2",
    "timestamp": "1560743513626681000"
  }
]

使用示例

EMQ X 管理控制檯 WebSocket 頁面中,向 sample 主題發佈如上格式消息消息,消息將解析存儲到 OpenTSDB udp 數據庫對應的 measurement 中。

總結

讀者在理解了 OpenTSDB 中所存儲的數據結構,學習使用消息模板配置寫入消息字段格式後能夠結合 OpenTSDB 拓展相關應用。


更多信息請訪問咱們的官網 emqx.io,或關注咱們的開源項目 github.com/emqx/emqx ,詳細文檔請訪問 官方文檔

相關文章
相關標籤/搜索