在機器學習中,如何根據給定的數據集,爲算法模型擬合參數,使得模型達到最優的效果,這一過程稱爲「調參」(tuning)。
Spark的Mllib提供了CrossValidator
和TrainValidationSplit
兩種方法,來幫助實現模型的調優。
通常使用上述的兩種方法須要進行以下設置,html
setEstimator
方法指定須要調參的算法algorithm或是工做流Pipeline(Pipeline也是一種Estimator);setEstimatorParamMaps
方法指定「參數網格」(使用new ParamGridBuilder().addGrid(xxx,xxx).build()
),做爲備選的參數組合;setEvaluator
指定評價方法,用來衡量訓練好的模型在驗證集上的表現。import org.apache.spark.ml.Pipeline import org.apache.spark.ml.classification.LogisticRegression import org.apache.spark.ml.evaluation.BinaryClassificationEvaluator import org.apache.spark.ml.feature.{HashingTF, Tokenizer} import org.apache.spark.ml.linalg.Vector import org.apache.spark.ml.tuning.{CrossValidator, ParamGridBuilder} import org.apache.spark.sql.Row // Prepare training data from a list of (id, text, label) tuples. val training = spark.createDataFrame(Seq( (0L, "a b c d e spark", 1.0), (1L, "b d", 0.0), (2L, "spark f g h", 1.0), (3L, "hadoop mapreduce", 0.0), (4L, "b spark who", 1.0), (5L, "g d a y", 0.0), (6L, "spark fly", 1.0), (7L, "was mapreduce", 0.0), (8L, "e spark program", 1.0), (9L, "a e c l", 0.0), (10L, "spark compile", 1.0), (11L, "hadoop software", 0.0) )).toDF("id", "text", "label") // Configure an ML pipeline, which consists of three stages: tokenizer, hashingTF, and lr. val tokenizer = new Tokenizer() .setInputCol("text") .setOutputCol("words") val hashingTF = new HashingTF() .setInputCol(tokenizer.getOutputCol) .setOutputCol("features") val lr = new LogisticRegression() .setMaxIter(10) val pipeline = new Pipeline() .setStages(Array(tokenizer, hashingTF, lr)) // We use a ParamGridBuilder to construct a grid of parameters to search over. // With 3 values for hashingTF.numFeatures and 2 values for lr.regParam, // this grid will have 3 x 2 = 6 parameter settings for CrossValidator to choose from. val paramGrid = new ParamGridBuilder() .addGrid(hashingTF.numFeatures, Array(10, 100, 1000)) .addGrid(lr.regParam, Array(0.1, 0.01)) .build() // We now treat the Pipeline as an Estimator, wrapping it in a CrossValidator instance. // This will allow us to jointly choose parameters for all Pipeline stages. // A CrossValidator requires an Estimator, a set of Estimator ParamMaps, and an Evaluator. // Note that the evaluator here is a BinaryClassificationEvaluator and its default metric // is areaUnderROC. val cv = new CrossValidator() .setEstimator(pipeline) .setEvaluator(new BinaryClassificationEvaluator) .setEstimatorParamMaps(paramGrid) .setNumFolds(2) // Use 3+ in practice // Run cross-validation, and choose the best set of parameters. val cvModel = cv.fit(training) // Prepare test documents, which are unlabeled (id, text) tuples. val test = spark.createDataFrame(Seq( (4L, "spark i j k"), (5L, "l m n"), (6L, "mapreduce spark"), (7L, "apache hadoop") )).toDF("id", "text") // Make predictions on test documents. cvModel uses the best model found (lrModel). cvModel.transform(test) .select("id", "text", "probability", "prediction") .collect() .foreach { case Row(id: Long, text: String, prob: Vector, prediction: Double) => println(s"($id, $text) --> prob=$prob, prediction=$prediction") }
輸出算法
(4, spark i j k) --> prob=[0.12566260711357374,0.8743373928864263], prediction=1.0 (5, l m n) --> prob=[0.995215441016286,0.004784558983714038], prediction=0.0 (6, mapreduce spark) --> prob=[0.3069689523262643,0.6930310476737357], prediction=1.0 (7, apache hadoop) --> prob=[0.8040279442401395,0.1959720557598605], prediction=0.0
prob中的兩個數值分別表示預測結果爲0或1的機率,默認的閾值爲0.5。CrossValidator
會對每一種參數組合評估屢次,計算成本會比較高,所以不適合規模比較大的數據集。sql
import org.apache.spark.ml.Pipeline import org.apache.spark.ml.classification.LogisticRegression import org.apache.spark.ml.evaluation.{BinaryClassificationEvaluator, RegressionEvaluator} import org.apache.spark.ml.feature.{HashingTF, Tokenizer} import org.apache.spark.ml.linalg.Vector import org.apache.spark.ml.tuning.{ParamGridBuilder, TrainValidationSplit} import org.apache.spark.sql.{Row, SparkSession} // Prepare training and test data. val data = spark.createDataFrame(Seq( (0L, "a b c d e spark", 1.0), (1L, "b d", 0.0), (2L, "spark f g h", 1.0), (3L, "hadoop mapreduce", 0.0), (4L, "b spark who", 1.0), (5L, "g d a y", 0.0), (6L, "spark fly", 1.0), (7L, "was mapreduce", 0.0), (8L, "e spark program", 1.0), (9L, "a e c l", 0.0), (10L, "spark compile", 1.0), (11L, "hadoop software", 0.0) )).toDF("id", "text", "label") // Configure an ML pipeline, which consists of three stages: tokenizer, hashingTF, and lr. val tokenizer = new Tokenizer() .setInputCol("text") .setOutputCol("words") val hashingTF = new HashingTF() .setInputCol(tokenizer.getOutputCol) .setOutputCol("features") val lr = new LogisticRegression() .setMaxIter(10) val pipeline = new Pipeline() .setStages(Array(tokenizer, hashingTF, lr)) // We use a ParamGridBuilder to construct a grid of parameters to search over. // TrainValidationSplit will try all combinations of values and determine best model using // the evaluator. val paramGrid = new ParamGridBuilder() .addGrid(lr.regParam, Array(0.1, 0.01)) .addGrid(lr.fitIntercept) .addGrid(lr.elasticNetParam, Array(0.0, 0.5, 1.0)) .build() // In this case the estimator is a Pipeline // A TrainValidationSplit requires an Estimator, a set of Estimator ParamMaps, and an Evaluator. val trainValidationSplit = new TrainValidationSplit() .setEstimator(pipeline) .setEvaluator(new BinaryClassificationEvaluator) .setEstimatorParamMaps(paramGrid) // 80% of the data will be used for training and the remaining 20% for validation. .setTrainRatio(0.8) // Run train validation split, and choose the best set of parameters. val model = trainValidationSplit.fit(data) val test = spark.createDataFrame(Seq( (4L, "spark i j k"), (5L, "l m n"), (6L, "mapreduce spark"), (7L, "apache hadoop") )).toDF("id", "text") // Make predictions on test data. model is the model with combination of parameters // that performed best. model.transform(test) .select("id", "text", "probability", "prediction") .collect() .foreach { case Row(id: Long, text: String, prob: Vector, prediction: Double) => println(s"($id, $text) --> prob=$prob, prediction=$prediction") }
輸出apache
(4, spark i j k) --> prob=[0.26612878920913,0.73387121079087], prediction=1.0 (5, l m n) --> prob=[0.9209302389399868,0.0790697610600131], prediction=0.0 (6, mapreduce spark) --> prob=[0.4429343598469927,0.5570656401530073], prediction=1.0 (7, apache hadoop) --> prob=[0.8583692828862762,0.14163071711372377], prediction=0.0
TrainValidationSplit
將輸入數據按照setTrainRatio()
的參數切分爲訓練集和驗證集,根據模型在驗證集上的表現,找出全部的參數組合中表現最好的一組,做爲模型的最優參數。相較於CrossValidator
對每一種參數組合都會評估屢次,TrainValidationSplit只需對每一種參數組合評估一次,所以計算成本更低。可是當數據集的規模不是很大時,可能會由於數據分佈的不均勻,影響模型選擇的結果。數組