Skyline內部提供了9個預約義的算法,這些算法要解決這樣一個問題:python
input:一個timeseries output:是否異常
3-sigma
一個很直接的異常斷定思路是,拿最新3個datapoint的平均值(tail_avg方法)和整個序列比較,看是否偏離整體平均水平太多。怎樣算「太多」呢,由於standard deviation表示集合中元素到mean的平均偏移距離,所以最簡單就是和它進行比較。這裏涉及到3-sigma理論:linux
In statistics, the 68–95–99.7 rule, also known as the three-sigma rule or empirical rule, states that nearly all values lie within 3 standard deviations of the mean in a normal distribution.算法
About 68.27% of the values lie within 1 standard deviation of the mean. Similarly, about 95.45% of the values lie within 2 standard deviations of the mean. Nearly all (99.73%) of the values lie within 3 standard deviations of the mean.app
簡單來講就是:在normal distribution(正態分佈)中,99.73%的數據都在偏離mean 3個σ (standard deviation 標準差) 的範圍內。若是某些datapoint到mean的距離超過這個範圍,則認爲是異常的。Skyline初始內置的7個算法幾乎都是基於該理論的:less
stddev_from_average
def stddev_from_average(timeseries): """ A timeseries is anomalous if the absolute value of the average of the latest three datapoint minus the moving average is greater than one standard deviation of the average. This does not exponentially weight the MA and so is better for detecting anomalies with respect to the entire series. """ series = pandas.Series([x[1] for x in timeseries]) mean = series.mean() stdDev = series.std() t = tail_avg(timeseries) return abs(t - mean) > 3 * stdDev
該算法以下:ide
- 求timeseries的mean
- 求timeseries的standard deviation
- 求tail_avg到mean的距離,大於3倍的標準差則異常。
註釋中寫着會計算moving average,可是代碼直接計算的mean,跟moving average沒有關係,這比較不和諧。wordpress
該算法的特色是能夠有效屏蔽 「在一個點上突變到很大的異常值但在下一個點回落到正常水平」 的狀況,即屏蔽單點毛刺:由於它使用的是末3個點的均值(有效緩和突變),和整個序列比較(均值可能被異常值拉大),致使判斷正常。對於須要忽略 「毛刺」 數據的場景而言,該算法比後續的EWMA/mean_subtraction_cumulation等算法更適用(固然也能夠改造這些算法,用tail_avg代替last datapoint)。post
first_hour_average
def first_hour_average(timeseries): """ Calcuate the simple average over one hour, FULL_DURATION seconds ago. A timeseries is anomalous if the average of the last three datapoints are outside of three standard deviations of this value. """ last_hour_threshold = time() - (FULL_DURATION - 3600) series = pandas.Series([x[1] for x in timeseries if x[0] < last_hour_threshold]) mean = (series).mean() stdDev = (series).std() t = tail_avg(timeseries) return abs(t - mean) > 3 * stdDev
和上述算法幾乎一致,可是不一樣的是,比對的對象是 最近FULL_DURATION時間段內開始的1小時內 的數據,求出這段datapoint的mean和standard deviation後再用tail_avg進行比較。當FULL_DURATION小於1小時(86400)時,該算法和上一個算法一致。對於那些在一段較長時間內勻速遞增/減的metrics,該算法可能會誤報。測試
stddev_from_moving_average
def stddev_from_moving_average(timeseries): """ A timeseries is anomalous if the absolute value of the average of the latest three datapoint minus the moving average is greater than one standard deviation of the moving average. This is better for finding anomalies with respect to the short term trends. """ series = pandas.Series([x[1] for x in timeseries]) expAverage = pandas.stats.moments.ewma(series, com=50) stdDev = pandas.stats.moments.ewmstd(series, com=50) return abs(series.iget(-1) - expAverage.iget(-1)) > 3 * stdDev.iget(-1)
該算法先求出最後一個datapoint處的EWMA(Exponentially-weighted moving average)mean/std deviation,而後用最近的datapoint和3-sigma理論與之進行比對。
Moving Average
給定一個timeseries和subset長度(好比N天),則datapoint i 的N天 moving average = i以前N天(包括i)的平均值。不停地移動這個長度爲N的「窗口」並計算平均值,就獲得了一條moving average曲線。
Moving average經常使用來消除數據短時間內的噪音,顯示長期趨勢;或者根據已有數據預測將來數據。
Simple Moving Average
這是最簡單的moving average,爲「窗口」內datapoints的算數平均值(每一個datapoint的weight同樣):
SMA(i) = [p(i) + p(i-1) + … + p(i-n+1) ]/ n
當計算i+1處的SMA時,一個新的值加入,「窗口」左端的值丟棄,所以可獲得遞推式:
SMA(i) = SMA(i-1) + p(i)/n – p(i-n+1)/n
實現起來也很容易,只要記錄上次SMA和將要丟棄的datapoint便可(最開始的幾個是沒有SMA的)。Pandas中可用pandas.stats.moments.rolling_mean計算SMA。
SMA因爲過去的數據和如今的數據權重是同樣的,所以它相對真實數據的走向存在延遲,不太適合預測,更適合觀察長期趨勢。
Exponential moving average
有時也稱Exponential-weighted moving average,它和SMA主要有兩處不一樣:
- 計算SMA僅「窗口」內的n個datapoint參與計算,而EWMA則是以前全部point;
- EWMA計算average時每一個datapoint的權重是不同的,最近的datapoint擁有越高的權重,隨時間呈指數遞減。
EWMA的遞推公式是:
EWMA(1) = p(1) // 有時也會取前若干值的平均值。α越小時EWMA(1)的取值越重要。
EWMA(i) = α * p(i) + (1-α) * EWMA(i – 1) // α是一個0-1間的小數,稱爲smoothing factor.
能夠看到比SMA更容易實現,只要維護上次EWMA便可。
擴展開來則會發現,從i往前的datapoint,權重依次爲α, α(1-α), α(1-α)^2….., α(1-α)^n,依次指數遞減,權重的和的極限等於1。
smoothing factor決定了EWMA的 時效性 和 穩定性。α越大時效性越好,越能反映出最近數據狀態;α越小越平滑,越能吸取瞬時波動,反映出長期趨勢。
EWMA因爲其時效性被普遍應用在「根據已有時間序列預測將來數據」的場景中,(在計算機領域)比較典型的應用是在TCP中估計RTT,即從已有的RTT數據計算將來RTT,以肯定超時時間。
雖然EWMA中參與計算的是所有datapoint,但它也有相似SMA 「N天EWMA」的概念,此時α由N決定:α = 2/(N+1),關於這個公式的由來參見這裏。
回到Skyline,這裏並非用EWMA來預測將來datapoint,而是相似以前的算法求出總體序列的mean和stdDev,只不過計算時加入了時間的權重(EWMA),越近的數據對結果影響越大,即判斷的參照物是最近某段時間而非全序列; 再用last datapoint與之比較。所以它的優點在於:
- 能夠檢測到一個異常較短期後發生的另外一個(不過高的突變型)異常,其餘算法頗有可能會忽略,由於第一個異常把總體的平均水平和標準差都拉高了
- 比其餘算法更快對異常做出反應,由於它更多的是參考突變以前的點(低水平),而非整體水平(有可能被某個異常或者出現較屢次的較高的統計數據拉高)
而劣勢則是
- 對漸進型而非突發型的異常檢測能力較弱
- 異常持續一段時間後可能被斷定爲正常
mean_subtraction_cumulation
def mean_subtraction_cumulation(timeseries): """ A timeseries is anomalous if the value of the next datapoint in the series is farther than a standard deviation out in cumulative terms after subtracting the mean from each data point. """ series = pandas.Series([x[1] if x[1] else 0 for x in timeseries]) series = series - series[0:len(series) - 1].mean() stdDev = series[0:len(series) - 1].std() expAverage = pandas.stats.moments.ewma(series, com=15) return abs(series.iget(-1)) > 3 * stdDev
算法以下:
- 排除全序列(暫稱爲all)最後一個值(last datapoint),求剩餘序列(暫稱爲rest,0..length-2)的mean;
- rest序列中每一個元素減去rest的mean,再求standard deviation;
- 求last datapoint到rest mean的距離,即 abs(last datapoint – rest mean);
- 判斷上述距離是否超過rest序列std. dev.的3倍。
簡單地說,就是用最後一個datapoint和剩餘序列比較,比較的過程依然遵循3-sigma。這個算法有2個地方很可疑:
- 求剩餘序列的std. dev.時先減去mean再求,這一步是多餘的,對結果沒影響;
- 雖然用tail_avg已經很不科學了,這個算法更進一步,只判斷最後一個datapoint是否異常,這要求在兩次analysis間隔中最多隻有一個datapoint被加入,不然就會丟失數據。關於這個問題的討論,見這篇文章最末。
和stddev_from_average相比,該算法對於 「毛刺」 判斷爲異常的機率遠大於後者。
least_squares
def least_squares(timeseries): """ A timeseries is anomalous if the average of the last three datapoints on a projected least squares model is greater than three sigma. """ x = np.array([t[0] for t in timeseries]) y = np.array([t[1] for t in timeseries]) A = np.vstack([x, np.ones(len(x))]).T results = np.linalg.lstsq(A, y) residual = results[1] m, c = np.linalg.lstsq(A, y)[0] errors = [] for i, value in enumerate(y): projected = m * x[i] + c error = value - projected errors.append(error) if len(errors) < 3: return False std_dev = scipy.std(errors) t = (errors[-1] + errors[-2] + errors[-3]) / 3 return abs(t) > std_dev * 3 and round(std_dev) != 0 and round(t) != 0
- 用最小二乘法獲得一條擬合現有datapoint value的直線;
- 用實際value和擬合value的差值組成一個新的序列error;
- 求該序列的stdDev,判斷序列error的tail_avg是否>3倍的stdDev
由於最小二乘法的關係,該算法對直線形的metrics比較適用。該算法也有一個問題,在最後斷定的時候,不是用tail_avg到error序列的mean的距離,而是直接使用tail_avg的值,這無形中縮小了異常斷定範圍,也不符合3-sigma。
最小二乘法
對於一個點對序列(xi,yi),用一條直線去擬合它,若是某直線 Y=aX+b,使得實際值yi和擬合值Yi的偏差的平方和最小,則該直線爲最優的。
小結
能夠看到上述算法都是相似的思路:用最近的若干datapoint作樣本,和一個整體序列進行比對,不一樣的只是比對的對象:
- stddev_from_average
tail_avg ——— 整個序列 - first_hour_average
tail_avg ———- FULL_DURATION開始的一個小時 - stddev_from_moving_average
last datapoint ———– 整個序列的EW mean和EW std dev - mean_subtraction_cumulation
last datapoint ——— 剩餘序列 - least_squares
last datapoint ——– 真實數據和擬合直線間的差值序列
grubbs
def grubbs(timeseries): """ A timeseries is anomalous if the Z score is greater than the Grubb's score. """ series = scipy.array([x[1] for x in timeseries]) stdDev = scipy.std(series) mean = np.mean(series) tail_average = tail_avg(timeseries) z_score = (tail_average - mean) / stdDev len_series = len(series) threshold = scipy.stats.t.isf(.05 / (2 * len_series) , len_series - 2) threshold_squared = threshold * threshold grubbs_score = ((len_series - 1) / np.sqrt(len_series)) * np.sqrt(threshold_squared / (len_series - 2 + threshold_squared)) return z_score > grubbs_score
Grubbs測試是一種從樣本中找出outlier的方法,所謂outlier,是指樣本中偏離平均值過遠的數據,他們有多是極端狀況下的正常數據,也有多是測量過程當中的錯誤數據。使用Grubbs測試須要整體是正態分佈的。
Grubbs測試步驟以下:
- 樣本從小到大排序;
- 求樣本的mean和std.dev.;
- 計算min/max與mean的差距,更大的那個爲可疑值;
- 求可疑值的z-score (standard score),若是大於Grubbs臨界值,那麼就是outlier;
Grubbs臨界值能夠查表獲得,它由兩個值決定:檢出水平α(越嚴格越小),樣本數量n - 排除outlier,對剩餘序列循環作 1-5 步驟。
因爲這裏須要的是異常斷定,只須要判斷tail_avg是否outlier便可。代碼中還有求Grubbs臨界值的過程,看不懂。
Z-score (standard score)
標準分,一個個體到集合mean的偏離,以標準差爲單位,表達個體距mean相對「平均偏離水平(std dev表達)」的偏離程度,經常使用來比對來自不一樣集合的數據。
histogram_bins
def histogram_bins(timeseries): """ A timeseries is anomalous if the average of the last three datapoints falls into a histogram bin with less than 20 other datapoints (you'll need to tweak that number depending on your data) Returns: the size of the bin which contains the tail_avg. Smaller bin size means more anomalous. """ series = scipy.array([x[1] for x in timeseries]) t = tail_avg(timeseries) h = np.histogram(series, bins=15) bins = h[1] for index, bin_size in enumerate(h[0]): if bin_size <= 20: # Is it in the first bin? if index == 0: if t <= bins[0]: return True # Is it in the current bin? elif t >= bins[index] and t < bins[index + 1]: return True return False
該算法和以上都不一樣,它首先將timeseries劃分紅15個寬度相等的直方,而後判斷tail_avg所在直方內的元素是否<=20,若是是,則異常。
直方的個數和元素個數斷定須要根據本身的metrics調整,否則在數據量小的時候很容易就異常了。
ks_test
def ks_test(timeseries): """ A timeseries is anomalous if 2 sample Kolmogorov-Smirnov test indicates that data distribution for last 10 minutes is different from last hour. It produces false positives on non-stationary series so Augmented Dickey-Fuller test applied to check for stationarity. """ hour_ago = time() - 3600 ten_minutes_ago = time() - 600 reference = scipy.array([x[1] for x in timeseries if x[0] >= hour_ago and x[0] < ten_minutes_ago]) probe = scipy.array([x[1] for x in timeseries if x[0] >= ten_minutes_ago]) if reference.size < 20 or probe.size < 20: return False ks_d,ks_p_value = scipy.stats.ks_2samp(reference, probe) if ks_p_value < 0.05 and ks_d > 0.5: adf = sm.tsa.stattools.adfuller(reference, 10) if adf[1] < 0.05: return True return False
這個算法比較高深,它將timeseries分紅兩段:最近10min(probe),1 hour前 -> 10 min前這50分鐘內(reference),兩個樣本經過Kolmogorov-Smirnov測試後判斷差別是否較大。若是相差較大,則對refercence這段樣本進行 Augmented Dickey-Fuller 檢驗(ADF檢驗),查看其平穩性,若是是平穩的,說明存在從平穩狀態(50分鐘)到另外一個差別較大狀態(10分鐘)的突變,序列認爲是異常的。
關於這兩個檢驗過於學術了,以上只是我粗淺的理解。
Kolmogorov-Smirnov test
KS-test有兩個典型應用:
- 判斷某個樣本是否知足某個已知的理論分佈,如正態/指數/均勻/泊松分佈;
- 判斷兩個樣本背後的整體是否可能有相同的分佈,or 兩個樣本間是否可能來自同一整體, or 兩個樣本是否有顯著差別。
檢驗返回兩個值:D,p-value,不太明白他們的具體含義,Skyline裏當 p-value < 0.05 && D > 0.5 時,認爲差別顯著。
Augmented Dickey-Fuller test (ADF test)
用於檢測時間序列的平穩性,當返回的p-value小於給定的顯著性水平時,序列認爲是平穩的,Skyline取的臨界值是0.05。
median_absolute_deviation
def median_absolute_deviation(timeseries): """ A timeseries is anomalous if the deviation of its latest datapoint with respect to the median is X times larger than the median of deviations. """ series = pandas.Series([x[1] for x in timeseries]) median = series.median() demedianed = np.abs(series - median) median_deviation = demedianed.median() # The test statistic is infinite when the median is zero, # so it becomes super sensitive. We play it safe and skip when this happens. if median_deviation == 0: return False test_statistic = demedianed.iget(-1) / median_deviation # Completely arbitary...triggers if the median deviation is # 6 times bigger than the median if test_statistic > 6: return True
該算法不是基於mean/ standard deviation的,而是基於median / median of deviations (MAD)。
Median
大部分狀況下咱們用mean來表達一個集合的平均水平(average),可是在某些狀況下存在少數極大或極小的outlier,拉高或拉低了(skew)總體的mean,形成估計的不許確。此時能夠用median(中位數)代替mean描述平均水平。Median的求法很簡單,集合排序中間位置便是,若是集合總數爲偶數,則取中間兩者的平均值。
Median of deviation(MAD)
同mean同樣,對於median咱們也須要相似standard deviation這樣的指標來表達數據的緊湊/分散程度,即偏離average的平均距離,這就是MAD。MAD顧名思義,是deviation的median,而此時的deviation = abs( 個體 – median ),避免了少許outlier對結果的影響,更robust。
如今算法很好理解了:求出序列的MAD,看最後datapoint與MAD的距離是否 > 6個MAD。一樣的,這裏用最後一個datapoint斷定,依然存在以前提到的問題;其次,6 是個「magic number」,須要根據本身metrics數據特色調整。
該算法的優點在於對異常更加敏感:假設metric忽然變很高並保持一段時間,基於標準差的算法可能在異常出現較短期後即判斷爲正常,由於少許outlier對標準差的計算是有影響的;而計算MAD時,若異常datapoint較少會直接忽略,所以感知異常的時間會更長。
但正如Median的侷限性,該算法對於由多個cluster組成的數據集,即數據分佈在幾個差距較大的區間內,效果不好,很容易誤判。好比下圖:
該曲線在兩個區間內來回震盪,此時median爲58,如紅線所示。MAD計算則爲9,很明顯均不能準確描述數據集,最後節點的deviation=55,此時誤判了。
參考資料
各類Wiki
各類API文檔
前輩的總結:【Etsy 的 Kale 系統】skyline 的過濾算法
Moving Averages: What Are They?
Kolmogorov-Smirnov檢驗
Grubbs檢驗法
Median absolute deviation 《Head first statistics》