SparkMLlib之 logistic regression源碼分析

最近在研究機器學習,使用的工具是spark,本文是針對spar最新的源碼Spark1.6.0的MLlib中的 logistic regression, linear regression進行源碼分析,其理論部分參考:http://www.cnblogs.com/ljy2013/p/5129610.htmlhtml

下面咱們跟隨個人demo來一步一步解剖源碼,首先來看一下個人demo:程序員

 1 package org.apache.spark.mllib.classification
 2 
 3 import org.apache.spark.SparkContext
 4 import org.apache.spark.mllib.classification.{ LogisticRegressionWithLBFGS, LogisticRegressionModel }
 5 import org.apache.spark.mllib.evaluation.MulticlassMetrics
 6 import org.apache.spark.mllib.regression.LabeledPoint
 7 import org.apache.spark.mllib.linalg.Vectors
 8 import org.apache.spark.mllib.util.MLUtils
 9 import org.apache.spark.SparkConf
10 
11 object MyLogisticRegression {
12   def main(args: Array[String]): Unit = {
13 
14     val conf = new SparkConf().setAppName("Simple Application").setMaster("local[*]")
15     val sc = new SparkContext(conf)
16 
17     // Load training data in LIBSVM format.  這裏的數據格式是LIBSVM格式:<label> <index1>:<value1> <index2>:<value2> ...index1是按1開始的
18     val data = MLUtils.loadLibSVMFile(sc, "D:\\MyFile\\wine.txt")
19 
20     // Split data into training (60%) and test (40%).
21     val splits = data.randomSplit(Array(0.6, 0.4), seed = 11L)
22     val training = splits(0).cache()
23     val test = splits(1)
24 
25     // Run training algorithm to build the model
26     val model = new LogisticRegressionWithLBFGS()
27       .setNumClasses(10) //設置類別的個數
28       .run(training)
29 
30     // Compute raw scores on the test set.
31     val predictionAndLabels = test.map {
32       case LabeledPoint(label, features) =>
33         val prediction = model.predict(features)
34         (prediction, label)
35     }
36 
37     // Get evaluation metrics.
38     val metrics = new MulticlassMetrics(predictionAndLabels)
39     val precision = metrics.precision
40     println("Precision = " + precision)
41 
42     // Save and load model
43     model.save(sc, "myModelPath")
44     val sameModel = LogisticRegressionModel.load(sc, "myModelPath")
45 
46   }
47 }
View Code

從上面的demo,咱們能夠看出LogisticRegression採用的是LBFGS算法來進行優化求參數的,LBFGS是一個無約束項優化算法,主要用來求解邏輯迴歸的參數(權值)。不清楚的同窗能夠參考:http://www.cnblogs.com/ljy2013/p/5129610.html 。web

我將其中的類繼承圖簡單的畫了一下:算法

主要分了兩個過程:訓練和預測。apache

一、訓練過程緩存

首先主程序經過調用下面的方法來進行訓練app

1     // Run training algorithm to build the model
2     val model = new LogisticRegressionWithLBFGS()
3       .setNumClasses(10) //設置類別的個數
4       .run(training)
View Code

