object RunRF {sql
def main(args: Array[String]) {數組
val sparkConf = new SparkConf().setAppName("rf")dom
val sc = new SparkContext(sparkConf)測試
//讀取數據lua
val rawData = sc.textFile("hdfs://192.168.1.64:8020/test/mllib/v3.csv")spa
val data = rawData.map{ line =>.net
val values = line.split(",").map(_.toDouble)rest
//init返回除了最後一個元素的全部元素,做爲特徵向量orm
//Vectors.dense向量化,dense密集型ip
val feature = Vectors.dense(values.init)
val label = values.last
LabeledPoint(label, feature)
}
//訓練集、交叉驗證集和測試集,各佔80%,10%和10%
//10%的交叉驗證數據集的做用是肯定在訓練數據集上訓練出來的模型的最好參數
//測試數據集的做用是評估CV數據集的最好參數
val Array(trainData, cvData, testData) = data.randomSplit(Array(0.8, 0.1, 0.1))
trainData.cache()
cvData.cache()
testData.cache()
//構建隨機森林
val model = RandomForest.trainClassifier(trainData, 2, Map[Int, Int](), 20, "auto", "gini", 4, 32)
val metrics = getMetrics(model, cvData)
println("-----------------------------------------confusionMatrix-----------------------------------------------------")
//混淆矩陣和模型精確率
println(metrics.confusionMatrix)
println("---------------------------------------------precision-------------------------------------------------")
println(metrics.precision)
println("-----------------------------------------(precision,recall)---------------------------------------------------")
//每一個類別對應的精確率與召回率
(0 until 2).map(target => (metrics.precision(target), metrics.recall(target))).foreach(println)
//保存模型
model.save(sc,"hdfs://192.168.1.64:8020/tmp/RFModel")
}
/**
* @param model 隨機森林模型
* @param data 用於交叉驗證的數據集
* */
def getMetrics(model: RandomForestModel, data: RDD[LabeledPoint]): MulticlassMetrics = {
//將交叉驗證數據集的每一個樣本的特徵向量交給模型預測,並和本來正確的目標特徵組成一個tuple
val predictionsAndLables = data.map { d =>
(model.predict(d.features), d.label)
}
//將結果交給MulticlassMetrics,其能夠以不一樣的方式計算分配器預測的質量
new MulticlassMetrics(predictionsAndLables)
}
/**
* 在訓練數據集上獲得最好的參數組合
* @param trainData 訓練數據集
* @param cvData 交叉驗證數據集
* */
def getBestParam(trainData: RDD[LabeledPoint], cvData: RDD[LabeledPoint]): Unit = {
val evaluations = for (impurity <- Array("gini", "entropy");
depth <- Array(1, 20);
bins <- Array(10, 300)) yield {
val model = RandomForest.trainClassifier(trainData, 2, Map[Int, Int](), 20, "auto", impurity, depth, bins)
// 2:classes
// 20: numTrees
// auto:subSampleStratry
val metrics = getMetrics(model, cvData)
((impurity, depth, bins), metrics.precision)
}
evaluations.sortBy(_._2).reverse.foreach(println)
}
/**
* 模擬對新數據進行預測1
*/
val rawData = sc.textFile("hdfs://192.168.1.64:8020/test/mllib/v3.csv")
val pdata = rawData.map{ line =>
val values = line.split(",").map(_.toDouble)
//轉化爲向量並去掉標籤(init去掉最後一個元素,即去掉標籤)
val feature = Vectors.dense(values.init)
feature
}
//讀取模型
val rfModel = RandomForestModel.load(sc,"hdfs://192.168.1.64:8020/tmp/RFModel")
//進行預測
val preLabel = rfModel.predict(pdata)
preLabel.take(10)
/**
* 模擬對新數據進行預測2
*
*/
val dataAndPreLable = rawData.map{ line =>
//轉化爲向量並去掉標籤(init去掉最後一個元素,即去掉標籤)
val vecData = Vectors.dense(line.split(",").map(_.toDouble).init)
val preLabel = rfModel.predict(vecData)
line + "," + preLabel
}//.saveAsTextFile("....")
dataAndPreLable.take(10)
}
val hc = new HiveContext(sc)
import hc.implicits._
// 調用HiveContext
// 取樣本,樣本的第一列爲label(0或者1),其餘列多是姓名,手機號,以及真正要參與訓練的特徵columns
val data = hc.sql(s"""select * from database1.traindata_userprofile""".stripMargin)
//提取schema,也就是表的column name,drop(2)刪掉1,2列,只保留特徵列
val schema = data.schema.map(f=>s"${f.name}").drop(2)
//ML的VectorAssembler是一個transformer,要求數據類型不能是string,將多列數據轉化爲單列的向量列,好比把age、income等等字段列合併成一個 userFea 向量列,方便後續訓練
val assembler = new VectorAssembler().setInputCols(schema.toArray).setOutputCol("userFea")
val userProfile = assembler.transform(data.na.fill(-1e9)).select("label","userFea")
val data_train = userProfile.na.fill(-1e9)
// 取訓練樣本
val labelIndexer = new StringIndexer().setInputCol("label").setOutputCol("indexedLabel").fit(userProfile)
val featureIndexer = new VectorIndexer().setInputCol("userFea").setOutputCol("indexedFeatures").setMaxCategories(4).fit(userProfile)
// Split the data into training and test sets (30% held out for testing).
val Array(trainingData, testData) = userProfile.randomSplit(Array(0.7, 0.3))
// Train a RandomForest model.
val rf = new RandomForestClassifier().setLabelCol("indexedLabel").setFeaturesCol("indexedFeatures")
rf.setMaxBins(32).setMaxDepth(6).setNumTrees(90).setMinInstancesPerNode(4).setImpurity("gini")
// Convert indexed labels back to original labels.
val labelConverter = new IndexToString().setInputCol("prediction").setOutputCol("predictedLabel").setLabels(labelIndexer.labels)
val pipeline = new Pipeline().setStages(Array(labelIndexer, featureIndexer, rf, labelConverter))
// Train model. This also runs the indexers.
val model = pipeline.fit(trainingData)
println("training finished!!!!")
// Make predictions.
val predictions = model.transform(testData)
// Select example rows to display.
predictions.select("predictedLabel", "indexedLabel", "indexedFeatures").show(5)
val evaluator = new MulticlassClassificationEvaluator().setLabelCol("indexedLabel").setPredictionCol("prediction").setMetricName("accuracy")
val accuracy = evaluator.evaluate(predictions)
println("Test Error = " + (1.0 - accuracy))