據咱們所知,有‘已知的已知’,有些事,咱們知道咱們知道;咱們也知道,有 ‘已知的未知’,也就是說,有些事,咱們如今知道咱們不知道。可是,一樣存在‘不知的不知’——有些事,咱們不知道咱們不知道。html
上一章中分類和迴歸都屬於監督學習。當目標值是未知時,須要使用非監督學習,非監督學習不會學習如何預測目標值。可是,它能夠學習數據的結構並找出類似輸入的羣組,或者學習哪些輸入類型可能出現,哪些類型不可能出現。java
異常檢測經常使用於檢測欺詐、網絡攻擊、服務器及傳感設備故障。在這些應用中,咱們要可以找出之前從未見過的新型異常,如新欺詐方式、新入侵方法或新服務器故障模式。git
聚類是最有名的非監督學習算法,K均值聚類是應用最普遍的聚類算法。它試圖在數據集中找出k個簇羣。在K均值算法中數據點相互距離通常採用歐氏距離。github
在K均值算法中簇羣實際上是一個點,即組成該簇的全部點的中信。數據點其實就是由全部數值型特徵組成的特徵向量,簡稱向量。算法
簇羣的中心稱爲質心,它是簇羣中全部點的算術平均值,所以算法取名K均值。算法開始時選擇一些數據點做爲簇羣的質心。而後把每一個數據點分配給最近的質心。接着對每一個簇計算該簇全部數據點的平均值,並將其做爲該簇的新質心。而後不斷重複這個過程。apache
統計對各個端口在短期內被遠程訪問的次數,就能夠獲得一個特徵,該特徵能夠很好地預測端口掃描攻擊。檢測網絡入侵是要找到與以往見過的鏈接不通的鏈接。K均值可根據每一個網絡鏈接的統計屬性進行聚類,結果簇定義了歷史鏈接類型,幫咱們界定了正常的鏈接的區域。任何在區域以外的點都是不正常的。json
KDD Cup是數據挖掘競賽,由ACM特別興趣小組舉辦。1999年主題爲網絡入侵。
數據下載地址:http://kdd.ics.uci.edu/databases/kddcup99/kddcup99.html
百度雲:http://pan.baidu.com/s/1cFqnRS
數據集大小爲108,每一個鏈接信息包括髮送的字節數、登陸次數、TCP錯誤數等。數據集爲CSV格式,每一個鏈接佔一行,包括38個特徵。
咱們關心的問題是找到「未知」的攻擊。服務器
加載數據並查看有哪些類別標號及每類樣本有多少:網絡
Scala:函數
val rawData = sc.textFile("D:/Workspace/AnalysisWithSpark/src/main/java/advanced/chapter5/kddcup.data/kddcup.data.corrected") rawData.map(_.split(',').last).countByValue().toSeq.sortBy(_._2).reverse.foreach(println)
Java:
1 //初始化SparkConf 2 SparkConf sc = new SparkConf().setMaster("local").setAppName("AnomalyDetectionInNetworkTraffic"); 3 System.setProperty("hadoop.home.dir", "D:/Tools/hadoop-2.6.4"); 4 JavaSparkContext jsc = new JavaSparkContext(sc); 5 6 //讀入數據 7 JavaRDD<String> rawData =jsc.textFile("src/main/java/advanced/chapter5/kddcup.data/kddcup.data.corrected"); 8 9 //查看有哪些類別標號及每類樣本有多少 10 ArrayList<Entry<String, Long>> lineList = new ArrayList<>(rawData.map(line -> line.split(",")[line.split(",").length-1]).countByValue().entrySet()); 11 Collections.sort(lineList, (m1, m2) -> m2.getValue().intValue()-m1.getValue().intValue()); 12 lineList.forEach(line -> System.out.println(line.getKey() + "," + line.getValue()));
結果:
smurf.,2807886
neptune.,1072017
normal.,972781
satan.,15892
ipsweep.,12481
portsweep.,10413
nmap.,2316
back.,2203
warezclient.,1020
teardrop.,979
pod.,264
guess_passwd.,53
buffer_overflow.,30
land.,21
warezmaster.,20
imap.,12
rootkit.,10
loadmodule.,9
ftp_write.,8
multihop.,7
phf.,4
perl.,3
spy.,2
看來用Scala一行能寫完的代碼用Java仍是比較麻煩的。
下面將CSV格式的行拆成列,刪除下標從1開始的三個類別型列和最後的標號列。
Scala:
import org.apache.spark.mllib.linalg._ val labelsAndData = rawData.map { line => val buffer = line.split(',').toBuffer buffer.remove(1, 3) val label = buffer.remove(buffer.length-1) val vector = Vectors.dense(buffer.map(_.toDouble).toArray) (label,vector) } val data = labelsAndData.values.cache()
Java:
1 //刪除下標從1開始的三個類別型列和最後的標號列 2 JavaRDD<Tuple2<String, Vector>> labelsAndData = rawData.map(line -> { 3 String[] lineArrya = line.split(","); 4 double[] vectorDouble = new double[lineArrya.length-4]; 5 for (int i = 0, j=0; i < lineArrya.length; i++) { 6 if(i==1 || i==2 || i==3 || i==lineArrya.length-1) { 7 continue; 8 } 9 vectorDouble[j] = Double.parseDouble(lineArrya[i]); 10 j++; 11 } 12 String label = lineArrya[lineArrya.length-1]; 13 Vector vector = Vectors.dense(vectorDouble); 14 return new Tuple2<String, Vector>(label,vector); 15 }); 16 17 RDD<Vector> data = JavaRDD.toRDD(labelsAndData.map(f -> f._2));
對數據進行聚類
Scala:
import org.apache.spark.mllib.clustering._ val kmeans = new KMeans() val model = kmeans.run(data) model.clusterCenters.foreach(println)
Java:
1 //聚類 2 KMeans kmeans = new KMeans(); 3 KMeansModel model = kmeans.run(data); 4 5 //聚類結果 6 Arrays.asList(model.clusterCenters()).forEach(v -> System.out.println(v.toJson()));
結果:
{"type":1,"values":[48.34019491959669,1834.6215497618625,826.2031900016945,5.7161172049003456E-6,6.487793027561892E-4,7.961734678254053E-6,0.012437658596734055,3.205108575604837E-5,0.14352904910348827,0.00808830584493399,6.818511237273984E-5,3.6746467745787934E-5,0.012934960793560386,0.0011887482315762398,7.430952366370449E-5,0.0010211435092468404,0.0,4.082940860643104E-7,8.351655530445469E-4,334.9735084506668,295.26714620807076,0.17797031701994304,0.17803698940272675,0.05766489875327384,0.05772990937912762,0.7898841322627527,0.021179610609915762,0.02826081009629794,232.98107822302248,189.21428335201279,0.753713389800417,0.030710978823818437,0.6050519309247937,0.006464107887632785,0.1780911843182427,0.17788589813471198,0.05792761150001037,0.05765922142400437]}
{"type":1,"values":[10999.0,0.0,1.309937401E9,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,1.0,1.0,0.0,0.0,1.0,1.0,1.0,0.0,0.0,255.0,1.0,0.0,0.65,1.0,0.0,0.0,0.0,1.0,1.0]}
程序輸出兩個向量,表明K均值將數據聚類成k=2個簇。對本章的數據集,咱們知道鏈接的類型有23個,所以程序確定沒能準確刻畫出數據中的不一樣羣組。
查看兩個簇中分別包含哪些類型的樣本。
Scala:
val clusterLabelCount = labelsAndData.map { case (label,datum) => val cluster = model.predict(datum) (cluster,label) }.countByValue clusterLabelCount.toSeq.sorted.foreach { case ((cluster,label),count) => println(f"$cluster%1s$label%18s$count%8s") }
Java:
1 ArrayList<Entry<Tuple2<Integer, String>, Long>> clusterLabelCount = new ArrayList<Entry<Tuple2<Integer, String>, Long>>(labelsAndData.map( v -> { 2 int cluster = model.predict(v._2); 3 return new Tuple2<Integer, String>(cluster, v._1); 4 }).countByValue().entrySet()); 5 6 Collections.sort(clusterLabelCount, (m1, m2) -> m2.getKey()._1-m1.getKey()._1); 7 clusterLabelCount.forEach(t -> System.out.println(t.getKey()._1 +"\t"+ t.getKey()._2 +"\t\t"+ t.getValue()));
結果:
1 portsweep. 1
0 portsweep. 10412
0 rootkit. 10
0 buffer_overflow. 30
0 phf. 4
0 pod. 264
0 perl. 3
0 spy. 2
0 ftp_write. 8
0 nmap. 2316
0 ipsweep. 12481
0 imap. 12
0 warezmaster. 20
0 satan. 15892
0 teardrop. 979
0 smurf. 2807886
0 neptune. 1072017
0 loadmodule. 9
0 guess_passwd. 53
0 normal. 972781
0 land. 21
0 multihop. 7
0 warezclient. 1020
0 back. 2203
結果顯示聚類根本沒有任何做用。簇1只有一個數據點!
計算兩點距離函數:
Scala:
def distance(a: Vector, b: Vector) = math.sqrt(a.toArray.zip(b.toArray). map(p => p._1 - p._2).map(d => d * d).sum)
Java:
1 public static double distance(Vector a, Vector b){ 2 double[] aArray = a.toArray(); 3 double[] bArray = b.toArray(); 4 ArrayList<Tuple2<Double, Double>> ab = new ArrayList<Tuple2<Double, Double>>(); 5 for (int i = 0; i < a.toArray().length; i++) { 6 ab.add(new Tuple2<Double, Double>(aArray[i],bArray[i])); 7 } 8 return Math.sqrt(ab.stream().map(x -> x._1-x._2).map(d -> d*d).reduce((r,e) -> r= r+e).get()); 9 }
計算數據點到簇質心距離函數:
Scala:
def distToCentroid(datum: Vector, model: KMeansModel) = { val cluster = model.predict(datum) val centroid = model.clusterCenters(cluster) distance(centroid, datum) }
Java:
1 public static double distToCentroid(Vector datum, KMeansModel model) { 2 int cluster = model.predict(datum); 3 Vector[] centroid = model.clusterCenters(); 4 return distance(centroid[cluster], datum); 5 }
給定k值的模型的平均質心距離函數:
Scala:
import org.apache.spark.rdd._ def clusteringScore(data: RDD[Vector], k: Int) = { val kmeans = new KMeans() kmeans.setK(k) val model = kmeans.run(data) data.map(datum => distToCentroid(datum, model)).mean() }
Java:
1 public static double clusteringScore(JavaRDD<Vector> data, int k) { 2 KMeans kmeans = new KMeans(); 3 kmeans.setK(k); 4 KMeansModel model = kmeans.run(JavaRDD.toRDD(data)); 5 return data.mapToDouble(datum -> distToCentroid(datum, model)).stats().mean(); 6 }
對K從5到40進行評估:
Scala:
(5 to 40 by 5).map(k => (k, clusteringScore(data, k))).foreach(println)
Java:
1 List<Double> list = Arrays.asList(new Integer[]{1, 2, 3, 4, 5, 6, 7, 8}).stream().map(k -> clusteringScore(labelsAndData.map(f -> f._2), k*5)).collect(Collectors.toList()); 2 3 list.forEach(System.out::println);
要算好久,結果:
1938.8583418059206
1686.4806829850777
1440.0646239087368
1305.763038353858
964.3070891182899
878.7358671386651
571.8923560384558
745.7857049862099
偷懶了,中間的那些和R相關還有標準化的沒有寫。
取k=150,聚類結果以下:
149 normal. 4
148 warezclient. 590
148 guess_passwd. 52
148 nmap. 1472
148 portsweep. 378
148 imap. 9
148 ftp_write. 2
…..
97 warezclient. 275
96 normal. 3
95 normal. 1
94 normal. 126
93 normal. 47
92 normal. 52196
92 loadmodule. 1
92 satan. 1
92 buffer_overflow.3
92 guess_passwd. 1
91 normal. 1
90 normal. 3
89 normal. 6
88 normal. 12388
…..
16 normal. 1
15 normal. 11
14 normal. 68
13 normal. 232
12 normal. 1
11 portsweep. 1
10 portsweep. 1
9 warezclient. 59
9 normal. 1
8 normal. 1
7 normal. 1
6 portsweep. 1
5 portsweep. 1
4 portsweep. 1
3 portsweep. 2
2 portsweep. 1
1 portsweep. 1
0 smurf. 527579
0 normal. 345
做爲示例,咱們在原始數據上進行異常檢查:
Scala:
val model = ... val originalAndData = ... val anomalies = originalAndData.filter { case (original, datum) => val normalized = normalizeFunction(datum) distToCentroid(normalized, model) > threshold }.keys
Java:
1 KMeans kmeansF = new KMeans(); 2 kmeansF.setK(150); 3 KMeansModel modelF = kmeansF.run(data); 4 5 System.out.println("json:---------"); 6 Arrays.asList(modelF.clusterCenters()).forEach(v -> System.out.println(v.toJson())); 7 8 ArrayList<Entry<Tuple2<Integer, String>, Long>> clusterLabelCountF = new ArrayList<Entry<Tuple2<Integer, String>, Long>>(labelsAndData.map( v -> { 9 int cluster = modelF.predict(v._2); 10 return new Tuple2<Integer, String>(cluster, v._1); 11 }).countByValue().entrySet()); 12 13 Collections.sort(clusterLabelCountF, (m1, m2) -> m2.getKey()._1-m1.getKey()._1); 14 clusterLabelCountF.forEach(t -> System.out.println(t.getKey()._1 +"\t"+ t.getKey()._2 +"\t\t"+ t.getValue())); 15 16 //距離中心最遠的第100個點的距離 17 JavaDoubleRDD distances = labelsAndData.map(f -> f._2).mapToDouble(datum -> distToCentroid(datum, modelF)); 18 Double threshold = distances.top(100).get(99); 19 20 JavaRDD<Tuple2<String, Vector>> result = labelsAndData.filter(t -> distToCentroid(t._2, modelF) > threshold); 21 System.out.println("result:---------"); 22 result.foreach(f -> System.out.println(f._2));
結果以下:
[2.0,222.0,1703110.0,0.0,0.0,0.0,0.0,0.0,1.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,1.0,1.0,0.0,0.0,0.0,0.0,1.0,0.0,0.0,73.0,255.0,1.0,0.0,0.01,0.03,0.0,0.0,0.0,0.0]
[10.0,194.0,954639.0,0.0,0.0,0.0,0.0,0.0,1.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,1.0,1.0,0.0,0.0,0.0,0.0,1.0,0.0,0.0,255.0,255.0,1.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0]
[43.0,528.0,1564759.0,0.0,0.0,0.0,0.0,0.0,1.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,1.0,1.0,0.0,0.0,0.0,0.0,1.0,0.0,0.0,94.0,10.0,0.11,0.76,0.01,0.0,0.0,0.0,0.7,0.1]
[24.0,333.0,1462897.0,0.0,0.0,0.0,0.0,0.0,1.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,1.0,1.0,0.0,0.0,0.0,0.0,1.0,0.0,0.0,2.0,2.0,1.0,0.0,0.5,0.0,0.0,0.0,0.0,0.0]
[60.0,885.0,1581712.0,0.0,0.0,0.0,0.0,0.0,1.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,1.0,1.0,0.0,0.0,0.0,0.0,1.0,0.0,0.0,30.0,8.0,0.27,0.1,0.03,0.0,0.0,0.0,0.0,0.0]
[65.0,693.0,2391949.0,0.0,0.0,0.0,0.0,0.0,1.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,1.0,1.0,0.0,0.0,0.0,0.0,1.0,0.0,0.0,75.0,16.0,0.21,0.05,0.01,0.0,0.0,0.0,0.0,0.0]
[60.0,854.0,1519233.0,0.0,0.0,0.0,0.0,0.0,1.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,1.0,1.0,0.0,0.0,0.0,0.0,1.0,0.0,0.0,113.0,34.0,0.3,0.04,0.01,0.0,0.0,0.0,0.0,0.0]
[107.0,585.0,2661605.0,0.0,0.0,0.0,0.0,0.0,1.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,1.0,1.0,0.0,0.0,0.0,0.0,1.0,0.0,0.0,171.0,47.0,0.27,0.02,0.01,0.0,0.0,0.0,0.0,0.0]
……
……
能夠改爲StreamingKmeans,它會根據增量對簇進行更新。官方文檔中也只有用Scala寫的代碼,若是須要找Java的話,能夠參考個人另一個項目中的代碼: https://github.com/jiangpz/LearnSpark/blob/master/src/main/java/mllib/StreamingKmeansExample.java