EMQ X+TDengine 搭建 MQTT 物聯網可視化平臺

物聯網數據採集涉及到大量設備接入、海量的時序數據傳輸,EMQ X MQTT 服務器 與 TDengine 大數據平臺的組合技術棧徹底可以勝任場景中的海量時間序列監測數據的傳輸、存儲和計算。javascript

數據入庫後,每每須要其餘方式如數據可視化系統將數據按照規則統計、展示出來,實現數據的監控、指標統計等業務需求,以便充分發揮數據的價值,TDengine 搭配開源軟件 Grafana 能夠快速搭建物聯網數據可視化平臺。java

上述整套方案無需代碼開發,涉及的產品均能提供開源軟件、企業服務、雲端 SaaS 服務不一樣層次的交付模式,可以根據項目需求實現免費版或企業版私有化落地以及雲端部署。node

image20200804111913555.png

方案介紹

EMQ X 簡介

EMQ X 是基於高併發的 Erlang/OTP 語言平臺開發,支持百萬級鏈接和分佈式集羣架構,發佈訂閱模式的開源 MQTT 消息服務器。EMQ X 內置了大量開箱即用的功能,其 開源版 EMQ X Broker企業版 EMQ X Enterprise 均支持經過規則引擎將設備消息存儲到 TDengine。sql

TDengine 是什麼

TDengine 是濤思數據專爲物聯網、車聯網、工業互聯網、IT 運維等設計和優化的大數據平臺。除核心的快 10 倍以上的時序數據庫功能外,還提供緩存、數據訂閱、流式計算等功能,最大程度減小研發和運維的複雜度,且核心代碼,包括集羣功能所有開源。docker

TDengine 提供社區版、企業版和雲服務版,安裝/使用教程詳見 TDengine 使用文檔數據庫

Grafana 簡介

Grafana 是一個跨平臺、開源的度量分析和可視化工具,能夠查詢處理各種數據源中的數據,進行可視化的展現。它能夠快速靈活建立的客戶端圖表,面板插件有許多不一樣方式的可視化指標和日誌,官方庫中具備豐富的儀表盤插件,好比熱圖、折線圖、圖表等多種展現方式;支持 Graphite,TDengine、InfluxDB,OpenTSDB,Prometheus,Elasticsearch,CloudWatch和 KairosDB 等數據源,支持數據項獨立/混合查詢展現;能夠建立自定義告警規則並通知到其餘消息處理服務或組件中。macos

業務場景

本文模擬物聯網環境數據採集場景,假設現有必定數據的環境數據採集點,全部採集點數據均經過 MQTT 協議 傳輸至採集平臺(MQTT Publish),主題設計以下:npm

sensor/data

傳感器發送的數據格式爲 JSON,數據包括傳感器採集的溫度、溼度、噪聲音量、PM十、PM2.五、二氧化硫、二氧化氮、一氧化碳、傳感器 ID、區域、採集時間等數據。json

{
    "temperature": 30,
    "humidity" : 20,
    "volume": 44.5,
    "PM10": 23,
    "pm25": 61,
    "SO2": 14,
    "NO2": 4,
    "CO": 5,
    "id": "10-c6-1f-1a-1f-47",
    "area": 1,
    "ts": 1596157444170
}

如今須要實時存儲以便在後續任意時間查看數據,提出如下的需求:瀏覽器

  • 每一個設備按照每 5 秒鐘一次的頻率進行數據上報,數據庫需存儲每條數據以供後續回溯分析;
  • 經過可視化系統查看 任意區域、任意時間區間內 的指標數據,如平均值、最大值、最小值。

環境準備

本文所用各個組件均有 Docker 鏡像,除 EMQ X 須要修改少數配置爲了便於操做使用下載安裝外,TDengine 與 Grafana 均使用 Docker 搭建。

安裝包資源與使用教程參照各自官網:

安裝 EMQ X

若是您是 EMQ X 新手用戶,推薦經過 EMQ X 文檔 快速上手

訪問 EMQ X 下載 頁面下載適合您操做系統的安裝包,本文截稿時 EMQ X 開源版最新版本爲 v4.1.2,下載 zip 包的啓動步驟以下 :

## 解壓下載好的安裝包
unzip emqx-macosx-v4.1.1.zip
cd emqx

## 以 console 模式啓動 EMQ X 方便調試
./bin/emqx console

啓動成功後瀏覽器訪問 http://127.0.0.1:18083 訪問 EMQ X 管理控制檯 Dashboard,使用 admin public 默認用戶名密碼完成初次登陸。

安裝 TDengine

爲了方便測試使用經過 Docker 進行安裝(需映射網絡端口),也可使用安裝包的方式進行安裝:

## 拉取並啓動容器
docker run -d --name tdengine -p 6030-6041:6030-6041 tdengine/tdengine:latest

## 啓動後檢查容器運行狀態
docker ps -a

安裝 Grafana

使用如下命令經過 Docker 安裝並啓動 Grafana:

docker run -d --name=grafana -p 3000:3000 grafana/grafana

