機器學習(machine learning, ML)是一門涉及機率論、統計學、逼近論、凸分析、算法複雜度理論等多領域的交叉學科。ML專一於研究計算機模擬或實現人類的學習行爲,以獲取新知識、新技能,並重組已學習的知識結構使之不斷改善自身。算法
MLlib是Spark提供的可擴展的機器學習庫。MLlib已經集成了大量機器學習的算法,因爲MLlib涉及的算法衆多,筆者只對部分算法進行了分析,其他算法只是簡單列出公式,讀者若是想要對公式進行推理,須要本身尋找有關機率論、數理統計、數理分析等方面的專門著做。本章更側重於機器學習API的使用,基本可以知足大多數讀者的須要。apache
機器學習也屬於人工智能的範疇,該領域主要研究的對象是人工智能,尤爲是如何在經驗學習中改善具體算法。機器學習是人工智能研究較爲年輕的分支,它的發展過程大體可分爲以下4個階段:數組
(1) 機器學習的組成緩存
機器學習的基本結構由環境、知識庫和執行部分三部分組成。環境向學習部分(屬於知識庫的一部分)提供某些信息,學習部分利用這些信息修改知識庫,以增進執行部分完成任務的效能,執行部分根據知識庫完成任務,同時把得到的信息反饋給學習部分。dom
(2) 學習策略機器學習
學習策略是指機器學習過程當中所採用的推理策略。學習系統通常由學習和環境兩部分組成。環境(如書本或教師)提供信息,學習部分則實現信息轉換、存儲,並從中獲取有用的信息。學習過程當中,學生(學習部分)使用的推理越少,他對教師(環境)的依賴就越大,教師的負擔也就越重。根據學生實現信息轉換所需推理的多少和難易程度,以從簡單到複雜,從少到多的次序能夠將學習策略分爲如下6種基本類型:分佈式
學習策略還能夠從所獲取知識的表示形式、應用領域等維度分類。函數
(3) 應用領域工具
目前,機器學習普遍應用於數據挖掘、計算機視覺、天然語言處理、生物特徵識別、搜索引擎、醫學診斷、檢測信用卡欺詐、證券市場分析、DNA序列測序、語音和手寫識別、戰略遊戲和機器人等領域。性能
MLlib(machine learning library)是Spark提供的可擴展的機器學習庫。MLlib中已經包含了一些通用的學習算法和工具,如:分類、迴歸、聚類、協同過濾、降維以及底層的優化原語等算法和工具。
MLlib提供的API主要分爲如下兩類:
MLlib支持存儲在一臺機器上的局部向量和矩陣以及由一個或多個RDD支持的分佈式矩陣。局部向量和局部矩陣是提供公共接口的簡單數據模型。Breeze和jblas提供了底層的線性代數運算。Breeze提供了一組線性代數和數字計算的庫,具體信息訪問http://www.scalanlp.org/。jblas提供了使用Java開發的線性代數庫,具體信息訪問http://jblas.org/。
MLlib支持兩種局部向量類型:密集向量(dense)和稀疏向量(sparse)。密集向量由double類型的數組支持,而稀疏向量則由兩個平行數組支持。例如,向量(1.0,0.0,3.0)由密集向量表示的格式爲[1.0,0.0,3.0],由稀疏向量表示的格式爲(3,[0,2],[1.0,3.0])。
注意:這裏對稀疏向量作些解釋。3是向量(1.0,0.0,3.0)的長度,除去0值外,其餘兩個值的索引和值分別構成了數組[0,2]和數組[1.0,3.0]。
有關向量的類如圖所示。
Vector是全部局部向量的基類,Dense-Vector和SparseVector都是Vector的具體實現。
Spark官方推薦使用Vectors中實現的工廠方法建立局部向量,就像下面這樣:
import org.apache.spark.mllib.linalg.{Vector, Vectors} //建立密集向量(1.0, 0.0, 3.0) val dv: Vector = Vectors.dense(1.0, 0.0, 3.0) //給向量(1.0, 0.0, 3.0)建立疏向量 val svl: Vector= Vectors.sparse(3, Array(0, 2), Array(1.0, 3.0)) //經過指定非0的項目,建立稀疏向量(1.0, 0.0, 3.0) val sv2: Vector = Vectors.sparse(3, Seq((0, 1.0), (2, 3.0)))
注意: Scala默認會導入scala.collection.immutable.Vector,因此必須顯式導入org.apache.spark.mllib.linalg.Vector才能使用MLlib才能使用MLlib提供的Vector。
上面例子中以數組爲參數,調用Vectors的sparse接口,見以下代碼。使用Seq建立稀疏向量,其本質依然是使用數組,見以下代碼。
標記點是將密集向量或者稀疏向量與應答標籤相關聯。在MLlib中,標記點用於監督學習算法。MLlib使用double類型存儲標籤,因此咱們能在迴歸和分類中使用標記點。若是隻有兩種分類,可使用二分法,一個標籤要麼是1.0,要麼是0.0。若是有不少分類,標籤應該從零開始:0、一、2....
標記點由樣例類LabeledPoint來表示,其使用方式以下。
import org.apache.spark.mllib.linalg.Vectors import org.apache.spark.regression.LabeledPoint //使用標籤1.0和一個密集向量建立一個標記點 val pos = LabeledPoint(1.0, Vectors.dense(1.0, 0.0, 3.0)) //使用標籤0.0和一個疏向量建立一個標記點 val neg = LabeledPoint(0.0, Vectors.sparse(3, Array(0, 2), Array(1.0, 3.0)))
用稀疏的訓練數據作練習是很常見的,好在MLlib支持讀取存儲在LIBSVM格式中的訓練例子。LIBSVM格式是一種每一行表示一個標籤稀疏特徵向量的文本格式,其格式以下:
label index1:value1 index2:value2 ...
LIBSVM是林智仁教授等開發設計的一個簡單、易用和快速有效的SVM模式識別與迴歸的軟件包。MLlib已經提供了MLUtils.loadLibSVMFile方法讀取存儲在LIBSVM格式文本文件中的訓練數據,見以下代碼:
import org.apache.spark.mllib.regression.LabeledPoint import org.apache.spark.mllib.util.MLUtils import org.apache.spark.rdd.RDD val examples: RDD[LabeledPoint] = MLUtils.loadLibSVMFile(sc, "data/mllib/sample_libsvm_data.txt")
MLlib支持數據存儲在單個double類型數組的密矩陣。先來看這樣一個矩陣:
這個矩陣是如何存儲的?它只是存儲到一維數組[1.0, 3.0, 5.0, 2.0, 4.0, 6.0],這個矩陣的尺寸是3*2,即3行2列。
有關局部矩陣的類以下圖所示。局部矩陣的基類是Matrix,目前有一個實現類DenseMatrix。Spark官方推薦使用Matrices中實現的工廠方法建立局部矩陣,例如:
import org.apache.spark.mllib.linalg.{Matrix, Matrices} //建立密矩陣((1.0,2.0),(3.0, 4.0),(5.0, 6.0)) val dm: Matrix = Matrices.dense(3, 2, Array(1.0, 3.0, 5.0, 2.0, 4.0, 6.0))
分佈式矩陣分佈式地存儲在一個或者多個RDD中。如何存儲數據量很大的分佈式矩陣?最重要的在於選擇一個正確的格式。若是將分佈式矩陣轉換爲不一樣格式,可能須要全局的shuffle,成本很是昂貴。
有關分佈式矩陣的類如圖所示:
迄今爲止,MLlib已經實現了4種類型的分佈式矩陣:
下面展現了可使用RDD[Vector]實例來構建RowMatrix的例子。
import org.apache.spark.mllib.linalg.Vector import org.apache.spark.mllib.linalg.distributed.RowMatrix val rows: RDD[Vector] = ... val mat: RowMatrix = new RowMatrix(rows) val m = mat.numRows() val n = mat.numCols()
@Experimental case class IndexedRow(index: Long, vector: Vector)
經過刪除IndexedRowMatrix的行索引,能夠將IndexedRowMatrix轉換爲RowMatrix。下面的例子演示瞭如何使用IndexedRowMatrix。
import org.apache.spark.mllib.linalg.distributed.{IndexedRow, IndexedRowMatrix, RowEntry} val rows: RDD[IndexedRow] = ... val mat: IndexedRowMatrix = new IndexedRowMatrix(rows) val m = mat.numRows() val n = mat.numCols() val rowMat: RowMatrix = mat.toRowMatrix()
可使用RDD[MatrixEntry]實例建立CoordinateMatrix。MatrixEntry的實現以下。
@Experimental case class MatrixEntry(i: Long, j: Long, value: Double)
經過調用CoordinateMatrix的toIndexedRowMatrix方法,能夠將CoordinateMatrix轉換爲IndexedRowMatrix。下面的例子演示了CoordinateMatrix的使用。
import org.apache.spark.mllib.linalg.distributed.{CoordinateMatrix, MatrixEntry} val entries: RDD[MatriEntry] = ... val mat: CoordinateMatrix = new CoordinateMatrix(entries) val m = mat.numRows() val n = mat.numCols() val indexedRowMatrix = mat.toIndexedRowMatrix()
經過調用IndexedRowRowMatrix或者CoordinateMatrix的toBlockMatrix方法,能夠方便轉換爲BlockMatrix。toBlockMatrix方法建立的Block的默認大小是1024 x 1024。可使用toBlockMatrix(rowsPerBlock,colsPerBlock)方法改變Block的大小。下面的例子演示了BlockMatrix的使用。
import org.apache.spark.mllib.linalg.distributed.{BlockMatrix, CoordinateMatrix, MatrixEntry} val entries: RDD[MatrixEntry] = ... val coordMat: CoordinateMatrix = new CoordinateMatrix(entries) val matA: BlockMatrix = coordMat.toBlockMatrix().cache() matA.validate() val ata = matA.transpose.multiply(matA)
注意:因爲MLlib會緩存矩陣的大小,因此支持分佈式矩陣的RDD必需要有明確的類型,不然會致使出錯。
MLlib提供了不少統計方法,包括摘要統計、相關統計、分層抽樣、假設校驗、隨機數生成等。這些都涉及統計學、機率論的專業知識。
調用Statistics類的colStats方法,能夠得到RDD[Vector]的列的摘要統計。colStats方法返回了MultivariateStatisticalSummary對象,MultivariateStatisticalSummary對象包含了列的最大值、最小值、平均值、方差、非零元素的數量以及總數。下面的例子演示瞭如何使用colStats。
import org.apache.spark.mllib.linalg.Vector import org.apache.spark.stat.{MultivariateStatisticalSummary, Statistics} val observations: RDD[Vector] = ... val summary: MultivariateStatisticalSummary = Statistics.colStats(observations) println(summary.mean) //每一個列值組成的密集向量 println(summary.variance) //列向量方差 println(summary.numNonzeros) //每一個列的非零值個數
colStats實際使用了RowMatrix的computeColumnSummaryStatistics方法,見代碼以下:
計算兩個序列之間的相關性是統計中通用的操做。MLlib提供了計算多個序列之間相關統計的靈活性。目前支持的關聯方法運用了皮爾森相關係數(Pearson correlation coefficient)和斯皮爾森相關係統(Spearman's rank correlation coefficient)。
皮爾森相關係數也稱爲皮爾森積矩相關係數(Pearson product-moment correlation coefficient),是一種線性相關係數。皮爾森相關係數是用來反映兩個變量線性相關程度的統計量。
相關係數用r表示,其中n爲樣本量,xi,yi,sx,sy 分別爲兩個變量的觀測值和均值。r描述的是兩個變量間線性相關強弱的程度。r的取值在-1與+1之間,若r>0,代表兩個變量是正相關,即一個變量的值越大,另外一個變量的值也會越大;若r<0,代表兩個變量是負相關,即一個變量的值越大另外一個變量的值反而會越小。r的絕對值越大代表相關性越強,要注意的是這裏並不存在因果關係。若r=0,代表兩個變量間不是線性相關,但有多是其餘方式的相關(好比曲線方式)。
斯皮爾森秩相關係數也稱爲Spearman的p,是由Charles Spearman命名的,通常用希臘字母ps(rho)或rs表示。Spearman秩相關係數是一種無參數(與分佈無關)的校驗方法,用於度量變量之間聯繫的強弱。在沒有重複數據的狀況下,若是一個變量是另一個變量的嚴格單調函數,則Spearman秩相關係數就是+1或-1,稱變量徹底Spearman秩相關。注意和Pearson徹底相關的區別,只有當兩變量存在線性關係時,Pearson相關係數才爲+1或-1。
Spearman秩相關係數爲:
Statistics提供了計算序列之間相關性的方法,默認狀況下使用皮爾森相關係數,使用方法以下:
import org.apache.spark.SparkContext import org.apache.spark.mllib.linalg._ import org.apache.spark.mllib.stat.Statistics val sc: SparkContext = ... val seriesX: RDD[Double] = ... //a series val seriesY: RDD[Double] = ... //和seriesX必須有相同的分區和基數 val correlation:Double = Statistics.corr(seriesX, seriesY, "pearson") val data: RDD[Vector] = ... //每一個向量必須是行,不能是列 val correlMatrix: Matrix = Statistics.corr(data, "pearson")
Statistics中相關性的實現,見代碼以下:
其實質是代理了Correlations,Correlations中相關性的實現見代碼以下:
分層抽樣(Stratified sampling)是先將整體按某種特徵分爲若干次級(層),而後再從每一層內進行獨立取樣,組成一個樣本的統計學計算方法。爲了對分層抽樣有更直觀的感覺,請看下面的例子:
某市現有機動車共1萬輛,其中大巴車500輛,小轎車6000輛,中拔車1000輛,越野車2000輛,工程車500輛。如今要了解這些車輛的使用年限,決定採用分層抽樣方式抽取100個樣本。按照車輛佔比,各種車輛的抽樣數量分別爲5,60,10,20,5.
摘要統計和相關統計都集成Statistics中,而分層抽樣只須要調用RDD[(K,V)]的sampleByKey和sampleByKeyExact便可。爲了分層抽樣,其中的鍵能夠被認爲是標籤,值是具體的屬性。sampleByKey方法採用擲硬幣的方式來決定是否將一個觀測值做爲採樣,所以須要一個預期大小的樣本數據。sampleByKeyExact則須要更多更有效的資源,可是樣本數據的大小是肯定的。sampleByKeyExact方法容許用戶採用符合[fk * nk] V k ∈ K,其中fk是鍵k的函數,nk是RDD[(K,V)]中鍵爲k的(K,V)對,K是鍵的集合。下例演示瞭如何使用分層抽樣。
import org.apache.spark.SparkContext import org.apache.spark.SparkContext import org.apache.spark.rdd.PairRDDFunctions val sc: SparkContext = ... val data = ... //an RDD[(K,V)] of any key value pairs val fractions: Map[K. Double] = ... //specify the exact fraction desired from each key val exactSample = data.sampleByKeyExact(withReplacement = false, fractions)
假設校驗(hypothesis testing) 是數理統計學中根據必定假設條件由樣本推斷整體的一種方法。
若是對整體的某種假設是真實的,那麼不利於或不能支持這一假設的事件A(小几率事件)在一次試驗中幾乎不可能發生;要是在一次試驗中A居然發生了,就有理由懷疑該假設的真實性,拒絕這一假設。小几率原理能夠用圖表示。
H0表示原假設,H1表示備選假設。常見的假設校驗有以下幾種:
假設校驗是一個強大的工具,不管結果是否偶然的,均可以決定結果是否具備統計特徵。MLlib目前支持皮爾森卡方測試。輸入數據的類型決定了是作卡方適合度檢測仍是獨立性檢測。卡方適合度檢測的輸入數據類型應當是向量,而卡方獨立性檢測須要的數據類型是矩陣。RDD[LabeledPoint]能夠做爲卡方檢測的輸入類型。下列演示瞭如何使用假設校驗。
import org.apache.spark.SparkContext import org.apache.spark.mllib.linalg._ import org.apache.spark.mllib.regression.LabeledPoint import org.apache.spark.mllib.stat.Statistics._ val sc: SparkContext = ... val vec: Vector = ... //事件的頻率組成的vector val goodnessOfFitTestResult = Statistics.chiSqTest(vec) println(goodnessOfFitTestResult) val mat: Matrix = ... //偶然性matrix val independenceTestResult = Statistics.chiSqTest(mat) println(independenceTestResult) val obs:RDD[LabeledPoint] = ... //(feature, label) pairs val featureTestResults: Arra[ChiSqTestResult] = Statistics.chiSqTest(obs) var i = 1 featureTestResults.foreach{ result => println(s"Column $i:\n result") i += 1 } //summary of the test
隨機數能夠看作隨機變量,什麼是隨機變量?將一枚質地均勻的硬幣拋擲3次,記錄它的結果,有:
其中u表明正面朝上,d表明反面朝上,整個集合Ω是拋擲3次硬幣的樣本空間。正面朝上的次數多是0,1,2,3。因爲樣本空間Ω中的結果都是隨機發生的,因此出現正面的次數X是隨機的,X即爲隨機變量。若是拋擲硬幣,直到出現正面的拋擲次數爲Y,那麼Y的取值多是0,1,2,3...。若是隨機變量的取值是有限的(好比X)或者是可列的(好比Y),那麼就稱爲離散隨機變量。
剛纔說的拋擲3次硬幣的狀況下,使用P(X = 取值)的方式表達每種取值的機率,咱們不可貴出:
若是樣本空間上隨機變量的取值用x1,x2,x3,....表示,那麼存在知足p(x1) = P(X = xi)和Σip(xi) = 1 的函數p。這個函數p稱爲隨機變量X的機率質量函數或者頻率函數。
若是X取值累積某個範圍的值,那麼其累積分佈函數定義以下:
累積分佈函數知足:
同同樣本空間上兩個離散隨機變量X和Y的可能取值分別爲x1,x2,x3,...和y1,y2,y3,...,若是對全部i和j,知足:
則X和Y是獨立的。將此定義推廣到兩個以上離散隨機變量的情形,若是對全部i, j 和 k,知足:
則X、Y和Z是相互獨立的。
剛纔所說的X、Y的取值都是離散的,還有一種狀況下取值是連續的。以人的壽命爲例,能夠是任意的正實數值。與頻率函數相對的是密度函數ƒ(x)。ƒ(x)有這些性質:ƒ(x) ≥ 0,ƒ分段連續且∫∞-∞ƒ(x)dx = 1。若是X是具備密度函數ƒ的隨機變量,那麼對於任意的a < b,X落在區間(a, b)上的機率是密度函數從a到b的下方面積:
隨機數生成對於隨機算法、隨機協議和隨機性能測試都頗有用。MLlib支持均勻分佈、標準正態分佈、泊松分佈等生成隨機RDD。
MLlib有關隨機數的類如圖所示:
以泊松分佈爲例,先看看它的數學定義。參數爲λ(λ>0)的泊松頻率函數是
當λ = 0.一、一、五、10時的泊松分佈如圖所示:
RandomRDDs提供了工廠方法建立RandomRDD和RandomVectorRDD。下面的例子中生成了一個包含100萬個double類型隨機數的RDD[double],其值符合標準正太分佈N(0,1),分佈於10個分區,而後將其映射到N(1, 4)。
import org.apache.spark.SparkContext import org.apache.spark.mllib.random.RandomRDDs._ val sc: SparkContext = ... val u = normalRDD(sc, 1000000L, 10) val v = u.map(x => 1.0 + 2.0 * x)
MLlib支持多種多樣的分析方法,例如,二元分類、多元分類和迴歸。表11-1列出了各種問題的支持算法。
許多標準的機器學習方法均可以配製成凸優化問題,即找到一個極小的凸函數ƒ依賴於一個d項的可變向量w。形式上,咱們能夠寫爲優化問題minwεR dƒ(w),其中所述目標函數的形式爲:
這裏的向量xi ε Rd 是訓練數據,1 ≤ i ≤ n 而且yi ε Rd 是想要預測數據的相應的標籤。若是L(w; xi, yi)能表示爲 wτX和y的函數,咱們就說這個方法是線性的。幾個MLlib的分類和迴歸算法都屬於這一類,並在這裏討論。
目標函數ƒ有兩個部分:控制該模型的複雜的正則化部分和用於在訓練數據上測量模型的偏差的損失部分。損失函數L(w)是典型的基於w的凸函數。固定的正則化參數λ ≥ 0定義了最小損失(即訓練偏差)和最小化模型的複雜性(即避免過分擬合)這兩個目標之間的權衡。
(1) 損失函數
在統計學,統計決策理論和經濟學中,損失函數是指一種將一個事件(在一個樣本空間中的一個元素)映射到一個表達與其事件相關的經濟成本和機會成本的實數上的一種函數。一般而言,損失函數由損失項和正則項組成。表11-2列出了經常使用的損失函數。
這裏對錶11-2中的一些內容作些說明:
(2) 正規化
正規化的目的是鼓勵簡單的模型,並避免過分擬合。MLlib支持如下正規化,如表11-3所示:
這裏的sign(w)是由向量w中全部項的符號(±1)組成的向量。平滑度L2正規化問題通常比L1正規化容易解決。然而L1正規化能幫助促進稀疏權重,致使更小、更可解釋的模型,其中後者於特徵選擇是有用的。沒有任何正規化,特別是當訓練實例的數目是小的,不建議訓練模型。
(3) 優化
線性方法使用凸優化來優化目標函數。MLlib使用兩種方法:新元和L-BFGS來描述優化部分。目前,大多數算法的API支持隨機梯度降低(SGD),並有一些支持L-BFGS。
線性迴歸是一類簡單的指導學習方法。線性迴歸是預測定量響應變量的有用工具。不少統計學習方法都是從線性迴歸推廣和擴展獲得的,因此咱們有必要重點理解它。
簡單線性迴歸很是簡單,只根據單一的預測變量X預測定量響應變量Y。它假定X與Y之間存在線性關係。其數學關係以下:
≈表示近似。這種線性關係能夠描述爲Y對X的迴歸。β0和β1是兩個未知的常量,被稱爲線性模型的係數,它們分別表示線性模型中的截距和斜率。
β0和β1怎麼獲得呢?經過大量樣本數據估算出估計值。假如樣本數據以下:
(x1, y1),(x2, y2),....,(x3, y3)
此時問題轉換爲在座標中尋找一條與全部點的距離最大程度接近的直線問題,如圖11-7所示
使用最小二乘方法最終求得的估計值(β’0,β’1)。
實際狀況,全部的樣本或者真實數據不可能真的都在一條直線上,每一個座標都會有偏差,因此能夠表示爲以下關係:
上式也稱爲整體迴歸直線,是對X和Y之間真實關係的最佳線性近似。
相比簡單線性迴歸,實踐中經常不止一個預測變量,這就要求對簡單線性迴歸進行擴展。雖然能夠給每一個預測變量單獨創建一個簡單線性迴歸模型,但沒法作出單一的預測。更好的方法是擴展簡單線性迴歸模型,使它能夠直接包含多個預測變量。通常狀況下,假設有p個不一樣的預測變量,多元線性迴歸模型爲:
其中Xj表明第j個預測變量,βj表明第j個預測變量和響應變量之間的關聯。
5.2節的線性迴歸模型中假設響應變量Y是定量的,但不少時候,Y倒是定性的。好比杯子的材質是定性變量,能夠是玻璃、塑料或不鏽鋼等。定性變量也叫分類變量。預測定性響應值是指對觀測分類。
分類的目標是劃分項目分類。最多見的分類類型是二元分類,二元分類有兩種分類,一般命名爲正和負。若是有兩個以上的分類,它被稱爲多元分類。MLlib支持兩種線性方法分類:線性支持向量機和邏輯迴歸。線性支持向量機僅支持二元分類,而邏輯迴歸對二元分類和多元分類都支持。對於這兩種方法,MLlib支持L1和L2正規化變體。MLlib中使用RDD[LabeledPoint]表明訓練數據集,其中標籤引從0開始,如0,1,2,...。對於二元標籤γ在MLlib中使用0表示負,使用+1表示正。
線性支持向量機(SVM)是用於大規模分類任務的標準方法。正是在介紹損失函數時提到的:
默認狀況下,線性支持向量機使用L2正規化訓練。MLlib也支持選擇L1正規化,在這種狀況下,問題就變成了線性問題。線性支持向量機算法輸出SVM模型。給定一個新的數據點,記爲X,該模型基於wτx的值作預測。默認狀況下,若是wτx ≥ 0則結果是正的,不然爲負。
下例展現瞭如何加載樣本數據集,執行訓練算法。
import org.apache.spark.mllib.classification.{SVMModel,SVMWithSGD} import org.apache.spark.mllib.evaluation.BinaryClassificationMetrics import org.apache.spark.mllib.util.MLUtils //加載LIBSVM格式的訓練數據 val data = MLUtils.loadLibSVMFile(sc, "data/mllib/sample_libsbvm_data.txt") //將數據切分爲訓練數據(60%)和測試數據(40%) val splits = data.randomSplit(Array(0.6,0.4),seed = 11L) val training = splits(0).cache() val test = splits(1) //運行訓練算法構建模型 val numIterations = 100 val model = SVMWithSGD.train(training, numIterations) //在測試數據上計算原始分數 val scoreAndLabels = test.map{ point => val score = model.predict(point.features) (score, point.label) } //獲取評估指標 val metrics = new BinaryClassificationMetrics(scoreAndLabels) val auROC = metrics.areaUnderROC() println("Area under ROC = " + auROC) //保存和加載模型 model.save(sc, "myModelPath") val sameModel = SVMModel.load(sc, "myModelPath")
SVMWithSGD.train默認執行L2正規化,能夠設置正則化參數爲1.0爲執行L1正規化。配置及優化SVMWithSGD的代碼以下:
import org.apache.spark.mllib.optimization.L1Updater val svmAlg = new SVMWithSGD() svmAlg.optimizer. setNumIterations(200). setRegParam(0.1). setUpdater(new L1Updater) val modelL1 = svmAlg.run(training)
邏輯迴歸被普遍用於預測二元響應。它正是在介紹損失函數時提到的:
對於二元分類問題,該算法輸出二元邏輯迴歸模型。給定一個新的數據點,記爲X,該模型基於應用邏輯函數
作預測,其中z = wτx。默認狀況下,若是ƒ(wτx) > 0.5,輸出爲正,不然爲負。雖然不像線性支持向量機,邏輯迴歸模型中,ƒ(z)的原始輸出具備一律率解釋(即X是正機率)。
二元邏輯迴歸能夠推廣到多元邏輯迴歸來訓練和預測多元分類問題。對於多元分類問題,該算法將輸出一個多元邏輯迴歸模型,其中包含K-1個二元邏輯迴歸模型。MLlib實現了兩種算法來解決邏輯迴歸分析:小批量梯度降低和L-BFGS。Spark官方推薦L-BFGS,由於它比小批梯度降低的收斂更快。
下例演示瞭如何使用邏輯迴歸。
//運行訓練算法構建模型 val model = new LogisticRegressionWithLBFGS().setNumClasses(10).run(training) //在測試數據上計算原始分數 val predictionAndLabels = test.map{ case LabeledPoint(label, features) => val prediction = model.predict(features) (prediction, label) } //獲取評估指標 val metrics = new MulticlassMetrics(predictionAndLabels) val precision =metrics.precision println("Precision = " + precision) //保存和加載模型 model.save(sc, "myModelPath") val samleModel = LogisticRegressionModel.load(sc, "myModelPath")
線性最小二乘公式是迴歸問題最多見的公式。在介紹損失函數時也提到過它的公式:
多種多樣的迴歸方法經過使用不一樣的正規化類型,都派生自線性最小二乘。例如,普通最小二乘或線性最小二乘使用非正規化:嶺迴歸使用L2正規化;套索使用L1正規化。對於全部這些模型的損失和訓練偏差:
就是均方偏差。
下面的例子演示瞭如何使用線性迴歸。
//加載解析數據 val data = sc.textFile("data/mllib/ridge-data/lpsa.data") val parsedData = data.map { line => val parts = line.split(',') LabeledPoint(parts(0).toDouble, Vectors.dense(parts(1).split(' ').map(_.toDouble))).cache() //構建模型 val numIterations = 100 val model = LinearRegressionWithSGD.train(parsedData, numIterations) //使用訓練樣本計算模型而且計算訓練偏差 val valueAndPreds = parsedData.map { point => val prediction = model.predict(point.features) (point.label, prediction) } val MSE = valueAndPreds.map{case(v, p) => math.pow((v- p), 2)}.mean() println("training Mean Squared Error = " + MSE) //保存與加載模型 model.save(sc, "myModelPath") val sameModel = LinearRegressionModel.load(sc, "myModelPath") }
流式數據能夠適用於線上的迴歸模型,每當有新數據到達時,更新模型的參數。MLlib目前使用普通最小二乘法支持流線性迴歸。除了每批數據到達時,模型更新最新的數據外,實際與線下的執行是相似的。
下面的例子,假設已經初始化好了StreamingContext ssc來演示流線性迴歸。
val numFeatures = 3 val model = new StreamingLinearRegressionWithSGD() .setInitialWeights(Vectors.zeros(numFeatures)) model.trainOn(trainingData) model.predictOnValues(testData.map(lp => (lp.label, lp.features))).print() ssc.start() ssc.awaitTermination()
決策樹是分類和迴歸的機器學習任務中經常使用的方法。決策樹普遍使用,由於它們很容易解釋,處理分類的功能,延伸到多元分類設置,不須要縮放功能,並能捕捉到非線性和功能的交互。
MLlib使用連續和分類功能支持決策樹的二元和多元的分類和迴歸。經過行實現分區數據,容許分佈式訓練數以百萬計的實例。
決策樹是一個貪心算法,即在特性空間上執行遞歸的二元分割。決策樹爲每一個最底部(葉)分區預測相同的標籤。爲了在每一個樹節點上得到最大的信息,每一個分區是從一組可能的劃分中選擇的最佳分裂。
節點不純度是節點上標籤的均勻性的量度。當前實現提供了兩種分類不純度測量的方法(基尼不純度和嫡)和一種迴歸不純度測量的方法(方差),如表11-4所示:
信息增益是父節點不純度與兩個子節點不純度的加權總和之間的差。假設將有s個分區,大小爲N的數據集D劃分爲兩個數據集Dleft和Dright,那麼信息增益爲:
(1) 連續特徵
對於單機上實現的小數據集,給每一個連續特徵劃分的候選人在此特徵上有惟一值。有些實現了對特徵值排序,爲了加速計算,使用這些有序的惟一值劃分候選人。對於大的分佈式數據集,排序是很昂貴的。經過在樣本數據分數上執行位計算,實現了計算近似的劃分候選人集合。有序劃分建立了「箱」,可以使用maxBins參數指定這樣的容器的最大數量。
(2) 分類特徵
對於有M種可能值的分類特徵,將會有2M-1-1個劃分候選人。對於二元(0/1)分類和迴歸,咱們能夠經過平均標籤排序的分類特徵值,減小劃分候選人至M-1多得數據量。例如,對於一個有A、B和C三個分類的分類特徵的二元分類問題,其相應的標籤1的比例是0.2,0.6和0.4時,分類特徵是有序的A、C、B。兩個劃分候選人分別是A|C,B和A,C|B,其中|標記劃分。
在多元分類中共有2M-1-1種可能的劃分,不管什麼時候均可能被使用。當2M-1-1比參數maxBins大時,咱們使用與二元分類和迴歸相相似的方法。M種分類特徵用不純度排序,最終獲得須要考慮的M-1個劃分候選人。
遞歸樹的構建當知足下面三個條件之一時會停在一個節點。
下面的例子演示了使用基尼不純度做爲不純度算法且樹深爲5的決策樹執行分類。
import org.apache.spark.mllib.tree.DecisionTree import org.apache.spark.mllib.tree.model.DecisionTreeModel import org.apache.spark.mllib.util.MLUtils val data = MLUtils.loadLibSVMFile(sc, "data/mllib/sample_libsvm_data.txt") val splits = data.randomSplit(Array(0.7, 0.3)) //訓練決策樹模型 //空categoricalFeaturesInfo說明全部的特徵是連續的 val numClasses = 2 val categoricalFeaturesInfo = Map[Int, Int]() val impurity = "gini" val maxDepth = 5 val maxBins = 32 // trainingData是訓練數據 val model = DecisionTree.trainClassifier(trainingData, numClasses, categoricalFeaturesInfo, impurity, maxDepth, maxBins) //在測試實例上計算 val labelAndPreds = testData.map{ point => val prediction = model.predict(point.features) (point.label, prediction) } val testErr = labelAndPreds.filter(r => r._1 != r._2).count.toDouble / testData.count() println("Test Error = " + testErr) println("Learned classification tree model: \n" + model.toDebugString) model.save(sc, "myModelPath") val sameModel = DecisionTreeModel.load(sc, "myModelPath")
下面的例子演示了使用方差做爲不純度算法且樹深爲5的決策樹執行分類
val categoricalFeaturesInfo = Map[Int, Int]() val impurity = "variance" val maxDepth =5 val maxBins = 32 val model = DecisionTree.trainRegressor(trainingData, categoricalFeaturesInfo, impurity, maxDepth, maxBins) val labelsAndPredictions = testData.map{ point => val prediction = model.predict(point.features) (point.label, prediction) } val testMSE = labelsAndPredictions.map{ case(v, p) => math.pow((v - p), 2)}.mean() println("Test Mean Squared Error =" + testMSE) println("Learned regression tree model:\n" + model.toDebugString) model.save(sc, "myModelPath") val sameModel = DecisionTreeModel.load(sc, "myModelPath")
合奏是一個建立由其餘模型的集合組合而成的模型的學習算法。MLlib支持兩個主要的合奏算法:梯度提高決策樹和隨機森林,它們都使用決策樹做爲其基礎模型。梯度提高決策樹和隨機森林雖然都是決策樹合奏的學習算法,可是訓練過程是不一樣的。關於合奏有如下幾個權衡點:
總之,兩種算法都是有效的,具體選擇應取決於特定的數據集。
隨機森林是分類與迴歸中最成功的機器學習模型之一。爲了減小過分擬合的風險,隨機森林將不少決策樹結合起來。和決策樹類似,隨機森林處理分類的功能,延伸到多元分類設置,不須要縮放功能,並能捕捉到非線性和功能的交互。
MLlib使用連續和分類功能支持隨機森林的二元和多元的分類和迴歸。
隨機森林訓練一個決策樹的集合,因此訓練能夠並行。該算法隨機性注入訓練過程,使每一個決策樹會有一點不一樣。結合每棵樹的預測下降了預測的方差,改進了測試數據的性能。
算法隨機性注入訓練的過程包括:
1) 每次迭代對原始數據集進行二次採樣得到不一樣的訓練集,即引導。
2) 考慮在樹的每一個節點上將特徵的不一樣隨機子集分割。
除了這些隨機性,每一個決策樹個體都以一樣的方法訓練。
對隨機森林作預測,就必須聚合它的決策樹集合的預測。分類和迴歸的聚合是不一樣的:
下面例子演示了使用隨機森林執行分類。
//訓練隨機森林模型 //空categoricalFeaturesInfo說明全部特徵是連續的 val numClasses = 2 val categoricalFeaturesInfo = Map[Int, Int]() val numTrees = 3 //use more practice val featureSubsetStrategy = "auto" //Let the algorithm choose. val impurity = "gini" val maxDepth = 4 val maxBins = 32 val model = RandomForest.trainClassifier(trainingData, numClasses, categoricalFeaturesInfo, numTrees, featureSubsetStrategy, impurity, maxDepth, maxBins) val labelAndPreds = testData.map{ point => val prediction = model.predict(point.features) (point.label, prediction) } val testErr = labelAndPreds.filter(r => r._1 != r._2).count.toDouble / testData.count() println("Test Error = " + testErr) println("Learned classification forest model:\n" + model.toDebugString) model.save(sc, "myModelPath") val sameModel = RandomForestModel.load(sc, "myModelPath")
下面例子演示了使用隨機森林執行迴歸。
val numClasses = 2 val categoricalFeaturesInfo = Map[Int, Int]() val numTrees = 3 //Use more in practice. val featureSubsetStrategy = "auto" //Let the algorithm choose val impurity = "variance" val maxDepth = 4 val maxBins = 32 val model = RandomForest.trainRegressor(trainingData, categoricalFeaturesInfo, numTrees, featureSubsetStrategy, impurity, maxDepth, maxBins) val labelAndPredictions = testData.map{ point => val prediction = model.predict(point.features) (point.label, prediction) } val testMSE = labelAndPredictions.map{ case(v, p) => math.pow((v -p), 2)}.mean() println("Test Mean Squared Error = " + testMSE) println("Learned regression forest model:\n" + model.toDebugString) model.save(sc, "myModelPath") val sameModel = RandomForestModel.load(sc, "myModelPath")
GBT迭代訓練決策樹,以便最小化損失函數。和決策樹類似,隨機森林處理分類的功能,延伸到多元分類設置,不須要縮放功能,並能捕捉到非線性和功能的交互。
MLlib使用連續和分類功能支持梯度提高決策樹的二元和多元的分類和迴歸。
GBT迭代訓練一個決策樹的序列。在每次迭代中,算法使用當前合奏來預測每一個訓練實例的標籤,而後將預測與真實的標籤進行比較。數據集被從新貼上標籤,將重點放在預測不佳的訓練實例上。所以,在下一迭代中,決策樹將幫助糾正先前的錯誤。重貼標籤的具體機制是由損失函數定義的。隨着每次迭代,GBT進一步減小訓練數據上的損失函數。表11-5列出了MLlib中GBT支持的損失函數。請注意,每一個損失只適用於分類或迴歸之一。其中N表示實例數量,y1表示實例i的標籤,xi表示實例i的特徵,F(Xi)表示實例i的模型預測標籤。
下例演示了用LogLoss做爲損失函數,使用GBT執行分類的例子。
//訓練GradientBoostedTree模型 //默認使用LogLoss val boostingStrategy = BoostingStrategy.defaultParams("Classification") boostingStrategy.numIterations = 3 //Note:Use more iterations in practice. boostingStrategy.treeStrategy.numClasses = 2 boostingStrategy.treeStrategy.maxDepth = 5 //空categoricalFeaturesInfo說明全部特徵是連續的 boostingStrategy.treeStrategy.categoricalFeaturesInfo = Map[Int, Int]() val model = GradientBoostedTrees.train(trainingData, boostingStrategy) val labelAndPreds = testData.map{ point => val prediction = model.predict(point.features) (point.label, prediction) } val testErr = labelAndPreds.filter(r => r._1 != r._2).count.toDouble / testData.count() println("Test Error = " + testErr) println("Learned classification GBT model:\n" + model.toDebugString) model.save(sc, "myModelPath") val sameModel = GradientBoostedTreesModel.load(sc, "myModelPath")
下例演示了用Squared Error 做爲損失函數,使用GBT執行迴歸的例子。
//訓練GradientBoostedTrees模型 // defaultParams指定了Regression, 默認使用SquaredError val boostingStrategy = BoostingStrategy.defaultParams("Regression") boostingStrategy.numIterations = 3 //Note: Use more iterations in practice. boostingStrategy.treeStrategy.maxDepth = 5 //空categoricalFeaturesInfo說明全部特徵是連續的 boostingStrategy。treeStrategy.categoricalFeaturesInfo = Map [Int, Int]() val model = GradientBoostedTrees.train(trainingData, boostingStrategy) val labelsAndPredictions = testData.map{ point => val prediction = model.predict(point.features) (point.label, prediction) } val testMSE = labelsAndPredictions.map{case(v, p) => math.pow((v - p), 2)}.mean() println("Test Mean Squared Error = " + testMSE) println("Learned regression GBT model:\n" + model.toDebugString) model.save(sc, "myModelPath") val sameModel = GradientBoostedTreesModel.load(sc, "myModelPath")
j咱們首先來介紹一些數學中的理論,而後來看樸素貝葉斯。
條件機率:A和B表示兩個事件,且P(B) ≠ 0(B事件發生的機率不等於0),則給定事件B發生的條件下事件A發生的條件機率定義爲:
使用條件機率推導出乘法定律:A和B表示兩個事件,且P(B) ≠ 0(B事件發生的機率不等於0)。那麼:
將乘法定律擴展爲全機率定律:事件B1,B2,...,Bn知足Uni=1 Bi=Ω,Bi∩Bj=Ø,i ≠ j,且對全部的i,P(Bi) > 0。那麼,對於任意的A,知足:
貝葉斯公式:事件事件A, B1,B2,...,Bn, 其中Bi不相交,Uni=1 Bi=Ω,且對全部的i,P(Bi) > 0。那麼:
樸素貝葉斯分類算法是一種基於每對特徵之間獨立性的假設的簡單的多元分類算法。樸素貝葉斯的思想:對於給出的待分類項,求解在此項出現的條件下各個類別出現的機率,哪一個最大,就認爲此待分類項屬於哪一個類別。樸素貝葉斯分類的定義以下:
1) 設x = {a1,a2,...,am}爲待分類項,每一個ai爲x的一個特徵屬性;
2)類別集合C = {y1,y2,...,yn};
3) 計算P(y1|x),P(y2|x),...,P(yn|x) ;
4) 若是P(yk|x) = max{(y1|x),P(y2|x),...,P(yn|x) },則x ε yk 。
關鍵在第三步:
1) 找到一個已知分類的待分類項集合,這個集合叫作訓練樣本集。
2) 統計獲得在各種別下各個特徵屬性的條件機率估計。即P(a1|y1),P(a2|y1),...,P(am|y1); P(a1|y2),P(a2|y2),... ,P(am|y2);...;P(a1|yn),P(a2|yn),...,P(am|yn) 。
3) 若是各個特徵屬性都是獨立的,則根據貝葉斯公式能夠獲得如下推導:
樸素貝葉斯可以被很是有效的訓練。它被單獨傳給訓練數據,計算給定標籤特徵的條件機率分佈並給出觀察結果用於預測。
MLlib支持多項樸素貝葉斯和伯努利樸素貝葉斯。這些模型典型的應用是文檔分類。在這方面,每一個觀察是一個文檔,每一個特徵表明一個條件,其值是條件的頻率(在多項樸素貝葉斯中)或一個由零個或一個指示該條件是否在文檔中找到(在伯努利樸素貝葉斯中)。特徵值必須是非負的。模型類型選擇使用可選的參數"多項"或「伯努利」。「多項」做爲默認模型。經過設置參數λ(默認爲1.0)添加劑平滑。爲文檔分類,輸入特徵向量一般是稀疏的,由於稀疏向量能利用稀疏性的優點。由於訓練數據只使用一次,因此沒有必要緩存它。
下面的例子演示瞭如何使用多項樸素貝葉斯。
import org.apache.spark.mllib.classification.{NaiveBayes, NaiveBayesModel} import org.apache.spark.mllib.linalg.Vectors import org.apache.spark.mllib.regression.LabeledPoint val data = sc.textFile("data/mllib/sample_naive_bayes_data.txt") val parsedData = data.map{ line => val parts = line.split(',') LabeledPoint(parts(0).toDouble, Vectors.dense(parts(1).split(' ').map(_.toDouble))) } val splits = parsedData.randomSplit(Array(0.6, 0.4), seed = 11L) val training = splits(0) val test = splits(1) val model = NaiveBayes.train(training, lambda = 1.0, modelType = "multinomial") val predictionAndLabel = test.map(p => (model.predict(p.features), p.label)) val accuracy = 1.0 * predictionAndLabel.filter(x => x._1 == x._2).count() / test.count() model.save(sc, "myModelPath") val sameModel = GradientBoostedTreesModel.load(sc, "myModelPath")
保序迴歸屬於迴歸算法,其定義爲:給定一個有限的實數集合Y = {y1,y2,...,yn} 表示觀測響應, X = {x1,x2,...,xn} 表示未知的響應值,進行擬合找到一個最小化函數:
並使用x1≤x2≤...≤xn 對目標排序,其中ωi 是大於0的權重。最終的函數被稱爲保序迴歸,而且它是惟一的。它能夠看作是排序限制下的最小二乘問題。基本上保序迴歸是擬合原始數據點最佳的單調函數。
MLlib支持PAVA,此算法使用一種辦法來平行化保序迴歸。保序迴歸有一個可選參數isotonic,默認值是true。此參數指定保序迴歸是保序的(單調增長)仍是不保序的(單調減小)。
保序迴歸的結果被視爲分段線性函數。所以,預測的規則是:
1) 若是預測輸入能準備匹配訓練特徵,那麼返回相關預測。若是有多個預測匹配訓練特徵,那麼會返回其中之一。
2) 若是預測輸入比全部的訓練特徵低或者高,那麼最低和最高的訓練特徵各自返回。若是有多個預測比全部的訓練特徵低或者高,那麼都會返回。
3) 若是預測輸入介於兩個訓練特徵,那麼預測會被視爲分段線性函數和從最接近的訓練特徵中計算獲得的插值。
下面的例子演示瞭如何使用保序迴歸。
import org.apache.spark.mllib.regression.{IsotonicRegression, IsotonicRegressionModel} //省略數據加載及樣本劃分的代碼 val model = new IsotonicRegression().setIsotonic(true).run(training) val predictionAndLabel = test.map { point => val predictedLabel = model.predict(point._2) (predictedLabel, point._1) } val meanSquareError = predictionAndLabel.map{case(p, 1) => math.pow((p-1), 2)}.mean() println("Mean Squared Error = " + meanSquaredError) model.save(sc, "myModelPath") val sameModel = IsotonicRegressionModel.load(sc, "myModelPath")
協同過濾一般用於推薦系統。這些技術旨在填補用戶關聯矩陣的缺失項。MLlib支持基於模型的協同過濾,用戶和產品能夠預測缺失項的潛在因素的小集合來描述。MLlib採用交替最小二乘算法來學習這些潛在的因素。
矩陣分解的標準方法基於協同過濾處理用戶項矩陣的條目是明確的。現實世界的用例只能訪問隱式反饋是更常見的(例如瀏覽、點擊、購買、喜歡、股份等)。
下面的例子演示瞭如何使用協同過濾。
import org.apache.spark.mllib.recommendation.ALS import org.apache.spark.mllib.recommendation.MatrixFactorizationModel import org.apache.spark.mllib.recommendation.Rating val data = sc.textFile("data/mllib/als/test.data") val ratings = data.map(_.split(',') match { case Array(user, item, rate) => Rating(user.toInt, item.toInt, rate.toDouble) }) //使用ALS構建推薦模型 val rank = 10 val numIterations = 20 val model = ALS.train(ratings, rank, numIterations, 0.01) //模型計算 val usersProducts = ratings.map{ case Rating(user, product, rate) => (user, product) } val predictions = model.predict(usersProducts).map { case Rating(user, product, rate) => ((user, product), rate) } val ratesAndPreds = ratings.map { case Rating(user, product, rate) => ((user, product), rate) }.join(predictions) val MSE = ratesAndPres.map { case ((user, product), (r1, r2)) => val err = (r1 - r2) err * err }.mean() println("Mean Squared Error = " + MSE) model.save(sc, "myModelPath") val sameModel = MatrixFactorizationModel.load(sc, "myModelPath")
聚類分析又稱羣分析,它是研究(樣品或指標)分類問題的一種統計分析方法。聚類分析以類似性爲基礎,在一個聚類中的模式之間比不在同一聚類中的模式之間具備更多的類似性。MLlib支持的聚類算法以下:
K-means算法是硬聚類算法,是典型的基於原型的目標函數聚類方法的表明,它是數據點到原型的某種距離做爲優化的目標函數,利用函數求極值的方法獲得迭代運算的調整規則。
聚類屬於無監督學習,以往的迴歸、樸素貝葉斯、SVM等都是有類別標籤y的,也就是說,樣本中已經給出了樣本的分類。而聚類的樣本中卻沒有給定y,只有特徵x,好比假設宇宙中的星星能夠表示成三維空間中點集(x, y, z)。聚類的目的是找到每一個樣本x潛在的類別y,並將同類別y的樣本x放在一塊兒。
在聚類問題中,訓練樣本X = {x1,x2,...,xm},每一個xi ε Rn,K-means算法是將樣本聚類成k個簇,具體算法描述以下:
1) 隨機選取k個聚類質心點爲µ1,µ2,...,µk ε Rn;
2) 重複下面過程直到收斂。
對每個樣本i計算它應該屬於的類
對於每個類從新計算該類的質心
k是咱們事先給定的聚類數,ci表明樣本i與k個類中距離最近的那個類,ci的值是1到k中的一個。質心uj表明咱們對屬於同一個類的樣本中心點的猜想,拿星團模型來解釋就是要將全部的星星聚成k個星團,首先隨機選取k個宇宙中的點(或者k個星星)做爲k個星團的質心,而後第一步對於每個星星計算其到k個質心中每個的距離,接着選取距離最近的那個星團做爲ci,這樣通過第一步每個星星都有了所屬的星團;第二步對於每個星團,從新計算它的質心uj(對裏面全部的星星座標求平均)。重複迭代第一步和第二步直到質心不變或者變化很小。圖11-8演示了以上過程。
下面的例子演示了K-means算法的使用。
import org.apache.spark.mllib.clustering.{KMeans, KMeansModel} import org.apache.spark.mllib.linalg.Vectors val data = sc.textFile("data/mllib/kmeans_data.txt") val parsedData = data.map(s => Vectors.dense(s.split(' ').map(_.toDouble))).cache() val numClusters =2 val numIterations = 20 val clusters = KMeans.train(parsedData, numClusters, numIterations) val WSSSE = clusters.computeCost(parsedData) println("Within Set Sum of Squared Errors = " + WSSSE) clusters.save(sc, "myModelPath") val sameModel = KMeansModel.load(sc, "myModelPath")
K-means的結果是每一個數據點被分配到其中某一個cluster了,而高斯混合則給出這些數據點被分配到每一個cluster的機率。高斯混合的算法與K-means算法相似。MLlib中高斯混合的使用例子
import org.apache.spark.mllib.clustering.GaussianMixture import org.apache.spark.mllib.clustering.GaussianMixtureModel import org.apache.spark.mllib.linalg.Vectors val data = sc.textFile("data/mllib/gmm_data.txt") val parsedData = data.map(s => Vectors.dense(s.trim.split(' ').map(_.toDouble))).cache() val gmm = new GaussianMixture().setK(2).run(parsedData) gmm.save(sc, "myGMMModel") val sameModel = GaussianMixtureModel.load(sc, "myGMMModel") for(i <- 0 until gmm.k){ println("weight=%f\nmu=%s\nsigma=\n%s\n" format (gmm.weights(i), gmm.gaussians(i).mu, gmm.gaussians(i).sigma)) }
快速迭代聚類是一種簡單可擴展的圖聚類方法。其使用例子以下:
import org.apache.spark.mllib.clustering.{PowerIterationClustering, PowerIterationClusteringModel} import org.apache.spark.mllib.linalg.Vectors val similarities: RDD[(Long, Long, Double)] = ... val pic = new PowerIterationClustering().setK(3).setMaxIterations(20) val model = pic.run(similarities) model.assignments.foreach{ a => println(s"${a.id} -> ${a.cluster}") } model.save(sc, "myModelPath") val sameModel = PowerIterationClusteringModel.load(sc, "myModelPath")
latent Dirichlet allocation(LDA)是一個三層貝葉斯機率模型,包含詞、主題和文檔三層結構。文檔到主題服從Dirichlet分佈,主題到詞服從多項式分佈。
LDA是一種非監督機器學習技術,能夠用來識別大規模文檔集或語料庫中潛藏的主題信息。它採用了詞袋的方法,這種方法將每一篇文檔視爲一個詞頻向量,從而將文本信息轉化爲了易於建模的數字信息。可是詞袋方法沒有考慮詞與詞之間的順序,這簡化了問題的複雜性,同時也爲模型的改進提供了契機。每一篇文檔表明了一些主題所構成的一個機率分佈,而每個主題又表明了不少單詞所構成的一個機率分佈。因爲Dirichlet分佈隨機向量各份量間的弱相關性(之因此還有點「相關」,是由於各份量之和必須爲1),使得咱們假想的潛在主題之間也幾乎是不相關的,這與不少實際問題並不相符,從而形成了LDA的又一個遺留問題。
對於語料庫中的每篇文檔,LDA定義了以下生成過程:
1) 對每一篇文檔,從主題分佈中抽取一個主題;
2) 從上述被抽到的主題所對應的單詞分佈中抽取一個單詞;
3) 重複上述過程直至遍歷文檔中的每個單詞。
下例演示了LDA的使用。
當數據流到達,咱們可能想要動態地估算cluster,並更新它們。該算法採用了小批量的K-means更新規則。對每一批數據,將全部的點分配到最近的cluster,並計算最新的cluster中心,而後更新每一個cluster的公式爲:
ci是前一次計算獲得的cluster中心,ni是已經分配到cluster的點數,xi是從當前批次獲得的cluster的新中心,mi是當前批次加入cluster的點數。衰減因子a可被用於忽略過去的數據;a=1時全部數據都從一開始就被使用;a=0時只有最近的數據將被使用。這相似於一個指數加權移動平均值。
下面的例子演示了流失K-means的使用。
維數減縮是減小所考慮變量的數量的過程。維數減縮有兩種方式:
奇異值分解將一個矩陣因子分解爲三個矩陣U、Σ 和 V:
其中U是正交矩陣,其列被稱爲左奇異向量;Σ是對角矩陣,其對角線是非負的且以降序排列,所以被稱爲奇異值;V也是正交矩陣,其列被稱爲右奇異向量。
對於大的矩陣,除了頂部奇異值和它的關聯奇異值,咱們不須要徹底分解。這樣能夠節省存儲、去噪聲和恢復矩陣的低秩結構。若是咱們保持前7個頂部奇異值,那麼最終的低秩矩陣爲:
假設n小於m。奇異值和右奇異值向量來源於特徵值和Gramian矩陣AτA的特徵向量。矩陣存儲左奇異向量U,經過矩陣乘法U = A(VS-1)計算。使用的實際方法基於計算成本自動被定義:
1) 若是(n < 100) 或者(k > n/2),咱們首先計算Gramian矩陣,而後再計算其頂部特徵向量並將特徵向量本地化到Driver。這須要單次傳遞,在每一個Executor和Driver上使用O(n2)的存儲,並花費Driver上O(n2k)的時間。
2) 不然,將使用分佈式的方式計算AτA的值,而且發送給ARPACK計算頂部特徵向量。這須要O(k)次傳遞,每一個Executor上使用O(n)的存儲,在Driver上使用O(nk)的存儲。
下面的例子演示了SVD的使用。
主成分分析是一種統計方法,此方法找到一個旋轉,使得第一座標具備可能的最大方差,而且每一個隨後的座標都具備可能的最大方差。旋轉矩陣的列被稱爲主成分。下面的例子演示了使用RowMatrix計算主成分。
術語頻率反轉是一個反映文集的文檔中的術語的重要性,普遍應用於文本挖掘的特徵矢量化方法。術語表示爲t,文檔表示爲d,文集表示爲D。術語頻率TF(t,d)表示術語t在文檔d中出現的頻率,文檔頻率DF(t, D)表示包含術語t的文檔數量。若是咱們僅使用術語頻率來測量重要性,則很容易過分強調術語出現的很頻繁,而攜帶的關於文檔的信息不多,例如a、the 和of等。若是術語很是頻繁地跨文集出現,這意味着它並無攜帶文檔的特定信息。反轉文檔頻率是一個術語提供了多少信息的數值度量:
|D| 表示文集中的文檔總數。由於使用了對數,若是一個術語出現於全部的文檔中,它的IDF值變0。須要注意的是,應用一個平滑項,以免被零除。TF-IDF方法基於TF與IDF,它的公式以下:
這裏有一些術語頻率和文檔頻率定義的變種。在MLlib中,爲了使TF和IDF更靈活,將它們分開了,MLlib實現術語頻率時使用了哈希。應用歐冠哈希函數將原始特徵映射到了索引(術語),術語頻率所以依賴於map的索引計算。這種方法避免了須要計算一個全局術語到索引圖,這對於大型語料庫開銷會很大。但因爲不一樣的原始特徵在哈希後可能變爲一樣的術語,因此存在潛在的哈希衝突。爲了下降碰撞的機會,咱們能夠增長目標特徵的維度(即哈希表中的桶數)。默認的特徵維度是220 = 1048576.
下面的例子演示了HashingTF的使用。
Word2Vec計算由單詞表示的分佈式向量。分佈式表徵的主要優勢是,相似的單詞在矢量空間是接近的,這使得泛化小說模式更容易和模型估計更穩健。分佈式向量表示被證明在許多天然語言處理應用中有用,例如,命名實體識別、消歧、解析、標記和機器翻譯。
MLlib的Word2Vec實現採用了skip-gram模型。skip-gram的訓練目標是學習單詞的向量表示,其善於在同一個句子預測其上下文。給定了單詞序列w1,w2,...,wτ,skip-gram模型的目標是最大化平均對數似然,公式以下:
其中k是訓練窗口的大小。每一個單詞w與兩個向量uw和vw關聯,而且由單獨的向量來表示。經過給定的單詞wj正確預測wi的機率是由softmax模型決定的。softmax模型以下:
w表示詞彙量。
因爲計算logp(wi|wj)的成本與V成正比(V很容易就達到百萬以上),因此softmax這種skip-gram模型是很昂貴的。爲了加速Word2Vec計算,MLlib使用分級softmax,它能夠減小計算logp(wi|wj) 複雜度到O(log(V))。
下面的例子演示了Word2Vec的使用,
經過縮放到單位方差和/或經過在訓練集的樣本上使用列摘要統計溢出均值使特徵標準化,這是常見的預處理步驟。例如,當全部的特徵都有單位方差和/或零均值時,支持向量機的RBF核或者L1和L2正規化線性模型一般能更好地工做。標準化能夠提升在優化過程當中的收斂速度,而且還能夠防止在模型訓練期間,很是大的差別會對特徵發揮過大的影響。
StandardScaler的構造器有兩個參數:
如下例子演示了StandardScaler的使用。
正規化尺度把樣本劃分爲單位Lp範式,即維度。這是一種常見的對文本分類或集羣化的操做。例如,兩個L2正規化TF-IDF向量的點積是這些向量的餘弦近似值。
設二維空間內有兩個向量a和b,它們的夾角爲θ(0≤θ≤π),則點積定義爲如下實數:
MLlib提供Normalizer支持正規化,Normalizer有如下構造參數:
p: 正規化到Lp空間,默認爲2。
下面的例子演示了Normalizer的使用。
ChiSqSelector用於卡方特徵選擇。它運轉在具備分類特徵的標籤數據上。ChiSqSelector對基於分類進行獨立卡方測試的特徵排序,而且過濾(選擇)最接近標籤的頂部特徵。
ChiSqSelector有如下構造器參數:
numTopFeatures:選擇器將要過濾(選擇)的頂部特徵數量。
下邊的例子演示了ChiSqSelector的使用。
ElementwiseProduct採用逐個相乘的方式,使用給定的權重與每一個輸入向量相乘。換言之,它採用一個標量乘法器擴展數據集的每一列。這表示Hadamard積對輸入向量v,使用轉換向量w,最終生成一個結果向量。Hadamard積可由如下公式表示:
ElementwiseProduct的構造器參數爲:
w:轉換向量。
下面代碼演示了ElementwiseProduct的使用。
分析大規模數據集的第一個步驟一般是挖掘頻繁項目、項目集、亞序列或其餘子結構,這在數據挖掘中做爲一個活躍的研究主題已多年了。其數學原理讀者能夠取維基百科瞭解。MLlib提供了頻繁模式挖掘的並行實現——FP-growth算法。
給定一個交易數據集,FP-growth的第一步驟是計算項目的頻率,並肯定頻繁項目。FP-growth雖然與Apriori類算法有相同的設計目的,可是FP-growth的第二步使用後綴樹(FP樹)結構對交易數據編碼且不會顯式生成候選集(生成候選集一般開銷很大)。第二步以後,就能夠從FP樹中抽取頻繁項目集。MLlib中實現了FP-growth的平行版本,叫作PFP。PFP能夠將FP-growth的工做分發到其餘機器,比單機運行有更好的擴展性。
FPGrowth有如下參數:
下面的例子演示了FPGrowth的使用。
預言模型標記是一種基於XML的語言,它可以定義和共享應用程序之間的預測模型。
MLlib支持將模型導出爲預言模型標記語言。表11-6列出了MLlib模型導出爲PMML的相應模型。
下面的例子演示了將KMeamsModel導出爲PMML格式。
Spark1.2增長了一個新包spark.ml,目的是提供一套高層次的API,幫助用戶建立、調試機器學習的管道。spark.ml的標準化API用於將多種機器學習算法組合到一個管道或工做流中。下面列出了Spark ML API的主要概念:
機器學習中,運行一系列的算法取處理數據或者從數據學習的場景是很常見的。例如,一個簡單的文本文檔處理工做流可能包含如下階段:
1) 將文檔文本切分紅單詞;
2) 將文檔的單詞轉換爲數字化的特徵向量;
3) 使用特徵向量和標籤學習一個預測模型。
Spark ML以一系列按序運行的PipelineStage組成的管道來表示這樣的工做流。這一系列的Stage要麼是Transformer,要麼是Estimator。數據集經過管道中的每一個Stage都會被修改。好比Transformer的transform()方法將在數據集上被調用,Estimator的fit()方法被調用生成一個Transformer,而後此Transformer的transform()方法也將在數據集上被調用。圖11-9展現了簡單文本文檔工做流例子使用管道的處理流程。
圖11-9說明整個管道由3個Stage組成。Tokenizer和HashingTF都是Transformer,LogisticRegression是Estimator。每一個圓柱體都說明它自己是一個DataFrame。整個處理流程以下:
1) 在由原生文本文檔構成的原始數據集上應用Pipeline.fit()方法。
2) Tokenizer.transform()將原生文本文檔切分爲單詞,並向數據集增長單詞列。
3) HashingTF.transform()將單詞列轉換爲特徵向量,並向數據集增長向量列。
4) 由於LogisticRegression是Estimator,因此管道第一次調用LogisticRegression.fit()生成了LogisticRegressionModel。若是管道中還有更多的Stage,將會傳遞數據集到下一個Stage以前在數據集上調用LogisticRegressionModel的transform()。
管道自己是一個Estimator。所以調用Pipeline的fit()方法最後生成了PipelineModel,PipelineModel也是一個Transformer。這個PipelineModel會在測試時間使用,測試過程如圖11-10所示。
圖11-10說明PipelineModel的測試過程與圖11-9的管道有相同的Stage數量。可是圖11-9的管道中的全部Estimator在此時都已經變爲Transformer。當在測試數據集上調用PipelineModel的transform()方法時,數據在管道中按序經過。每一個Stage的transform()方法都會更新數據集,並將數據集傳遞給下一個Stage。
剛纔介紹的例子中,管道是線性的,即每一個stage都使用由上一個stage生產的數據。只要數據流圖構成了DAG,它就有可能不是線性的。若是管道構成了DAG,那麼這些Stage就必須指定拓撲順序。
Spark ML的Transformer和Estimator指定參數具備統一的API。有兩種方式指定參數:
下面的例子演示了Estimator、Transformer和Param的使用。
模型選擇是Spark ML中很重要的課題。經過對整個管道的調整,而不是對管道中的每一個元素的調整,促成對管道模型的選擇。當前spark.ml使用CrossValidator支持模型選擇。CrossValidator自己攜帶一個Estimator、一組ParamMap以及一個Evaluator。CrossValidator開始先將數據集劃分爲多組,每組都由訓練數據集和測試數據集組成。例如,須要劃分3組,那麼CrossValidator將生成三個數據集對(訓練,測試)。每一對都使用2/3的數據用於訓練,1/3的數據用於測試。CrossValidator會迭代ParamMap的集合。對於每一個ParamMap,它都會訓練給定的Estimator並使用給定的Evaluator計算。ParamMap將會產出最佳的計算模型(對多個數據集對求平均),CrossValidator最終使用這個最佳的ParamMap和整個數據集擬合Estimator。下邊的例子演示了CrossValidator的使用。使用ParamGridBuilder構造網格參數:hashingTF.numFeatures有3個值,r.regParam有2個值。這個網格將會有3*2=6個參數設置供CrossValidator選擇。使用了2組數據集對,那麼一共有(3*2)*2=12種不一樣的模型被訓練。
下面的代碼演示了交叉驗證的使用。