Spark MLlib



部份內容原文地址:
掘金:美圖數據團隊:從Spark MLlib到美圖機器學習框架實踐
博客園:牧夢者:Spark MLlib 機器學習 ***html



1、Spark MLlib

在 Spark 官網上展現了邏輯迴歸算法在 Spark 和 Hadoop 上運行性能比較,從下圖能夠看出 MLlib 比 MapReduce 快了 100 倍。
在這裏插入圖片描述
Spark MLlib 主要包括如下幾方面的內容:java

  • 學習算法:分類、迴歸、聚類和協同過濾;
  • 特徵處理:特徵提取、變換、降維和選擇;
  • 管道(Pipeline):用於構建、評估和調整機器學習管道的工具;
  • 持久性:保存和加載算法,模型和管道;
  • 實用工具:線性代數,統計,最優化,調參等工具。

Spark MLlib 典型流程以下:算法

  • 構造訓練數據集
  • 構建各個 Stage
  • Stage 組成 Pipeline
  • 啓動模型訓練
  • 評估模型效果
  • 計算預測結果

經過一個 Pipeline 的文本分類示例來加深理解:sql

import org.apache.spark.ml.{Pipeline, PipelineModel}
import org.apache.spark.ml.classification.LogisticRegression
import org.apache.spark.ml.feature.{HashingTF, Tokenizer}
import org.apache.spark.ml.linalg.Vector
import org.apache.spark.sql.Row

// Prepare training documents from a list of (id, text, label) tuples.
val training = spark.createDataFrame(Seq(
 (0L, "a b c d e spark", 1.0),
 (1L, "b d", 0.0),
 (2L, "spark f g h", 1.0),
 (3L, "hadoop mapreduce", 0.0)
)).toDF("id", "text", "label")

// Configure an ML pipeline, which consists of three stages: tokenizer, hashingTF, and lr.
val tokenizer = new Tokenizer()
 .setInputCol("text")
 .setOutputCol("words")
val hashingTF = new HashingTF()
 .setNumFeatures(1000)
 .setInputCol(tokenizer.getOutputCol)
 .setOutputCol("features")
val lr = new LogisticRegression()
 .setMaxIter(10)
 .setRegParam(0.001)
val pipeline = new Pipeline()
 .setStages(Array(tokenizer, hashingTF, lr))

// Fit the pipeline to training documents.
val model = pipeline.fit(training)

// Now we can optionally save the fitted pipeline to disk
model.write.overwrite().save("/tmp/spark-logistic-regression-model")

// We can also save this unfit pipeline to disk
pipeline.write.overwrite().save("/tmp/unfit-lr-model")

// And load it back in during production
val sameModel = PipelineModel.load("/tmp/spark-logistic-regression-model")

// Prepare test documents, which are unlabeled (id, text) tuples.
val test = spark.createDataFrame(Seq(
 (4L, "spark i j k"),
 (5L, "l m n"),
 (6L, "spark hadoop spark"),
 (7L, "apache hadoop")
)).toDF("id", "text")

// Make predictions on test documents.
model.transform(test)
 .select("id", "text", "probability", "prediction")
 .collect()
 .foreach { case Row(id: Long, text: String, prob: Vector, prediction: Double) =>
   println(s"($id, $text) --> prob=$prob, prediction=$prediction")
 }

模型選擇與調參

Spark MLlib 提供了 CrossValidator 和 TrainValidationSplit 兩個模型選擇和調參工具。模型選擇與調參的三個基本組件分別是 Estimator、ParamGrid 和 Evaluator,其中 Estimator 包括算法或者 Pipeline;ParamGrid 即 ParamMap 集合,提供參數搜索空間;Evaluator 即評價指標。apache

CrossValidator

CrossValidator 將數據集按照交叉驗證數切分紅 n 份,每次用 n-1 份做爲訓練集,剩餘的做爲測試集,訓練並評估模型,重複 n 次,獲得 n 個評估結果,求 n 次的平均值做爲此次交叉驗證的結果。接着對每一個候選 ParamMap 重複上面的過程,選擇最優的 ParamMap 並從新訓練模型,獲得最優參數的模型輸出。markdown

// We use a ParamGridBuilder to construct a grid of parameters to search over.
// With 3 values for hashingTF.numFeatures and 2 values for lr.regParam,
// this grid will have 3 x 2 = 6 parameter settings for CrossValidator to choose from.
val paramGrid = new ParamGridBuilder()
 .addGrid(hashingTF.numFeatures, Array(10, 100, 1000))
 .addGrid(lr.regParam, Array(0.1, 0.01))
 .build()

// We now treat the Pipeline as an Estimator, wrapping it in a CrossValidator instance.
// This will allow us to jointly choose parameters for all Pipeline stages.
// A CrossValidator requires an Estimator, a set of Estimator ParamMaps, and an Evaluator.
// Note that the evaluator here is a BinaryClassificationEvaluator and its default metric
// is areaUnderROC.
val cv = new CrossValidator()
 .setEstimator(pipeline)
 .setEvaluator(new BinaryClassificationEvaluator)
 .setEstimatorParamMaps(paramGrid)
 .setNumFolds(2)  // Use 3+ in practice
 .setParallelism(2)  // Evaluate up to 2 parameter settings in parallel

// Run cross-validation, and choose the best set of parameters.
val cvModel = cv.fit(training)

