spark mllib
機器學習
算法
邏輯迴歸與線性迴歸相似,但它不屬於迴歸分析家族(主要爲二分類),而屬於分類家族,差別主要在於變量不一樣,所以其解法與生成曲線也不盡相同。 邏輯迴歸是無監督學習的一個重要算法,對某些數據與事物的歸屬(分到哪一個類別)及可能性(分到某一類別的機率)進行評估。apache
公式推導部分轉自知乎機器學習專欄 zhuanlan.zhihu.com/p/28775274緩存
邏輯迴歸本質是線性迴歸,只是在特徵到結果的過程上加上了一層映射。即首先須要把特徵進行求和,而後將求和後的結果應用於一個g(z)函數,g(z)能夠將值映射到0或者是1上面,這個函數就是Sigmoid函數,公式以下:bash
默認分類的值是0.5,超過0.5則類別爲1,小於0.5類別爲0 dom
對於線性邊界的狀況,邊界形式能夠概括爲以下公式(1):eclipse
邏輯迴歸和多重線性迴歸有不少的類似之處。最大的區別是他們的因變量不一樣。這兩個迴歸也能夠統一歸爲廣義線性模型。在 spark mllib 實現的過程當中也是先定義好父類廣義線性模型,而後讓線性迴歸和邏輯迴歸去繼承這個類,從新覆蓋裏面的一些參數,好比 Updater,GradientDescent等。機器學習
邏輯迴歸主要包含如下類:函數
邏輯迴歸相關的spark mllib源碼解析能夠參考這篇文章:blog.csdn.net/stevekangpe…性能
下面的例子展現瞭如何使用邏輯迴歸訓練模型,預測結果,保存和調用模型。學習
import org.apache.spark.SparkContext
import org.apache.spark.mllib.classification.{LogisticRegressionWithLBFGS, LogisticRegressionModel}
import org.apache.spark.mllib.evaluation.MulticlassMetrics
import org.apache.spark.mllib.regression.LabeledPoint
import org.apache.spark.mllib.linalg.Vectors
import org.apache.spark.mllib.util.MLUtils
// 加載訓練數據
val data = MLUtils.loadLibSVMFile(sc, "data/mllib/sample_libsvm_data.txt")
// 切分數據,training (60%) and test (40%).
val splits = data.randomSplit(Array(0.6, 0.4), seed = 11L)
val training = splits(0).cache()
val test = splits(1)
// 訓練模型
val model = new LogisticRegressionWithLBFGS()
.setNumClasses(10)
.run(training)
// Compute raw scores on the test set.
val predictionAndLabels = test.map { case LabeledPoint(label, features) =>
val prediction = model.predict(features)
(prediction, label)
}
// Get evaluation metrics.
val metrics = new MulticlassMetrics(predictionAndLabels)
val precision = metrics.precision
println("Precision = " + precision)
// 保存和加載模型
model.save(sc, "myModelPath")
val sameModel = LogisticRegressionModel.load(sc, "myModelPath")
複製代碼
package Classification
/** * LogisticRegression Algorithm * Created by wy on 2019/03/25 */
//spark初始化
import org.apache.log4j.{Level, Logger}
import org.apache.spark.{SparkConf, SparkContext}
//分類數據格式處理
import org.apache.spark.mllib.linalg.Vectors
import org.apache.spark.mllib.regression.LabeledPoint
import org.apache.spark.rdd.RDD
//邏輯迴歸-隨機梯度降低SGD
import org.apache.spark.mllib.classification.LogisticRegressionWithSGD
//計算Accuracy、PR、ROC和AUC
import org.apache.spark.mllib.evaluation.BinaryClassificationMetrics
//數據標準化
import org.apache.spark.mllib.linalg.distributed.RowMatrix
import org.apache.spark.mllib.feature.StandardScaler
//參數調優
import org.apache.spark.mllib.optimization.{Updater,SimpleUpdater,L1Updater,SquaredL2Updater}
import org.apache.spark.mllib.classification.ClassificationModel
object LogisticRegression {
//屏蔽沒必要要的日誌顯示在終端上
//Logger.getLogger("org.apache.spark").setLevel(Level.WARN)
Logger.getLogger("org.apache.spark").setLevel(Level.ERROR) //warn類信息不會顯示,只顯示error級別的
Logger.getLogger("org.apache.eclipse.jetty.server").setLevel(Level.OFF)
def main(args: Array[String]): Unit = {
//初始化
val conf = new SparkConf().setMaster("local").setAppName("LogisticRegression")
val sc = new SparkContext(conf)
/** * 數據:lr_test.txt * 該數據集包含了46個feature,1個label * */
//input
val sourceRDD = sc.textFile("E:\\Spark\\scala-data\\LRdata\\lr_test.txt")
val data = sourceRDD.map{
line =>{
val arr = line.split("#")
val label = arr(1).toDouble
val features = arr(0).split(",").map(_.toDouble)
LabeledPoint(label,Vectors.dense(features)) //建立一個稠密向量
}
}
/** * 建立一個稀疏向量(第一種方式) * val sv1: Vector = Vector.sparse(3, Array(0,2), Array(1.0,3.0)); * 建立一個稀疏向量(第二種方式) * val sv2 : Vector = Vector.sparse(3, Seq((0,1.0),(2,3.0))) * * 對於稠密向量:很直觀,你要建立什麼,就加入什麼,其函數聲明爲Vector.dense(values : Array[Double]) * 對於稀疏向量,當採用第一種方式時,3表示此向量的長度,第一個Array(0,2)表示的索引,第二個Array(1.0, 3.0) * 與前面的Array(0,2)是相互對應的,表示第0個位置的值爲1.0,第2個位置的值爲3 * * 對於稀疏向量,當採用第二種方式時,3表示此向量的長度,後面的比較直觀,Seq裏面每一對都是(索引,值)的形式。 * */
data.cache() //緩存
val Array(trainData, testData) = data.randomSplit(Array(0.8,0.2),seed = 11L)
trainData.cache()
testData.cache()
val numData = data.count
val numTrainData = trainData.count
val numTestData = testData.count
println("原始數據量:",numData) //40530
println("訓練數據量:",numTrainData) //32503
println("測試數據量:",numTestData) //8027
val stepSize = 0.1 //迭代步長,默認爲1.0
val numIterations = 50 //迭代次數,默認爲100
val miniBatchFraction = 1.0 //每次迭代參與計算的樣本比例,默認爲1.0
/** 訓練邏輯迴歸模型 */
val lrModel = LogisticRegressionWithSGD.train(data, numIterations, stepSize, miniBatchFraction)
//打印模型權重值
val res = lrModel.weights.toArray
println("權重值列表以下:")
res.foreach(println)
println("----------------------擬合預測結果--------------------")
//預測值與真實值比較
//val testPoint = testData.first
//val testPredict = lrModel.predict(testPoint.features)
//testPredict: Double = 0.0
//val testTrueLabel = testPoint.label
//testTrueLabel: Double = 0.0
/** 預測的正確率計算 */
val lrTestCorrect = data.map { x =>
if (lrModel.predict(x.features)== x.label) 1 else 0
}.sum
//預測正確率
val lrAccuracy = lrTestCorrect / numData
println(f"Accuracy:${lrAccuracy * 100}%2.3f%%")
// Accuracy: 97.839%
/** 計算 準確率-召回律(PR曲線) ROC曲線的面積(AUC) * 1.準確率一般用於評價結果的質量,定義爲真陽性的數目除以真陽性和假陽性的總數,其中真陽性值被預測的類別爲1的樣本, * 假陽性是錯誤預測爲1的樣本。 * 2.召回率用來評價結果的完整性,定義爲真陽性的數目除以真陽性和假陽性的和,其中假陽性是類別爲1卻被預測爲0的樣本。 * 一般高準確率對應着低召回率 * 3.ROC曲線與PR曲線相似,是對分類器的真陽性率-假陽性率的圖形化解釋。 * */
val metrics = Seq(lrModel).map{ model =>
val scoreAndLabels = data.map{ x =>
(model.predict(x.features), x.label)
}
val metrics = new BinaryClassificationMetrics(scoreAndLabels)
(model.getClass.getSimpleName, metrics.areaUnderPR(), metrics.areaUnderROC())
}
//val allMetrics = metrics ++ nbMetrics ++ dtMetrics
metrics.foreach{ case (model, pr, roc) =>
println(f"model:$model\n" +
f"Area under PR: ${pr * 100.0}%2.3f%%\n" +
f"Area under ROC: ${roc * 100.0}%2.3f%%")
}
//Accuracy:97.839%
//model:LogisticRegressionModel
//Area under PR: 51.081%
//Area under ROC: 50.000%
/** 改進模型性能以及參數調優 */
//特徵標準化
//將特徵變量用(RowMatrix類)表示成MLlib中的(分佈矩陣)
val vectors = data.map(x => x.features)
val matrix = new RowMatrix(vectors)
val matrixSummary = matrix.computeColumnSummaryStatistics() //計算矩陣每列的統計特性
println("----------------------特徵標準化----------------------")
println("mean: ",matrixSummary.mean) //輸出每列均值
println("max: ",matrixSummary.max) //每列最大值
println("variance: ",matrixSummary.variance) //矩陣每列方差
println("numNonzeros:",matrixSummary.numNonzeros) //每列非0項的數目
println("normL2: ",matrixSummary.normL2) //L2範數:向量各元素的平方和而後求平方根
/**爲使得數據更符合模型的假設,對每一個特徵進行標準化,使得每一個特徵是(0均值)和(單位標準差)*/
//作法:對(每一個特徵值)減去(列的均值),而後(除以)列的(標準差)以進行縮放
val scaler = new StandardScaler(withMean = true, withStd = true).fit(vectors) //將向量傳到轉換函數
val scaledData = data.map(x => LabeledPoint(x.label, scaler.transform(x.features)))
//println(data.first.features)
println("標準化後的特徵第一行結果:")
println(scaledData.first.features)
//[0.0016544159298287912,0.0273303020874253,0.008141541536538578,0.07992614623509364,...
//爲驗證第一個特徵已經應用標準差公式被轉換了,用 第一個特徵(減去)其均值,而後(除以)標準差--方差的平方根
println("驗證第一個特徵是否正確")
println((data.first.features(0) - matrixSummary.mean(0)) / math.sqrt(matrixSummary.variance(0)))
//0.0016544159298287912 驗證正確
/** 如今使用標準化的數據從新訓練模型邏輯迴歸-(決策樹和樸素貝葉斯不受特徵標準化的影響)*/
//val Array(scaledTrainData, scaledTestData) = scaledData.randomSplit(Array(0.8,0.2),seed = 11L)
val scaledLrModel = LogisticRegressionWithSGD.train(scaledData,numIterations,stepSize,miniBatchFraction)
val scaledLrCorrect = scaledData.map{ x =>
if (scaledLrModel.predict(x.features) == x.label) 1 else 0
}.sum
val scaledLrTestAccuracy = scaledLrCorrect / numData
val lrPredictionsVsTrue = scaledData.map{ x =>
(scaledLrModel.predict(x.features), x.label)
}
val lrMetricsScaled = new BinaryClassificationMetrics(lrPredictionsVsTrue)
val lrPr = lrMetricsScaled.areaUnderPR() //lrPr: Double = 0.27532
val lrPoc = lrMetricsScaled.areaUnderROC() //lrPoc: Double = 0.58451
println("------------標準化後數據訓練、擬合和結果----------------")
println(f"Model:${scaledLrModel.getClass.getSimpleName}\n" +
f"Accuracy: ${scaledLrTestAccuracy * 100}%2.3f%%\n" +
f"Area under PR: ${lrPr * 100}%2.3f%%\n" +
f"Area under ROC:${lrPoc * 100}%2.3f%%")
//Model:LogisticRegressionModel
//Accuracy: 64.974%
//Area under PR: 35.237%
//Area under ROC:65.355%
/** 模型參數調優MLlib線性模型優化技術:SGD和L-BFGS(只在邏輯迴歸中使用LogisticRegressionWithLBFGS)*/
//線性模型
//定義訓練調參輔助函數,根據給定輸入訓練模型 (輸入, 則正則化參數, 迭代次數, 正則化形式, 步長)
def trainWithParams(input: RDD[LabeledPoint], regParam: Double, numIterations: Int,
updater: Updater, stepSize: Double) = {
val lr =new LogisticRegressionWithSGD //邏輯迴歸也能夠用LogisticRegressionWithLBFGS
lr.optimizer
.setNumIterations(numIterations) //迭代次數
.setStepSize(stepSize) //步長
.setRegParam(regParam) //則正則化參數
.setUpdater(updater) //正則化形式
lr.run(input) //輸入訓練數據RDD
}
//定義第二個輔助函數,label爲須要調試的參數,data:輸入預測的數據,model訓練的模型
def createMetrics(label: Double, data: RDD[LabeledPoint], model: ClassificationModel) = {
val scoreAndLabels = data.map { point =>
(model.predict(point.features),point.label) //(predicts,label)
}
val metrics = new BinaryClassificationMetrics(scoreAndLabels)
(label, metrics.areaUnderROC()) //計算AUC
}
//加快屢次模型訓練速度, 緩存標準化後的數據
scaledData.cache()
println("------------------標準化後數據調參---------------------")
//1迭代次數
val iterateResults = Seq(1, 5, 10, 50, 100).map { param =>
//訓練
val model = trainWithParams(scaledData, 0.0, param, new SimpleUpdater, 1.0)
//擬合,計算AUC
createMetrics(param, scaledData, model)
}
println("1迭代次數numIterations:Seq(1, 5, 10, 50, 100)")
iterateResults.foreach { case (param, auc) => println(f"$param iterations, AUC = ${auc * 100}%2.2f%%")}
//1 iterations, AUC = 64.50%
//5 iterations, AUC = 67.07%
//10 iterations, AUC = 67.10%
//50 iterations, AUC = 67.56%
//100 iterations, AUC = 67.56%
var maxIterateAuc = 0.0
var bestIterateParam = 0
for(x <- iterateResults){
//println(x)
if(x._2 > maxIterateAuc){
maxIterateAuc = x._2
bestIterateParam = x._1.toInt
}
}
println("max auc: " + maxIterateAuc + " best numIterations param: " + bestIterateParam)
//2步長 大步長收斂快,太大可能致使收斂到局部最優解
val stepResults = Seq(0.001, 0.01, 0.1, 1.0, 10.0).map { param =>
val model = trainWithParams(scaledData, 0.0, bestIterateParam, new SimpleUpdater, param)
createMetrics(param, scaledData, model)
}
println("\n2步長stepSize:Seq(0.001, 0.01, 0.1, 1.0, 10.0)")
stepResults.foreach { case (param, auc) => println(f"$param stepSize, AUC = ${auc * 100}%2.2f%%")}
//0.001 step size, AUC = 64.50%
//0.01 step size, AUC = 64.50%
//0.1 step size, AUC = 65.36%
//1.0 step size, AUC = 67.56%
//10.0 step size, AUC = 50.20%
var maxStepAuc = 0.0
var bestStepParam = 0.0
for(x <- stepResults){
//println(x)
if(x._2 > maxStepAuc){
maxStepAuc = x._2
bestStepParam = x._1
}
}
println("max auc: " + maxStepAuc + " best stepSize param: " + bestStepParam)
//3.1正則化參數,默認值爲0.0,L1正則new L1Updater
val regL1Results = Seq(0.0, 0.001, 0.01, 0.1, 1.0, 10.0).map{ param =>
val model = trainWithParams(scaledData, param, bestIterateParam, new L1Updater, bestStepParam)
createMetrics(param, scaledData, model)
}
println("\n3.1 L1正則化參數regParam:Seq(0.0, 0.001, 0.01, 0.1, 1.0, 10.0)")
regL1Results.foreach{ case (param,auc) => println(f"$param regParam L1, AUC = ${auc * 100}%2.2f%%")}
//regParam L1 = 0.0, AUC = 67.56%
//regParam L1 = 0.001, AUC = 66.43%
//regParam L1 = 0.01, AUC = 65.74%
//regParam L1 = 0.1, AUC = 50.00%
//regParam L1 = 1.0, AUC = 50.00%
//regParam L1 = 10.0, AUC = 50.00%
var maxRegL1Auc = 0.0
var bestRegL1Param = 0.0
for(x <- regL1Results){
//println(x)
if(x._2 > maxRegL1Auc){
maxRegL1Auc = x._2
bestRegL1Param = x._1
}
}
println("max auc: " + maxRegL1Auc + " best L1regParam: " + bestRegL1Param)
//3.2正則化參數:默認值爲0.0,L2正則new SquaredL2Updater
val regL2Results = Seq(0.0, 0.001, 0.01, 0.1, 1.0, 10.0).map{ param =>
val model = trainWithParams(scaledData, param, bestIterateParam, new SquaredL2Updater, bestStepParam)
createMetrics(param, scaledData, model)
}
println("\n3.2 L2正則化參數regParam:Seq(0.0, 0.001, 0.01, 0.1, 1.0, 10.0)")
regL2Results.foreach{ case (param,auc) => println(f"$param regParam L2, AUC = ${auc * 100}%2.2f%%")}
//regParam L2 = 0.0 , AUC = 67.56%
//regParam L2 = 0.001 , AUC = 67.56%
//regParam L2 = 0.01 , AUC = 67.43%
//regParam L2 = 0.1 , AUC = 67.14%
//regParam L2 = 1.0 , AUC = 66.60%
//regParam L2 = 10.0 , AUC = 36.76%
var maxRegL2Auc = 0.0
var bestRegL2Param = 0.0
for(x <- regL2Results){
//println(x)
if(x._2 > maxRegL2Auc){
maxRegL2Auc = x._2
bestRegL2Param = x._1
}
}
println("max auc: " + maxRegL2Auc + " best L2regParam: " + bestRegL2Param)
//4正則化形式:默認爲new SimpleUpdater 正則化係數無效,前兩個參數調參後最優AUC爲maxStepAuc
//則,3.1和3.2的最優AUC與maxStepAuc比較,較大的則爲最優正則化形式
var bestRegParam = 0.0
var bestUpdaterID = 0
if(maxStepAuc >= maxRegL1Auc ){
if(maxStepAuc >= maxRegL2Auc){
bestUpdaterID = 0
bestRegParam = 0.0
}
else {
bestUpdaterID = 2
bestRegParam = bestRegL2Param
}
}
else {
if(maxRegL2Auc >= maxRegL1Auc){
bestUpdaterID = 2
bestRegParam = bestRegL2Param
}
else {
bestUpdaterID = 1
bestRegParam = bestRegL1Param
}
}
val Updaters = Seq(new SimpleUpdater, new L1Updater, new SquaredL2Updater)
val bestUpdater = Updaters(bestUpdaterID)
//最優參數:
println("------------------更新模型訓練參數---------------------")
println(f"best numIterations param: $bestIterateParam\n" +
f"best stepSize param: $bestStepParam\n" +
f"best regParam: $bestRegParam\n" +
f"best regUpdater: $bestUpdater\n"
)
// numIterations:50
// stepSize:1.0
// regParam:0.0
// updater:new SimpleUpdater
/** 數據標準化、參數調優後,再次訓練邏輯迴歸模型,能夠用28分訓練測試 */
val upDateLrModel = trainWithParams(scaledData, bestRegParam, bestIterateParam, bestUpdater, bestStepParam)
//保存和加載模型
upDateLrModel.save(sc, "E:\\Spark\\scala-data\\Model")
val newModel = LogisticRegressionModel.load(sc, "E:\\Spark\\scala-data\\Model")
//打印模型權重值
val newRes = newModel.weights.toArray
println("參數調優後特徵權重值列表以下:")
newRes.foreach(println)
}
}
複製代碼
------------標準化後數據訓練、擬合和結果----------------
Model:LogisticRegressionModel
Accuracy: 64.974%
Area under PR: 35.237%
Area under ROC:65.355%
------------------標準化後數據調參---------------------
1迭代次數numIterations:Seq(1, 5, 10, 50, 100)
1.0 iterations, AUC = 64.50%
5.0 iterations, AUC = 67.07%
10.0 iterations, AUC = 67.10%
50.0 iterations, AUC = 67.56%
100.0 iterations, AUC = 67.56%
max auc: 0.6756122825453876 best numIterations param: 50
2步長stepSize:Seq(0.001, 0.01, 0.1, 1.0, 10.0)
0.001 stepSize, AUC = 64.50%
0.01 stepSize, AUC = 64.50%
0.1 stepSize, AUC = 65.36%
1.0 stepSize, AUC = 67.56%
10.0 stepSize, AUC = 50.20%
max auc: 0.6756122825453876 best stepSize param: 1.0
3.1 L1正則化參數regParam:Seq(0.0, 0.001, 0.01, 0.1, 1.0, 10.0)
0.0 regParam L1, AUC = 67.56%
0.001 regParam L1, AUC = 66.43%
0.01 regParam L1, AUC = 65.74%
0.1 regParam L1, AUC = 50.00%
1.0 regParam L1, AUC = 50.00%
10.0 regParam L1, AUC = 50.00%
max auc: 0.6756122825453876 best L1regParam: 0.0
3.2 L2正則化參數regParam:Seq(0.0, 0.001, 0.01, 0.1, 1.0, 10.0)
0.0 regParam L2, AUC = 67.56%
0.001 regParam L2, AUC = 67.56%
0.01 regParam L2, AUC = 67.43%
0.1 regParam L2, AUC = 67.14%
1.0 regParam L2, AUC = 66.60%
10.0 regParam L2, AUC = 36.76%
max auc: 0.6756122825453876 best L2regParam: 0.0
------------------更新模型訓練參數---------------------
best numIterations param: 50
best stepSize param: 1.0
best regParam: 0.0
best regUpdater: org.apache.spark.mllib.optimization.SimpleUpdater@4b6fc615
---------------調用模型輸出特徵權重值------------------
-0.014912705104766975
-0.010300917189488083
0.0037483953352201067
-1.0105291799106376E-4
0.08089240228856116
···
複製代碼