部份內容原文地址:
掘金:美圖數據團隊:從Spark MLlib到美圖機器學習框架實踐
博客園:牧夢者:Spark MLlib 機器學習 ***html
在 Spark 官網上展現了邏輯迴歸算法在 Spark 和 Hadoop 上運行性能比較,從下圖能夠看出 MLlib 比 MapReduce 快了 100 倍。
Spark MLlib 主要包括如下幾方面的內容:java
Spark MLlib 典型流程以下:算法
經過一個 Pipeline 的文本分類示例來加深理解:sql
import org.apache.spark.ml.{Pipeline, PipelineModel} import org.apache.spark.ml.classification.LogisticRegression import org.apache.spark.ml.feature.{HashingTF, Tokenizer} import org.apache.spark.ml.linalg.Vector import org.apache.spark.sql.Row // Prepare training documents from a list of (id, text, label) tuples. val training = spark.createDataFrame(Seq( (0L, "a b c d e spark", 1.0), (1L, "b d", 0.0), (2L, "spark f g h", 1.0), (3L, "hadoop mapreduce", 0.0) )).toDF("id", "text", "label") // Configure an ML pipeline, which consists of three stages: tokenizer, hashingTF, and lr. val tokenizer = new Tokenizer() .setInputCol("text") .setOutputCol("words") val hashingTF = new HashingTF() .setNumFeatures(1000) .setInputCol(tokenizer.getOutputCol) .setOutputCol("features") val lr = new LogisticRegression() .setMaxIter(10) .setRegParam(0.001) val pipeline = new Pipeline() .setStages(Array(tokenizer, hashingTF, lr)) // Fit the pipeline to training documents. val model = pipeline.fit(training) // Now we can optionally save the fitted pipeline to disk model.write.overwrite().save("/tmp/spark-logistic-regression-model") // We can also save this unfit pipeline to disk pipeline.write.overwrite().save("/tmp/unfit-lr-model") // And load it back in during production val sameModel = PipelineModel.load("/tmp/spark-logistic-regression-model") // Prepare test documents, which are unlabeled (id, text) tuples. val test = spark.createDataFrame(Seq( (4L, "spark i j k"), (5L, "l m n"), (6L, "spark hadoop spark"), (7L, "apache hadoop") )).toDF("id", "text") // Make predictions on test documents. model.transform(test) .select("id", "text", "probability", "prediction") .collect() .foreach { case Row(id: Long, text: String, prob: Vector, prediction: Double) => println(s"($id, $text) --> prob=$prob, prediction=$prediction") }
Spark MLlib 提供了 CrossValidator 和 TrainValidationSplit 兩個模型選擇和調參工具。模型選擇與調參的三個基本組件分別是 Estimator、ParamGrid 和 Evaluator,其中 Estimator 包括算法或者 Pipeline;ParamGrid 即 ParamMap 集合,提供參數搜索空間;Evaluator 即評價指標。apache
CrossValidator 將數據集按照交叉驗證數切分紅 n 份,每次用 n-1 份做爲訓練集,剩餘的做爲測試集,訓練並評估模型,重複 n 次,獲得 n 個評估結果,求 n 次的平均值做爲此次交叉驗證的結果。接着對每一個候選 ParamMap 重複上面的過程,選擇最優的 ParamMap 並從新訓練模型,獲得最優參數的模型輸出。markdown
// We use a ParamGridBuilder to construct a grid of parameters to search over. // With 3 values for hashingTF.numFeatures and 2 values for lr.regParam, // this grid will have 3 x 2 = 6 parameter settings for CrossValidator to choose from. val paramGrid = new ParamGridBuilder() .addGrid(hashingTF.numFeatures, Array(10, 100, 1000)) .addGrid(lr.regParam, Array(0.1, 0.01)) .build() // We now treat the Pipeline as an Estimator, wrapping it in a CrossValidator instance. // This will allow us to jointly choose parameters for all Pipeline stages. // A CrossValidator requires an Estimator, a set of Estimator ParamMaps, and an Evaluator. // Note that the evaluator here is a BinaryClassificationEvaluator and its default metric // is areaUnderROC. val cv = new CrossValidator() .setEstimator(pipeline) .setEvaluator(new BinaryClassificationEvaluator) .setEstimatorParamMaps(paramGrid) .setNumFolds(2) // Use 3+ in practice .setParallelism(2) // Evaluate up to 2 parameter settings in parallel // Run cross-validation, and choose the best set of parameters. val cvModel = cv.fit(training) // Prepare test documents, which are unlabeled (id, text) tuples. val test = spark.createDataFrame(Seq( (4L, "spark i j k"), (5L, "l m n"), (6L, "mapreduce spark"), (7L, "apache hadoop") )).toDF("id", "text") // Make predictions on test documents. cvModel uses the best model found (lrModel). cvModel.transform(test) .select("id", "text", "probability", "prediction") .collect() .foreach { case Row(id: Long, text: String, prob: Vector, prediction: Double) => println(s"($id, $text) --> prob=$prob, prediction=$prediction") }
TrainValidationSplit 使用 trainRatio 參數將訓練集按照比例切分紅訓練和驗證集,其中 trainRatio 比例的樣本用於訓練,剩餘樣本用於驗證。與 CrossValidator 不一樣的是,TrainValidationSplit 只有一次驗證過程,能夠簡單當作是 CrossValidator 的 n 爲 2 時的特殊版本。app
import org.apache.spark.ml.evaluation.RegressionEvaluator import org.apache.spark.ml.regression.LinearRegression import org.apache.spark.ml.tuning.{ParamGridBuilder, TrainValidationSplit} // Prepare training and test data. val data = spark.read.format("libsvm").load("data/mllib/sample_linear_regression_data.txt") val Array(training, test) = data.randomSplit(Array(0.9, 0.1), seed = 12345) val lr = new LinearRegression() .setMaxIter(10) // We use a ParamGridBuilder to construct a grid of parameters to search over. // TrainValidationSplit will try all combinations of values and determine best model using // the evaluator. val paramGrid = new ParamGridBuilder() .addGrid(lr.regParam, Array(0.1, 0.01)) .addGrid(lr.fitIntercept) .addGrid(lr.elasticNetParam, Array(0.0, 0.5, 1.0)) .build() // In this case the estimator is simply the linear regression. // A TrainValidationSplit requires an Estimator, a set of Estimator ParamMaps, and an Evaluator. val trainValidationSplit = new TrainValidationSplit() .setEstimator(lr) .setEvaluator(new RegressionEvaluator) .setEstimatorParamMaps(paramGrid) // 80% of the data will be used for training and the remaining 20% for validation. .setTrainRatio(0.8) // Evaluate up to 2 parameter settings in parallel .setParallelism(2) // Run train validation split, and choose the best set of parameters. val model = trainValidationSplit.fit(training) // Make predictions on test data. model is the model with combination of parameters // that performed best. model.transform(test) .select("features", "label", "prediction") .show()
兩個算法包:框架
spark.ml具有更優的性能和更好的擴展性,建議優先選用。
spark.mllib相較於spark.ml包含更多的算法。dom
向量:帶類別的向量,矩陣等。機器學習
基本統計量(min,max,average等),相關分析,隨機數產生器,假設檢驗等。
Transformer、Estimator、Parameter。
分類算法、迴歸算法、聚類算法、協同過濾。
2、Spark MLlib算法庫Spark MLlib算法庫主要包含兩類算法:分類算法與迴歸算法。
協同過濾(Collaborative Filtering,簡稱CF)推薦算法,CF的基本思想是根據用戶以前的喜愛以及其餘興趣相近的用戶的選擇來給用戶推薦物品。
CF推薦算法分類:
import org.apache.spark.ml.evalution.RegressionEvaluator import org.apache.soark.ml.recommendation.ALS case class Rating(userId:Int,movieId:Int,rating:Float,timestamp:Long) def parseRating(str: String): Rating = { val fields = str.split("::") assert(fields.size == 4) Rating(fields(0).toInt,fields(1).toInt,fields(2).toFloat,fields(3).toLong) } val ratings = spark.read.textFile("data/mllib/als/sample_movicelens_ratings.txt") .map(parseRating) ,toDF() val Array(training.test) = ratings.randomSplit(Array(0.8,0.2)) val als = new ALS() .setMaxIter(5) .setRegParam(0.01) .setUserCol("userId") .setItemCOl("movieId") .setRatingCol("rating") val model = als.fit(training) val predictions = model.transform(test) val evaluator = new RegressionEvaluator() .setMetricName("rmse") .setLabelCol("rating") .setPredicationCol("prediction") val rmse = evaluator.evaluate(predictions) println(s"Root-mean-squar error = $rmse")