// Prepare test documents, which are unlabeled (id, text) tuples.
val test = spark.createDataFrame(Seq(
 (4L, "spark i j k"),
 (5L, "l m n"),
 (6L, "mapreduce spark"),
 (7L, "apache hadoop")
)).toDF("id", "text")

// Make predictions on test documents. cvModel uses the best model found (lrModel).
cvModel.transform(test)
 .select("id", "text", "probability", "prediction")
 .collect()
 .foreach { case Row(id: Long, text: String, prob: Vector, prediction: Double) =>
   println(s"($id, $text) --> prob=$prob, prediction=$prediction")
 }

TrainValidationSplit

TrainValidationSplit 使用 trainRatio 參數將訓練集按照比例切分紅訓練和驗證集,其中 trainRatio 比例的樣本用於訓練,剩餘樣本用於驗證。與 CrossValidator 不一樣的是,TrainValidationSplit 只有一次驗證過程,能夠簡單當作是 CrossValidator 的 n 爲 2 時的特殊版本。app

import org.apache.spark.ml.evaluation.RegressionEvaluator
import org.apache.spark.ml.regression.LinearRegression
import org.apache.spark.ml.tuning.{ParamGridBuilder, TrainValidationSplit}

// Prepare training and test data.
val data = spark.read.format("libsvm").load("data/mllib/sample_linear_regression_data.txt")
val Array(training, test) = data.randomSplit(Array(0.9, 0.1), seed = 12345)

val lr = new LinearRegression()
   .setMaxIter(10)

// We use a ParamGridBuilder to construct a grid of parameters to search over.
// TrainValidationSplit will try all combinations of values and determine best model using
// the evaluator.
val paramGrid = new ParamGridBuilder()
 .addGrid(lr.regParam, Array(0.1, 0.01))
 .addGrid(lr.fitIntercept)
 .addGrid(lr.elasticNetParam, Array(0.0, 0.5, 1.0))
 .build()

// In this case the estimator is simply the linear regression.
// A TrainValidationSplit requires an Estimator, a set of Estimator ParamMaps, and an Evaluator.
val trainValidationSplit = new TrainValidationSplit()
 .setEstimator(lr)
 .setEvaluator(new RegressionEvaluator)
 .setEstimatorParamMaps(paramGrid)
 // 80% of the data will be used for training and the remaining 20% for validation.
 .setTrainRatio(0.8)
 // Evaluate up to 2 parameter settings in parallel
 .setParallelism(2)

// Run train validation split, and choose the best set of parameters.
val model = trainValidationSplit.fit(training)

// Make predictions on test data. model is the model with combination of parameters
// that performed best.
model.transform(test)
 .select("features", "label", "prediction")
 .show()

MLlib目錄結構

在這裏插入圖片描述

MLlib處理流程

在這裏插入圖片描述

MLlib構成

兩個算法包:框架

  • spark.mllib:包含原始API,構建在RDD之上。
  • spark.ml:基於DataFrame構建的高級API。

spark.ml具有更優的性能和更好的擴展性,建議優先選用。
spark.mllib相較於spark.ml包含更多的算法。dom

數據類型(Data Type)

向量:帶類別的向量,矩陣等。機器學習

數學統計計算庫

基本統計量(min,max,average等),相關分析,隨機數產生器,假設檢驗等。

機器學習管道(pipeline)

Transformer、Estimator、Parameter。

機器學習算法

分類算法、迴歸算法、聚類算法、協同過濾。

2、Spark MLlib算法庫

Spark MLlib算法庫主要包含兩類算法:分類算法與迴歸算法。

2.1 推薦算法(AlterNating Least Squares)(ALS)

協同過濾(Collaborative Filtering,簡稱CF)推薦算法,CF的基本思想是根據用戶以前的喜愛以及其餘興趣相近的用戶的選擇來給用戶推薦物品。

CF推薦算法分類:

  • User-based:基於用戶對物品的偏好找到相鄰鄰居用戶,而後將鄰居用戶喜愛的推薦給當前用戶。
  • Item-based:基於用戶對物品的偏好找到類似的物品,而後根據用戶的歷史偏好,推薦類似的物品給他。

2.2 ALS:Scala

import org.apache.spark.ml.evalution.RegressionEvaluator
import org.apache.soark.ml.recommendation.ALS

case class Rating(userId:Int,movieId:Int,rating:Float,timestamp:Long)

def parseRating(str: String): Rating = {
	val fields = str.split("::")
	assert(fields.size == 4)
	Rating(fields(0).toInt,fields(1).toInt,fields(2).toFloat,fields(3).toLong)
}

val ratings = spark.read.textFile("data/mllib/als/sample_movicelens_ratings.txt")
	.map(parseRating)
	,toDF()

val Array(training.test) = ratings.randomSplit(Array(0.8,0.2))

val als = new ALS()
	.setMaxIter(5)
	.setRegParam(0.01)
	.setUserCol("userId")
	.setItemCOl("movieId")
	.setRatingCol("rating")

val model = als.fit(training)

val predictions = model.transform(test)

val evaluator = new RegressionEvaluator()
	.setMetricName("rmse")
	.setLabelCol("rating")
	.setPredicationCol("prediction")

val rmse = evaluator.evaluate(predictions)

println(s"Root-mean-squar error = $rmse")
相關文章
相關標籤/搜索