spark實現邏輯迴歸和集羣環境運行

spark mllib 機器學習java

scala代碼:

package Classification

/** * LogisticRegression Algorithm For Hive * 邏輯迴歸測試環境連hive部署自動化 * Created by wy on 2019/04/01 */
import java.io.File

import org.apache.log4j.{Level, Logger}
import org.apache.spark.{SparkConf, SparkContext}
import org.apache.spark.sql.hive.HiveContext
import org.apache.spark.mllib.linalg.Vectors
import org.apache.spark.mllib.regression.LabeledPoint
import org.apache.spark.rdd.RDD
import org.apache.spark.mllib.classification.LogisticRegressionWithSGD
import org.apache.spark.mllib.classification.LogisticRegressionModel
import org.apache.spark.mllib.evaluation.BinaryClassificationMetrics
import org.apache.spark.mllib.feature.StandardScaler
import org.apache.spark.mllib.optimization.{L1Updater, SimpleUpdater, SquaredL2Updater, Updater}
import org.apache.spark.mllib.classification.ClassificationModel
import org.apache.spark.sql.Row
import org.apache.spark.sql.types.{DoubleType, StringType, StructField, StructType}

object LrForHive {
  //屏蔽沒必要要的日誌顯示在終端上
  Logger.getLogger("org.apache.spark").setLevel(Level.WARN)
  Logger.getLogger("org.apache.eclipse.jetty.server").setLevel(Level.OFF)