經過設置對應的類別的個數,而後調用LogisticRegressionWithLBFGS的run方法,可是LogisticRegressionWithLBFGS類自己是沒有該方法的,但它繼承自GeneralizedLinearAlgorithm類的run方法,訓練過程就是在這個方法中完成的,如今讓咱們來看一下這個方法:dom

  1   def run(input: RDD[LabeledPoint], initialWeights: Vector): M = {
  2 
  3     if (numFeatures < 0) {
  4       numFeatures = input.map(_.features.size).first()
  5     }
  6     //因爲須要屢次迭代,所以須要將訓練數據緩存到內存中
  7     if (input.getStorageLevel == StorageLevel.NONE) {
  8       logWarning("The input data is not directly cached, which may hurt performance if its"
  9         + " parent RDDs are also uncached.")
 10     }
 11 
 12     // Check the data properties before running the optimizer
 13     if (validateData && !validators.forall(func => func(input))) {
 14       throw new SparkException("Input validation failed.")
 15     }
 16 
 17     /**
 18      * Scaling columns to unit variance as a heuristic to reduce the condition number:
 19      *
 20      * During the optimization process, the convergence (rate) depends on the condition number of
 21      * the training dataset. Scaling the variables often reduces this condition number
 22      * heuristically, thus improving the convergence rate. Without reducing the condition number,
 23      * some training datasets mixing the columns with different scales may not be able to converge.
 24      *
 25      * GLMNET and LIBSVM packages perform the scaling to reduce the condition number, and return
 26      * the weights in the original scale.
 27      * See page 9 in http://cran.r-project.org/web/packages/glmnet/glmnet.pdf
 28      *
 29      * Here, if useFeatureScaling is enabled, we will standardize the training features by dividing
 30      * the variance of each column (without subtracting the mean), and train the model in the
 31      * scaled space. Then we transform the coefficients from the scaled space to the original scale
 32      * as GLMNET and LIBSVM do.
 33      *
 34      * Currently, it's only enabled in LogisticRegressionWithLBFGS
 35      */
 36     //將數據標準化
 37     val scaler = if (useFeatureScaling) {
 38       new StandardScaler(withStd = true, withMean = false).fit(input.map(_.features))
 39     } else {
 40       null
 41     }
 42 
 43     // Prepend an extra variable consisting of all 1.0's for the intercept.
 44     // TODO: Apply feature scaling to the weight vector instead of input data.
 45     val data =
 46       if (addIntercept) {
 47         if (useFeatureScaling) {
 48           input.map(lp => (lp.label, appendBias(scaler.transform(lp.features)))).cache()
 49         } else {
 50           input.map(lp => (lp.label, appendBias(lp.features))).cache()
 51         }
 52       } else {
 53         if (useFeatureScaling) {
 54           input.map(lp => (lp.label, scaler.transform(lp.features))).cache()
 55         } else {
 56           input.map(lp => (lp.label, lp.features))
 57         }
 58       }
 59 
 60     /**
 61      * TODO: For better convergence, in logistic regression, the intercepts should be computed
 62      * from the prior probability distribution of the outcomes; for linear regression,
 63      * the intercept should be set as the average of response.
 64      */
 65     val initialWeightsWithIntercept = if (addIntercept && numOfLinearPredictor == 1) {
 66       appendBias(initialWeights)
 67     } else {
 68       /** If `numOfLinearPredictor > 1`, initialWeights already contains intercepts. */
 69       initialWeights
 70     }
 71 
 72     //採用優化器對權值進行優化,返回優化好的權值,即最終的模型參數
 73     val weightsWithIntercept = optimizer.optimize(data, initialWeightsWithIntercept)
 74 
 75     val intercept = if (addIntercept && numOfLinearPredictor == 1) {
 76       weightsWithIntercept(weightsWithIntercept.size - 1)
 77     } else {
 78       0.0
 79     }
 80 
 81     var weights = if (addIntercept && numOfLinearPredictor == 1) {
 82       Vectors.dense(weightsWithIntercept.toArray.slice(0, weightsWithIntercept.size - 1))
 83     } else {
 84       weightsWithIntercept
 85     }
 86 
 87     /**
 88      * The weights and intercept are trained in the scaled space; we're converting them back to
 89      * the original scale.
 90      *
 91      * Math shows that if we only perform standardization without subtracting means, the intercept
 92      * will not be changed. w_i = w_i' / v_i where w_i' is the coefficient in the scaled space, w_i
 93      * is the coefficient in the original space, and v_i is the variance of the column i.
 94      */
 95     if (useFeatureScaling) {
 96       if (numOfLinearPredictor == 1) {
 97         weights = scaler.transform(weights)
 98       } else {
 99         /**
100          * For `numOfLinearPredictor > 1`, we have to transform the weights back to the original
101          * scale for each set of linear predictor. Note that the intercepts have to be explicitly
102          * excluded when `addIntercept == true` since the intercepts are part of weights now.
103          */
104         var i = 0
105         val n = weights.size / numOfLinearPredictor
106         val weightsArray = weights.toArray
107         while (i < numOfLinearPredictor) {
108           val start = i * n
109           val end = (i + 1) * n - { if (addIntercept) 1 else 0 }
110 
111           val partialWeightsArray = scaler.transform(
112             Vectors.dense(weightsArray.slice(start, end))).toArray
113 
114           System.arraycopy(partialWeightsArray, 0, weightsArray, start, partialWeightsArray.size)
115           i += 1
116         }
117         weights = Vectors.dense(weightsArray)
118       }
119     }
120 
121     // Warn at the end of the run as well, for increased visibility.
122     if (input.getStorageLevel == StorageLevel.NONE) {
123       logWarning("The input data was not directly cached, which may hurt performance if its"
124         + " parent RDDs are also uncached.")
125     }
126 
127     // Unpersist cached data
128     if (data.getStorageLevel != StorageLevel.NONE) {
129       data.unpersist(false)
130     }
131 
132     createModel(weights, intercept)
133   }
View Code

這個方法中,第一步是實現訓練數據進行標準化處理;機器學習

第二步,就是經過優化器算法進行求最優的權值。這裏要注意一點:它是實現的方式是:val weightsWithIntercept = optimizer.optimize(data, initialWeightsWithIntercept)這裏有一個應用到多態的特性。這裏的optimizer是GeneralizedLinearAlgorithm類中的抽象方法,以下所示:ide

可是子類LogisticRegressionWithLBFGS實現了該方法:(這樣子的設計能夠作到每個算法能夠有本身特有的優化算法來計算最優權值,但GeneralizedLinearAlgorithm類能夠適用於全部的機器學習算法)

