最近在研究機器學習,使用的工具是spark,本文是針對spar最新的源碼Spark1.6.0的MLlib中的 logistic regression, linear regression進行源碼分析,其理論部分參考:http://www.cnblogs.com/ljy2013/p/5129610.htmlhtml
下面咱們跟隨個人demo來一步一步解剖源碼,首先來看一下個人demo:程序員
1 package org.apache.spark.mllib.classification 2 3 import org.apache.spark.SparkContext 4 import org.apache.spark.mllib.classification.{ LogisticRegressionWithLBFGS, LogisticRegressionModel } 5 import org.apache.spark.mllib.evaluation.MulticlassMetrics 6 import org.apache.spark.mllib.regression.LabeledPoint 7 import org.apache.spark.mllib.linalg.Vectors 8 import org.apache.spark.mllib.util.MLUtils 9 import org.apache.spark.SparkConf 10 11 object MyLogisticRegression { 12 def main(args: Array[String]): Unit = { 13 14 val conf = new SparkConf().setAppName("Simple Application").setMaster("local[*]") 15 val sc = new SparkContext(conf) 16 17 // Load training data in LIBSVM format. 這裏的數據格式是LIBSVM格式:<label> <index1>:<value1> <index2>:<value2> ...index1是按1開始的 18 val data = MLUtils.loadLibSVMFile(sc, "D:\\MyFile\\wine.txt") 19 20 // Split data into training (60%) and test (40%). 21 val splits = data.randomSplit(Array(0.6, 0.4), seed = 11L) 22 val training = splits(0).cache() 23 val test = splits(1) 24 25 // Run training algorithm to build the model 26 val model = new LogisticRegressionWithLBFGS() 27 .setNumClasses(10) //設置類別的個數 28 .run(training) 29 30 // Compute raw scores on the test set. 31 val predictionAndLabels = test.map { 32 case LabeledPoint(label, features) => 33 val prediction = model.predict(features) 34 (prediction, label) 35 } 36 37 // Get evaluation metrics. 38 val metrics = new MulticlassMetrics(predictionAndLabels) 39 val precision = metrics.precision 40 println("Precision = " + precision) 41 42 // Save and load model 43 model.save(sc, "myModelPath") 44 val sameModel = LogisticRegressionModel.load(sc, "myModelPath") 45 46 } 47 }
從上面的demo,咱們能夠看出LogisticRegression採用的是LBFGS算法來進行優化求參數的,LBFGS是一個無約束項優化算法,主要用來求解邏輯迴歸的參數(權值)。不清楚的同窗能夠參考:http://www.cnblogs.com/ljy2013/p/5129610.html 。web
我將其中的類繼承圖簡單的畫了一下:算法
主要分了兩個過程:訓練和預測。apache
一、訓練過程緩存
首先主程序經過調用下面的方法來進行訓練app
1 // Run training algorithm to build the model 2 val model = new LogisticRegressionWithLBFGS() 3 .setNumClasses(10) //設置類別的個數 4 .run(training)
經過設置對應的類別的個數,而後調用LogisticRegressionWithLBFGS的run方法,可是LogisticRegressionWithLBFGS類自己是沒有該方法的,但它繼承自GeneralizedLinearAlgorithm類的run方法,訓練過程就是在這個方法中完成的,如今讓咱們來看一下這個方法:dom
1 def run(input: RDD[LabeledPoint], initialWeights: Vector): M = { 2 3 if (numFeatures < 0) { 4 numFeatures = input.map(_.features.size).first() 5 } 6 //因爲須要屢次迭代,所以須要將訓練數據緩存到內存中 7 if (input.getStorageLevel == StorageLevel.NONE) { 8 logWarning("The input data is not directly cached, which may hurt performance if its" 9 + " parent RDDs are also uncached.") 10 } 11 12 // Check the data properties before running the optimizer 13 if (validateData && !validators.forall(func => func(input))) { 14 throw new SparkException("Input validation failed.") 15 } 16 17 /** 18 * Scaling columns to unit variance as a heuristic to reduce the condition number: 19 * 20 * During the optimization process, the convergence (rate) depends on the condition number of 21 * the training dataset. Scaling the variables often reduces this condition number 22 * heuristically, thus improving the convergence rate. Without reducing the condition number, 23 * some training datasets mixing the columns with different scales may not be able to converge. 24 * 25 * GLMNET and LIBSVM packages perform the scaling to reduce the condition number, and return 26 * the weights in the original scale. 27 * See page 9 in http://cran.r-project.org/web/packages/glmnet/glmnet.pdf 28 * 29 * Here, if useFeatureScaling is enabled, we will standardize the training features by dividing 30 * the variance of each column (without subtracting the mean), and train the model in the 31 * scaled space. Then we transform the coefficients from the scaled space to the original scale 32 * as GLMNET and LIBSVM do. 33 * 34 * Currently, it's only enabled in LogisticRegressionWithLBFGS 35 */ 36 //將數據標準化 37 val scaler = if (useFeatureScaling) { 38 new StandardScaler(withStd = true, withMean = false).fit(input.map(_.features)) 39 } else { 40 null 41 } 42 43 // Prepend an extra variable consisting of all 1.0's for the intercept. 44 // TODO: Apply feature scaling to the weight vector instead of input data. 45 val data = 46 if (addIntercept) { 47 if (useFeatureScaling) { 48 input.map(lp => (lp.label, appendBias(scaler.transform(lp.features)))).cache() 49 } else { 50 input.map(lp => (lp.label, appendBias(lp.features))).cache() 51 } 52 } else { 53 if (useFeatureScaling) { 54 input.map(lp => (lp.label, scaler.transform(lp.features))).cache() 55 } else { 56 input.map(lp => (lp.label, lp.features)) 57 } 58 } 59 60 /** 61 * TODO: For better convergence, in logistic regression, the intercepts should be computed 62 * from the prior probability distribution of the outcomes; for linear regression, 63 * the intercept should be set as the average of response. 64 */ 65 val initialWeightsWithIntercept = if (addIntercept && numOfLinearPredictor == 1) { 66 appendBias(initialWeights) 67 } else { 68 /** If `numOfLinearPredictor > 1`, initialWeights already contains intercepts. */ 69 initialWeights 70 } 71 72 //採用優化器對權值進行優化,返回優化好的權值,即最終的模型參數 73 val weightsWithIntercept = optimizer.optimize(data, initialWeightsWithIntercept) 74 75 val intercept = if (addIntercept && numOfLinearPredictor == 1) { 76 weightsWithIntercept(weightsWithIntercept.size - 1) 77 } else { 78 0.0 79 } 80 81 var weights = if (addIntercept && numOfLinearPredictor == 1) { 82 Vectors.dense(weightsWithIntercept.toArray.slice(0, weightsWithIntercept.size - 1)) 83 } else { 84 weightsWithIntercept 85 } 86 87 /** 88 * The weights and intercept are trained in the scaled space; we're converting them back to 89 * the original scale. 90 * 91 * Math shows that if we only perform standardization without subtracting means, the intercept 92 * will not be changed. w_i = w_i' / v_i where w_i' is the coefficient in the scaled space, w_i 93 * is the coefficient in the original space, and v_i is the variance of the column i. 94 */ 95 if (useFeatureScaling) { 96 if (numOfLinearPredictor == 1) { 97 weights = scaler.transform(weights) 98 } else { 99 /** 100 * For `numOfLinearPredictor > 1`, we have to transform the weights back to the original 101 * scale for each set of linear predictor. Note that the intercepts have to be explicitly 102 * excluded when `addIntercept == true` since the intercepts are part of weights now. 103 */ 104 var i = 0 105 val n = weights.size / numOfLinearPredictor 106 val weightsArray = weights.toArray 107 while (i < numOfLinearPredictor) { 108 val start = i * n 109 val end = (i + 1) * n - { if (addIntercept) 1 else 0 } 110 111 val partialWeightsArray = scaler.transform( 112 Vectors.dense(weightsArray.slice(start, end))).toArray 113 114 System.arraycopy(partialWeightsArray, 0, weightsArray, start, partialWeightsArray.size) 115 i += 1 116 } 117 weights = Vectors.dense(weightsArray) 118 } 119 } 120 121 // Warn at the end of the run as well, for increased visibility. 122 if (input.getStorageLevel == StorageLevel.NONE) { 123 logWarning("The input data was not directly cached, which may hurt performance if its" 124 + " parent RDDs are also uncached.") 125 } 126 127 // Unpersist cached data 128 if (data.getStorageLevel != StorageLevel.NONE) { 129 data.unpersist(false) 130 } 131 132 createModel(weights, intercept) 133 }
這個方法中,第一步是實現訓練數據進行標準化處理;機器學習
第二步,就是經過優化器算法進行求最優的權值。這裏要注意一點:它是實現的方式是:val weightsWithIntercept = optimizer.optimize(data, initialWeightsWithIntercept)這裏有一個應用到多態的特性。這裏的optimizer是GeneralizedLinearAlgorithm類中的抽象方法,以下所示:ide
可是子類LogisticRegressionWithLBFGS實現了該方法:(這樣子的設計能夠作到每個算法能夠有本身特有的優化算法來計算最優權值,但GeneralizedLinearAlgorithm類能夠適用於全部的機器學習算法)
好了,如今是第三步,建立算法模型。咱們能夠看到GeneralizedLinearAlgorithm的run方法中,建立模型就一句代碼搞定:createModel(weights, intercept)。但其中包含了程序員的設計思想在裏面。和上面optimizer相似,createModel(weights, intercept)方法也是用到了多態的方式來實現。首先,GeneralizedLinearAlgorithm類中定義了一個抽象的:createModel方法,以下所示:
protected def createModel(weights: Vector, intercept: Double): M
子類LogisticRegressionWithLBFGS實現了該方法。以下所示:
1 override protected def createModel(weights: Vector, intercept: Double) = { 2 if (numOfLinearPredictor == 1) { 3 //兩類的模型 4 new LogisticRegressionModel(weights, intercept) 5 } else { 6 //多類的模型 7 new LogisticRegressionModel(weights, intercept, numFeatures, numOfLinearPredictor + 1) 8 } 9 }
所以實際上調用的是對應LogisticRegressionWithLBFGS的createModel方法。到目前爲止,算法的模型算是已經創建了。接下來就是如何利用算法的模型,好比預測等
二、預測過程
預測過程主要是利用上面創建好的模型,去判斷未知樣本的類別。首先父類GeneralizedLinearModel有一個predict方法用來預測,該方法有兩個實現方式,一個是一參數爲RDD的方式,用於並行預測;另外一個參數是vector的方式,用於單機預測。代碼以下所示:
1 /** 2 * Predict values for the given data set using the model trained. 3 * 4 * @param testData RDD representing data points to be predicted 5 * @return RDD[Double] where each entry contains the corresponding prediction 6 * 7 */ 8 @Since("1.0.0") //該方法是用於並行預測 9 def predict(testData: RDD[Vector]): RDD[Double] = { 10 // A small optimization to avoid serializing the entire model. Only the weightsMatrix 11 // and intercept is needed. 12 val localWeights = weights 13 val bcWeights = testData.context.broadcast(localWeights) 14 val localIntercept = intercept 15 testData.mapPartitions { iter => 16 val w = bcWeights.value 17 iter.map(v => predictPoint(v, w, localIntercept)) 18 } 19 } 20 21 /** 22 * Predict values for a single data point using the model trained. 23 * 24 * @param testData array representing a single data point 25 * @return Double prediction from the trained model 26 * 27 */ 28 @Since("1.0.0") //該方法適用於單機預測 29 def predict(testData: Vector): Double = { 30 predictPoint(testData, weights, intercept) 31 }
咱們仔細看上面的代碼,上面的兩個predict方法都是須要調用predictPoint方法 ,而predictPoint方法在GeneralizedLinearModel類中是抽象方法。也就是說,對應不一樣的機器學習的算法,有不一樣的預測方式,如邏輯迴歸的預測方式是判斷對應的與測試是否大於0.5(以下所示)。
因此,對應的每個具體的機器學習算法的模型都會去從新實現父類(GeneralizedLinearModel)的predictPoint方法。那麼咱們來看一下GeneralizedLinearModel的predictPoint方法是怎麼實現的,代碼以下所示:
1 override protected def predictPoint( 2 dataMatrix: Vector, 3 weightMatrix: Vector, 4 intercept: Double) = { 5 require(dataMatrix.size == numFeatures) 6 7 // If dataMatrix and weightMatrix have the same dimension, it's binary logistic regression. 8 if (numClasses == 2) { 9 val margin = dot(weightMatrix, dataMatrix) + intercept 10 val score = 1.0 / (1.0 + math.exp(-margin)) 11 threshold match { 12 case Some(t) => if (score > t) 1.0 else 0.0 13 case None => score 14 } 15 } else { 16 /** 17 * Compute and find the one with maximum margins. If the maxMargin is negative, then the 18 * prediction result will be the first class. 19 * 20 * PS, if you want to compute the probabilities for each outcome instead of the outcome 21 * with maximum probability, remember to subtract the maxMargin from margins if maxMargin 22 * is positive to prevent overflow. 23 */ 24 var bestClass = 0 25 var maxMargin = 0.0 26 val withBias = dataMatrix.size + 1 == dataWithBiasSize 27 (0 until numClasses - 1).foreach { i => 28 var margin = 0.0 29 dataMatrix.foreachActive { (index, value) => 30 if (value != 0.0) margin += value * weightsArray((i * dataWithBiasSize) + index) 31 } 32 // Intercept is required to be added into margin. 33 if (withBias) { 34 margin += weightsArray((i * dataWithBiasSize) + dataMatrix.size) 35 } 36 if (margin > maxMargin) { 37 maxMargin = margin 38 bestClass = i + 1 39 } 40 } 41 bestClass.toDouble 42 } 43 }
至此,機器學習的算法的源碼分析算是完成,本文主要是借用LogisticRegression算法來說述源碼,其餘機器學習實際上也是相似的分析,後續也會給出其餘算法的源碼分析。