物聯網設備(如機牀、鍋爐、電梯、水錶、氣表等等)無時無刻不在產生海量的設備狀態數據和業務消息數據,這些數據的在採集、計算、分析過程當中又經常涉及異常數據的檢測。html
DolphinDB做爲一個高性能的分佈式時序數據庫 (time series database),內置了一個流數據框架,既能實時處理分析這些物聯網數據,也能對歷史數據進行計算分析,幫助用戶利用、發揮這些數據的價值。DolphinDB內置的流數據框架支持流數據的發佈、訂閱、預處理、實時內存計算、複雜指標的滾動窗口計算等,是一個運行高效,使用便捷的流數據處理框架。具體介紹詳見DolphinDB流數據教程。git
針對異常數據檢測的需求,DolphinDB提供基於流數據框架的異常檢測引擎函數,用戶只需指定異常指標,異常檢測引擎就能夠實時地進行異常數據檢測。github
DolphinDB的異常檢測引擎創建在流數據的發佈-訂閱模型之上。下例中,經過createAnomalyDetectionEngine函數建立異常檢測引擎,並經過subscribeTable函數訂閱流數據,每次有新數據流入就會按指定規則觸發append!{engine},將流數據持續輸入異常檢測引擎中。異常檢測引擎實時檢測數據是否符合用戶自定義的警報指標temp>65,如發現異常數據,將它們輸出到表outputTable中。數據庫
share streamTable(1000:0, `time`device`temp, [TIMESTAMP, SYMBOL, DOUBLE]) as sensor share streamTable(1000:0, `time`device`anomalyType`anomalyString, [TIMESTAMP, SYMBOL, INT, SYMBOL]) as outputTable engine = createAnomalyDetectionEngine("engine1", <[temp > 65]>, sensor, outputTable, `time, `device, 10, 1) subscribeTable(, "sensor", "sensorAnomalyDetection", 0, append!{engine}, true)
這裏對異常處理引擎涉及到的一些概念作簡要介紹:編程
異常檢測引擎中的指標均要求返回布爾值。通常是一個函數或一個表達式。當指標中包含聚合函數,必須指定窗口長度和計算的時間間隔,異常檢測引擎每隔一段時間,在固定長度的移動窗口中計算指標。異常指標通常有如下三種類型:app
當異常指標中包含聚合函數時,用戶必須指定數據窗口。流數據聚合計算是每隔一段時間,在固定長度的移動窗口中進行。窗口長度由參數windowSize設定;計算的時間間隔由參數step設定。框架
在有多組數據的狀況下,若每組都根據各自第一條數據進入系統的時間來構造數據窗口的邊界,則通常沒法將各組的計算結果在相同數據窗口中進行對比。考慮到這一點,系統按照參數step值肯定一個整型的規整尺度alignmentSize,以對各組第一個數據窗口的邊界值進行規整處理。分佈式
(1)當數據時間類型爲MONTH時,會以第一條數據對應年份的1月做爲窗口的上邊界。ide
(2)當數據的時間類型爲DATE時,不對第一個數據窗口的邊界值進行規整。函數
(2)當數據時間精度爲秒或分鐘時,如MINUTE, DATETIME或SECOND類型,alignmentSize取值規則以下表:
step alignmentSize 0~2 2 3~5 5 6~10 10 11~15 15 16~20 20 21~30 30 31~60 60
(2)當數據時間精度爲毫秒時,如TIMESTAMP或TIME類型,alignmentSize取值規則以下表:
step alignmentSize 0~2 2 3~5 5 6~10 10 11~20 20 21~25 25 26~50 50 51~100 100 101~200 200 201~250 250 251~500 500 501~1000 1000
假設第一條數據時間的最小精度值爲x,那麼第一個數據窗口的左邊界最小精度通過規整後爲x/alignmentSize\*alignmentSize,其中/表明相除後取整。舉例來講,若第一條數據時間爲 2018.10.08T01:01:01.365,則x=365。若step=100,根據上表,alignmentSize=100,可得出規整後的第一個數據窗口左邊界最小精度爲365\100*100=300,所以規整後的第一個數據窗口範圍爲2018.10.08T01:01:01.300至 2018.10.08T01:01:01.400。
5.1 應用場景
現模擬傳感器設備採集溫度。假設窗口長度爲4ms,每隔2ms移動一次窗口,每隔1ms採集一次溫度,規定如下異常指標:
5.2 系統設計
採集的數據存放到流數據表中,異常檢測引擎經過訂閱流數據表來獲取實時數據,並進行異常檢測,符合異常指標的數據輸出到另一個表中。
5.3 實現步驟
(1) 定義流數據表sensor來存放採集的數據:
share streamTable(1000:0, `time`temp, [TIMESTAMP, DOUBLE]) as sensor
(2) 定義異常檢測引擎和輸出表outputTable,輸出表也是流數據表:
share streamTable(1000:0, `time`anomalyType`anomalyString, [TIMESTAMP, INT, SYMBOL]) as outputTable engine = createAnomalyDetectionEngine("engine1", <[temp > 65, temp > percentile(temp, 75), abs((avg(temp) - prev(avg(temp))) / avg(temp)) > 0.01]>, sensor, outputTable, `time, , 6, 3)
(3) 異常檢測引擎engine訂閱流數據表sensor:
subscribeTable(, "sensor", "sensorAnomalyDetection", 0, append!{engine}, true)
(4) 向流數據表sensor中寫入10次數據模擬採集溫度:
timev = 2018.10.08T01:01:01.001 + 1..10 tempv = 59 66 57 60 63 51 53 52 56 55 insert into sensor values(timev, tempv)
查看流數據表sensor的內容:
time temp 2018.10.08T01:01:01.002 59 2018.10.08T01:01:01.003 66 2018.10.08T01:01:01.004 57 2018.10.08T01:01:01.005 60 2018.10.08T01:01:01.006 63 2018.10.08T01:01:01.007 51 2018.10.08T01:01:01.008 53 2018.10.08T01:01:01.009 52 2018.10.08T01:01:01.010 56 2018.10.08T01:01:01.011 55
再查看結果表outputTable:
time anomalyType anomalyString 2018.10.08T01:01:01.003 0 temp > 65 2018.10.08T01:01:01.003 1 temp > percentile(temp, 75) 2018.10.08T01:01:01.005 1 temp > percentile(temp, 75) 2018.10.08T01:01:01.006 2 abs((avg(temp) - prev(avg(temp))) / avg(temp)) > 0.01 2018.10.08T01:01:01.006 1 temp > percentile(temp, 75) 2018.10.08T01:01:01.009 2 abs((avg(temp) - prev(avg(temp))) / avg(temp)) > 0.01
下面詳細解釋異常檢測引擎的計算過程。爲方便閱讀,對時間的描述中省略相同的2018.10.08T01:01:01部分,只列出毫秒部分。
(1)指標temp > 65只包含不做爲函數參數的列temp,所以會在每條數據到達時計算。模擬數據中只有003時的溫度知足檢測異常的指標。
(2)指標temp > percentile(temp, 75)中,temp列既做爲聚合函數percentile的參數,又單獨出現,所以會在每條數據到達時,將其中的temp與上一個窗口計算獲得的percentile(temp, 75)比較。第一個窗口基於第一行數據的時間002進行對齊,對齊後窗口起始邊界爲000,第一個窗口是從000到002,只包含002一條記錄,計算percentile(temp, 75)的結果是59,數據003到005與這個值比較,知足條件的有003和005。第二個窗口是從002到005,計算percentile(temp, 75)的結果是60,數據006到008與這個值比較,知足條件的有006。第三個窗口是從003到008,計算percentile(temp, 75)的結果是63,數據009到011與這個值比較,其中沒有知足條件的行。最後一條數據011到達後,還沒有觸發新的窗口計算。
(3)指標abs((avg(temp) - prev(avg(temp))) / avg(temp)) > 0.01中,temp只做爲聚合函數avg的參數出現,所以只會在每次窗口計算時檢查。相似上一個指標的分析,前三個窗口計算獲得的avg(temp)分別爲59, 60.5, 58.33,知足abs((avg(temp) - prev(avg(temp))) / avg(temp)) > 0.01的時間爲第二個窗口和第三個窗口的計算時間006和009。
5.4 監控異常檢測引擎的狀態
getAggregatorStat().AnomalDetectionAggregator name user status lastErrMsg numGroups numRows numMetrics metrics ------- ----- ------ ---------- --------- ------- ---------- -------------------- engine1 guest OK 0 10 3 temp > 65, temp > percentile(temp, 75), abs((avg(temp) - prev(avg(temp))) / avg(temp)) > 0.01
5.5 刪除異常檢測引擎
removeAggregator("engine1")
語法
createAnomalyDetectionEngine(name, metrics, dummyTable, outputTable, timeColumn, [keyColumn], [windowSize], [step], [garbageSize])
返回對象
createAnomalyDetectionEngine函數的做用是返回一個表對象,向該表寫入數據意味着這些數據進入異常檢測引擎進行計算。
參數
DolphinDB提供的異常檢測引擎是一個輕量、使用方便的流數據引擎,它經過與流數據表合做來完成流數據的實時檢測任務,可以知足物聯網實時監控和預警的需求。