乾貨丨時序數據庫DolphinDB異常檢測引擎教程

1. 概述

物聯網設備(如機牀、鍋爐、電梯、水錶、氣表等等)無時無刻不在產生海量的設備狀態數據和業務消息數據,這些數據的在採集、計算、分析過程當中又經常涉及異常數據的檢測。html

DolphinDB做爲一個高性能的分佈式時序數據庫 (time series database),內置了一個流數據框架,既能實時處理分析這些物聯網數據,也能對歷史數據進行計算分析,幫助用戶利用、發揮這些數據的價值。DolphinDB內置的流數據框架支持流數據的發佈、訂閱、預處理、實時內存計算、複雜指標的滾動窗口計算等,是一個運行高效,使用便捷的流數據處理框架。具體介紹詳見DolphinDB流數據教程git

針對異常數據檢測的需求,DolphinDB提供基於流數據框架的異常檢測引擎函數,用戶只需指定異常指標,異常檢測引擎就能夠實時地進行異常數據檢測。github

2. 異常檢測引擎框架

DolphinDB的異常檢測引擎創建在流數據的發佈-訂閱模型之上。下例中,經過createAnomalyDetectionEngine函數建立異常檢測引擎,並經過subscribeTable函數訂閱流數據,每次有新數據流入就會按指定規則觸發append!{engine},將流數據持續輸入異常檢測引擎中。異常檢測引擎實時檢測數據是否符合用戶自定義的警報指標temp>65,如發現異常數據,將它們輸出到表outputTable中。數據庫

