1. 工業物聯網的數據特色和痛點前端
工業物聯網的數據採集有着頻率高、設備多、維度高的特色,數據量很是大,對系統的吞吐量有很高的要求。同時工業物聯網每每須要系統可以實時處理數據,對系統預警,監控,甚至反控。很多系統還須要提供圖形化終端供操做工人實時監控設備的運行,這給整個系統帶來了更大的壓力。對於採集到的海量歷史數據,一般還須要進行離線的建模和分析。所以,工業物聯網的數據平臺有着很是苛刻的要求,既要有很是高的吞吐量,又要有較低的延時;既要可以實時處理流數據,又要可以處理海量的歷史數據;既要知足簡單的點查詢的要求,又要知足批量數據複雜分析的要求。node
傳統的事務型數據庫,好比SQL Server、Oracle和MySQL,沒法知足高吞吐量的數據寫入和海量數據的分析。即便數據量較小,能知足數據寫入的要求,也不能同時響應實時計算的請求。git
Hadoop生態提供了消息引擎、實時數據寫入、流數據計算、離線數據倉庫、離線數據計算等多個部件。這些大數據系統組合起來,能夠解決工業物聯網的數據平臺問題。但這樣的方案過於龐大和臃腫,實施和運維的成本很高。github
2.時序數據庫的工業物聯網解決方案數據庫
以DolphinDB爲例,DolphinDB database 做爲一個高性能的分佈式時序數據庫,爲工業物聯網的數據存儲和計算提供了一個強大的基礎平臺。編程
3.案例綜述服務器
企業的生產車間內總共有1000個傳感設備,每一個設備每10ms採集一次數據,爲簡化demo腳本,假設採集的數據僅有三個維度,均爲溫度。須要完成的任務包括:app
4.案例實施運維
4.1 系統的功能模塊設計編程語言
針對上述的案例,咱們首先要啓用DolphinDB的分佈式數據庫,建立一個命名爲iotDemoDB的分佈式數據庫用於保存採集的實時數據。數據庫按日期和設備兩個維度進行數據分區。日期採用值分區,設備採用範圍分區。往後清理過時數據,只要簡單的刪除舊的日期分區就可完成。
啓用流數據發佈和訂閱功能。訂閱高頻數據流作實時計算。createStreamingAggregator函數能建立一個指標聚合引擎,用於實時計算。咱們在案例裏指定計算窗口大小是1分鐘,每2秒鐘運算一次過往1分鐘的溫度均值,而後將運算結果保存到低頻數據表中,供前端輪詢。
部署前端Grafana平臺展現運算結果的趨勢圖,設置每1秒鐘輪詢一次DolphinDB Server,並刷新展現界面。
4.2 服務器部署
在本次demo裏,爲了使用分佈式數據庫,咱們須要使用一個單機多節點集羣,能夠參考單機多節點集羣部署指南。這裏咱們配置了1個controller+1個agent+4個datanode的集羣,下面列出主要的配置文件內容供參考:
cluster.nodes:
localSite,mode localhost:8701:agent1,agent localhost:8081:node1,datanode localhost:8083:node2,datanode localhost:8082:node3,datanode localhost:8084:node4,datanode
因爲DolphinDB系統默認是不啓用Streaming模塊功能的,因此咱們須要經過在cluster.cfg裏作顯式配置來啓用它,由於本次demo裏使用的數據量不大,爲了不demo複雜化,因此這裏只啓用了node1來作數據訂閱。
cluster.cfg:
maxMemSize=2 workerNum=4 persistenceDir=dbcache maxSubConnections=4 node1.subPort=8085 maxPubConnections=4
實際生產環境下,建議使用多物理機集羣,能夠參考多物理機集羣部署指南。
4.3 實現步驟
首先咱們定義一個sensorTemp流數據表用於接收實時採集的溫度數據,咱們使用enableTablePersistence函數對sensorTemp表作持久化,內存中保留的最大數據量是100萬行。
share streamTable(1000000:0,`hardwareId`ts`temp1`temp2`temp3,[INT,TIMESTAMP,DOUBLE,DOUBLE,DOUBLE]) as sensorTemp enableTablePersistence(sensorTemp, true, false, 1000000)
經過訂閱流數據表sensorTmp,把採集的數據準實時的批量保存到分佈式數據庫中。分佈式表使用日期和設備編號兩個分區維度。在物聯網大數據場景下,常常要清除過期的數據,這樣分區的模式能夠簡單的經過刪除指定日期分區就能夠快速的清理過時數據。subscribeTable函數最後兩個參數控制數據保存的頻率,只有訂閱數據達到100萬或時間間隔達到10秒才批量將數據寫入分佈式數據庫。
db1 = database("",VALUE,2018.08.14..2018.12.20) db2 = database("",RANGE,0..10*100) db = database("dfs://iotDemoDB",COMPO,[db1,db2]) dfsTable = db.createPartitionedTable(sensorTemp,"sensorTemp",`ts`hardwareId) subscribeTable(, "sensorTemp", "save_to_db", -1, append!{dfsTable}, true, 1000000, 10)
在對流數據作分佈式保存數據庫的同時,系統使用createStreamAggregator函數建立一個指標聚合引擎, 用於實時計算。函數第一個參數指定了窗口大小爲60秒,第二個參數指定每2秒鐘作一次求均值運算,第三個參數是運算的元代碼,能夠由用戶本身指定計算函數,任何系統支持的或用戶自定義的聚合函數這裏都能支持,經過指定分組字段hardwareId,函數會將流數據按設備分紅1000個隊列進行均值運算,每一個設備都會按各自的窗口計算獲得對應的平均溫度。最後經過subscribeTable訂閱流數據,在有新數據進來時觸發實時計算,並將運算結果保存到一個新的數據流表sensorTempAvg中。
createStreamAggregator 參數說明:窗口時間,運算間隔時間,聚合運算元代碼,原始數據輸入表,運算結果輸出表,時序字段,分組字段,觸發GC記錄數閾值。
share streamTable(1000000:0, `time`hardwareId`tempavg1`tempavg2`tempavg3, [TIMESTAMP,INT,DOUBLE,DOUBLE,DOUBLE]) as sensorTempAvg metrics = createStreamAggregator(60000,2000,<[avg(temp1),avg(temp2),avg(temp3)]>,sensorTemp,sensorTempAvg,`ts,`hardwareId,2000) subscribeTable(, "sensorTemp", "metric_engine", -1, append!{metrics},true)
在DolphinDB Server端在對高頻數據流作保存、分析的時候,Grafana前端程序每秒鐘會輪詢實時運算的結果,並刷新平均溫度的趨勢圖。DolphinDB提供了Grafana_DolphinDB的datasource插件,關於Grafana的安裝以及DolphinDB的插件配置請參考Grafana配置教程。
在完成grafana的基本配置以後,新增一個Graph Panel, 在Metrics tab裏輸入:
select gmtime(time) as time, tempavg1, tempavg2, tempavg3 from sensorTempAvg where hardwareId = 1
這段腳本是選出1號設備實時運算獲得的平均溫度表。
最後,啓動數據模擬生成程序,生成模擬溫度數據並寫入流數據表。
數據規模: 1000 個設備,以每一個點3個維度、10ms的頻率生成數據,以每一個維度8個Byte ( Double類型 ) 計算,數據流速是 24Mbps,持續100秒。
def writeData(){ hardwareNumber = 1000 for (i in 0:10000) { data = table(take(1..hardwareNumber,hardwareNumber) as hardwareId ,take(now(),hardwareNumber) as ts,rand(20..41,hardwareNumber) as temp1,rand(30..71,hardwareNumber) as temp2,rand(70..151,hardwareNumber) as temp3) sensorTemp.append!(data) sleep(10) } } submitJob("simulateData", "simulate sensor data", writeData)
點擊這裏下載完整的demo腳本。