要說KD樹,咱們得先說一下什麼是KNN算法。
KNN是k-NearestNeighbor的簡稱,原理很簡單:當你有一堆已經標註好的數據時,你知道哪些是正類,哪些是負類。當新拿到一個沒有標註的數據時,你想知道它是哪一類的。只要找到它的鄰居(離它距離短)的點是什麼類別的,所謂近朱者赤近墨者黑,KNN就是採用了相似的方法。node
如上圖,當有新的點不知道是哪一類時,只要看看離它最近的幾個點是什麼類別,咱們就判斷它是什麼類別。python
舉個例子:咱們將k取3(就是每次看看新來的數據點的三個住的最近的鄰居),那麼咱們將全部數據點和新來的數據點計算一次距離,而後排序,取前三個數據點,讓它們舉手表決。兩票及以上的類別咱們就認爲是新的數據點的類別。git
很簡單也很好的想法,可是,咱們要注意到當測試集數據比較大時,因爲每次未標註的數據點都要和所有的已標註的數據點進行一次距離計算,而後排序。能夠說時間開銷很是大。咱們在此基礎上,想到了一種存儲點與點之間關係的算法來經過空間換時間。
有一篇博文寫KD樹還不錯
點擊此處查看
舉個例子:有一個二維的數據集: T={(2,3),(5,4),(9,6),(4,7),(8,1),(7,2)}github
經過你已經學習的KD樹的算法,按照依次選擇維度,取各維中位數,是否得出和下面同樣的KD樹?
算法
咱們的數據來自於KDD Cup 1999 Data 點我下載數據app
數據格式以下圖
數據的含義以下:
學習
咱們此次實驗針對正常和DDOS攻擊兩種狀況進行檢測。
取特徵範圍爲(1,9)U(22,31)的特徵中的數值型特徵,最終獲得16維的特徵向量。將數據隨機化處理後按照7:3的比例分紅訓練集和測試集。下面是咱們組作好的訓練集和測試集
點我下載測試
處理完的訓練集和測試集以下圖:
優化
下面是具體實現:.net
# coding=utf8 import math import time import matplotlib.pyplot as plt import numpy as np old_settings = np.seterr(all='ignore') #定義節點類型 class KD_node: def __init__(self, point=None, split=None, left=None, right=None): ''' :param point: 數據點的特徵向量 :param split: 切分的維度 :param left: 左兒子 :param right: 右兒子 ''' self.point = point self.split = split self.left = left self.right = right
計算方差,以利用方差大小進行判斷在哪一維進行切分
def computeVariance(arrayList): ''' :param arrayList: 全部數據某一維的向量 :return: 返回 ''' for ele in arrayList: ele = float(ele) LEN = float(len(arrayList)) array = np.array(arrayList) sum1 = float(array.sum()) array2 = np.dot(array, array.T) sum2 = float(array2.sum()) mean = sum1 / LEN variance = sum2 / LEN - mean ** 2 return variance
建樹
def createKDTree(root, data_list): ''' :param root: 輸入一個根節點,以此建樹 :param data_list: 數據列表 :return: 返回根節點 ''' LEN = len(data_list) if LEN == 0: return # 數據點的維度 dimension = len(data_list[0]) - 1 #去掉了最後一維標籤維 # 方差 max_var = 0 # 最後選擇的劃分域 split = 0 for i in range(dimension): ll = [] for t in data_list: ll.append(t[i]) var = computeVariance(ll) #計算出在這一維的方差大小 if var > max_var: max_var = var split = i # 根據劃分域的數據對數據點進行排序 data_list = list(data_list) data_list.sort(key=lambda x: x[split]) #按照在切分維度上的大小進行排序 data_list = np.array(data_list) # 選擇下標爲len / 2的點做爲分割點 point = data_list[LEN / 2] root = KD_node(point, split) root.left = createKDTree(root.left, data_list[0:(LEN / 2)])#遞歸的對切分到左兒子和右兒子的數據再建樹 root.right = createKDTree(root.right, data_list[(LEN / 2 + 1):LEN]) return root
def computeDist(pt1, pt2): ''' :param pt1: 特徵向量1 :param pt2: 特徵向量2 :return: 兩個向量的歐氏距離 ''' sum_dis = 0.0 for i in range(len(pt1)): sum_dis += (pt1[i] - pt2[i]) ** 2 #實現的歐氏距離計算,效率很低的版本,能夠改爲numpy的向量運算 return math.sqrt(sum_dis)
def findNN(root, query): ''' :param root: 創建好的KD樹的樹根 :param query: 查詢數據 :return: 與這個數據最近的前三個節點 ''' # 初始化爲root的節點 NN = root.point min_dist = computeDist(query, NN) nodeList = [] temp_root = root dist_list = [temp_root.point, None, None] #用來存儲前三個節點 ##二分查找創建路徑 while temp_root: nodeList.append(temp_root) #對向下走的路徑進行壓棧處理 dd = computeDist(query, temp_root.point) #計算當前最近節點和查詢點的距離大小 if min_dist > dd: NN = temp_root.point min_dist = dd # 當前節點的劃分域 temp_split = temp_root.split if query[temp_split] <= temp_root.point[temp_split]: temp_root = temp_root.left else: temp_root = temp_root.right ##回溯查找 while nodeList: back_point = nodeList.pop() back_split = back_point.split if dist_list[1] is None: dist_list[2] = dist_list[1] dist_list[1] = back_point.point elif dist_list[2] is None: dist_list[2] = back_point.point if abs(query[back_split] - back_point.point[back_split]) < min_dist: #當查詢點和回溯點的距離小於當前最小距離時,另外一個區域有但願存在更近的節點 #若是大於這個距離,能夠理解爲假設在二維空間上,直角三角形的直角邊已經不知足要求了,那麼斜邊也必定不知足要求 if query[back_split] < back_point.point[back_split]: temp_root = back_point.right else: temp_root = back_point.left if temp_root: nodeList.append(temp_root) curDist = computeDist(query, temp_root.point) if min_dist > curDist: min_dist = curDist dist_list[2] = dist_list[1] dist_list[1] = dist_list[0] dist_list[0] = temp_root.point elif dist_list[1] is None or curDist < computeDist(dist_list[1], query): dist_list[2] = dist_list[1] dist_list[1] = temp_root.point elif dist_list[2] is None or curDist < computeDist(dist_list[1], query): dist_list[2] = temp_root.point return dist_list
進行判斷
def judge_if_normal(dist_list): ''' :param dist_list: 利用findNN查找出的最近三個節點進行投票表決 :return: ''' normal_times = 0 except_times = 0 for i in dist_list: if abs(i[-1] - 0.0) < 1e-7: #浮點數的比較 normal_times += 1 else: except_times += 1 if normal_times > except_times: #判斷是normal return True else: return False
數據預處理
def pre_data(path): f = open(path) lines = f.readlines() f.close() lstall = [] for line in lines: lstn = [] lst = line.split(",") u = 0 y = 0 for i in range(0, 9): if lst[i].isdigit(): lstn.append(float(lst[i])) u += 1 else: pass for j in range(21, 31): try: lstn.append(float(lst[j])) y += 1 except: pass if lst[len(lst) - 1] == "smurf.\n" or lst[len(lst) - 1] == "teardrop.\n": lstn.append(int("1")) else: lstn.append(int("0")) lstall.append(lstn) nplst = np.array(lstall, dtype=np.float16) return nplst
下面就是我的的測試代碼了,大概運行了40分鐘才全跑完
def my_test(all_train_data, all_test_data, train_data_num): train_data = all_train_data[:train_data_num] train_time_start = time.time() root = KD_node() root = createKDTree(root, train_data) train_time_end = time.time() train_time = train_time_end - train_time_start right = 0 error = 0 test_time_start = time.time() for i in range(len(all_test_data)): if judge_if_normal(findNN(root, all_test_data[i])) is True and abs(all_test_data[i][-1] - 0.0) < 1e-7: right += 1 elif judge_if_normal(findNN(root, all_test_data[i])) is False and abs(all_test_data[i][-1] - 1.0) < 1e-7: right += 1 else: error += 1 test_time_end = time.time() test_time = test_time_end - test_time_start right_ratio = float(right) / (right + error) return right_ratio, train_time, test_time def draw(train_num_list=[10, 100, 1000, 10000], train_data=[], test_data=[]): train_time_list = [] test_time_list = [] right_ratio_list = [] for i in train_num_list: print 'start run ' + i.__str__() temp = my_test(train_data, test_data, i) right_ratio_list.append(temp[0]) train_time_list.append(temp[1]) test_time_list.append(temp[2]) plt.title('train data num from ' + train_num_list[0].__str__() + ' to ' + train_num_list[:-1].__str__()) plt.subplot(311) plt.plot(train_num_list, right_ratio_list, c='b') plt.xlabel('train data num') plt.ylabel('right ratio') plt.grid(True) plt.subplot(312) plt.plot(train_num_list, train_time_list, c='r') plt.xlabel('train data num') plt.ylabel('time of train data (s)') plt.grid(True) plt.subplot(313) plt.plot(train_num_list, test_time_list, c='g') plt.xlabel('train data num') plt.ylabel('time of test data (s)') plt.grid(True) plt.show() data = pre_data('KDD-test\ddos+normal_70.txt') data2 = pre_data('KDD-test\ddos+normal_30.txt') ''' 建議開始將測試數據調小點,由於時間很長,下面這是所有訓練集和所有測試集,共花費了40分鐘才跑完。我是第六代i7 6700HQ+16G內存+1070+win10 ''' draw(train_num_list=[10, 100, 500, 1000, 2000, 3000, 5000, 10000, 15000, 20000, 50000, 100000, 265300], train_data=data[:], test_data=data2[:])
跑完的效果如圖所示:
正確率最終達到95%以上。開始出現的波動咱們懷疑是數據在開始沒有達到良好的隨機效果。
訓練時間與訓練數據量明顯成線性關係
測試時間確實和理論一致,是Nlog(M)的時間複雜度。應該說這種時間複雜度的下降是咱們使用KD樹而不是原版的KNN最重要的地方。在原來的KNN算法下,假設訓練集大小爲M,測試集大小爲N,則查詢時間複雜度能夠達到O(MN),可是咱們下降到O(Nlog(M)),仍是挺合算的。
本次實驗能夠優化的地方不少,可是時間匆忙,沒有作更深的擴展。歡迎你們提出更多建議。