時序數據庫在工業物聯網中的應用

DolphinDB在工業物聯網的應用1. 工業物聯網的數據特色和痛點前端

工業物聯網的數據採集有着頻率高、設備多、維度高的特色,數據量很是大,對系統的吞吐量有很高的要求。同時工業物聯網每每須要系統可以實時處理數據,對系統預警,監控,甚至反控。很多系統還須要提供圖形化終端供操做工人實時監控設備的運行,這給整個系統帶來了更大的壓力。對於採集到的海量歷史數據,一般還須要進行離線的建模和分析。所以,工業物聯網的數據平臺有着很是苛刻的要求,既要有很是高的吞吐量,又要有較低的延時;既要可以實時處理流數據,又要可以處理海量的歷史數據;既要知足簡單的點查詢的要求,又要知足批量數據複雜分析的要求。node

傳統的事務型數據庫,好比SQL Server、Oracle和MySQL,沒法知足高吞吐量的數據寫入和海量數據的分析。即便數據量較小,能知足數據寫入的要求,也不能同時響應實時計算的請求。git

Hadoop生態提供了消息引擎、實時數據寫入、流數據計算、離線數據倉庫、離線數據計算等多個部件。這些大數據系統組合起來,能夠解決工業物聯網的數據平臺問題。但這樣的方案過於龐大和臃腫,實施和運維的成本很高。github

2.時序數據庫的工業物聯網解決方案數據庫

以DolphinDB爲例,DolphinDB database 做爲一個高性能的分佈式時序數據庫,爲工業物聯網的數據存儲和計算提供了一個強大的基礎平臺。編程

  • DolphinDB的分佈式數據庫能夠方便的支持水平擴展和垂直擴展,系統的吞吐量和支持的數據量能夠近乎無限的擴展。
  • DolphinDB的流計算引擎支持實時流計算處理。內置的聚合引擎能夠按指定的時間窗口大小和頻率來計算各類聚合指標。聚合既能夠是時間軸上(從高頻到低頻)的縱向聚合,也能夠是多個維度的橫向聚合。
  • DolphinDB的內存數據庫能夠支持數據的快速寫入,查詢和計算。例如聚合引擎的結果能夠輸出到一個內存表,接受前端BI(如Grafana)的的秒級輪詢指令。
  • DolphinDB集數據庫、分佈式計算和編程語言於一體,能夠在庫內快速的完成複雜的分佈式計算,例如迴歸和分類。這大大加快了海量歷史數據的離線分析和建模。
  • DolphinDB也實現了與部分開源或商業化的BI工具的接口。方便用戶可視化或監控設備數據。

3.案例綜述服務器

企業的生產車間內總共有1000個傳感設備,每一個設備每10ms採集一次數據,爲簡化demo腳本,假設採集的數據僅有三個維度,均爲溫度。須要完成的任務包括:app

  • 將採集到的原始數據存入數據庫。離線的數據建模須要用到這些歷史數據。
  • 實時計算每一個設備過去一分鐘的平均溫度指標,計算的頻率爲每兩秒鐘要進行一次。
  • 由於設備的操做工須要在最快的時間內掌握溫度變化,因此前端展現界面每秒查詢一次實時運算的結果並刷新溫度變化趨勢圖。

4.案例實施運維

4.1 系統的功能模塊設計編程語言

針對上述的案例,咱們首先要啓用DolphinDB的分佈式數據庫,建立一個命名爲iotDemoDB的分佈式數據庫用於保存採集的實時數據。數據庫按日期和設備兩個維度進行數據分區。日期採用值分區,設備採用範圍分區。往後清理過時數據,只要簡單的刪除舊的日期分區就可完成。

啓用流數據發佈和訂閱功能。訂閱高頻數據流作實時計算。createStreamingAggregator函數能建立一個指標聚合引擎,用於實時計算。咱們在案例裏指定計算窗口大小是1分鐘,每2秒鐘運算一次過往1分鐘的溫度均值,而後將運算結果保存到低頻數據表中,供前端輪詢。

部署前端Grafana平臺展現運算結果的趨勢圖,設置每1秒鐘輪詢一次DolphinDB Server,並刷新展現界面。

4.2 服務器部署

在本次demo裏,爲了使用分佈式數據庫,咱們須要使用一個單機多節點集羣,能夠參考單機多節點集羣部署指南。這裏咱們配置了1個controller+1個agent+4個datanode的集羣,下面列出主要的配置文件內容供參考:

cluster.nodes:

localSite,mode
localhost:8701:agent1,agent
localhost:8081:node1,datanode
localhost:8083:node2,datanode
localhost:8082:node3,datanode
localhost:8084:node4,datanode

