提起機器學習 (Machine Learning),相信不少計算機從業者都會對這個技術方向感到興奮。然而學習並使用機器學習算法來處理數據倒是一項複雜的工做,須要充足的知識儲備,如機率論,數理統計,數值逼近,最優化理論等。機器學習旨在使計算機具備人類同樣的學習能力和模仿能力,這也是實現人工智能的核心思想和方法。傳統的機器學習算法,因爲技術和單機存儲的限制,只能在少許數據上使用,隨着 HDFS(Hadoop Distributed File System) 等分佈式文件系統出現,存儲海量數據已經成爲可能。然而因爲 MapReduce 自身的限制,使得使用 MapReduce 來實現分佈式機器學習算法很是耗時和消耗磁盤容量。由於一般狀況下機器學習算法參數學習的過程都是迭代計算的,即本次計算的結果要做爲下一次迭代的輸入,這個過程當中,若是使用 MapReduce,咱們只能把中間結果存儲磁盤,而後在下一次計算的時候重新讀取,這對於迭代 頻發的算法顯然是致命的性能瓶頸。Spark 立足於內存計算,自然的適應於迭代式計算,相信對於這點,讀者經過前面幾篇文章已經有了較爲深刻的瞭解。然而即使這樣,對於普通開發者來講,實現一個分佈式機器學習算法仍然是一件極具挑戰的事情。MLlib 正是爲了讓基於海量數據的機器學習變得更加簡單,它提供了經常使用機器學習算法的分佈式實現,開發者只須要有 Spark 基礎而且瞭解機器學習算法的原理,以及方法相關參數的含義,就能夠輕鬆的經過調用相應的 API 來實現基於海量數據的機器學習過程。固然,原始數據 ETL,特徵指標提取,調節參數並優化學習過程,這依然須要有足夠的行業知識和數據敏感度,這每每也是經驗的體現。本文的重點在於向讀者介紹如何使用 MLlib 機器學習庫提供的 K-means 算法作聚類分析,這是一個有意義的過程,相信會對讀者特別是初學者有啓發意義。node
Spark 機器學習庫提供了經常使用機器學習算法的實現,包括聚類,分類,迴歸,協同過濾,維度縮減等。使用 Spark 機器學習庫來作機器學習工做,能夠說是很是的簡單,一般只須要在對原始數據進行處理後,而後直接調用相應的 API 就能夠實現。可是要想選擇合適的算法,高效準確地對數據進行分析,您可能還須要深刻了解下算法原理,以及相應 Spark MLlib API 實現的參數的意義。算法
須要說起的是,Spark 機器學習庫從 1.2 版本之後被分爲兩個包,分別是:apache
Spark MLlib 歷史比較長了,1.0 之前的版本中已經包含了,提供的算法實現都是基於原始的 RDD,從學習角度上來說,其實比較容易上手。若是您已經有機器學習方面的經驗,那麼您只須要熟悉下 MLlib 的 API 就能夠開始數據分析工做了。想要基於這個包提供的工具構建完整而且複雜的機器學習流水線是比較困難的。bash
Spark ML Pipeline 從 Spark1.2 版本開始,目前已經從 Alpha 階段畢業,成爲可用而且較爲穩定的新的機器學習庫。ML Pipeline 彌補了原始 MLlib 庫的不足,向用戶提供了一個基於 DataFrame 的機器學習工做流式 API 套件,使用 ML Pipeline API,咱們能夠很方便的把數據處理,特徵轉換,正則化,以及多個機器學習算法聯合起來,構建一個單一完整的機器學習流水線。顯然,這種新的方式給咱們提供了更靈活的方法,並且這也更符合機器學習過程的特色。機器學習
從官方文檔來看,Spark ML Pipeline 雖然是被推薦的機器學習方式,可是並不會在短時間內替代原始的 MLlib 庫,由於 MLlib 已經包含了豐富穩定的算法實現,而且部分 ML Pipeline 實現基於 MLlib。並且就筆者看來,並非全部的機器學習過程都須要被構建成一個流水線,有時候原始數據格式整齊且完整,並且使用單一的算法就能實現目標,咱們就沒有必要把事情複雜化,採用最簡單且容易理解的方式纔是正確的選擇。分佈式
本文基於 Spark 1.5,向讀者展現使用 MLlib API 進行聚類分析的過程。讀者將會發現,使用 MLlib API 開發機器學習應用方式是比較簡單的,相信本文可使讀者創建起信心並掌握基本方法,以便在後續的學習和工做中事半功倍。ide
聚類分析是一個無監督學習 (Unsupervised Learning) 過程, 通常是用來對數據對象按照其特徵屬性進行分組,常常被應用在客戶分羣,欺詐檢測,圖像分析等領域。K-means 應該是最有名而且最常用的聚類算法了,其原理比較容易理解,而且聚類效果良好,有着普遍的使用。函數
和諸多機器學習算法同樣,K-means 算法也是一個迭代式的算法,其主要步驟以下:工具
其中 C 表明中心點,X 表明任意一個非中心點。oop
在實際應用中,K-means 算法有兩個不得不面對而且克服的問題。
Spark MLlib K-means 算法的實如今初始聚類點的選擇上,借鑑了一個叫 K-means||的類 K-means++ 實現。K-means++ 算法在初始點選擇上遵循一個基本原則: 初始聚類中心點相互之間的距離應該儘量的遠。基本步驟以下:
Spark MLlib 中 K-means 算法的實現類 (KMeans.scala) 具備如下參數,具體以下。
經過下面默認構造函數,咱們能夠看到這些可調參數具備如下初始值。
參數的含義解釋以下:
一般應用時,咱們都會先調用 KMeans.train 方法對數據集進行聚類訓練,這個方法會返回 KMeansModel 類實例,而後咱們也可使用 KMeansModel.predict 方法對新的數據點進行所屬聚類的預測,這是很是實用的功能。
KMeans.train 方法有不少重載方法,這裏咱們選擇參數最全的一個展現。
KMeansModel.predict 方法接受不一樣的參數,能夠是向量,或者 RDD,返回是入參所屬的聚類的索引號。
在本文中,咱們所用到目標數據集是來自 UCI Machine Learning Repository 的 Wholesale customer Data Set。UCI 是一個關於機器學習測試數據的下載中心站點,裏面包含了適用於作聚類,分羣,迴歸等各類機器學習問題的數據集。
Wholesale customer Data Set 是引用某批發經銷商的客戶在各類類別產品上的年消費數。爲了方便處理,本文把原始的 CSV 格式轉化成了兩個文本文件,分別是訓練用數據和測試用數據。
讀者能夠從標題清楚的看到每一列表明的含義,固然讀者也能夠到 UCI 網站上去找到關於該數據集的更多信息。雖然 UCI 的數據能夠自由獲取並使用,可是咱們仍是在此聲明,該數據集的版權屬 UCI 以及其原始提供組織或公司全部。
本例中,咱們將根據目標客戶的消費數據,將每一列視爲一個特徵指標,對數據集進行聚類分析。代碼實現步驟以下
import org.apache.spark.{SparkContext, SparkConf} import org.apache.spark.mllib.clustering.{KMeans, KMeansModel} import org.apache.spark.mllib.linalg.Vectors object KMeansClustering { def main (args: Array[String]) { if (args.length < 5) { println("Usage:KMeansClustering trainingDataFilePath testDataFilePath numClusters numIterations runTimes") sys.exit(1) } val conf = new SparkConf().setAppName("Spark MLlib Exercise:K-Means Clustering") val sc = new SparkContext(conf) /** *Channel Region Fresh Milk Grocery Frozen Detergents_Paper Delicassen * 2 3 12669 9656 7561 214 2674 1338 * 2 3 7057 9810 9568 1762 3293 1776 * 2 3 6353 8808 7684 2405 3516 7844 */ val rawTrainingData = sc.textFile(args(0)) val parsedTrainingData = rawTrainingData.filter(!isColumnNameLine(_)).map(line => { Vectors.dense(line.split("\t").map(_.trim).filter(!"".equals(_)).map(_.toDouble)) }).cache() // Cluster the data into two classes using KMeans val numClusters = args(2).toInt val numIterations = args(3).toInt val runTimes = args(4).toInt var clusterIndex:Int = 0 val clusters:KMeansModel = KMeans.train(parsedTrainingData, numClusters, numIterations,runTimes) println("Cluster Number:" + clusters.clusterCenters.length) println("Cluster Centers Information Overview:") clusters.clusterCenters.foreach( x => { println("Center Point of Cluster " + clusterIndex + ":") println(x) clusterIndex += 1 }) //begin to check which cluster each test data belongs to based on the clustering result val rawTestData = sc.textFile(args(1)) val parsedTestData = rawTestData.map(line => { Vectors.dense(line.split("\t").map(_.trim).filter(!"".equals(_)).map(_.toDouble)) }) parsedTestData.collect().foreach(testDataLine => { val predictedClusterIndex: Int = clusters.predict(testDataLine) println("The data " + testDataLine.toString + " belongs to cluster " + predictedClusterIndex) }) println("Spark MLlib K-means clustering test finished.") } private def isColumnNameLine(line:String):Boolean = { if (line != null && line.contains("Channel")) true else false }
該示例程序接受五個入參,分別是
和本系列其餘文章同樣,咱們依然選擇使用 HDFS 存儲數據文件。運行程序以前,咱們須要將前文提到的訓練和測試數據集上傳到 HDFS。
./spark-submit --class com.ibm.spark.exercise.mllib.KMeansClustering \ --master spark://<spark_master_node_ip>:7077 \ --num-executors 6 \ --driver-memory 3g \ --executor-memory 512m \ --total-executor-cores 6 \ /home/fams/spark_exercise-1.0.jar \ hdfs://<hdfs_namenode_ip>:9000/user/fams/mllib/wholesale_customers_data_training.txt \ hdfs://<hdfs_namenode_ip>:9000/user/fams/mllib/wholesale_customers_data_test.txt \ 8 30 3
前面提到 K 的選擇是 K-means 算法的關鍵,Spark MLlib 在 KMeansModel 類裏提供了 computeCost 方法,該方法經過計算全部數據點到其最近的中心點的平方和來評估聚類的效果。通常來講,一樣的迭代次數和算法跑的次數,這個值越小表明聚類的效果越好。可是在實際狀況下,咱們還要考慮到聚類結果的可解釋性,不能一味的選擇使 computeCost 結果值最小的那個 K。
val ks:Array[Int] = Array(3,4,5,6,7,8,9,10,11,12,13,14,15,16,17,18,19,20) ks.foreach(cluster => { val model:KMeansModel = KMeans.train(parsedTrainingData, cluster,30,1) val ssd = model.computeCost(parsedTrainingData) println("sum of squared distances of points to their nearest center when k=" + cluster + " -> "+ ssd) })
從上圖的運行結果能夠看到,當 K=9 時,cost 值有波動,可是後面又逐漸減少了,因此咱們選擇 8 這個臨界點做爲 K 的個數。固然能夠多跑幾回,找一個穩定的 K 值。理論上 K 的值越大,聚類的 cost 越小,極限狀況下,每一個點都是一個聚類,這時候 cost 是 0,可是顯然這不是一個具備實際意義的聚類結果。