share streamTable(1000:0, `time`device`temp, [TIMESTAMP, SYMBOL, DOUBLE]) as sensor
share streamTable(1000:0, `time`device`anomalyType`anomalyString, [TIMESTAMP, SYMBOL, INT, SYMBOL]) as outputTable
engine = createAnomalyDetectionEngine("engine1", <[temp > 65]>, sensor, outputTable, `time, `device, 10, 1)
subscribeTable(, "sensor", "sensorAnomalyDetection", 0, append!{engine}, true)

這裏對異常處理引擎涉及到的一些概念作簡要介紹:編程

  • 流數據表:DolphinDB爲流式數據提供的一種特定的表對象,提供流式數據的發佈功能。經過subscribeTable函數,其餘的節點或應用能夠訂閱和消費流數據。
  • 異常處理引擎數據源:爲異常處理引擎提供"原料"的通道。createAnomalyDetectionEngine函數返回一個抽象表,向這個抽象表寫入數據,就意味着數據進入異常處理引擎進行計算。
  • 異常指標:以元代碼的格式提供一組處理流數據的布爾表達式。其中能夠包含聚合函數,以支持複雜的場景。
  • 數據窗口:每次計算時截取的流數據窗口長度。數據窗口僅在指標中包含聚合函數時有意義。
  • 輸出表:異常檢測引擎的輸出表第一列必須是時間類型,用於存放檢測到異常的時間戳,若是有指定分組列,那麼第二列爲分組列,以後的兩列分別爲int類型和string或symbol類型,用於記錄異常的類型(異常指標的表達式在metrics中的下標)和異常的內容。

3. 異常指標

異常檢測引擎中的指標均要求返回布爾值。通常是一個函數或一個表達式。當指標中包含聚合函數,必須指定窗口長度和計算的時間間隔,異常檢測引擎每隔一段時間,在固定長度的移動窗口中計算指標。異常指標通常有如下三種類型:app

  • 只包含列名或非聚合函數,例如qty > 10, lt(qty, prev(qty))。對於這類指標,異常檢測引擎會對每一條收到的數據進行計算,判斷是否符合指標並決定是否輸出。
  • 全部出現的列名都在聚合函數的參數中,例如avg(qty - price) > 10, percentile(qty, 90) < 100, sum(qty) > prev(sum(qty))。對於這類指標,異常檢測引擎只會在窗口發生移動時對數據進行聚合計算,和時間序列聚合引擎(Time Series Aggregator)相似。
  • 出現的列名中,既有做爲聚合函數的參數,也有不是聚合函數參數,例如avg(qty) > qty, le(med(qty), price)。對於這類指標,異常檢測引擎會在在窗口發生移動時對聚合列進行聚合計算,並在有數據到達時對每一條數據進行計算,其中聚合函數的返回值使用最近一個窗口的計算值。

4. 數據窗口

當異常指標中包含聚合函數時,用戶必須指定數據窗口。流數據聚合計算是每隔一段時間,在固定長度的移動窗口中進行。窗口長度由參數windowSize設定;計算的時間間隔由參數step設定。框架

在有多組數據的狀況下,若每組都根據各自第一條數據進入系統的時間來構造數據窗口的邊界,則通常沒法將各組的計算結果在相同數據窗口中進行對比。考慮到這一點,系統按照參數step值肯定一個整型的規整尺度alignmentSize,以對各組第一個數據窗口的邊界值進行規整處理。分佈式

(1)當數據時間類型爲MONTH時,會以第一條數據對應年份的1月做爲窗口的上邊界。ide

(2)當數據的時間類型爲DATE時,不對第一個數據窗口的邊界值進行規整。函數

(2)當數據時間精度爲秒或分鐘時,如MINUTE, DATETIME或SECOND類型,alignmentSize取值規則以下表:

step     alignmentSize
0~2      2
3~5      5
6~10     10
11~15    15
16~20    20
21~30    30
31~60    60

(2)當數據時間精度爲毫秒時,如TIMESTAMP或TIME類型,alignmentSize取值規則以下表:

step       alignmentSize
0~2        2
3~5        5
6~10       10
11~20      20
21~25      25
26~50      50
51~100     100
101~200    200
201~250    250
251~500    500
501~1000   1000

假設第一條數據時間的最小精度值爲x,那麼第一個數據窗口的左邊界最小精度通過規整後爲x/alignmentSize\*alignmentSize,其中/表明相除後取整。舉例來講,若第一條數據時間爲 2018.10.08T01:01:01.365,則x=365。若step=100,根據上表,alignmentSize=100,可得出規整後的第一個數據窗口左邊界最小精度爲365\100*100=300,所以規整後的第一個數據窗口範圍爲2018.10.08T01:01:01.300至 2018.10.08T01:01:01.400。

5. 應用示例

5.1 應用場景

現模擬傳感器設備採集溫度。假設窗口長度爲4ms,每隔2ms移動一次窗口,每隔1ms採集一次溫度,規定如下異常指標:

  • 單次採集的溫度超過65;
  • 單次採集的溫度超過上一個窗口中75%的值;
  • 窗口內平均溫度和上一個窗口的平均溫度相對偏差大於1%。

5.2 系統設計

採集的數據存放到流數據表中,異常檢測引擎經過訂閱流數據表來獲取實時數據,並進行異常檢測,符合異常指標的數據輸出到另一個表中。

5.3 實現步驟

(1) 定義流數據表sensor來存放採集的數據:

share streamTable(1000:0, `time`temp, [TIMESTAMP, DOUBLE]) as sensor

(2) 定義異常檢測引擎和輸出表outputTable,輸出表也是流數據表:

share streamTable(1000:0, `time`anomalyType`anomalyString, [TIMESTAMP, INT, SYMBOL]) as outputTable
engine = createAnomalyDetectionEngine("engine1", <[temp > 65, temp > percentile(temp, 75), abs((avg(temp) - prev(avg(temp))) / avg(temp)) > 0.01]>, sensor, outputTable, `time, , 6, 3)

(3) 異常檢測引擎engine訂閱流數據表sensor:

subscribeTable(, "sensor", "sensorAnomalyDetection", 0, append!{engine}, true)

(4) 向流數據表sensor中寫入10次數據模擬採集溫度:

timev = 2018.10.08T01:01:01.001 + 1..10
tempv = 59 66 57 60 63 51 53 52 56 55
insert into sensor values(timev, tempv)

查看流數據表sensor的內容:

time                       temp
2018.10.08T01:01:01.002    59
2018.10.08T01:01:01.003    66
2018.10.08T01:01:01.004    57
2018.10.08T01:01:01.005    60
2018.10.08T01:01:01.006    63
2018.10.08T01:01:01.007    51
2018.10.08T01:01:01.008    53
2018.10.08T01:01:01.009    52
2018.10.08T01:01:01.010    56
2018.10.08T01:01:01.011    55

再查看結果表outputTable:

time                      anomalyType    anomalyString
2018.10.08T01:01:01.003    0             temp > 65
2018.10.08T01:01:01.003    1             temp > percentile(temp, 75)
2018.10.08T01:01:01.005    1             temp > percentile(temp, 75)
2018.10.08T01:01:01.006    2             abs((avg(temp) - prev(avg(temp))) / avg(temp)) > 0.01
2018.10.08T01:01:01.006    1             temp > percentile(temp, 75)
2018.10.08T01:01:01.009    2             abs((avg(temp) - prev(avg(temp))) / avg(temp)) > 0.01

下面詳細解釋異常檢測引擎的計算過程。爲方便閱讀,對時間的描述中省略相同的2018.10.08T01:01:01部分,只列出毫秒部分。

(1)指標temp > 65只包含不做爲函數參數的列temp,所以會在每條數據到達時計算。模擬數據中只有003時的溫度知足檢測異常的指標。

(2)指標temp > percentile(temp, 75)中,temp列既做爲聚合函數percentile的參數,又單獨出現,所以會在每條數據到達時,將其中的temp與上一個窗口計算獲得的percentile(temp, 75)比較。第一個窗口基於第一行數據的時間002進行對齊,對齊後窗口起始邊界爲000,第一個窗口是從000到002,只包含002一條記錄,計算percentile(temp, 75)的結果是59,數據003到005與這個值比較,知足條件的有003和005。第二個窗口是從002到005,計算percentile(temp, 75)的結果是60,數據006到008與這個值比較,知足條件的有006。第三個窗口是從003到008,計算percentile(temp, 75)的結果是63,數據009到011與這個值比較,其中沒有知足條件的行。最後一條數據011到達後,還沒有觸發新的窗口計算。

(3)指標abs((avg(temp) - prev(avg(temp))) / avg(temp)) > 0.01中,temp只做爲聚合函數avg的參數出現,所以只會在每次窗口計算時檢查。相似上一個指標的分析,前三個窗口計算獲得的avg(temp)分別爲59, 60.5, 58.33,知足abs((avg(temp) - prev(avg(temp))) / avg(temp)) > 0.01的時間爲第二個窗口和第三個窗口的計算時間006和009。

5.4 監控異常檢測引擎的狀態

getAggregatorStat().AnomalDetectionAggregator
name    user  status lastErrMsg numGroups numRows numMetrics metrics             
------- ----- ------ ---------- --------- ------- ---------- --------------------
engine1 guest OK                0         10      3          temp > 65, temp > percentile(temp, 75), abs((avg(temp) - prev(avg(temp))) / avg(temp)) > 0.01

5.5 刪除異常檢測引擎

removeAggregator("engine1")

6. createAnomalyEngine函數介紹

語法

createAnomalyDetectionEngine(name, metrics, dummyTable, outputTable, timeColumn, [keyColumn], [windowSize], [step], [garbageSize])

返回對象

createAnomalyDetectionEngine函數的做用是返回一個表對象,向該表寫入數據意味着這些數據進入異常檢測引擎進行計算。

參數

  • name: 一個字符串,表示異常檢測引擎的名稱,是異常檢測引擎的惟一標識。它能夠包含字母,數字和下劃線,但必須以字母開頭。
  • metrics: 元代碼。它的返回值必須是bool類型。它能夠是函數或表達式,如<[qty > 5, eq(qty, price)]>。能夠在其中使用系統內置或用戶自定義的聚合函數(使用defg關鍵字定義),如<[sum(qty) > 5, lt(avg(price), price)]>。詳情可參考元編程
  • dummyTable: 表對象,它能夠不包含數據,但它的結構必須與訂閱的流數據表結構相同。
  • outputTable: 表對象,用於保存計算結果。它的第一列必須是時間類型,用於存放檢測到異常的時間戳,而且該列的數據類型要與dummyTable的時間列一致。若是keyColumn參數不爲空,那麼outputTable的第二列爲keyColumn。以後的兩列分別爲int類型和string/symbol類型,用於記錄異常的類型(在metrics中的下標)和異常的內容
  • timeColumn: 字符串標量,表示輸入流數據表的時間列名稱。
  • keyColumn: 字符串標量,表示分組列。異常檢測引擎會按照keyColumn對輸入數據分組,並在每組中進行聚合計算。它是可選參數。
  • windowSize: 正整數。當metrics中包含聚合函數時,windowSize必須指定,表示用於聚合計算的數據窗口的長度。若是metrics中沒有聚合函數,這個參數不起做用。
  • step: 正整數。當metrics中包含聚合函數時,step必須指定,表示計算的時間間隔。windowSize必須是step的整數倍,不然會拋出異常。若是metrics中沒有聚合函數,這個參數不起做用。
  • garbageSize: 正整數。它是可選參數,默認值是50,000。若是沒有指定keyColumn,當內存中歷史數據的數量超過garbageSize時,系統會清理本次計算不須要的歷史數據。若是指定了keyColumn,意味着須要分組計算時,內存清理是各分組獨立進行的。當一個組的歷史數據記錄數超出garbageSize時,會清理該組再也不須要的歷史數據。若一個組的歷史數據記錄數未超出garbageSize,則該組數據不會被清理。若是metrics中沒有聚合函數,這個參數不起做用。

7. 總結

DolphinDB提供的異常檢測引擎是一個輕量、使用方便的流數據引擎,它經過與流數據表合做來完成流數據的實時檢測任務,可以知足物聯網實時監控和預警的需求。

相關文章
相關標籤/搜索