EMQ X + ClickHouse 實現物聯網數據接入與分析

物聯網數據採集涉及到大量設備接入、海量的數據傳輸,EMQ X 物聯網消息中間件 與 ClickHouse 聯機分析 (OLAP) 數據庫的組合技術棧徹底可以勝任物聯網數據採集傳輸與存儲、分析處理業務。javascript

數據入庫後,每每須要其餘方式如數據可視化系統將數據按照規則統計、展示出來,實現數據的監控、指標統計等業務需求,以便充分發揮數據的價值,ClickHouse 搭配開源軟件 Grafana 能夠快速搭建物聯網數據分析可視化平臺。html

上述整套方案無需代碼開發,涉及的產品均能提供開源軟件、企業服務、雲端 SaaS 服務不一樣層次的交付模式,可以根據項目需求實現免費版或企業版私有化落地以及雲端部署。java

image-20200916112653512

方案介紹

EMQ X 簡介

EMQ X 是基於高併發的 Erlang/OTP 語言平臺開發,支持百萬級鏈接和分佈式集羣架構,發佈訂閱模式的開源 MQTT 消息服務器。EMQ X 內置了大量開箱即用的功能,其企業版 EMQ X Enterprise 支持經過規則引擎將物聯網消息數據存儲到 ClickHouse。node

ClickHouse 簡介

ClickHouse 是一個用於數據分析(OLAP)的列式數據庫管理系統(column-oriented DBMS),由俄羅斯搜索巨頭 Yandex 公司開源。目前國內很多大廠在使用,包括騰訊、今日頭條、攜程、快手、虎牙等,集羣規模多達數千節點。sql

  • 今日頭條 內部用 ClickHouse 來作用戶行爲分析,內部一共幾千個 ClickHouse 節點,單集羣最大 1200 節點,日增原始數據 300TB 左右。
  • 騰訊 內部用 ClickHouse 作遊戲數據分析,而且爲之創建了一整套監控運維體系。
  • 攜程 內部從 18 年 7 月份開始接入試用,目前 80% 的業務都跑在 ClickHouse 上。天天數據增量十多億,近百萬次查詢請求。
  • 快手 內部也在使用 ClickHouse,存儲總量大約 10PB, 天天新增 200TB, 90% 查詢小於 3S。

在國外,Yandex 內部有數百節點用於作用戶點擊行爲分析,優步、CloudFlare、Spotify 等頭部公司也在使用,更多用戶列表見 ClickHouse 官網-用戶列表docker

Grafana 簡介

Grafana 是一個跨平臺、開源的度量分析和可視化工具,能夠查詢處理各種數據源中的數據,進行可視化的展現。它能夠快速靈活建立的客戶端圖表,面板插件有許多不一樣方式的可視化指標和日誌,官方庫中具備豐富的儀表盤插件,好比熱圖、折線圖、圖表等多種展現方式;支持 InfluxDB, OpenTSDB, Prometheus, Elasticsearch, CloudWatch 和 KairosDB 等數據源,支持數據項獨立/混合查詢展現;能夠建立自定義告警規則並通知到其餘消息處理服務或組件中。數據庫

Grafana 4.6+ 版本支持經過插件的形式安裝 Clickhouse 數據源,使用前須要在 Grafana 上額外安裝 ClickHouse 插件。macos

業務場景

本文模擬物聯網環境數據採集場景,假設現有必定數據的環境數據採集點,全部採集點數據均經過 MQTT 協議 傳輸至採集平臺(MQTT Publish),主題設計以下:npm

sensor/data

傳感器發送的數據格式爲 JSON,數據包括傳感器採集的溫度、溼度、噪聲音量、PM十、PM2.五、二氧化硫、二氧化氮、一氧化碳、傳感器 ID、區域、採集時間等數據。json

{
    "temperature": 30,
    "humidity" : 20,
    "volume": 44.5,
    "PM10": 23,
    "pm25": 61,
    "SO2": 14,
    "NO2": 4,
    "CO": 5,
    "id": "10-c6-1f-1a-1f-47",
    "area": 1,
    "ts": 1596157444170
}

如今須要實時存儲以便在後續任意時間查看數據,提出如下的需求:

  • 每一個設備按照每 5 秒鐘一次的頻率進行數據上報,數據庫需存儲每條數據以供後續回溯分析;
  • 經過 ClickHouse 存儲原始數據,配合 Grafana 進行數據分析並可視化展現。

環境準備

本文所用各個組件均有 Docker 鏡像能夠快速搭建運行,爲方便開發,Grafana 使用 Docker 搭建,ClickHouse 使用文檔推薦方式安裝,EMQ X 採用安裝包或在線雲服務的形式集成使用。