啓動成功後瀏覽器訪問 http://127.0.0.1:3000 訪問 Grafana 可視化面板,使用 admin admin 默認用戶名密碼完成初次登陸,登陸後按照提示修改密碼使用新密碼登陸進入主界面:

配置 EMQ X 存儲數據到 TDengine

TDengine 建立數據庫與數據表

進入TDengine Docker 容器:

docker exec -it tdengine bash

建立 test 數據庫:

taos
create database test;

建立 sensor_data 表,關於 TDengine 數據結構以及 SQL 命令參見 TAOS SQL

use test;
CREATE TABLE sensor_data (
  ts timestamp,
     temperature float,
  humidity float,
  volume float,
  PM10 float,
  pm25 float,
  SO2 float,
  NO2 float,
  CO float,
  sensor_id NCHAR(255), 
  area TINYINT,
  coll_time timestamp
);

配置 EMQ X 規則引擎

打開 EMQ X Dashboared,進入 規則引擎 -> 規則 頁面,點擊 建立 按鈕進入建立頁面。

規則 SQL

規則 SQL 用於 EMQ X 消息以及事件篩選,如下 SQL 表示從 sensor/data 主題篩選出 payload 數據:

SELECT
  payload
FROM
  "sensor/data"

使用 SQL 測試功能 ,輸入測試數據進行篩選結果測試,測試有結果且輸出內容以下,標明 SQL 編寫正確:

{
  "payload": "{\"temperature\":30,\"humidity\":20,\"volume\":44.5,\"PM10\":23,\"pm2.5\":61,\"SO2\":14,\"NO2\":4,\"CO\":5,\"id\":\"10-c6-1f-1a-1f-47\",\"area\":1,\"ts\":1596157444170}"
}

image20200731104046137.png

響應動做

爲支持各類不一樣類型平臺的開發,TDengine 提供符合 REST 設計標準的 API。經過 RESTful Connector 提供了最簡單的鏈接方式,即便用 HTTP 請求攜帶認證信息與要執行的 SQL 操做 TDengine。

使用 EMQ X 開源版中的 發送到 Web 服務 便可經過 RESTful Connector 寫入數據到 TDengine。即將到來的 EMQ X 企業版 4.1.1 版本將提供原生更高性能的寫入 Connector。

發送到 Web 服務須要兩個數據,一個是關聯資源,另外一個是消息內容模板。

  • 關聯資源:HTTP 服務器配置信息,此處爲 TDengine 的 RESTful Connector
  • 消息內容模板:此處爲攜帶數據的 INSERT SQL,注意咱們應當在 SQL 中指定數據庫名,字符類型也要用單引號括起來, 消息內容模板爲:
INSERT INTO test.sensor_data VALUES(
  now,
  ${payload.temperature},
  ${payload.humidity},
  ${payload.volume},
  ${payload.PM10},
  ${payload.pm25},
  ${payload.SO2},
  ${payload.NO2},
  ${payload.CO},
  '${payload.id}',
  ${payload.area},
  ${payload.ts}
)

image20200731145609393.png

建立過程

點擊響應動做下的 添加 按鈕,在彈出框內選擇 發送數據到 Web 服務,點擊 新建資源 新建一個 WebHook 資源。

image20200731104403456.png

資源類型選擇 Webhook,請求 URL 填寫 http://127.0.0.1:6041/rest/sql,請求方法選擇 POST, 還需添加 Authorization 請求頭做爲認證信息

Authorization 的值爲 Basic + TDengine 的 {username}:{password} 通過 Base64 編碼以後的字符串, 例如 root:taosdata 編碼後爲 cm9vdDp0YW9zZGF0YQ==,實際填入的值爲:Basic cm9vdDp0YW9zZGF0YQ==

在響應動做建立頁面選擇新建的資源,並填入消息模板內容便可。

image20200804110459517.png

生成模擬數據

如下腳本模擬了 10000 個設備在過去 24 小時內、每隔 5 秒鐘上報一條模擬數據併發送到 EMQ X 的場景。

  • 總數據量: 24 3600 / 5 10000 = 1.72 億條
  • 消息 TPS: 2000

讀者安裝 Node.js ,按需修改配置參數後能夠經過如下命令啓動:

npm install mqtt mockjs --save --registry=https://registry.npm.taobao.org
node mock.js

附:模擬生成數據併發送到 EMQ X 代碼,請根據集羣性能調整相關參數

// mock.js
const mqtt = require('mqtt')
const Mock = require('mockjs')

const EMQX_SERVER = 'mqtt://localhost:1883'
const CLIENT_NUM = 10000
const STEP = 5000 // 模擬採集時間間隔 ms
const AWAIT = 5000 // 每次發送完後休眠時間,防止消息速率過快 ms
const CLIENT_POOL = []

startMock()


function sleep(timer = 100) {
  return new Promise(resolve => {
    setTimeout(resolve, timer)
  })
}