好了,如今是第三步,建立算法模型。咱們能夠看到GeneralizedLinearAlgorithm的run方法中,建立模型就一句代碼搞定:createModel(weights, intercept)。但其中包含了程序員的設計思想在裏面。和上面optimizer相似,createModel(weights, intercept)方法也是用到了多態的方式來實現。首先,GeneralizedLinearAlgorithm類中定義了一個抽象的:createModel方法,以下所示:

 protected def createModel(weights: Vector, intercept: Double): M

子類LogisticRegressionWithLBFGS實現了該方法。以下所示:

1   override protected def createModel(weights: Vector, intercept: Double) = {
2     if (numOfLinearPredictor == 1) {
3       //兩類的模型
4       new LogisticRegressionModel(weights, intercept)
5     } else {
6       //多類的模型
7       new LogisticRegressionModel(weights, intercept, numFeatures, numOfLinearPredictor + 1)
8     }
9   }
View Code

所以實際上調用的是對應LogisticRegressionWithLBFGS的createModel方法。到目前爲止,算法的模型算是已經創建了。接下來就是如何利用算法的模型,好比預測等

二、預測過程

預測過程主要是利用上面創建好的模型,去判斷未知樣本的類別。首先父類GeneralizedLinearModel有一個predict方法用來預測,該方法有兩個實現方式,一個是一參數爲RDD的方式,用於並行預測;另外一個參數是vector的方式,用於單機預測。代碼以下所示:

 1   /**
 2    * Predict values for the given data set using the model trained.
 3    *
 4    * @param testData RDD representing data points to be predicted
 5    * @return RDD[Double] where each entry contains the corresponding prediction
 6    *
 7    */
 8   @Since("1.0.0")  //該方法是用於並行預測
 9   def predict(testData: RDD[Vector]): RDD[Double] = {
10     // A small optimization to avoid serializing the entire model. Only the weightsMatrix
11     // and intercept is needed.
12     val localWeights = weights
13     val bcWeights = testData.context.broadcast(localWeights)
14     val localIntercept = intercept
15     testData.mapPartitions { iter =>
16       val w = bcWeights.value
17       iter.map(v => predictPoint(v, w, localIntercept))
18     }
19   }
20 
21   /**
22    * Predict values for a single data point using the model trained.
23    *
24    * @param testData array representing a single data point
25    * @return Double prediction from the trained model
26    *
27    */
28   @Since("1.0.0")  //該方法適用於單機預測
29   def predict(testData: Vector): Double = {
30     predictPoint(testData, weights, intercept)
31   }
View Code

咱們仔細看上面的代碼,上面的兩個predict方法都是須要調用predictPoint方法 ,而predictPoint方法在GeneralizedLinearModel類中是抽象方法。也就是說,對應不一樣的機器學習的算法,有不一樣的預測方式,如邏輯迴歸的預測方式是判斷對應的與測試是否大於0.5(以下所示)。

 

因此,對應的每個具體的機器學習算法的模型都會去從新實現父類(GeneralizedLinearModel)的predictPoint方法。那麼咱們來看一下GeneralizedLinearModel的predictPoint方法是怎麼實現的,代碼以下所示:

 1 override protected def predictPoint(
 2       dataMatrix: Vector,
 3       weightMatrix: Vector,
 4       intercept: Double) = {
 5     require(dataMatrix.size == numFeatures)
 6 
 7     // If dataMatrix and weightMatrix have the same dimension, it's binary logistic regression.
 8     if (numClasses == 2) {
 9       val margin = dot(weightMatrix, dataMatrix) + intercept
10       val score = 1.0 / (1.0 + math.exp(-margin))
11       threshold match {
12         case Some(t) => if (score > t) 1.0 else 0.0
13         case None => score
14       }
15     } else {
16       /**
17        * Compute and find the one with maximum margins. If the maxMargin is negative, then the
18        * prediction result will be the first class.
19        *
20        * PS, if you want to compute the probabilities for each outcome instead of the outcome
21        * with maximum probability, remember to subtract the maxMargin from margins if maxMargin
22        * is positive to prevent overflow.
23        */
24       var bestClass = 0
25       var maxMargin = 0.0
26       val withBias = dataMatrix.size + 1 == dataWithBiasSize
27       (0 until numClasses - 1).foreach { i =>
28         var margin = 0.0
29         dataMatrix.foreachActive { (index, value) =>
30           if (value != 0.0) margin += value * weightsArray((i * dataWithBiasSize) + index)
31         }
32         // Intercept is required to be added into margin.
33         if (withBias) {
34           margin += weightsArray((i * dataWithBiasSize) + dataMatrix.size)
35         }
36         if (margin > maxMargin) {
37           maxMargin = margin
38           bestClass = i + 1
39         }
40       }
41       bestClass.toDouble
42     }
43   }
View Code

至此,機器學習的算法的源碼分析算是完成,本文主要是借用LogisticRegression算法來說述源碼,其餘機器學習實際上也是相似的分析,後續也會給出其餘算法的源碼分析。

相關文章
相關標籤/搜索