  def main(args: Array[String]): Unit = {
    //程序入口初始化
    val conf = new SparkConf().setAppName("LrForHive")
    conf.set("set hive.cli.print.header","false")   //去除hive表列名

    val sc = new SparkContext(conf)
    val sqlContext = new HiveContext(sc)
    //hive讀數據
    val Data = sqlContext.sql("select * from sospdm.sosp_ml_supervip_big_order_mem").rdd

    val data = Data.map(x => x(0).toString).map{
      line => {
        val arr = line.split("#")
        val label = arr(1).toDouble
        val features = arr(0).split(",").map(_.toDouble)
        LabeledPoint(label,Vectors.dense(features))  //建立一個稠密向量
      }
    }

    data.cache()  //緩存

    //特徵標準化
    val vectors = data.map(x => x.features)
    val scalar = new StandardScaler(withMean = true, withStd = true).fit(vectors) //將向量傳到轉換函數
    val scaledData = data.map(x => LabeledPoint(x.label, scalar.transform(x.features)))
    scaledData.cache()

    /** 模型參數調優MLlib線性模型優化技術:SGD和L-BFGS(只在邏輯迴歸中使用LogisticRegressionWithLBFGS)*/
    //線性模型
    //定義訓練調參輔助函數,根據給定輸入訓練模型 (輸入, 則正則化參數, 迭代次數, 正則化形式, 步長)
    def trainWithParams(input: RDD[LabeledPoint], regParam: Double, numIterations: Int,
                        updater: Updater, stepSize: Double) = {
      val lr =new LogisticRegressionWithSGD  //邏輯迴歸也能夠用LogisticRegressionWithLBFGS
      lr.optimizer
        .setNumIterations(numIterations)  //迭代次數
        .setStepSize(stepSize)            //步長
        .setRegParam(regParam)            //則正則化參數
        .setUpdater(updater)              //正則化形式
      lr.run(input)                       //輸入訓練數據RDD
    }

    //定義第二個輔助函數,label爲須要調試的參數,data:輸入預測的數據,model訓練的模型
    def createMetrics(label: Double, data: RDD[LabeledPoint], model: ClassificationModel) = {
      val scoreAndLabels = data.map { point =>
        (model.predict(point.features),point.label)  //(predicts,label)
      }
      val metrics = new BinaryClassificationMetrics(scoreAndLabels)
      (label, metrics.areaUnderROC())  //計算AUC
    }

    //1迭代次數
    val iterateResults = Seq(1, 5, 10, 50, 100).map { param =>
      //訓練
      val model = trainWithParams(scaledData, 0.0, param, new SimpleUpdater, 1.0)
      //擬合,計算AUC
      createMetrics(param, scaledData, model)
    }
    iterateResults.foreach { case (param, auc) => println(f"$param iterations, AUC = ${auc * 100}%2.2f%%")}
    var maxIterateAuc = 0.0
    var bestIterateParam = 0
    for(x <- iterateResults){
      if(x._2 > maxIterateAuc){
        maxIterateAuc = x._2
        bestIterateParam = x._1.toInt
      }
    }
    println("max auc: " + maxIterateAuc + " best numIterations param: " + bestIterateParam)

    //2步長 大步長收斂快,太大可能致使收斂到局部最優解
    val stepResults = Seq(0.001, 0.01, 0.1, 1.0, 10.0).map { param =>
      val model = trainWithParams(scaledData, 0.0, bestIterateParam, new SimpleUpdater, param)
      createMetrics(param, scaledData, model)
    }
    stepResults.foreach { case (param, auc) => println(f"$param stepSize, AUC = ${auc * 100}%2.2f%%")}
    var maxStepAuc = 0.0
    var bestStepParam = 0.0
    for(x <- stepResults){
      if(x._2 > maxStepAuc){
        maxStepAuc = x._2
        bestStepParam = x._1
      }
    }
    println("max auc: " + maxStepAuc + " best stepSize param: " + bestStepParam)

    //3.1正則化參數,默認值爲0.0,L1正則new L1Updater
    val regL1Results = Seq(0.0, 0.001, 0.01, 0.1, 1.0, 10.0).map{ param =>
      val model = trainWithParams(scaledData, param, bestIterateParam, new L1Updater, bestStepParam)
      createMetrics(param, scaledData, model)
    }
    regL1Results.foreach{ case (param,auc) => println(f"$param regParam L1, AUC = ${auc * 100}%2.2f%%")}
    var maxRegL1Auc = 0.0
    var bestRegL1Param = 0.0
    for(x <- regL1Results){
      if(x._2 > maxRegL1Auc){
        maxRegL1Auc = x._2
        bestRegL1Param = x._1
      }
    }
    println("max auc: " + maxRegL1Auc + " best L1regParam: " + bestRegL1Param)

    //3.2正則化參數:默認值爲0.0,L2正則new SquaredL2Updater
    val regL2Results = Seq(0.0, 0.001, 0.01, 0.1, 1.0, 10.0).map{ param =>
      val model = trainWithParams(scaledData, param, bestIterateParam, new SquaredL2Updater, bestStepParam)
      createMetrics(param, scaledData, model)
    }
    regL2Results.foreach{ case (param,auc) => println(f"$param regParam L2, AUC = ${auc * 100}%2.2f%%")}
    var maxRegL2Auc = 0.0
    var bestRegL2Param = 0.0
    for(x <- regL2Results){
      if(x._2 > maxRegL2Auc){
        maxRegL2Auc = x._2
        bestRegL2Param = x._1
      }
    }
    println("max auc: " + maxRegL2Auc + " best L2regParam: " + bestRegL2Param)
    //4正則化形式:默認爲new SimpleUpdater 正則化係數無效,前兩個參數調參後最優AUC爲maxStepAuc
    //則,3.1和3.2的最優AUC與maxStepAuc比較,較大的則爲最優正則化形式
    var bestRegParam = 0.0
    var bestUpdaterID = 0
    if(maxStepAuc >= maxRegL1Auc ){
      if(maxStepAuc >= maxRegL2Auc){
        bestUpdaterID = 0
        bestRegParam = 0.0
      }
      else {
        bestUpdaterID = 2
        bestRegParam = bestRegL2Param
      }
    }
    else {
      if(maxRegL2Auc >= maxRegL1Auc){
        bestUpdaterID = 2
        bestRegParam = bestRegL2Param
      }
      else {
        bestUpdaterID = 1
        bestRegParam = bestRegL1Param
      }
    }
    val Updaters = Seq(new SimpleUpdater, new L1Updater, new SquaredL2Updater)
    val bestUpdater = Updaters(bestUpdaterID)
    //最優參數:
    println("------------------更新模型訓練參數---------------------")
    println(f"best numIterations param: $bestIterateParam\n" +
      f"best stepSize param: $bestStepParam\n" +
      f"best regParam: $bestRegParam\n" +
      f"best regUpdater: $bestUpdater\n"
    )

    val upDateLrModel = trainWithParams(scaledData, bestRegParam, bestIterateParam, bestUpdater, bestStepParam)

    /** //刪除模型目錄和文件 def dirDel(path: File) { if (!path.exists()) return else if (path.isFile) { path.delete() return } val file: Array[File] = path.listFiles() for (d <- file) { dirDel(d) } path.delete() } val path: File = new File("/data/home/sospdm/tmp_wy/Model/upDateLrModel.model") dirDel(path) //刪除原模型保存的文件,不刪除新模型保存會報錯 // 保存和加載模型 upDateLrModel.save(sc, "/data/home/sospdm/tmp_wy/Model/upDateLrModel.model") val loadLrModel = LogisticRegressionModel.load(sc, "/data/home/sospdm/tmp_wy/Model/upDateLrModel.model") */

    //打印模型權重值
    val newRes = upDateLrModel.weights.toArray

    val rdd1 = sc.parallelize(Seq(newRes)) //轉換成RDD


    //經過StructType直接指定每一個字段的schema
    val schema = StructType(
      List(
        StructField("feature01", DoubleType, nullable = true),
        StructField("feature02", DoubleType, nullable = true),
        StructField("feature03", DoubleType, nullable = true),
        StructField("feature04", DoubleType, nullable = true),
        StructField("feature05", DoubleType, nullable = true),
        StructField("feature06", DoubleType, nullable = true),
        StructField("feature07", DoubleType, nullable = true),
        StructField("feature08", DoubleType, nullable = true),
        StructField("feature09", DoubleType, nullable = true),
        StructField("feature10", DoubleType, nullable = true),
        StructField("feature11", DoubleType, nullable = true),
        StructField("feature12", DoubleType, nullable = true),
        StructField("feature13", DoubleType, nullable = true),
        StructField("feature14", DoubleType, nullable = true)
      )
    )

    //將RDD1映射到rowRDD,19個特徵權重值
    val resRDD = rdd1.map(f => (f(0),f(1),f(2),f(3),f(4),f(5),f(6),f(7),f(8),f(9),f(10)
      ,f(11),f(12),f(13)))

    val rowRDD = resRDD.map(x => Row(x._1.toDouble,x._2.toDouble,x._3.toDouble,
      x._4.toDouble,x._5.toDouble,x._6.toDouble,x._7.toDouble,x._8.toDouble,x._9.toDouble,
      x._10.toDouble,x._11.toDouble,x._12.toDouble,x._13.toDouble,x._14.toDouble))
    //轉換成DF,並放入表item_sim
    val df = sqlContext.createDataFrame(rowRDD, schema)

    df.createOrReplaceTempView("item_sim_c01")

    //結果寫入hive
    sqlContext.sql("drop table if exists sospdm.sosp_ml_supervip_big_order_features")
    sqlContext.sql("create table if not exists " +
      "sospdm.sosp_ml_supervip_big_order_features as select * from item_sim_c01")

    //方法2
    val rdd2 = sc.parallelize(newRes)
    val rowRdd2 = rdd2.map(p => Row(p))
    val schema2 = StructType(
      List(
        StructField("feature01", DoubleType, nullable = true)
      )
    )
    val df2 = sqlContext.createDataFrame(rowRdd2, schema2)
    df2.createOrReplaceTempView("item_sim_c02")
    //結果寫入hive
    sqlContext.sql("drop table if exists sospdm.sosp_ml_supervip_big_order_features_02")
    sqlContext.sql("create table if not exists " +
      "sospdm.sosp_ml_supervip_big_order_features_02 as select * from item_sim_c02")

    sc.stop()
  }
}
複製代碼

hive查詢結果:

hive> select * from sosp_ml_supervip_big_order_features;
OK
0.6663199754057857      -0.010171216293572719   0.0023033400349458714   0.038338481430094495
-0.01642462221720575     0.024300063006121263    0.010833461995473337    0.7449827421313793  
-0.028370767756329837    -0.01679050770672618    -0.004508776927906877   -0.01072063206632886
-0.05246889909683635     0.0167997584085957
Time taken: 0.118 seconds, Fetched: 1 row(s)
hive> select * from sosp_ml_supervip_big_order_features_02;
OK
0.6663199754057857
-0.010171216293572719
0.0023033400349458714
0.038338481430094495
-0.01642462221720575
0.024300063006121263
0.010833461995473337
0.7449827421313793
-0.028370767756329837
-0.01679050770672618
-0.004508776927906877
-0.010720632066328865
-0.05246889909683635
0.0167997584085957
Time taken: 0.095 seconds, Fetched: 14 row(s)
複製代碼
相關文章
相關標籤/搜索