async function startMock() {
  const now = Date.now()
  for (let i = 0; i < CLIENT_NUM; i++) {
    const client = await createClient(`mock_client_${i}`)
    CLIENT_POOL.push(client)
  }
  // last 24h every 5s
  const last = 24 * 3600 * 1000
  for (let ts = now - last; ts <= now; ts += STEP) {
    for (const client of CLIENT_POOL) {
      const mockData = generateMockData()
      const data = {
        ...mockData,
        id: client.clientId,
        area: 0,
        ts,
      }
      client.publish('sensor/data', JSON.stringify(data))
    }
    const dateStr = new Date(ts).toLocaleTimeString()
    console.log(`${dateStr} send success.`)
    await sleep(AWAIT)
  }
  console.log(`Done, use ${(Date.now() - now) / 1000}s`)
}

/**
 * Init a virtual mqtt client
 * @param {string} clientId ClientID
 */
function createClient(clientId) {
  return new Promise((resolve, reject) => {
    const client = mqtt.connect(EMQX_SERVER, {
      clientId,
    })
    client.on('connect', () => {
      console.log(`client ${clientId} connected`)
      resolve(client)
    })
    client.on('reconnect', () => {
      console.log('reconnect')
    })
    client.on('error', (e) => {
      console.error(e)
      reject(e)
    })
  })
}

/**
* Generate mock data
*/
function generateMockData() {
 return {
   "temperature": parseFloat(Mock.Random.float(22, 100).toFixed(2)),
   "humidity": parseFloat(Mock.Random.float(12, 86).toFixed(2)),
   "volume": parseFloat(Mock.Random.float(20, 200).toFixed(2)),
   "PM10": parseFloat(Mock.Random.float(0, 300).toFixed(2)),
   "pm25": parseFloat(Mock.Random.float(0, 300).toFixed(2)),
   "SO2": parseFloat(Mock.Random.float(0, 50).toFixed(2)),
   "NO2": parseFloat(Mock.Random.float(0, 50).toFixed(2)),
   "CO": parseFloat(Mock.Random.float(0, 50).toFixed(2)),
   "area": Mock.Random.integer(0, 20),
   "ts": 1596157444170,
 }
}

可視化配置

組件安裝完成,模擬數據寫入成功後,按照 Grafana 可視化界面的操做指引,完成業務所需數據可視化配置。

添加數據源(Add data source)

添加數據源,即顯示的數據源信息。選取 TDengine 類型數據源,輸入鏈接參數進行配置,默認狀況下,關鍵配置信息以下:

image20200804110612868.png

添加儀表盤(New Dashboard)

添加好數據源後,添加須要顯示的數據儀表盤信息。儀表盤爲多個可視化面板的集合,點擊 New Dashboard 後,選擇 + Query 經過查詢來添加數據面板。

建立面板須要四個步驟,分別是 Queries(查詢)Visualization(可視化)General(圖表配置)Alert(告警) ,建立時間

平均值面板

使用 Grafana 的可視化查詢構建工具,查詢出全部設備的平均值。

如下 SQL 按照指定時間段(&dollar;form &dollar;to)、指定時間間隔(&dollar;interval),查詢出數據中關鍵指標的平均值:

select avg(temperature), avg(humidity), avg(volume), avg(PM10), avg(pm25), avg(SO2), avg(NO2), avg(CO)  from test.sensor_data where coll_time >= $from and coll_time < $to interval($interval)

Visualization 默認不作更改, General 裏面修改面板名稱爲 歷史平均值,若是須要對業務進行監控告警,能夠在 Alert 裏編排告警規則,此處僅作可視化展現,不使用此功能。

image20200803091833280.png

完成建立後,點擊左上角返回按鈕,該 Dashboard 裏成功添加一個數據面板。點擊頂部導航欄 保存 圖標,輸入 Dashboard 名稱完成 Dashboard 的建立。

最大值、最小值面板

繼續點擊 Dashboard 的 Add panel 按鈕,添加最大值、最小值圖表。操做步驟同添加平均值,僅對查詢中 SELECT 統計方法字段作出調整,調整爲 AVG 函數爲 MAXMIN

select max(temperature), max(humidity), max(volume), max(PM10), max(pm25), max(SO2), max(NO2), max(CO), min(temperature), min(humidity), min(volume), min(PM10), min(pm25), min(SO2), min(NO2), min(CO)  from test.sensor_data where coll_time >= $from and coll_time < $to interval($interval)

image20200803093314019.png

儀表盤效果

保存儀表盤,拖拽調整每一個數據面板大小、位置,最終獲得一個視覺效果較好的數據儀表盤。儀表盤右上角能夠選擇時間區間、自動刷新時間,此時設備持續發送數據採集數據,儀表盤數據值會有所變更,實現了比較好的可視化效果。

總結

至此咱們藉助 EMQ X + TDengine 完成了物聯網數據傳輸、存儲、展示整個流程的系統搭建,讀者能夠了解到 EMQ X 豐富的拓展能力與 TDengine 完備的大數據平臺特性在物聯網數據採集中的應用。深刻學習掌握 Grafana 的其餘功能後,用戶能夠定製出更完善的數據可視化乃至監控告警系統。

image20200803093438116.png

版權聲明: 本文爲 EMQ 原創,轉載請註明出處。

原文連接:https://www.emqx.io/cn/blog/e...

相關文章
相關標籤/搜索