因爲DolphinDB系統默認是不啓用Streaming模塊功能的,因此咱們須要經過在cluster.cfg裏作顯式配置來啓用它,由於本次demo裏使用的數據量不大,爲了不demo複雜化,因此這裏只啓用了node1來作數據訂閱。

cluster.cfg:

maxMemSize=2
workerNum=4
persistenceDir=dbcache
maxSubConnections=4
node1.subPort=8085
maxPubConnections=4

實際生產環境下,建議使用多物理機集羣,能夠參考多物理機集羣部署指南

4.3 實現步驟

首先咱們定義一個sensorTemp流數據表用於接收實時採集的溫度數據,咱們使用enableTablePersistence函數對sensorTemp表作持久化,內存中保留的最大數據量是100萬行。

share streamTable(1000000:0,`hardwareId`ts`temp1`temp2`temp3,[INT,TIMESTAMP,DOUBLE,DOUBLE,DOUBLE]) as sensorTemp
enableTablePersistence(sensorTemp, true, false, 1000000)

經過訂閱流數據表sensorTmp,把採集的數據準實時的批量保存到分佈式數據庫中。分佈式表使用日期和設備編號兩個分區維度。在物聯網大數據場景下,常常要清除過期的數據,這樣分區的模式能夠簡單的經過刪除指定日期分區就能夠快速的清理過時數據。subscribeTable函數最後兩個參數控制數據保存的頻率,只有訂閱數據達到100萬或時間間隔達到10秒才批量將數據寫入分佈式數據庫。

db1 = database("",VALUE,2018.08.14..2018.12.20)
db2 = database("",RANGE,0..10*100)
db = database("dfs://iotDemoDB",COMPO,[db1,db2])
dfsTable = db.createPartitionedTable(sensorTemp,"sensorTemp",`ts`hardwareId)
subscribeTable(, "sensorTemp", "save_to_db", -1, append!{dfsTable}, true, 1000000, 10)

在對流數據作分佈式保存數據庫的同時,系統使用createStreamAggregator函數建立一個指標聚合引擎, 用於實時計算。函數第一個參數指定了窗口大小爲60秒,第二個參數指定每2秒鐘作一次求均值運算,第三個參數是運算的元代碼,能夠由用戶本身指定計算函數,任何系統支持的或用戶自定義的聚合函數這裏都能支持,經過指定分組字段hardwareId,函數會將流數據按設備分紅1000個隊列進行均值運算,每一個設備都會按各自的窗口計算獲得對應的平均溫度。最後經過subscribeTable訂閱流數據,在有新數據進來時觸發實時計算,並將運算結果保存到一個新的數據流表sensorTempAvg中。

createStreamAggregator 參數說明:窗口時間,運算間隔時間,聚合運算元代碼,原始數據輸入表,運算結果輸出表,時序字段,分組字段,觸發GC記錄數閾值。

share streamTable(1000000:0, `time`hardwareId`tempavg1`tempavg2`tempavg3, [TIMESTAMP,INT,DOUBLE,DOUBLE,DOUBLE]) as sensorTempAvg
metrics = createStreamAggregator(60000,2000,<[avg(temp1),avg(temp2),avg(temp3)]>,sensorTemp,sensorTempAvg,`ts,`hardwareId,2000)
subscribeTable(, "sensorTemp", "metric_engine", -1, append!{metrics},true)

在DolphinDB Server端在對高頻數據流作保存、分析的時候,Grafana前端程序每秒鐘會輪詢實時運算的結果,並刷新平均溫度的趨勢圖。DolphinDB提供了Grafana_DolphinDB的datasource插件,關於Grafana的安裝以及DolphinDB的插件配置請參考Grafana配置教程

在完成grafana的基本配置以後,新增一個Graph Panel, 在Metrics tab裏輸入:

select gmtime(time) as time, tempavg1, tempavg2, tempavg3 from sensorTempAvg where hardwareId = 1

這段腳本是選出1號設備實時運算獲得的平均溫度表。

8cb3647ad8787ac3bb57c127166fda36.png

最後,啓動數據模擬生成程序,生成模擬溫度數據並寫入流數據表。

數據規模: 1000 個設備,以每一個點3個維度、10ms的頻率生成數據,以每一個維度8個Byte ( Double類型 ) 計算,數據流速是 24Mbps,持續100秒。

def writeData(){
	hardwareNumber = 1000
	for (i in 0:10000) {
		data = table(take(1..hardwareNumber,hardwareNumber) as hardwareId ,take(now(),hardwareNumber) as ts,rand(20..41,hardwareNumber) as temp1,rand(30..71,hardwareNumber) as temp2,rand(70..151,hardwareNumber) as temp3)
		sensorTemp.append!(data)
		sleep(10)
	}
}
submitJob("simulateData", "simulate sensor data", writeData)

點擊這裏下載完整的demo腳本。

相關文章
相關標籤/搜索