MLlib 是Spark 中提供機器學習函數的庫。它是專爲在集羣上並行運行的狀況而設計的。MLlib 中包含許多機器學習算法,能夠在Spark 支持的全部編程語言中使用,因爲Spark基於內存計算模型的優點,很是適合機器學習中出現的屢次迭代,避免了操做磁盤和網絡的性能損耗。Spark 官網展現的 MLlib 與Hadoop性能對比圖就很是顯著。因此Spark比Hadoop的MapReduce框架更易於支持機器學習。html
MLlib 包含一些特有的數據類型,它們位於org.apache.spark.mllib 包(Java/Scala)或
pyspark.mllib(Python)內。python
本地向量(Local Vector)的索引是從0開始的,而且是整型。而它的值爲 Double 類型,存儲於單個機器內。 MLlib 支持兩種本地向量:稠密向量和稀疏向量。算法
稠密向量是用一個 Double 類型的數組表明它的實體值,而稀疏向量是基於兩個並行數組,即索引和值。舉個例子,向量 (1.0, 0.0, 3.0)
寫成稠密形式就是 [1.0, 0.0, 3.0]
,而寫成稀疏形式則是 (3, [0, 2], [1.0, 3.0])
,後者的第一個 3 是指向量的大小。稀疏和稠密的界限沒有嚴格意義上的標準,一般須要依據具體的問題來決定。apache
本地向量的基類是 Vector 類,DenseVector 和 SparseVector 類是其兩種不一樣的實現。官方文檔推薦你們使用 Vector 類中已實現的工廠方法來建立本地向量。編程
Scala環境下:數組
//建立稠密向量 scala> val denseVec1 = Vectors.dense(1.0,2.0,3.0) denseVec1: org.apache.spark.mllib.linalg.Vector = [1.0,2.0,3.0] scala> val denseVec2 = Vectors.dense(Array(1.0,2.0,3.0)) denseVec2: org.apache.spark.mllib.linalg.Vector = [1.0,2.0,3.0] //建立稀疏向量 scala> val sparseVec1 = Vectors.sparse(4,Array(0,2),Array(1.0,2.0)) sparseVec1: org.apache.spark.mllib.linalg.Vector = (4,[0,2],[1.0,2.0])
python環境下:
網絡
>>> from pyspark.mllib.linalg import Vectors >>> den = Vectors.dense([1.0,2.0,3.0]) >>> den DenseVector([1.0, 2.0, 3.0]) >>> spa = Vectors.sparse(4,[0,2],[1.0,2.0]) >>> spa SparseVector(4, {0: 1.0, 2: 2.0})
在諸如分類和迴歸這樣的監督式學習(supervised learning)算法中,LabeledPoint 用來表示帶標籤的數據點。它包含一個特徵向量與一個標籤(由一個浮點數表示),位置在 mllib.regression
包中。框架
Scala環境中:dom
// 首先須要引入標籤點相關的類 import org.apache.spark.mllib.linalg.Vectors import org.apache.spark.mllib.regression.LabeledPoint // 建立一個帶有正面標籤和稠密特徵向量的標籤點。 val pos = LabeledPoint(1.0, Vectors.dense(1.0, 0.0, 3.0)) // 建立一個帶有負面標籤和稀疏特徵向量的標籤點。 val neg = LabeledPoint(0.0, Vectors.sparse(3, Array(0, 2), Array(1.0, 3.0)))
注意其第一個參數爲標籤,第二個參數爲向量。標籤是用Double
類型表示的。機器學習
Python環境中:
>>> from pyspark.mllib.regression import LabeledPoint >>> from pyspark.mllib.linalg import Vectors >>> pos = LabeledPoint(1.0,Vectors.dense([1.0,2.0,3.0])) >>> neg = LabeledPoint(0.0,Vectors.dense([1.0,2.0,3.0]))
稠密矩陣的實體值以列爲主要次序的形式,存放於單個 Double 型數組內。係數矩陣的非零實體以列爲主要次序的形式,存放於壓縮稀疏列(Compressed Sparse Column, CSC)中。例如,下面這個稠密矩陣就是存放在一維數組 [1.0, 3.0, 5.0, 2.0, 4.0, 6.0] 中,矩陣的大小爲 (3, 2) 。
import org.apache.spark.mllib.linalg.{Matrix, Matrices} // 建立稠密矩陣 ((1.0, 2.0), (3.0, 4.0), (5.0, 6.0)) val dm: Matrix = Matrices.dense(3, 2, Array(1.0, 3.0, 5.0, 2.0, 4.0, 6.0))關於稀疏矩陣的存儲方式請參考: http://www.tuicool.com/articles/A3emmqi
/下列矩陣 1.0 0.0 4.0 0.0 3.0 5.0 2.0 0.0 6.0 / 若是採用稀疏矩陣存儲的話,其存儲信息包括: 實際存儲值: [1.0, 2.0, 3.0, 4.0, 5.0, 6.0]`, 矩陣元素對應的行索引:rowIndices=[0, 2, 1, 0, 1, 2]` 列起始位置索引: `colPointers=[0, 2, 3, 6]`.則生成稀疏矩陣的方式爲:
scala> val sparseMatrix= Matrices.sparse(3, 3, Array(0, 2, 3, 6), Array(0, 2, 1, 0, 1, 2), Array(1.0, 2.0, 3.0,4.0,5.0,6.0)) sparseMatrix: org.apache.spark.mllib.linalg.Matrix = 3 x 3 CSCMatrix (0,0) 1.0 (2,0) 2.0 (1,1) 3.0 (0,2) 4.0 (1,2) 5.0 (2,2) 6.0
關於更多類型介紹參考:
http://spark.apache.org/docs/1.6.1/mllib-data-types.html#local-matrix
不管是在即時的探索中,仍是在機器學習的數據理解中,基本的統計都是數據分析的重要部分。MLlib 經過mllib.stat.Statistics 類中的方法提供了幾種普遍使用的統計函數,這些函數能夠直接在RDD 上使用。一些經常使用的函數以下所列。
計算由向量組成的RDD 的統計性綜述,保存着向量集合中每列的最小值、最大值、平均值和方差。這能夠用來在一次執行中獲取豐富的統計信息。
計算由向量組成的RDD 中的列間的相關矩陣,使用皮爾森相關(Pearson correlation)或斯皮爾曼相關(Spearman correlation)中的一種(method 必須是pearson 或spearman中的一個)。
計算兩個由浮點值組成的RDD 的相關矩陣,使用皮爾森相關或斯皮爾曼相關中的一種(method 必須是pearson 或spearman 中的一個)。
計算由LabeledPoint 對象組成的RDD 中每一個特徵與標籤的皮爾森獨立性測試
(Pearson’s independence test) 結果。返回一個ChiSqTestResult 對象, 其中有p 值
(p-value)、測試統計及每一個特徵的自由度。標籤和特徵值必須是分類的(即離散值)。構建測試數據,
下面舉個例子:使用三個學生的成績Vector來構建所需的RDD Vector,這個矩陣裏的每一個Vector都表明一個學生在四門課程裏的分數:
Python環境下:
from pyspark.mllib.stat import Statistics from pyspark.mllib.linalg import Vectors //構建RDD basicTestRDD = sc.parallelize([Vectors.dense([60, 70, 80, 0]), Vectors.dense([80, 50, 0, 90]), Vectors.dense([60, 70, 80, 0])])
//以查看下summary裏的成員,這個對象中包含了大量的統計內容 >>> print summary.mean() [ 66.66666667 63.33333333 53.33333333 30. ] >>> print summary.variance() [ 133.33333333 133.33333333 2133.33333333 2700. ] >>> print summary.numNonzeros() [ 3. 3. 2. 1.]Scala環境:
import org.apache.spark.mllib.linalg.{Vector, Vectors} import org.apache.spark.rdd.RDD val array1: Array[Double] = Array[Double](60, 70, 80, 0) val array2: Array[Double] = Array[Double](80, 50, 0, 90) val array3: Array[Double] = Array[Double](60, 70, 80, 0) val denseArray1 = Vectors.dense(array1) val denseArray2 = Vectors.dense(array2) val denseArray3 = Vectors.dense(array3) val seqDenseArray: Seq[Vector] = Seq(denseArray1, denseArray2, denseArray3) val basicTestRDD: RDD[Vector] = sc.parallelize[Vector](seqDenseArray) val summary: MultivariateStatisticalSummary = Statistics.colStats(basicTestRDD)
關於更多的統計能夠參考:http://spark.apache.org/docs/1.6.1/mllib-statistics.html
Python環境下:
# 讀取數據文件,建立RDD dataFile = "/usr/local/spark-1.6.3-bin-hadoop2.6/data/mllib/kmeans_data.txt" lines = sc.textFile(dataFile) # 建立Vector,將每行的數據用空格分隔後轉成浮點值返回numpy的array data = lines.map(lambda line: np.array([float(x) for x in line.split(' ')])) # 其中2是簇的個數 model = KMeans.train(data, 2) print("Final centers: " + str(model.clusterCenters)) print("Total Cost: " + str(model.computeCost(data)))
實驗的數據咱們直接使用官方提供的數據
/usr/local/spark-1.6.3-bin-hadoop2.6/data/mllib/sample_libsvm_data.txt
。
# 加載模塊 from pyspark.mllib.util import MLUtils from pyspark.mllib.classification import SVMWithSGD # 讀取數據 dataFile = '/usr/local/spark-1.6.3-bin-hadoop2.6/data/mllib/sample_libsvm_data.txt' data = MLUtils.loadLibSVMFile(sc, dataFile) splits = data.randomSplit([0.8, 0.2], seed = 9L) training = splits[0].cache() test = splits[1] # 打印分割後的數據量 print "TrainingCount:[%d]" % training.count(); print "TestingCount:[%d]" % test.count(); model = SVMWithSGD.train(training, 100) scoreAndLabels = test.map(lambda point : (model.predict(point.features), point.label)) #輸出結果,包含預測的數字結果和0/1結果: for score, label in scoreAndLabels.collect(): print score, label
參考資料: