利用KD樹進行異常檢測


什麼是KD樹

要說KD樹,咱們得先說一下什麼是KNN算法。
KNN是k-NearestNeighbor的簡稱,原理很簡單:當你有一堆已經標註好的數據時,你知道哪些是正類,哪些是負類。當新拿到一個沒有標註的數據時,你想知道它是哪一類的。只要找到它的鄰居(離它距離短)的點是什麼類別的,所謂近朱者赤近墨者黑,KNN就是採用了相似的方法。node

image

如上圖,當有新的點不知道是哪一類時,只要看看離它最近的幾個點是什麼類別,咱們就判斷它是什麼類別。python

舉個例子:咱們將k取3(就是每次看看新來的數據點的三個住的最近的鄰居),那麼咱們將全部數據點和新來的數據點計算一次距離,而後排序,取前三個數據點,讓它們舉手表決。兩票及以上的類別咱們就認爲是新的數據點的類別。git

很簡單也很好的想法,可是,咱們要注意到當測試集數據比較大時,因爲每次未標註的數據點都要和所有的已標註的數據點進行一次距離計算,而後排序。能夠說時間開銷很是大。咱們在此基礎上,想到了一種存儲點與點之間關係的算法來經過空間換時間。
有一篇博文寫KD樹還不錯
點擊此處查看
舉個例子:有一個二維的數據集: T={(2,3),(5,4),(9,6),(4,7),(8,1),(7,2)}github

經過你已經學習的KD樹的算法,按照依次選擇維度,取各維中位數,是否得出和下面同樣的KD樹?
image2算法


異常檢測

咱們的數據來自於KDD Cup 1999 Data 點我下載數據app

數據格式以下圖
image3
數據的含義以下:
image4
image5學習

咱們此次實驗針對正常和DDOS攻擊兩種狀況進行檢測。
取特徵範圍爲(1,9)U(22,31)的特徵中的數值型特徵,最終獲得16維的特徵向量。將數據隨機化處理後按照7:3的比例分紅訓練集和測試集。下面是咱們組作好的訓練集和測試集
點我下載測試

處理完的訓練集和測試集以下圖:
image6優化

下面是具體實現:.net

# coding=utf8
import math
import time
import matplotlib.pyplot as plt
import numpy as np

old_settings = np.seterr(all='ignore')

#定義節點類型
class KD_node:
    def __init__(self, point=None, split=None, left=None, right=None):
        '''

        :param point: 數據點的特徵向量
        :param split: 切分的維度
        :param left: 左兒子
        :param right: 右兒子
        '''
        self.point = point
        self.split = split
        self.left = left
        self.right = right

計算方差,以利用方差大小進行判斷在哪一維進行切分

def computeVariance(arrayList):
    '''

    :param arrayList: 全部數據某一維的向量
    :return: 返回
    '''
    for ele in arrayList:
        ele = float(ele)
    LEN = float(len(arrayList))
    array = np.array(arrayList)
    sum1 = float(array.sum())
    array2 = np.dot(array, array.T)
    sum2 = float(array2.sum())
    mean = sum1 / LEN
    variance = sum2 / LEN - mean ** 2
    return variance

建樹

def createKDTree(root, data_list):
    '''

    :param root: 輸入一個根節點,以此建樹
    :param data_list: 數據列表
    :return: 返回根節點
    '''
    LEN = len(data_list)
    if LEN == 0:
        return
        # 數據點的維度
    dimension = len(data_list[0]) - 1 #去掉了最後一維標籤維
    # 方差
    max_var = 0
    # 最後選擇的劃分域
    split = 0
    for i in range(dimension):
        ll = []
        for t in data_list:
            ll.append(t[i])
        var = computeVariance(ll) #計算出在這一維的方差大小
        if var > max_var:
            max_var = var
            split = i
            # 根據劃分域的數據對數據點進行排序
    data_list = list(data_list)
    data_list.sort(key=lambda x: x[split]) #按照在切分維度上的大小進行排序
    data_list = np.array(data_list)
    # 選擇下標爲len / 2的點做爲分割點
    point = data_list[LEN / 2]
    root = KD_node(point, split)
    root.left = createKDTree(root.left, data_list[0:(LEN / 2)])#遞歸的對切分到左兒子和右兒子的數據再建樹
    root.right = createKDTree(root.right, data_list[(LEN / 2 + 1):LEN])
    return root
