TDengine 是濤思數據(北京濤思數據科技有限公司)推出的一款開源的專爲物聯網、車聯網、工業互聯網、IT 運維等設計和優化的大數據平臺。除核心的快 10 倍以上的時序數據庫功能外,還提供緩存、數據訂閱、流式計算等功能,最大程度減小研發和運維的複雜度。ios
TDengine 做爲時序處理引擎,能夠徹底不用 Kafka、HDFS/HBase/Spark、Redis 等軟件,大幅簡化大數據平臺的設計,下降研發成本和運營成本。由於須要集成的開源組件少,於是系統能夠更加健壯,也更容易保證數據的一致性。git
TDEngine 提供社區版、企業版和雲服務版,安裝/使用教程詳見 TDEngine 使用文檔 https://www.taosdata.com/cn/products/github
本文以經過 MQTT 協議接入 EMQ X 的智能門鎖爲例進行說明。sql
智能門鎖已經成爲了智能家居的重點關注產品,爲了保證用戶更安全的開鎖體驗,智能門鎖一般能夠實現指紋開鎖、密碼開鎖、IC卡開鎖、鑰匙開鎖、遠程開鎖等功能。智能門鎖每一個業務環節都涉及到操做敏感指令和狀態數據的發送、傳輸,這些數據在應當存儲起來以備後續審計使用。數據庫
智能門鎖下發指令與上報數據經過 MQTT 協議經 EMQ X 傳輸,可選在 EMQ X 上使用規則引擎篩選或設置消費客戶端處理,將知足條件的數據寫入 TDEngine 數據平臺,整個數據流轉流程以下:npm
該場景中擬設智能門鎖經過 lock/:id/control_receipt
主題( id 爲門鎖鏈接客戶端的 clientid,同門鎖 id) 上報操做回執與狀態信息,數據格式爲以下 JSON 消息:json
{ "id": "51dc0c50f55d11e9a4fec59e26b058d5", // 門鎖 id "longitude": 102.8622543, // 當前位置經度 "latitude": 24.8614503, // 當前位置緯度 "command": "unlock", // 指令 "LockState": 0, // 門鎖狀態 "LockType": 0, // 開鎖方式 "KeyNickName": "", // 鑰匙暱稱 "KeyID": "c944c8d0f55e11e9a4fec59e26b058d5", // 鑰匙 ID "ErrorCode": 0, // 執行故障代碼 "pid": "84a2e10f55d11e9a4fec59e26b058d5", // 下發的指令 ID "alarm": "", // 當前告警信息 "ts": 1570838400000 // 執行時間 }
儘管 TDEngine 是關係型數據庫模型,但要求每一個採集設備單獨建表,所以咱們按照門鎖 id 每一個門鎖建表一張,同時浮點數據壓縮比相對整型數據壓縮比不好,經度緯度一般精確到小數點後 7 位,所以將經度緯度增大 1E7 倍轉爲長整型存儲:axios
建立數據庫的語句爲:緩存
create database db cache 8192 ablocks 2 tblocks 1000 tables 10000; use db;
建立超級表的SQL語句爲:安全
create table lock( ts timestamp, id nchar(50), pid nchar(50), longitude bigint, latitude bigint, command nchar(50), LockState smallint, LockType smallint, KeyNickName nchar(255), KeyID nchar(255), ErrorCode smallint, alarm nchar(255) ) tags(card int, model binary(10));
TDEngine 是關係型數據庫模型,但要求每一個採集設備單獨建表,以門鎖 id 做爲採集表表名,例如 id 爲 51dc0c50f55d11e9a4fec59e26b058d5,那麼建立數據表的語句爲:
-- 使用 using 指定其所屬 超級表 create table "v_51dc0c50f55d11e9a4fec59e26b058d5" using lock tags('51dc0c50f55d11e9a4fec59e26b058d5', 0);
在該數據模型下,以門鎖 id 51dc0c50f55d11e9a4fec59e26b058d5 爲例,寫入一條記錄到表 v_51dc0c50f55d11e9a4fec59e26b058d5 的 SQL 語句爲:
insert into v_51dc0c50f55d11e9a4fec59e26b058d5 values( 1570838400000, '51dc0c50f55d11e9a4fec59e26b058d5', 'e84a2e10f55d11e9a4fec59e26b058d5', 1028622543, 248614503, 'unlock', 0, 0, '', 'c944c8d0f55e11e9a4fec59e26b058d5', 0, '[]', );
實際使用中請先依次給每一個智能門鎖建表
目前 EMQ X 消息數據直接寫入 TDEngine 的功能還在規劃中,但得益於 TDEngine 提供了諸多鏈接器,咱們選用如下兩種方式完成數據寫入:
EMQ X Dashboard 中點擊 規則 主菜單,在 資源 頁面新建一個 WebHook 資源,用於向 TDEngine RESTful Connector 發送數據,新增請求頭:
{username}:{password}
通過 Base64 編碼以後的字符串。有關 RESTful Connector 使用教程詳見:TDEngine RESTful Connector
點擊 測試鏈接,測試經過後點擊 肯定 按鈕完成建立。
資源建立完畢後咱們能夠進行規則建立,規則引擎 --> 規則 頁面中點擊 新建 按鈕進入規則建立頁面。
選擇 消息發佈 事件,處理傳感器消息上報(發佈)時的數據。根據 可用字段 提示,傳感器等信息能夠從 payload
中選取。
因爲須要將浮點值處理爲整型,咱們使用簡單計算功能,請留意 SQL 中的註釋項,最終整個 SQL 語句以下:
SELECT -- JSON 數據解碼 json_decode(payload) as p, -- 經緯度放大 10E7 倍存儲 p.longitude * 10000000 as p.longitude, p.latitude * 10000000 as p.latitude FROM "message.publish" WHERE -- 經過 topic 篩選數據源 topic =~ 'lock/+/control_receipt'
使用 SQL 測試功能,輸入原始上報數據與相關變量,獲得以下輸出結果:
{ "p": { "ErrorCode": 0, "KeyID": "c944c8d0f55e11e9a4fec59e26b058d5", "KeyNickName": "", "LockState": 0, "LockType": 0, "alarm": "", "command": "unlock", "id": "51dc0c50f55d11e9a4fec59e26b058d5", "latitude": 248614503, "longitude": 1028622543, "pid": "84a2e10f55d11e9a4fec59e26b058d5", "ts": 1570838400000 } }
從輸出結果看,經緯度浮點值已經轉爲整型,說明該步操做正確,能夠進行後續操做。
點擊建立頁面下方 添加動做 按鈕,在彈出的 新增動做 彈框裏動做類型選擇 發送數據到 Web 服務,使用資源 選擇上一步中建立的資源,消息內容模板 內容模板裏面,使用 ${}
語法提取 條件 SQL 篩選出來的數據,拼接寫入 SQL 語句以下:
insert into db.v_${p.id} values( ${p.ts}, '${p.id}', '${p.pid}', ${p.longitude}, ${p.latitude}, '${p.command}', ${p.LockState}, ${p.LockType}, '${p.KeyNickName}', '${p.KeyID}', ${p.ErrorCode}, '${p.alarm}', );
點擊 建立 完成規則的建立,智能門鎖上報數據時數據將寫入到 DBEngine,整個工做和業務流程以下:
message.publish
事件觸發規則引擎 ,開始按照條件 SQL 中的 where
條件匹配 topic
和 payload
數據字段TDEngine 提供多種語言平臺適用的 SDK,程序能夠經過訂閱 MQTT 主題或消費消息中間件數據獲取智能門鎖上報到 EMQ X 的數據,隨後將數據拼接成寫入 SQL 最終寫入到 TDEngine 中。
本文使用訂閱 MQTT 主題的方式獲取智能門鎖上報數據。考慮到消息量可能增加到單個訂閱客戶端沒法承受的數據量,咱們使用 共享訂閱 的方式來消費數據。
在共享訂閱中,訂閱同一個主題的客戶端會輪流的收到這個主題下的消息,也就是說同一個消息不會發送到多個訂閱者,從而實現訂閱端的多個節點之間的負載均衡。
該示例使用 Node.js 平臺,藉助 TDEngine 的 RESTful Connector 實現數據寫入操做。
使用方式:安裝 Node.js、安裝 npm、安裝依賴、修改相應參數並運行執行
// index.js const mqtt = require("mqtt"); const axios = require("axios"); /** * 經過 RESTful Connector 執行 TDEngine 操做 * @param {string} 須要執行的 sql */ function exec(sql = "") { return axios({ method: "post", url: "http://127.0.0.1:6020/rest/sql", auth: { username: "root", password: "taosdata" }, data: sql }); } // MQTT 處理訂閱消息回調 async function handleMessage(topic, message) { try { // JSON 轉對象 const p = JSON.parse(message.toString()); // 處理浮點數據 p.longitude = p.longitude * 10e7; p.latitude = p.latitude * 10e7; const resp = await exec(` INSERT INTO db.v_${p.id} values( ${p.ts}, '${p.id}', '${p.pid}', ${p.longitude}, ${p.latitude}, '${p.command}', ${p.LockState}, ${p.LockType}, '${p.KeyNickName}', '${p.KeyID}', ${p.ErrorCode}, '${p.alarm}', );`); console.log(`Exec success:`, resp.data); } catch (e) { console.log( "exec insert error:", e.message, e.response ? e.response.data : "" ); } } function createConsumer(config = {}) { const client = mqtt.connect("mqtt://127.0.0.1:1883", config); client.on("connect", () => { // 使用共享訂閱 $share/ 前綴 client.subscribe("$share//lock/+/control_receipt", (err, granded = []) => { if (!err && granded[0].qos <= 2) { console.log("Consumer client ready"); } }); }); client.on("message", handleMessage); } // 建立 10 個共享訂閱消費者 for (let i = 0; i < 10; i++) { createConsumer(); }
經過 EMQ X Dashboard 內置的 MQTT 客戶端(WebSocket)能夠快速模擬測試規則可用性。打開 工具 -> WebSocket 頁面,輸入按照智能門鎖鏈接信息創建鏈接,在 發佈 功能裏面輸入上報主題、上報數據點擊發布進行模擬測試:
發佈主題:lock/${id}/control_receipt
Payload:
{ "id": "51dc0c50f55d11e9a4fec59e26b058d5", "longitude": 102.8622543, "latitude": 24.8614503, "command": "unlock", "LockState": 0, "LockType": 0, "KeyNickName": "", "KeyID": "c944c8d0f55e11e9a4fec59e26b058d5", "ErrorCode": 0, "pid": "84a2e10f55d11e9a4fec59e26b058d5", "alarm": "", "ts": 1570838400000 }
發佈屢次,在 規則引擎 列表裏,點擊 監控 圖標能夠快速查看當前規則執行數據,由下圖可見 4 條消息命中 3 次,成功 3 次:
在 TDEngine 控制檯查看 db.v_51dc0c50f55d11e9a4fec59e26b058d5
中的數據,此時有 3 條數據:
use db; select count(*) from v_51dc0c50f55d11e9a4fec59e26b058d5; taos> select count(*) from v_51dc0c50f55d11e9a4fec59e26b058d5; count(*) | ====================== 3| Query OK, 1 row(s) in set (0.000612s)
刪除該條規則,啓動 TDEngine SDK 寫入代碼,重複該上述測試操做,能夠看到程序打印日誌以下:
{ status: 'succ', head: [ 'affected_rows' ], data: [ [ 1 ] ], rows: 1 } { status: 'succ', head: [ 'affected_rows' ], data: [ [ 1 ] ], rows: 1 } { status: 'succ', head: [ 'affected_rows' ], data: [ [ 1 ] ], rows: 1 }
至此,寫入 EMQ X 數據到 TDEngine 的整個功能已開發/配置完成。
更多信息請訪問咱們的官網 emqx.io,或關注咱們的開源項目 github.com/emqx/emqx ,詳細文檔請訪問 官方文檔。