MLlib 是 Apache Spark 的可擴展機器學習庫,旨在簡化機器學習的工程實踐工做,並方便擴展到更大規模的數據集。git
在深刻介紹 Spark MLlib 以前先了解機器學習,根據維基百科的介紹,機器學習有下面幾種定義:
github
機器學習是一門人工智能的科學,該領域的主要研究對象是人工智能,特別是如何在經驗學習中改善具體算法的性能;算法
機器學習是對能經過經驗自動改進的計算機算法的研究;sql
機器學習是用數據或以往的經驗,以此優化計算機程序的性能標準;apache
一種常常引用的英文定義是「A computer program is said to learn from experience E with respect to some class of tasks T and performance measure P, if its performance at tasks in T, as measured by P, improves with experience E.」。api
其實在「美圖數據技術團隊」以前的科普文章貝葉斯機率模型一覽曾介紹過,機器學習狹義上是指代統計機器學習,統計學習根據任務類型能夠分爲監督學習、半監督學習、無監督學習、加強學習等。性能優化
機器學習經常使用的算法能夠分爲如下種類:bash
1.構造間隔理論分佈:人工神經網絡、決策樹、感知器、支持向量機、集成學習 AdaBoost、降維與度量學習、聚類、貝葉斯分類器;
2.構造條件機率: 高斯過程迴歸、線性判別分析、最近鄰居法、徑向基函數核;
3.經過再生模型構造機率密度函數: 最大指望算法、機率圖模型(貝葉斯網和 Markov 隨機場)、Generative Topographic Mapping;
4.近似推斷技術: 馬爾可夫鏈、蒙特卡羅方法、變分法;
5.最優化算法。
在上文咱們曾提到機器學習的重點之一是「經驗」,而對於計算機而言經驗每每須要通過多輪迭代計算才能獲得,而 Spark 擅長迭代計算,正好符合機器學習這一特性。在 Spark 官網上展現了邏輯迴歸算法在 Spark 和 Hadoop 上運行性能比較,從下圖能夠看出 MLlib 比 MapReduce 快了 100 倍。網絡
Spark MLlib 主要包括如下幾方面的內容:數據結構
學習算法:分類、迴歸、聚類和協同過濾;
特徵處理:特徵提取、變換、降維和選擇;
管道(Pipeline):用於構建、評估和調整機器學習管道的工具;
持久性:保存和加載算法,模型和管道;
實用工具:線性代數,統計,最優化,調參等工具。
上表總結了 Spark MLlib 支持的功能結構,能夠看出它所提供的算法豐富,但算法種類較少而且老舊,所以 Spark MLlib 在算法上支持與 kylin 項目有些脫節,它的主要功能更可能是與特徵相關的。
從 Spark 2.0 開始基於 RDD 的 API 進入維護模式,Spark 的主要機器學習 API 如今是基於 DataFrame 的 API spark.ml,借鑑 Scikit-Learn 的設計提供了 Pipeline 套件,以構建機器學習工做流。 ML Pipelines 提供了一套基於 DataFrame 構建的統一的高級 API ,可幫助用戶建立和調整實用的機器學習流程。
*「Spark ML」不是官方名稱,偶爾用於指代基於 MLlib DataFrame 的 API
首先了解 ML Pipelines 內幾個重要組件。
DataFrame 讓 Spark 具有了處理大規模結構化數據的能力。
RDD 是分佈式 Java 對象的集合,對象的內部數據結構對於 RDD 而言不可知。DataFrame 是一種以 RDD 爲基礎的分佈式數據集,RDD 中存儲了 Row 對象,Row 對象提供了詳細的結構信息,即模式(schema),使得 DataFrame 具有告終構化數據的能力。
Transformer 一般是一個數據/特徵變換的類,或一個訓練好的模型。
每一個 Transformer 都有 transform 函數,用於將一個 DataFrame 轉換爲另外一個 DataFrame 。通常 transform 的過程是在輸入的 DataFrame 上添加一列或者多列 ,Transformer.transform也是惰性執行,只會生成新的 DataFrame 變量,而不會去提交 job 計算 DataFrame 中的內容。
Estimator 抽象了從輸入數據學習模型的過程,每一個 Estimator 都實現了 fit 方法,用於給定 DataFrame 和 Params 後,生成一個 Transformer(即訓練好的模型),每當調用 Estimator.fit() 後,都會產生 job 去訓練模型,獲得模型參數。
能夠經過設置 Transformer 或 Estimator 實例的參數來設置模型參數,也能夠經過傳入 ParamMap 對象來設置模型參數。
Pipeline 定義了一組數據處理流程,能夠在 Pipeline 中加入 Transformer、Estimator 或另外一個 Pipeline。Pipeline 繼承自 Estimator,調用 Pipeline.fit 方法後返回一個 Transformer——PipelineModel;PipelineModel 繼承自 Transformer,用於將輸入通過 Pipeline 的各個 Transformer 的變換後,獲得最終輸出。
Spark MLlib 典型流程以下:
構造訓練數據集
構建各個 Stage
Stage 組成 Pipeline
啓動模型訓練
評估模型效果
計算預測結果
經過一個 Pipeline 的文本分類示例來加深理解:
import org.apache.spark.ml.{Pipeline, PipelineModel}
import org.apache.spark.ml.classification.LogisticRegression
import org.apache.spark.ml.feature.{HashingTF, Tokenizer}
import org.apache.spark.ml.linalg.Vector
import org.apache.spark.sql.Row
// Prepare training documents from a list of (id, text, label) tuples.
val training = spark.createDataFrame(Seq(
(0L, "a b c d e spark", 1.0),
(1L, "b d", 0.0),
(2L, "spark f g h", 1.0),
(3L, "hadoop mapreduce", 0.0)
)).toDF("id", "text", "label")
// Configure an ML pipeline, which consists of three stages: tokenizer, hashingTF, and lr.
val tokenizer = new Tokenizer()
.setInputCol("text")
.setOutputCol("words")
val hashingTF = new HashingTF()
.setNumFeatures(1000)
.setInputCol(tokenizer.getOutputCol)
.setOutputCol("features")
val lr = new LogisticRegression()
.setMaxIter(10)
.setRegParam(0.001)
val pipeline = new Pipeline()
.setStages(Array(tokenizer, hashingTF, lr))
// Fit the pipeline to training documents.
val model = pipeline.fit(training)
// Now we can optionally save the fitted pipeline to disk
model.write.overwrite().save("/tmp/spark-logistic-regression-model")
// We can also save this unfit pipeline to disk
pipeline.write.overwrite().save("/tmp/unfit-lr-model")
// And load it back in during production
val sameModel = PipelineModel.load("/tmp/spark-logistic-regression-model")
// Prepare test documents, which are unlabeled (id, text) tuples.
val test = spark.createDataFrame(Seq(
(4L, "spark i j k"),
(5L, "l m n"),
(6L, "spark hadoop spark"),
(7L, "apache hadoop")
)).toDF("id", "text")
// Make predictions on test documents.
model.transform(test)
.select("id", "text", "probability", "prediction")
.collect()
.foreach { case Row(id: Long, text: String, prob: Vector, prediction: Double) =>
println(s"($id, $text) --> prob=$prob, prediction=$prediction")
}複製代碼
Spark MLlib 提供了 CrossValidator 和 TrainValidationSplit 兩個模型選擇和調參工具。模型選擇與調參的三個基本組件分別是 Estimator、ParamGrid 和 Evaluator,其中 Estimator 包括算法或者 Pipeline;ParamGrid 即 ParamMap 集合,提供參數搜索空間;Evaluator 即評價指標。
via https://github.com/JerryLead/blogs/blob/master/BigDataSystems/Spark/ML/Introduction%20to%20MLlib%20Pipeline.md
CrossValidator 將數據集按照交叉驗證數切分紅 n 份,每次用 n-1 份做爲訓練集,剩餘的做爲測試集,訓練並評估模型,重複 n 次,獲得 n 個評估結果,求 n 次的平均值做爲此次交叉驗證的結果。接着對每一個候選 ParamMap 重複上面的過程,選擇最優的 ParamMap 並從新訓練模型,獲得最優參數的模型輸出。
🌰舉個例子:
// We use a ParamGridBuilder to construct a grid of parameters to search over.
// With 3 values for hashingTF.numFeatures and 2 values for lr.regParam,
// this grid will have 3 x 2 = 6 parameter settings for CrossValidator to choose from.
val paramGrid = new ParamGridBuilder()
.addGrid(hashingTF.numFeatures, Array(10, 100, 1000))
.addGrid(lr.regParam, Array(0.1, 0.01))
.build()
// We now treat the Pipeline as an Estimator, wrapping it in a CrossValidator instance.
// This will allow us to jointly choose parameters for all Pipeline stages.
// A CrossValidator requires an Estimator, a set of Estimator ParamMaps, and an Evaluator.
// Note that the evaluator here is a BinaryClassificationEvaluator and its default metric
// is areaUnderROC.
val cv = new CrossValidator()
.setEstimator(pipeline)
.setEvaluator(new BinaryClassificationEvaluator)
.setEstimatorParamMaps(paramGrid)
.setNumFolds(2) // Use 3+ in practice
.setParallelism(2) // Evaluate up to 2 parameter settings in parallel
// Run cross-validation, and choose the best set of parameters.
val cvModel = cv.fit(training)
// Prepare test documents, which are unlabeled (id, text) tuples.
val test = spark.createDataFrame(Seq(
(4L, "spark i j k"),
(5L, "l m n"),
(6L, "mapreduce spark"),
(7L, "apache hadoop")
)).toDF("id", "text")
// Make predictions on test documents. cvModel uses the best model found (lrModel).
cvModel.transform(test)
.select("id", "text", "probability", "prediction")
.collect()
.foreach { case Row(id: Long, text: String, prob: Vector, prediction: Double) =>
println(s"($id, $text) --> prob=$prob, prediction=$prediction")
}複製代碼
TrainValidationSplit 使用 trainRatio 參數將訓練集按照比例切分紅訓練和驗證集,其中 trainRatio 比例的樣本用於訓練,剩餘樣本用於驗證。
與 CrossValidator 不一樣的是,TrainValidationSplit 只有一次驗證過程,能夠簡單當作是 CrossValidator 的 n 爲 2 時的特殊版本。
🌰舉個例子:
import org.apache.spark.ml.evaluation.RegressionEvaluator
import org.apache.spark.ml.regression.LinearRegression
import org.apache.spark.ml.tuning.{ParamGridBuilder, TrainValidationSplit}
// Prepare training and test data.
val data = spark.read.format("libsvm").load("data/mllib/sample_linear_regression_data.txt")
val Array(training, test) = data.randomSplit(Array(0.9, 0.1), seed = 12345)
val lr = new LinearRegression()
.setMaxIter(10)
// We use a ParamGridBuilder to construct a grid of parameters to search over.
// TrainValidationSplit will try all combinations of values and determine best model using
// the evaluator.
val paramGrid = new ParamGridBuilder()
.addGrid(lr.regParam, Array(0.1, 0.01))
.addGrid(lr.fitIntercept)
.addGrid(lr.elasticNetParam, Array(0.0, 0.5, 1.0))
.build()
// In this case the estimator is simply the linear regression.
// A TrainValidationSplit requires an Estimator, a set of Estimator ParamMaps, and an Evaluator.
val trainValidationSplit = new TrainValidationSplit()
.setEstimator(lr)
.setEvaluator(new RegressionEvaluator)
.setEstimatorParamMaps(paramGrid)
// 80% of the data will be used for training and the remaining 20% for validation.
.setTrainRatio(0.8)
// Evaluate up to 2 parameter settings in parallel
.setParallelism(2)
// Run train validation split, and choose the best set of parameters.
val model = trainValidationSplit.fit(training)
// Make predictions on test data. model is the model with combination of parameters
// that performed best.
model.transform(test)
.select("features", "label", "prediction")
.show()複製代碼
繼承自 Transformer 類,實現 transform 方法,一般是在輸入的 DataFrame 上添加一列或多列。
對於單輸入列,單輸出列的 Transformer 能夠繼承自 UnaryTransformer 類,並實現其中的 createTransformFunc 方法,實現對輸入列每一行的處理,並返回相應的輸出。
機器學習技術突飛猛進,卻缺乏高效靈活的框架下降新技術的調研成本,而經驗與技術每每須要經過框架和工具來沉澱,而且算法人員經常受限於算力,致使離線證實有效的模型,由於預估時間複雜度太高而沒法上線。
據此美圖數據技術團隊以「開發簡單靈活的機器學習工做流,下降算法人員的新算法調研成本及工程人員的維護成本,而且提供經常使用的領域內解決方案,將經驗沉澱」的目標搭建了一套量身定製的機器學習框架用以解決上述問題,尤爲是解決在推薦算法相關任務上遇到的問題。該框架總共包括 3 個組件:Spark Feature、Bamboo 與 Online Scorer。
該組件主要用於訓練樣本的生產,實現了靈活高效的樣本特徵編碼,能夠實現將任意特徵集合放在同一個空間進行編碼,不一樣特徵集合共享編碼空間;爲此咱們提出了兩個概念:第一個是「域」,用於定義共享相同建模過程的一組特徵;第二個是「空間」,用於定義共享相同編碼空間的一組域。
上圖示例中的「Old」展現了在沒有「域」和「空間」概念下的樣本特徵編碼,全部特徵從 1 開始編號;「New」展現了將 age 和 gender 分別放到 age 域和 gender 域後,兩個域分別從 1 開始編碼,互不影響。
Spark Feature 最終採用 TFRecords 做爲訓練樣本的存儲格式。
該組件主要爲了實現可擴展、高效、簡單快速的模型定義與訓練。爲此,在設計 Bamboo 時咱們遵循如下原則:
1.layer 之間經過 tensor 進行交互,layer 的輸入是 tensor,輸出也是 tensor;
2.爲了最大限度地提升離線與在線效率,沒有采用太多高級 api,如 keras,大多數模型與組件基於 Tensorflow 底層 api 開發,而且根據 Tensorflow 官方的性能優化指南對代碼進行優化;
3.提供 online-offline 的建模框架,複雜計算放到離線,在線只進行輕量計算,使得複雜模型更易上線;
4.封裝數據加載、模型訓練與導出、效果評估以及提供了各類輔助工具,用戶只須要定義前向推理網絡,同時封裝了大量的經常使用 layer,模型定義更快捷。
Online Scorer的目標是提供一個統一,高效的在線推理服務,能夠同時支持tensorflow,pytorch,xgboost等各類主流建模框架導出的模型。目前這塊工做還在進行中,具體實現方案細節,咱們放到後面的專題文章介紹。
以上就是美圖自研機器學習框架的簡要介紹,歡迎持續關注「美圖數據技術團隊」,後續將帶來該平臺的詳細介紹。