相關資源與使用教程參照各自官網:

安裝 EMQ X

方式一:使用 EMQ X Cloud

EMQ 提供了 全託管的物聯網 MQTT 雲服務 - EMQ X Cloud,在 EMQ X Cloud 上,用戶僅需數分鐘便可建立高可用、獨享實例的 EMQ X 集羣,當即開始原型設計與應用開發而無需關注後續的運維工做。產品上線後,集羣可進行不停機擴容以應對業務增加帶來的容量擴張,保證可用性的同時最大化節省使用成本。

EMQ X Cloud 爲新註冊用戶提供 6 個月時長的免費試用,註冊帳號並登陸建立試用部署後,點擊部署詳情中的 EMQ X Dashboard 便可打開 EMQ X 管理控制檯。

使用 EMQ X Cloud 須要保證 ClickHouse 可以被經過公網地址訪問。

image-20200915150048492

方式二:私有部署安裝

若是您是 EMQ X 新手用戶,推薦經過 EMQ X 文檔 快速上手

訪問 EMQ 下載 頁面下載適合您操做系統的安裝包,本文截稿時 EMQ X 企業版本爲 v4.1.2,下載 zip 包的啓動步驟以下 :

## 解壓下載好的安裝包
unzip emqx-macosx-v4.1.2.zip
cd emqx

## 以 console 模式啓動 EMQ X 方便調試
./bin/emqx console

啓動成功後瀏覽器訪問 http://127.0.0.1:18083 訪問 EMQ X 管理控制檯 Dashboard,使用 admin public 默認用戶名密碼完成初次登陸。

安裝 ClickHouse

使用 ClickHouse 文檔 推薦的安裝方式安裝,本文僅作 Demo 演示,採用華爲雲 2 核 4GB 規格的雲服務器進行安裝使用:

sudo yum install yum-utils
sudo rpm --import https://repo.clickhouse.tech/CLICKHOUSE-KEY.GPG
sudo yum-config-manager --add-repo https://repo.clickhouse.tech/rpm/clickhouse.repo
sudo yum install clickhouse-server clickhouse-client

sudo /etc/init.d/clickhouse-server start
clickhouse-client

默認狀況下 ClickHouse 只監聽本地端口,若是須要遠程訪問須要修改配置文件

<!-- /etc/clickhouse-server/config.xml -->
<!-- 找到這一行,取消註釋 <listen_host>::</listen_host> 並修改成 -->
<listen_host>0.0.0.0</listen_host>

從新啓動:

service clickhouse-server restart

Grafana 安裝

使用如下命令經過 Docker 安裝並啓動 Grafana:

docker run -d --name=grafana -p 3000:3000 grafana/grafana

啓動成功後瀏覽器訪問 http://127.0.0.1:3000 訪問 Grafana 可視化面板,使用 admin admin 默認用戶名密碼完成初次登陸,登陸後按照提示修改密碼使用新密碼登陸進入主界面。

配置 EMQ X 存儲數據到 ClickHouse

EMQ X 企業版支持經過規則引擎將設備事件與消息數據寫入到各種數據庫與消息中間件中(包括 ClickHouse),參考 文檔

ClickHouse 建立數據庫與數據表

啓動 ClickHouse 並進入命令行:

sudo /etc/init.d/clickhouse-server start
clickhouse-client

建立 test 數據庫:

create database test;
use test;

建立 sensor_data 表,ClickHouse SQL 語法與常規關係數據庫有所差異,具體請參考 ClickHouse 文檔-SQL語法

Grafana 時序顯示時須要添加 DataTime 列與 Date 列
CREATE TABLE sensor_data (
     temperature Float32,
  humidity Float32,
  volume Float32,
  PM10 Float32,
  pm25 Float32,
  SO2 Float32,
  NO2 Float32,
  CO Float32,
  sensor_id String, 
  area Int16,
  coll_time DateTime,
  coll_date Date
) engine = Log;

-- ClickHouse 命令行中不支持建表語句換行,選用如下 SQL 執行:
CREATE TABLE sensor_data( temperature Float32, humidity Float32, volume Float32, PM10 Float32, pm25 Float32, SO2 Float32, NO2 Float32, CO Float32, sensor_id String, area Int16, coll_time DateTime, coll_date Date) engine = Log;

配置 EMQ X 規則引擎

打開 EMQ X Dashboared,進入 規則引擎 -> 規則 頁面,點擊 建立 按鈕進入建立頁面。

規則 SQL

規則 SQL 用於 EMQ X 消息以及事件篩選,如下 SQL 表示從 sensor/data 主題篩選出 payload 數據:

SELECT
  payload
FROM
  "sensor/data"

使用 SQL 測試功能,輸入測試數據進行篩選結果測試,測試有結果且輸出內容以下,代表 SQL 編寫正確:

測試數據(設備實際上報的數據):

{
    "temperature": 30,
    "humidity" : 20,
    "volume": 44.5,
    "PM10": 23,
    "pm25": 61,
    "SO2": 14,
    "NO2": 4,
    "CO": 5,
    "id": "10-c6-1f-1a-1f-47",
    "area": 1,
    "ts": 1596157444170
}

測試輸出:

{
  "payload": "{\"temperature\":30,\"humidity\":20,\"volume\":44.5,\"PM10\":23,\"pm25\":61,\"SO2\":14,\"NO2\":4,\"CO\":5,\"id\":\"10-c6-1f-1a-1f-47\",\"area\":1,\"ts\":1596157444170}"
}

image-20200915163114173

響應動做

使用 EMQ X 企業版與 EMQ X Cloud 均支持經過規則引擎寫入數據到 ClickHouse,

配置響應動做須要兩個數據,一個是關聯資源,另外一個是 SQL 模板。

  • 關聯資源:建立一個 ClickHouse 資源,配置鏈接參數
  • SQL 模板:此處爲攜帶數據的 INSERT SQL,注意咱們應當在 SQL 中指定數據庫名
INSERT INTO test.sensor_data VALUES(
  ${payload.temperature},
  ${payload.humidity},
  ${payload.volume},
  ${payload.PM10},
  ${payload.pm25},
  ${payload.SO2},
  ${payload.NO2},
  ${payload.CO},
  '${payload.id}',
  ${payload.area},
  ${payload.ts}/1000,
  ${payload.ts}/1000
)

建立過程

點擊響應動做下的添加按鈕,在彈出框內選擇 保存數據到 ClickHouse,點擊 新建資源 新建一個 ClickHouse 資源。

資源類型選擇 ClickHouse,填入資源名稱,服務器地址與認證信息便可:

image-20200915164110500

在響應動做建立頁面選擇新建的資源,並填入 SQL 模板便可。

image-20200915163932584

生成模擬數據

如下腳本模擬了 10 個設備在過去 24 小時內、每隔 5 秒鐘上報一條模擬數據併發送到 EMQ X 的場景。

讀者安裝 Node.js ,按需修改配置參數後能夠經過如下命令啓動:

npm install mqtt mockjs --save --registry=https://registry.npm.taobao.org
node mock.js

附:模擬生成數據併發送到 EMQ X 代碼,請根據集羣性能調整相關參數

// mock.js
const mqtt = require('mqtt')
const Mock = require('mockjs')

const EMQX_SERVER = 'mqtt://localhost:1883'
const CLIENT_NUM = 10
const STEP = 5000 // 模擬採集時間間隔 ms
const AWAIT = 500 // 每次發送完後休眠時間,防止消息速率過快 ms
const CLIENT_POOL = []

startMock()


function sleep(timer = 100) {
  return new Promise(resolve => {
    setTimeout(resolve, timer)
  })
}

async function startMock() {
  const now = Date.now()
  for (let i = 0; i < CLIENT_NUM; i++) {
    const client = await createClient(`mock_client_${i}`)
    CLIENT_POOL.push(client)
  }
  // last 24h every 5s
  const last = 24 * 3600 * 1000
  for (let ts = now - last; ts <= now; ts += STEP) {
    for (const client of CLIENT_POOL) {
      const mockData = generateMockData()
      const data = {
        ...mockData,
        id: client.options.clientId,
        ts,
      }
      client.publish('sensor/data', JSON.stringify(data))
    }
    const dateStr = new Date(ts).toLocaleTimeString()
    console.log(`${dateStr} send success.`)
    await sleep(AWAIT)
  }
  console.log(`Done, use ${(Date.now() - now) / 1000}s`)
}

/**
 * Init a virtual mqtt client
 * @param {string} clientId ClientID
 */
function createClient(clientId) {
  return new Promise((resolve, reject) => {
    const client = mqtt.connect(EMQX_SERVER, {
      clientId,
    })
    client.on('connect', () => {
      console.log(`client ${clientId} connected`)
      resolve(client)
    })
    client.on('reconnect', () => {
      console.log('reconnect')
    })
    client.on('error', (e) => {
      console.error(e)
      reject(e)
    })
  })
}

