1.1 邏輯迴歸算法
1.1.1 基礎理論
logistic迴歸本質上是線性迴歸,只是在特徵到結果的映射中加入了一層函數映射,即先把特徵線性求和,而後使用函數g(z)將最爲假設函數來預測。g(z)能夠將連續值映射到0和1上。html
它與線性迴歸的不一樣點在於:爲了將線性迴歸輸出的很大範圍的數,例如從負無窮到正無窮,壓縮到0和1之間,這樣的輸出值表達爲「可能性」才能說服廣大民衆。固然了,把大值壓縮到這個範圍還有個很好的好處,就是能夠消除特別冒尖的變量的影響。算法
Logistic函數(或稱爲Sigmoid函數),函數形式爲:apache
Sigmoid 函數在有個很漂亮的「S」形,以下圖所示:數組
給定n個特徵x=(x1,x2,…,xn),設條件機率P(y=1|x)爲觀測樣本y相對於事件因素x發生的機率,用sigmoid函數表示爲:app
那麼在x條件下y不發生的機率爲:dom
假設如今有m個相互獨立的觀測事件y=(y1,y2,…,ym),則一個事件yi發生的機率爲(yi= 1)ide
當y=1的時候,後面那一項是否是沒有了,那就只剩下x屬於1類的機率,當y=0的時候,第一項是否是沒有了,那就只剩下後面那個x屬於0的機率(1減去x屬於1的機率)。因此無論y是0仍是1,上面獲得的數,都是(x, y)出現的機率。那咱們的整個樣本集,也就是n個獨立的樣本出現的似然函數爲(由於每一個樣本都是獨立的,因此n個樣本出現的機率就是他們各自出現的機率相乘):函數
而後咱們的目標是求出使這一似然函數的值最大的參數估計,最大似然估計就是求出參數,使得源碼分析
取得最大值,對函數取對數獲得學習
這時候,用L(θ)對θ求導,獲得:
1.1.2 梯度降低算法
θ更新過程:
θ更新過程能夠寫成:
向量化Vectorization
Vectorization是使用矩陣計算來代替for循環,以簡化計算過程,提升效率。
如上式,Σ(...)是一個求和的過程,顯然須要一個for語句循環m次,因此根本沒有徹底的實現vectorization。
下面介紹向量化的過程:
約定訓練數據的矩陣形式以下,x的每一行爲一條訓練樣本,而每一列爲不一樣的特稱取值:
g(A)的參數A爲一列向量,因此實現g函數時要支持列向量做爲參數,並返回列向量。由上式可知可由
一次計算求得。
θ更新過程能夠改成:
綜上所述,Vectorization後θ更新的步驟以下:
(1)求;
(2)求;
(3)求 。
1.1.3 正則化
在實際應該過程當中,爲了加強模型的泛化能力,防止咱們訓練的模型過擬合,特別是對於大量的稀疏特徵,模型複雜度比較高,須要進行降維,咱們須要保證在訓練偏差最小化的基礎上,經過加上正則化項減少模型複雜度。在邏輯迴歸中,有L1、L2進行正則化。
損失函數以下:
在損失函數里加入一個正則化項,正則化項就是權重的L1或者L2範數乘以一個,用來控制損失函數和正則化項的比重,直觀的理解,首先防止過擬合的目的就是防止最後訓練出來的模型過度的依賴某一個特徵,當最小化損失函數的時候,某一維度很大,擬合出來的函數值與真實的值之間的差距很小,經過正則化可使總體的cost變大,從而避免了過度依賴某一維度的結果。固然加正則化的前提是特徵值要進行歸一化。
對於線性迴歸
Regularized Logistic Regression 實際上與 Regularized Linear Regression 是十分類似的。
一樣使用梯度降低:
1.2 Spark Mllib Logistic Regression源碼分析
1.2.1 LogisticRegressionWithSGD
Logistic迴歸算法的train方法,由LogisticRegressionWithSGD類的object定義了train函數,在train函數中新建了LogisticRegressionWithSGD對象。
package org.apache.spark.mllib.classification
// 1 類:LogisticRegressionWithSGD
class LogisticRegressionWithSGD private[mllib] (
privatevar stepSize: Double,
privatevar numIterations: Int,
privatevar regParam: Double,
privatevar miniBatchFraction: Double)
extends GeneralizedLinearAlgorithm[LogisticRegressionModel] with Serializable {
privateval gradient = new LogisticGradient()
privateval updater = new SquaredL2Updater()
overrideval optimizer = new GradientDescent(gradient, updater)
.setStepSize(stepSize)
.setNumIterations(numIterations)
.setRegParam(regParam)
.setMiniBatchFraction(miniBatchFraction)
overrideprotectedval validators = List(DataValidators.binaryLabelValidator)
/**
* Construct a LogisticRegression object with default parameters: {stepSize: 1.0,
* numIterations: 100, regParm: 0.01, miniBatchFraction: 1.0}.
*/
defthis() = this(1.0, 100, 0.01, 1.0)
overrideprotected[mllib] def createModel(weights: Vector, intercept: Double) = {
new LogisticRegressionModel(weights, intercept)
}
LogisticRegressionWithSGD類中參數說明:
stepSize: 迭代步長,默認爲1.0
numIterations: 迭代次數,默認爲100
regParam: 正則化參數,默認值爲0.0
miniBatchFraction: 每次迭代參與計算的樣本比例,默認爲1.0
gradient:LogisticGradient(),Logistic梯度降低;
updater:SquaredL2Updater(),正則化,L2範數;
optimizer:GradientDescent(gradient, updater),梯度降低最優化計算。
// 2 train方法
object LogisticRegressionWithSGD {
/**
* Train a logistic regression model given an RDD of (label, features) pairs. We run a fixed
* number of iterations of gradient descent using the specified step size. Each iteration uses
* `miniBatchFraction` fraction of the data to calculate the gradient. The weights used in
* gradient descent are initialized using the initial weights provided.
* NOTE: Labels used in Logistic Regression should be {0, 1}
*
* @param input RDD of (label, array of features) pairs.
* @param numIterations Number of iterations of gradient descent to run.
* @param stepSize Step size to be used for each iteration of gradient descent.
* @param miniBatchFraction Fraction of data to be used per iteration.
* @param initialWeights Initial set of weights to be used. Array should be equal in size to
* the number of features in the data.
*/
def train(
input: RDD[LabeledPoint],
numIterations: Int,
stepSize: Double,
miniBatchFraction: Double,
initialWeights: Vector): LogisticRegressionModel = {
new LogisticRegressionWithSGD(stepSize, numIterations, 0.0, miniBatchFraction)
.run(input, initialWeights)
}
}
train參數說明:
input:樣本數據,分類標籤lable只能是1.0和0.0兩種,feature爲double類型
numIterations: 迭代次數,默認爲100
stepSize: 迭代步長,默認爲1.0
miniBatchFraction: 每次迭代參與計算的樣本比例,默認爲1.0
initialWeights:初始權重,默認爲0向量
run方法來自於繼承父類GeneralizedLinearAlgorithm,實現方法以下。
1.2.2 GeneralizedLinearAlgorithm
LogisticRegressionWithSGD中run方法的實現。
package org.apache.spark.mllib.regression
/**
* Run the algorithm with the configured parameters on an input RDD
* of LabeledPoint entries starting from the initial weights provided.
*/
def run(input: RDD[LabeledPoint], initialWeights: Vector): M = {
// 特徵維度賦值。
if (numFeatures < 0) {
numFeatures = input.map(_.features.size).first()
}
// 輸入樣本數據檢測。
if (input.getStorageLevel == StorageLevel.NONE) {
logWarning("The input data is not directly cached, which may hurt performance if its"
+ " parent RDDs are also uncached.")
}
// 輸入樣本數據檢測。
// Check the data properties before running the optimizer
if (validateData && !validators.forall(func => func(input))) {
thrownew SparkException("Input validation failed.")
}
val scaler = if (useFeatureScaling) {
new StandardScaler(withStd = true, withMean = false).fit(input.map(_.features))
} else {
null
}
// 輸入樣本數據處理,輸出data(label, features)格式。
// addIntercept:是否增長θ0常數項,若增長,則增長x0=1項。
// Prepend an extra variable consisting of all 1.0's for the intercept.
// TODO: Apply feature scaling to the weight vector instead of input data.
val data =
if (addIntercept) {
if (useFeatureScaling) {
input.map(lp => (lp.label, appendBias(scaler.transform(lp.features)))).cache()
} else {
input.map(lp => (lp.label, appendBias(lp.features))).cache()
}
} else {
if (useFeatureScaling) {
input.map(lp => (lp.label, scaler.transform(lp.features))).cache()
} else {
input.map(lp => (lp.label, lp.features))
}
}
//初始化權重。
// addIntercept:是否增長θ0常數項,若增長,則權重增長θ0。
/**
* TODO: For better convergence, in logistic regression, the intercepts should be computed
* from the prior probability distribution of the outcomes; for linear regression,
* the intercept should be set as the average of response.
*/
val initialWeightsWithIntercept = if (addIntercept && numOfLinearPredictor == 1) {
appendBias(initialWeights)
} else {
/** If `numOfLinearPredictor > 1`, initialWeights already contains intercepts. */
initialWeights
}
//權重優化,進行梯度降低學習,返回最優權重。
val weightsWithIntercept = optimizer.optimize(data, initialWeightsWithIntercept)
val intercept = if (addIntercept && numOfLinearPredictor == 1) {
weightsWithIntercept(weightsWithIntercept.size - 1)
} else {
0.0
}
var weights = if (addIntercept && numOfLinearPredictor == 1) {
Vectors.dense(weightsWithIntercept.toArray.slice(0, weightsWithIntercept.size - 1))
} else {
weightsWithIntercept
}
createModel(weights, intercept)
}
其中optimizer.optimize(data, initialWeightsWithIntercept)是邏輯迴歸實現的核心。
oprimizer的類型爲GradientDescent,optimize方法中主要調用GradientDescent伴生對象的runMiniBatchSGD方法,返回當前迭代產生的最優特徵權重向量。
GradientDescentd對象中optimize實現方法以下。
1.2.3 GradientDescent
optimize實現方法以下。
package org.apache.spark.mllib.optimization
/**
* :: DeveloperApi ::
* Runs gradient descent on the given training data.
* @param data training data
* @param initialWeights initial weights
* @return solution vector
*/
@DeveloperApi
def optimize(data: RDD[(Double, Vector)], initialWeights: Vector): Vector = {
val (weights, _) = GradientDescent.runMiniBatchSGD(
data,
gradient,
updater,
stepSize,
numIterations,
regParam,
miniBatchFraction,
initialWeights)
weights
}
}
在optimize方法中,調用了GradientDescent.runMiniBatchSGD方法,其runMiniBatchSGD實現方法以下:
/**
* Run stochastic gradient descent (SGD) in parallel using mini batches.
* In each iteration, we sample a subset (fraction miniBatchFraction) of the total data
* in order to compute a gradient estimate.
* Sampling, and averaging the subgradients over this subset is performed using one standard
* spark map-reduce in each iteration.
*
* @param data - Input data for SGD. RDD of the set of data examples, each of
* the form (label, [feature values]).
* @param gradient - Gradient object (used to compute the gradient of the loss function of
* one single data example)
* @param updater - Updater function to actually perform a gradient step in a given direction.
* @param stepSize - initial step size for the first step
* @param numIterations - number of iterations that SGD should be run.
* @param regParam - regularization parameter
* @param miniBatchFraction - fraction of the input data set that should be used for
* one iteration of SGD. Default value 1.0.
*
* @return A tuple containing two elements. The first element is a column matrix containing
* weights for every feature, and the second element is an array containing the
* stochastic loss computed for every iteration.
*/
def runMiniBatchSGD(
data: RDD[(Double, Vector)],
gradient: Gradient,
updater: Updater,
stepSize: Double,
numIterations: Int,
regParam: Double,
miniBatchFraction: Double,
initialWeights: Vector): (Vector, Array[Double]) = {
//歷史迭代偏差數組
val stochasticLossHistory = new ArrayBuffer[Double](numIterations)
//樣本數據檢測,若爲空,返回初始值。
val numExamples = data.count()
// if no data, return initial weights to avoid NaNs
if (numExamples == 0) {
logWarning("GradientDescent.runMiniBatchSGD returning initial weights, no data found")
return (initialWeights, stochasticLossHistory.toArray)
}
// miniBatchFraction值檢測。
if (numExamples * miniBatchFraction < 1) {
logWarning("The miniBatchFraction is too small")
}
// weights權重初始化。
// Initialize weights as a column vector
var weights = Vectors.dense(initialWeights.toArray)
val n = weights.size
/**
* For the first iteration, the regVal will be initialized as sum of weight squares
* if it's L2 updater; for L1 updater, the same logic is followed.
*/
var regVal = updater.compute(
weights, Vectors.dense(new Array[Double](weights.size)), 0, 1, regParam)._2
// weights權重迭代計算。
for (i <- 1 to numIterations) {
val bcWeights = data.context.broadcast(weights)
// Sample a subset (fraction miniBatchFraction) of the total data
// compute and sum up the subgradients on this subset (this is one map-reduce)
// 採用treeAggregate的RDD方法,進行聚合計算,計算每一個樣本的權重向量、偏差值,而後對全部樣本權重向量及偏差值進行累加。
// sample是根據miniBatchFraction指定的比例隨機採樣相應數量的樣本 。
val (gradientSum, lossSum, miniBatchSize) = data.sample(false, miniBatchFraction, 42 + i)
.treeAggregate((BDV.zeros[Double](n), 0.0, 0L))(
seqOp = (c, v) => {
// c: (grad, loss, count), v: (label, features)
val l = gradient.compute(v._2, v._1, bcWeights.value, Vectors.fromBreeze(c._1))
(c._1, c._2 + l, c._3 + 1)
},
combOp = (c1, c2) => {
// c: (grad, loss, count)
(c1._1 += c2._1, c1._2 + c2._2, c1._3 + c2._3)
})
// 保存本次迭代偏差值,以及更新weights權重向量。
if (miniBatchSize > 0) {
/**
* NOTE(Xinghao): lossSum is computed using the weights from the previous iteration
* and regVal is the regularization value computed in the previous iteration as well.
*/
// updater.compute更新weights矩陣和regVal(正則化項)。根據本輪迭代中的gradient和loss的變化以及正則化項計算更新以後的weights和regVal。
stochasticLossHistory.append(lossSum / miniBatchSize + regVal)
val update = updater.compute(
weights, Vectors.fromBreeze(gradientSum / miniBatchSize.toDouble), stepSize, i, regParam)
weights = update._1
regVal = update._2
} else {
logWarning(s"Iteration ($i/$numIterations). The size of sampled batch is zero")
}
}
logInfo("GradientDescent.runMiniBatchSGD finished. Last 10 stochastic losses %s".format(
stochasticLossHistory.takeRight(10).mkString(", ")))
(weights, stochasticLossHistory.toArray)
}
runMiniBatchSGD的輸入、輸出參數說明:
data 樣本輸入數據,格式 (label, [feature values])
gradient 梯度對象,用於對每一個樣本計算梯度及偏差
updater 權重更新對象,用於每次更新權重
stepSize 初始步長
numIterations 迭代次數
regParam 正則化參數
miniBatchFraction 迭代因子,每次迭代參與計算的樣本比例
返回結果(Vector, Array[Double]),第一個爲權重,每二個爲每次迭代的偏差值。
在MiniBatchSGD中主要實現對輸入數據集進行迭代抽樣,經過使用LogisticGradient做爲梯度降低算法,使用SquaredL2Updater做爲更新算法,不斷對抽樣數據集進行迭代計算從而找出最優的特徵權重向量解。在LinearRegressionWithSGD中定義以下:
privateval gradient = new LogisticGradient()
privateval updater = new SquaredL2Updater()
overrideval optimizer = new GradientDescent(gradient, updater)
.setStepSize(stepSize)
.setNumIterations(numIterations)
.setRegParam(regParam)
.setMiniBatchFraction(miniBatchFraction)
runMiniBatchSGD方法中調用了gradient.compute、updater.compute兩個方法,其實現方法以下。
1.2.4 gradient & updater
1)gradient
//計算當前計算對象的類標籤與實際類標籤值之差
//計算當前平方梯度降低值
//計算權重的更新值
//返回當前訓練對象的特徵權重向量和偏差
class LogisticGradient(numClasses: Int) extends Gradient {
defthis() = this(2)
overridedef compute(data: Vector, label: Double, weights: Vector): (Vector, Double) = {
val gradient = Vectors.zeros(weights.size)
val loss = compute(data, label, weights, gradient)
(gradient, loss)
}
overridedef compute(
data: Vector,
label: Double,
weights: Vector,
cumGradient: Vector): Double = {
val dataSize = data.size
// (weights.size / dataSize + 1) is number of classes
require(weights.size % dataSize == 0 && numClasses == weights.size / dataSize + 1)
numClasses match {
case2 =>
/**
* For Binary Logistic Regression.
*
* Although the loss and gradient calculation for multinomial one is more generalized,
* and multinomial one can also be used in binary case, we still implement a specialized
* binary version for performance reason.
*/
val margin = -1.0 * dot(data, weights)
val multiplier = (1.0 / (1.0 + math.exp(margin))) - label
axpy(multiplier, data, cumGradient)
if (label > 0) {
// The following is equivalent to log(1 + exp(margin)) but more numerically stable.
MLUtils.log1pExp(margin)
} else {
MLUtils.log1pExp(margin) - margin
}
case _ =>
/**
* For Multinomial Logistic Regression.
*/
val weightsArray = weights match {
case dv: DenseVector => dv.values
case _ =>
thrownew IllegalArgumentException(
s"weights only supports dense vector but got type ${weights.getClass}.")
}
val cumGradientArray = cumGradient match {
case dv: DenseVector => dv.values
case _ =>
thrownew IllegalArgumentException(
s"cumGradient only supports dense vector but got type ${cumGradient.getClass}.")
}
// marginY is margins(label - 1) in the formula.
var marginY = 0.0
var maxMargin = Double.NegativeInfinity
var maxMarginIndex = 0
val margins = Array.tabulate(numClasses - 1) { i =>
var margin = 0.0
data.foreachActive { (index, value) =>
if (value != 0.0) margin += value * weightsArray((i * dataSize) + index)
}
if (i == label.toInt - 1) marginY = margin
if (margin > maxMargin) {
maxMargin = margin
maxMarginIndex = i
}
margin
}
/**
* When maxMargin > 0, the original formula will cause overflow as we discuss
* in the previous comment.
* We address this by subtracting maxMargin from all the margins, so it's guaranteed
* that all of the new margins will be smaller than zero to prevent arithmetic overflow.
*/
val sum = {
var temp = 0.0
if (maxMargin > 0) {
for (i <- 0 until numClasses - 1) {
margins(i) -= maxMargin
if (i == maxMarginIndex) {
temp += math.exp(-maxMargin)
} else {
temp += math.exp(margins(i))
}
}
} else {
for (i <- 0 until numClasses - 1) {
temp += math.exp(margins(i))
}
}
temp
}
for (i <- 0 until numClasses - 1) {
val multiplier = math.exp(margins(i)) / (sum + 1.0) - {
if (label != 0.0 && label == i + 1) 1.0else0.0
}
data.foreachActive { (index, value) =>
if (value != 0.0) cumGradientArray(i * dataSize + index) += multiplier * value
}
}
val loss = if (label > 0.0) math.log1p(sum) - marginY else math.log1p(sum)
if (maxMargin > 0) {
loss + maxMargin
} else {
loss
}
}
}
}
2)updater
//weihtsOld:上一次迭代計算後的特徵權重向量
//gradient:本次迭代計算的特徵權重向量
//stepSize:迭代步長
//iter:當前迭代次數
//regParam:正則參數
//以當前迭代次數的平方根的倒數做爲本次迭代趨近(降低)的因子
//返回本次剃度降低後更新的特徵權重向量
//使用了L2 regularization(R(w) = 1/2 ||w||^2),weights更新規則爲:
/**
* :: DeveloperApi ::
* Updater for L2 regularized problems.
* R(w) = 1/2 ||w||^2
* Uses a step-size decreasing with the square root of the number of iterations.
*/
@DeveloperApi
class SquaredL2Updater extends Updater {
overridedef compute(
weightsOld: Vector,
gradient: Vector,
stepSize: Double,
iter: Int,
regParam: Double): (Vector, Double) = {
// add up both updates from the gradient of the loss (= step) as well as
// the gradient of the regularizer (= regParam * weightsOld)
// w' = w - thisIterStepSize * (gradient + regParam * w)
// w' = (1 - thisIterStepSize * regParam) * w - thisIterStepSize * gradient
val thisIterStepSize = stepSize / math.sqrt(iter)
val brzWeights: BV[Double] = weightsOld.toBreeze.toDenseVector
brzWeights :*= (1.0 - thisIterStepSize * regParam)
brzAxpy(-thisIterStepSize, gradient.toBreeze, brzWeights)
val norm = brzNorm(brzWeights, 2.0)
(Vectors.fromBreeze(brzWeights), 0.5 * regParam * norm * norm)
}
}
1.3 Mllib Logistic Regression實例
一、數據
數據格式爲:標籤, 特徵1 特徵2 特徵3……
0 128:51 129:159 130:253 131:159 132:50 155:48 156:238 157:252 158:252 159:252 160:237 182:54 183:227 184:253 185:252 186:239 187:233 188:252 189:57 190:6 208:10 209:60 210:224 211:252 212:253 213:252 214:202 215:84 216:252 217:253 218:122 236:163 237:252 238:252 239:252 240:253 241:252 242:252 243:96 244:189 245:253 246:167 263:51 264:238 265:253 266:253 267:190 268:114 269:253 270:228 271:47 272:79 273:255 274:168 290:48 291:238 292:252 293:252 294:179 295:12 296:75 297:121 298:21 301:253 302:243 303:50 317:38 318:165 319:253 320:233 321:208 322:84 329:253 330:252 331:165 344:7 345:178 346:252 347:240 348:71 349:19 350:28 357:253 358:252 359:195 372:57 373:252 374:252 375:63 385:253 386:252 387:195 400:198 401:253 402:190 413:255 414:253 415:196 427:76 428:246 429:252 430:112 441:253 442:252 443:148 455:85 456:252 457:230 458:25 467:7 468:135 469:253 470:186 471:12 483:85 484:252 485:223 494:7 495:131 496:252 497:225 498:71 511:85 512:252 513:145 521:48 522:165 523:252 524:173 539:86 540:253 541:225 548:114 549:238 550:253 551:162 567:85 568:252 569:249 570:146 571:48 572:29 573:85 574:178 575:225 576:253 577:223 578:167 579:56 595:85 596:252 597:252 598:252 599:229 600:215 601:252 602:252 603:252 604:196 605:130 623:28 624:199 625:252 626:252 627:253 628:252 629:252 630:233 631:145 652:25 653:128 654:252 655:253 656:252 657:141 658:37
1 159:124 160:253 161:255 162:63 186:96 187:244 188:251 189:253 190:62 214:127 215:251 216:251 217:253 218:62 241:68 242:236 243:251 244:211 245:31 246:8 268:60 269:228 270:251 271:251 272:94 296:155 297:253 298:253 299:189 323:20 324:253 325:251 326:235 327:66 350:32 351:205 352:253 353:251 354:126 378:104 379:251 380:253 381:184 382:15 405:80 406:240 407:251 408:193 409:23 432:32 433:253 434:253 435:253 436:159 460:151 461:251 462:251 463:251 464:39 487:48 488:221 489:251 490:251 491:172 515:234 516:251 517:251 518:196 519:12 543:253 544:251 545:251 546:89 570:159 571:255 572:253 573:253 574:31 597:48 598:228 599:253 600:247 601:140 602:8 625:64 626:251 627:253 628:220 653:64 654:251 655:253 656:220 681:24 682:193 683:253 684:220
……
二、代碼
//1 讀取樣本數據
valdata_path = "/user/tmp/sample_libsvm_data.txt"
valexamples = MLUtils.loadLibSVMFile(sc, data_path).cache()
//2 樣本數據劃分訓練樣本與測試樣本
valsplits = examples.randomSplit(Array(0.6, 0.4), seed = 11L)
valtraining = splits(0).cache()
valtest = splits(1)
valnumTraining = training.count()
valnumTest = test.count()
println(s"Training: $numTraining, test: $numTest.")
//3 新建邏輯迴歸模型,並設置訓練參數
valnumIterations = 1000
valstepSize = 1
valminiBatchFraction = 1.0
valmodel = LogisticRegressionWithSGD.train(training, numIterations, stepSize, miniBatchFraction)
//4 對測試樣本進行測試
valprediction = model.predict(test.map(_.features))
valpredictionAndLabel = prediction.zip(test.map(_.label))
//5 計算測試偏差
valmetrics = new MulticlassMetrics(predictionAndLabel)
valprecision = metrics.precision
println("Precision = " + precision)