def computeDist(pt1, pt2):
    '''

    :param pt1: 特徵向量1
    :param pt2: 特徵向量2
    :return: 兩個向量的歐氏距離
    '''
    sum_dis = 0.0
    for i in range(len(pt1)):
        sum_dis += (pt1[i] - pt2[i]) ** 2
    #實現的歐氏距離計算,效率很低的版本,能夠改爲numpy的向量運算
    return math.sqrt(sum_dis)
def findNN(root, query):
    '''
    
    :param root: 創建好的KD樹的樹根
    :param query: 查詢數據
    :return: 與這個數據最近的前三個節點
    '''
    # 初始化爲root的節點
    NN = root.point
    min_dist = computeDist(query, NN)
    nodeList = []
    temp_root = root
    dist_list = [temp_root.point, None, None] #用來存儲前三個節點
    ##二分查找創建路徑
    while temp_root:
        nodeList.append(temp_root) #對向下走的路徑進行壓棧處理
        dd = computeDist(query, temp_root.point) #計算當前最近節點和查詢點的距離大小
        if min_dist > dd:
            NN = temp_root.point
            min_dist = dd
            # 當前節點的劃分域
        temp_split = temp_root.split
        if query[temp_split] <= temp_root.point[temp_split]:
            temp_root = temp_root.left
        else:
            temp_root = temp_root.right
            ##回溯查找
    while nodeList:
        back_point = nodeList.pop()
        back_split = back_point.split
        if dist_list[1] is None:
            dist_list[2] = dist_list[1]
            dist_list[1] = back_point.point
        elif dist_list[2] is None:
            dist_list[2] = back_point.point
        if abs(query[back_split] - back_point.point[back_split]) < min_dist: 
            #當查詢點和回溯點的距離小於當前最小距離時,另外一個區域有但願存在更近的節點
            #若是大於這個距離,能夠理解爲假設在二維空間上,直角三角形的直角邊已經不知足要求了,那麼斜邊也必定不知足要求
            if query[back_split] < back_point.point[back_split]:
                temp_root = back_point.right
            else:
                temp_root = back_point.left
            if temp_root:
                nodeList.append(temp_root)
                curDist = computeDist(query, temp_root.point)
                if min_dist > curDist:
                    min_dist = curDist
                    dist_list[2] = dist_list[1]
                    dist_list[1] = dist_list[0]
                    dist_list[0] = temp_root.point
                elif dist_list[1] is None or curDist < computeDist(dist_list[1], query):
                    dist_list[2] = dist_list[1]
                    dist_list[1] = temp_root.point
                elif dist_list[2] is None or curDist < computeDist(dist_list[1], query):
                    dist_list[2] = temp_root.point
    return dist_list

進行判斷

def judge_if_normal(dist_list):
    '''
    
    :param dist_list: 利用findNN查找出的最近三個節點進行投票表決
    :return: 
    '''
    normal_times = 0
    except_times = 0
    for i in dist_list:
        if abs(i[-1] - 0.0) < 1e-7: #浮點數的比較
            normal_times += 1
        else:
            except_times += 1
    if normal_times > except_times: #判斷是normal
        return True
    else:
        return False

數據預處理