/**
* Generate mock data
*/
function generateMockData() {
 return {
   "temperature": parseFloat(Mock.Random.float(22, 100).toFixed(2)),
   "humidity": parseFloat(Mock.Random.float(12, 86).toFixed(2)),
   "volume": parseFloat(Mock.Random.float(20, 200).toFixed(2)),
   "PM10": parseFloat(Mock.Random.float(0, 300).toFixed(2)),
   "pm25": parseFloat(Mock.Random.float(0, 300).toFixed(2)),
   "SO2": parseFloat(Mock.Random.float(0, 50).toFixed(2)),
   "NO2": parseFloat(Mock.Random.float(0, 50).toFixed(2)),
   "CO": parseFloat(Mock.Random.float(0, 50).toFixed(2)),
   "area": Mock.Random.integer(0, 100),
 }
}

可視化配置

組件安裝完成,模擬數據寫入成功後,按照 Grafana 可視化界面的操做指引,完成業務所需數據可視化配置。

首選須要安裝 Grafana ClickHouse 數據源插件:查看插件安裝步驟

添加數據源 (Add data source)

添加數據源,即顯示的數據源信息。選取 ClickHouse 類型數據源,輸入鏈接參數進行配置,默認狀況下,關鍵配置信息以下:

image-20200916110233266

添加儀表盤 (New Dashboard)

添加好數據源後,添加須要顯示的數據儀表盤信息。儀表盤爲多個可視化面板的集合,點擊 New Dashboard 後,選擇 + Query 經過查詢來添加數據面板。

平均值面板

使用 Grafana 的可視化查詢構建工具,查詢出全部設備的平均值。

ClickHouse 插件生成 SQL 時自動填充了一些變量,Grafana 查詢時能夠識別這些變量:

  • $timeSeries:指定的 DateTime 列以及一些轉換邏輯,以確保數據採用 Grafana 能夠在顯示中使用的格式
  • $table: 數據庫表名
  • $timeFilter:自動生成的時間序列過濾條件

咱們按照須要,新增兩個 AVG 處理後的字段便可:

SELECT
    $timeSeries as t,
    avg(temperature) as temperature,
    avg(humidity) as humidity
FROM $table

WHERE $timeFilter

GROUP BY t

ORDER BY t

對於折線圖等帶有時間序列的圖表,Grafana 須要一個 DateTime 列來選擇時間序列。咱們必須輸入時間序列,而且該列必須是 DateTime 或 Timestamp 數據類型。

點擊下圖紅框中的 編輯 按鈕,進入表名、時間列配置:

image-20200916110544930

選擇數據庫、數據表,若是數據表內有 DateTime 與 Date 字段,能夠在 Column:DateTime 與 Column:Date 中識別選擇出來。

  • Column:Date:用於 Grafana 拖拽時間範圍的時候過濾數據
  • Column:DateTime:用於時序顯示時做爲時間數據

<img src="https://static.emqx.net/images/07b9a092530b50bfa314447a189f8d4b.png" alt="image-20200916111101870" style="zoom:67%;" />

完成後再次點擊編輯按鈕,點擊圖標右上角選擇一個時間範圍,確保時間範圍內有數據,點擊 刷新 圖標刷新一下數據,便可看到渲染出來的平均值面板。

image-20200916111420196

完成建立後,點擊左上角返回按鈕,該 Dashboard 裏成功添加一個數據面板。點擊頂部導航欄保存圖標,輸入 Dashboard 名稱完成 Dashboard 的建立。

最大值面板

繼續點擊 Dashboard 的 Add panel 按鈕,添加最大值、最小值圖表。操做步驟同添加平均值,僅對查詢中 SELECT 統計方法字段作出調整,調整爲 AVG 函數爲 MAX

SELECT
    $timeSeries as t,
    max(temperature) as temperature,
    max(humidity) as humidity
FROM $table

WHERE $timeFilter

GROUP BY t

ORDER BY t

儀表盤效果

保存儀表盤,拖拽調整每一個數據面板大小、位置,最終獲得一個視覺效果較好的數據儀表盤。儀表盤右上角能夠選擇時間區間、自動刷新時間,此時設備持續發送數據採集數據,儀表盤數據值會有所變更,實現了比較好的可視化效果。

image-20200916112334081

總結

至此咱們藉助 EMQ X + ClickHouse 完成了物聯網數據傳輸、存儲、分析展示整個流程的系統搭建,讀者能夠了解到 EMQ X 豐富的拓展能力與 ClickHouse 領先的數據處理分析能力在物聯網數據採集中的應用。深刻學習掌握 Grafana 的其餘功能後,用戶能夠定製出更完善的數據可視化分析乃至監控告警系統。

版權聲明: 本文爲 EMQ 原創,轉載請註明出處。

原文連接:https://www.emqx.io/cn/blog/e...

相關文章
相關標籤/搜索