spark mllib
機器學習
java
package Classification
/** * LogisticRegression Algorithm For Hive * 邏輯迴歸測試環境連hive部署自動化 * Created by wy on 2019/04/01 */
import java.io.File
import org.apache.log4j.{Level, Logger}
import org.apache.spark.{SparkConf, SparkContext}
import org.apache.spark.sql.hive.HiveContext
import org.apache.spark.mllib.linalg.Vectors
import org.apache.spark.mllib.regression.LabeledPoint
import org.apache.spark.rdd.RDD
import org.apache.spark.mllib.classification.LogisticRegressionWithSGD
import org.apache.spark.mllib.classification.LogisticRegressionModel
import org.apache.spark.mllib.evaluation.BinaryClassificationMetrics
import org.apache.spark.mllib.feature.StandardScaler
import org.apache.spark.mllib.optimization.{L1Updater, SimpleUpdater, SquaredL2Updater, Updater}
import org.apache.spark.mllib.classification.ClassificationModel
import org.apache.spark.sql.Row
import org.apache.spark.sql.types.{DoubleType, StringType, StructField, StructType}
object LrForHive {
//屏蔽沒必要要的日誌顯示在終端上
Logger.getLogger("org.apache.spark").setLevel(Level.WARN)
Logger.getLogger("org.apache.eclipse.jetty.server").setLevel(Level.OFF)
def main(args: Array[String]): Unit = {
//程序入口初始化
val conf = new SparkConf().setAppName("LrForHive")
conf.set("set hive.cli.print.header","false") //去除hive表列名
val sc = new SparkContext(conf)
val sqlContext = new HiveContext(sc)
//hive讀數據
val Data = sqlContext.sql("select * from sospdm.sosp_ml_supervip_big_order_mem").rdd
val data = Data.map(x => x(0).toString).map{
line => {
val arr = line.split("#")
val label = arr(1).toDouble
val features = arr(0).split(",").map(_.toDouble)
LabeledPoint(label,Vectors.dense(features)) //建立一個稠密向量
}
}
data.cache() //緩存
//特徵標準化
val vectors = data.map(x => x.features)
val scalar = new StandardScaler(withMean = true, withStd = true).fit(vectors) //將向量傳到轉換函數
val scaledData = data.map(x => LabeledPoint(x.label, scalar.transform(x.features)))
scaledData.cache()
/** 模型參數調優MLlib線性模型優化技術:SGD和L-BFGS(只在邏輯迴歸中使用LogisticRegressionWithLBFGS)*/
//線性模型
//定義訓練調參輔助函數,根據給定輸入訓練模型 (輸入, 則正則化參數, 迭代次數, 正則化形式, 步長)
def trainWithParams(input: RDD[LabeledPoint], regParam: Double, numIterations: Int,
updater: Updater, stepSize: Double) = {
val lr =new LogisticRegressionWithSGD //邏輯迴歸也能夠用LogisticRegressionWithLBFGS
lr.optimizer
.setNumIterations(numIterations) //迭代次數
.setStepSize(stepSize) //步長
.setRegParam(regParam) //則正則化參數
.setUpdater(updater) //正則化形式
lr.run(input) //輸入訓練數據RDD
}
//定義第二個輔助函數,label爲須要調試的參數,data:輸入預測的數據,model訓練的模型
def createMetrics(label: Double, data: RDD[LabeledPoint], model: ClassificationModel) = {
val scoreAndLabels = data.map { point =>
(model.predict(point.features),point.label) //(predicts,label)
}
val metrics = new BinaryClassificationMetrics(scoreAndLabels)
(label, metrics.areaUnderROC()) //計算AUC
}
//1迭代次數
val iterateResults = Seq(1, 5, 10, 50, 100).map { param =>
//訓練
val model = trainWithParams(scaledData, 0.0, param, new SimpleUpdater, 1.0)
//擬合,計算AUC
createMetrics(param, scaledData, model)
}
iterateResults.foreach { case (param, auc) => println(f"$param iterations, AUC = ${auc * 100}%2.2f%%")}
var maxIterateAuc = 0.0
var bestIterateParam = 0
for(x <- iterateResults){
if(x._2 > maxIterateAuc){
maxIterateAuc = x._2
bestIterateParam = x._1.toInt
}
}
println("max auc: " + maxIterateAuc + " best numIterations param: " + bestIterateParam)
//2步長 大步長收斂快,太大可能致使收斂到局部最優解
val stepResults = Seq(0.001, 0.01, 0.1, 1.0, 10.0).map { param =>
val model = trainWithParams(scaledData, 0.0, bestIterateParam, new SimpleUpdater, param)
createMetrics(param, scaledData, model)
}
stepResults.foreach { case (param, auc) => println(f"$param stepSize, AUC = ${auc * 100}%2.2f%%")}
var maxStepAuc = 0.0
var bestStepParam = 0.0
for(x <- stepResults){
if(x._2 > maxStepAuc){
maxStepAuc = x._2
bestStepParam = x._1
}
}
println("max auc: " + maxStepAuc + " best stepSize param: " + bestStepParam)
//3.1正則化參數,默認值爲0.0,L1正則new L1Updater
val regL1Results = Seq(0.0, 0.001, 0.01, 0.1, 1.0, 10.0).map{ param =>
val model = trainWithParams(scaledData, param, bestIterateParam, new L1Updater, bestStepParam)
createMetrics(param, scaledData, model)
}
regL1Results.foreach{ case (param,auc) => println(f"$param regParam L1, AUC = ${auc * 100}%2.2f%%")}
var maxRegL1Auc = 0.0
var bestRegL1Param = 0.0
for(x <- regL1Results){
if(x._2 > maxRegL1Auc){
maxRegL1Auc = x._2
bestRegL1Param = x._1
}
}
println("max auc: " + maxRegL1Auc + " best L1regParam: " + bestRegL1Param)
//3.2正則化參數:默認值爲0.0,L2正則new SquaredL2Updater
val regL2Results = Seq(0.0, 0.001, 0.01, 0.1, 1.0, 10.0).map{ param =>
val model = trainWithParams(scaledData, param, bestIterateParam, new SquaredL2Updater, bestStepParam)
createMetrics(param, scaledData, model)
}
regL2Results.foreach{ case (param,auc) => println(f"$param regParam L2, AUC = ${auc * 100}%2.2f%%")}
var maxRegL2Auc = 0.0
var bestRegL2Param = 0.0
for(x <- regL2Results){
if(x._2 > maxRegL2Auc){
maxRegL2Auc = x._2
bestRegL2Param = x._1
}
}
println("max auc: " + maxRegL2Auc + " best L2regParam: " + bestRegL2Param)
//4正則化形式:默認爲new SimpleUpdater 正則化係數無效,前兩個參數調參後最優AUC爲maxStepAuc
//則,3.1和3.2的最優AUC與maxStepAuc比較,較大的則爲最優正則化形式
var bestRegParam = 0.0
var bestUpdaterID = 0
if(maxStepAuc >= maxRegL1Auc ){
if(maxStepAuc >= maxRegL2Auc){
bestUpdaterID = 0
bestRegParam = 0.0
}
else {
bestUpdaterID = 2
bestRegParam = bestRegL2Param
}
}
else {
if(maxRegL2Auc >= maxRegL1Auc){
bestUpdaterID = 2
bestRegParam = bestRegL2Param
}
else {
bestUpdaterID = 1
bestRegParam = bestRegL1Param
}
}
val Updaters = Seq(new SimpleUpdater, new L1Updater, new SquaredL2Updater)
val bestUpdater = Updaters(bestUpdaterID)
//最優參數:
println("------------------更新模型訓練參數---------------------")
println(f"best numIterations param: $bestIterateParam\n" +
f"best stepSize param: $bestStepParam\n" +
f"best regParam: $bestRegParam\n" +
f"best regUpdater: $bestUpdater\n"
)
val upDateLrModel = trainWithParams(scaledData, bestRegParam, bestIterateParam, bestUpdater, bestStepParam)
/** //刪除模型目錄和文件 def dirDel(path: File) { if (!path.exists()) return else if (path.isFile) { path.delete() return } val file: Array[File] = path.listFiles() for (d <- file) { dirDel(d) } path.delete() } val path: File = new File("/data/home/sospdm/tmp_wy/Model/upDateLrModel.model") dirDel(path) //刪除原模型保存的文件,不刪除新模型保存會報錯 // 保存和加載模型 upDateLrModel.save(sc, "/data/home/sospdm/tmp_wy/Model/upDateLrModel.model") val loadLrModel = LogisticRegressionModel.load(sc, "/data/home/sospdm/tmp_wy/Model/upDateLrModel.model") */
//打印模型權重值
val newRes = upDateLrModel.weights.toArray
val rdd1 = sc.parallelize(Seq(newRes)) //轉換成RDD
//經過StructType直接指定每一個字段的schema
val schema = StructType(
List(
StructField("feature01", DoubleType, nullable = true),
StructField("feature02", DoubleType, nullable = true),
StructField("feature03", DoubleType, nullable = true),
StructField("feature04", DoubleType, nullable = true),
StructField("feature05", DoubleType, nullable = true),
StructField("feature06", DoubleType, nullable = true),
StructField("feature07", DoubleType, nullable = true),
StructField("feature08", DoubleType, nullable = true),
StructField("feature09", DoubleType, nullable = true),
StructField("feature10", DoubleType, nullable = true),
StructField("feature11", DoubleType, nullable = true),
StructField("feature12", DoubleType, nullable = true),
StructField("feature13", DoubleType, nullable = true),
StructField("feature14", DoubleType, nullable = true)
)
)
//將RDD1映射到rowRDD,19個特徵權重值
val resRDD = rdd1.map(f => (f(0),f(1),f(2),f(3),f(4),f(5),f(6),f(7),f(8),f(9),f(10)
,f(11),f(12),f(13)))
val rowRDD = resRDD.map(x => Row(x._1.toDouble,x._2.toDouble,x._3.toDouble,
x._4.toDouble,x._5.toDouble,x._6.toDouble,x._7.toDouble,x._8.toDouble,x._9.toDouble,
x._10.toDouble,x._11.toDouble,x._12.toDouble,x._13.toDouble,x._14.toDouble))
//轉換成DF,並放入表item_sim
val df = sqlContext.createDataFrame(rowRDD, schema)
df.createOrReplaceTempView("item_sim_c01")
//結果寫入hive
sqlContext.sql("drop table if exists sospdm.sosp_ml_supervip_big_order_features")
sqlContext.sql("create table if not exists " +
"sospdm.sosp_ml_supervip_big_order_features as select * from item_sim_c01")
//方法2
val rdd2 = sc.parallelize(newRes)
val rowRdd2 = rdd2.map(p => Row(p))
val schema2 = StructType(
List(
StructField("feature01", DoubleType, nullable = true)
)
)
val df2 = sqlContext.createDataFrame(rowRdd2, schema2)
df2.createOrReplaceTempView("item_sim_c02")
//結果寫入hive
sqlContext.sql("drop table if exists sospdm.sosp_ml_supervip_big_order_features_02")
sqlContext.sql("create table if not exists " +
"sospdm.sosp_ml_supervip_big_order_features_02 as select * from item_sim_c02")
sc.stop()
}
}
複製代碼
hive> select * from sosp_ml_supervip_big_order_features;
OK
0.6663199754057857 -0.010171216293572719 0.0023033400349458714 0.038338481430094495
-0.01642462221720575 0.024300063006121263 0.010833461995473337 0.7449827421313793
-0.028370767756329837 -0.01679050770672618 -0.004508776927906877 -0.01072063206632886
-0.05246889909683635 0.0167997584085957
Time taken: 0.118 seconds, Fetched: 1 row(s)
hive> select * from sosp_ml_supervip_big_order_features_02;
OK
0.6663199754057857
-0.010171216293572719
0.0023033400349458714
0.038338481430094495
-0.01642462221720575
0.024300063006121263
0.010833461995473337
0.7449827421313793
-0.028370767756329837
-0.01679050770672618
-0.004508776927906877
-0.010720632066328865
-0.05246889909683635
0.0167997584085957
Time taken: 0.095 seconds, Fetched: 14 row(s)
複製代碼