def pre_data(path):
    f = open(path)
    lines = f.readlines()
    f.close()
    lstall = []
    for line in lines:
        lstn = []
        lst = line.split(",")
        u = 0
        y = 0
        for i in range(0, 9):
            if lst[i].isdigit():
                lstn.append(float(lst[i]))
                u += 1
            else:
                pass
        for j in range(21, 31):
            try:
                lstn.append(float(lst[j]))
                y += 1
            except:
                pass
        if lst[len(lst) - 1] == "smurf.\n" or lst[len(lst) - 1] == "teardrop.\n":
            lstn.append(int("1"))
        else:
            lstn.append(int("0"))
        lstall.append(lstn)
    nplst = np.array(lstall, dtype=np.float16)
    return nplst

下面就是我的的測試代碼了,大概運行了40分鐘才全跑完

def my_test(all_train_data, all_test_data, train_data_num):
    train_data = all_train_data[:train_data_num]
    train_time_start = time.time()
    root = KD_node()
    root = createKDTree(root, train_data)
    train_time_end = time.time()
    train_time = train_time_end - train_time_start
    right = 0
    error = 0
    test_time_start = time.time()
    for i in range(len(all_test_data)):
        if judge_if_normal(findNN(root, all_test_data[i])) is True and abs(all_test_data[i][-1] - 0.0) < 1e-7:
            right += 1
        elif judge_if_normal(findNN(root, all_test_data[i])) is False and abs(all_test_data[i][-1] - 1.0) < 1e-7:
            right += 1
        else:
            error += 1
    test_time_end = time.time()
    test_time = test_time_end - test_time_start
    right_ratio = float(right) / (right + error)
    return right_ratio, train_time, test_time


def draw(train_num_list=[10, 100, 1000, 10000], train_data=[], test_data=[]):
    train_time_list = []
    test_time_list = []
    right_ratio_list = []
    for i in train_num_list:
        print 'start run ' + i.__str__()
        temp = my_test(train_data, test_data, i)
        right_ratio_list.append(temp[0])
        train_time_list.append(temp[1])
        test_time_list.append(temp[2])
    plt.title('train data num from ' + train_num_list[0].__str__() + ' to ' + train_num_list[:-1].__str__())
    plt.subplot(311)
    plt.plot(train_num_list, right_ratio_list, c='b')
    plt.xlabel('train data num')
    plt.ylabel('right ratio')
    plt.grid(True)
    plt.subplot(312)
    plt.plot(train_num_list, train_time_list, c='r')
    plt.xlabel('train data num')
    plt.ylabel('time of train data (s)')
    plt.grid(True)
    plt.subplot(313)
    plt.plot(train_num_list, test_time_list, c='g')
    plt.xlabel('train data num')
    plt.ylabel('time of test data (s)')
    plt.grid(True)
    plt.show()


data = pre_data('KDD-test\ddos+normal_70.txt')
data2 = pre_data('KDD-test\ddos+normal_30.txt')
'''
建議開始將測試數據調小點,由於時間很長,下面這是所有訓練集和所有測試集,共花費了40分鐘才跑完。我是第六代i7 6700HQ+16G內存+1070+win10
'''
draw(train_num_list=[10, 100, 500, 1000, 2000, 3000, 5000, 10000, 15000, 20000, 50000, 100000, 265300],
     train_data=data[:], test_data=data2[:])

跑完的效果如圖所示:
image7
正確率最終達到95%以上。開始出現的波動咱們懷疑是數據在開始沒有達到良好的隨機效果。
訓練時間與訓練數據量明顯成線性關係
測試時間確實和理論一致,是Nlog(M)的時間複雜度。應該說這種時間複雜度的下降是咱們使用KD樹而不是原版的KNN最重要的地方。在原來的KNN算法下,假設訓練集大小爲M,測試集大小爲N,則查詢時間複雜度能夠達到O(MN),可是咱們下降到O(Nlog(M)),仍是挺合算的。

本次實驗能夠優化的地方不少,可是時間匆忙,沒有作更深的擴展。歡迎你們提出更多建議。

相關文章
相關標籤/搜索