Spark MLlib框架詳解

1. 概述

1.1 功能

  MLlib是Spark的機器學習(machine learing)庫,其目標是使得機器學習的使用更加方便和簡單,其具備以下功能:html

  • ML算法:經常使用的學習算法,包括分類、迴歸、聚類和過濾;
  • 特徵:特徵萃取、轉換、降維和選取;
  • Pipelines:其是一個工具,目標是用於構建、測量和調節;
  • 使用工具:包括線性代數、統計學習和數據操做等等。

1.2 API架包

MLlib有兩個API架包:算法

  1) Spark.mllib:基於RDDAPI包,在spark 2.0時已經進入維護模型
  2) Spark.ml:基於DataFrameAPI包,目前Spark官方首推使用該包。

2. Pipelines Components

MLlib標準化機器學習算法的API,使得更容易將多個算法組合成到單個管道(工做流)。其設計思想是受到Scikit-learn項目的啓發。sql

  • DataFrameMLlib的數據使用Spark SQL中的DataFrame結構來存儲,即用戶的數據集和模型的輸出標籤都是以此結構存儲,包括Pipeline內部數據的傳輸都是以此結構存儲;
  • TransformerMLlib將算法模型用Transformer結構來表示,其以一個DataFrame數據做爲輸入,經過模型計算後轉換爲一個DataFrame數據;
  • EstimatorEstimator結構也表示一種算法,但其以一個DataFrame數據做爲輸入,經過模型計算後轉換爲一個Transformer對象,而不是DataFrame數據;
  • PipelineMLlib使用Pipeline來組織多個ML模型,即其內部有多個TransformerEstimator對象,從而組成一個算法工做流;
  • ParameterMLlib使用Parameter結構來存儲參數,用戶經過這些參數來配置和調節模型。即在一個Pipeline對象內的全部TransformerEstimator對象都共享一個Parameter對象。

2.1 DataFrame

  機器學習中數據集是由一個個樣本組成,而每一個樣本實際上是一條有多個特徵組成的記錄,從而數據集實際上是一個矩陣結構。而Spark SQL中的DataFrame結構也擁有相似的結構,DataFrame內部有一行行的數據Row組成,每一個Row對象內部也能夠由多個屬性組成。從而MLlib使用DataFrame來描述機器學習中的數據集正好不過了。apache

  Spark SQL的DataFrame其實一種Dataset類型,只是存儲的是Row元素,以下Spark源碼所示:數組

Package object sql{ app

……框架

type DataFrame = Dataset[Row] 機器學習

}ide

 

2.2 Pipeline

  MLlib使用Pipeline來組織多個ML模型,其內部有多個Transformer和Estimator對象,從而組成一個算法工做流。在Spark ML中與Pipeline相關聯的類如圖 1所示。從圖中可明顯看出Transformer和Estimator都是PipelineStage抽象類的子類;而且Pipeline類內部有一個stages數組來存儲PipelineStage對象,即存放Transformer和Estimator對象;當用戶調用Pipeline的fit()方法時,將產生一個PipelineModel對象;PipelineModel類有一個transform()方法能返回一個DataFrame對象。工具

 

圖 1  

3. 工做機制

  Pipeline是由一系列stage組成,這些stage有兩種類型:Transformer和Estimator。Stage在Pipeline的運行是有序的,並且輸入的DataFrame會在stage中被轉換和傳遞。若stage是Transformer類型,則對條用Transformer對象的transform()方法將輸入的DataFrame轉換爲另外一種DataFrame;若stage是Estimator類型,則會調用Estimator對象的fit()方法產生Transformer對象,調用該Transformer對象的transform()方法同樣會產生一個DataFrame。

能夠將上述這一段,詳細解釋爲兩個過程:模型訓練和模型預測,以下所示:

3.1 模型訓練

  Pipeline對象內部有一個stages容器,存放多個Transformer對象和一個Estimator對象。當用戶調用Pipeline對象的fit()方法時,會接收輸入的DataFrame,而後在這些stage中被轉換和傳遞。當傳遞到最後一個stage(Estimator對象)時,將生成一個PipelineModel對象(Transformer子類),如圖 2所示。

 

圖 2

  用戶調用上圖中Pipeline的fit()時,會將stages容器存放的全部Transformer對象和Estimator對象生成的Transformer對象都添加到PipelineModel對象中,該對象有一個stages容器(Array[Transformer]類型),其可以存放Transformer對象。

    經過Spark源碼,能夠查看Pipeline類中的fit()內容以下所示:

override def fit(dataset: Dataset[_]): PipelineModel = {

transformSchema(dataset.schema, logging = true)

val theStages = $(stages)

var curDataset = dataset

val transformers = ListBuffer.empty[Transformer]

theStages.view.zipWithIndex.foreach { case (stage, index) =>

if (index <= indexOfLastEstimator) {

val transformer = stage match {

case estimator: Estimator[_] =>//如果Estimator對象,則調用fit()方法生成一個Transformer

estimator.fit(curDataset)

case t: Transformer =>//如果Transformer對象,則直接返回

t

case _ =>

throw new IllegalArgumentException(

s"Does not support stage $stage of type ${stage.getClass}")

}

if (index < indexOfLastEstimator) {

curDataset = transformer.transform(curDataset)//若是不是最後的對象,則調用transformer對象的transform方法,生成一個DataFrame

}

transformers += transformer //將生成的全部Transformer對象都添加到一個list

} else {

transformers += stage.asInstanceOf[Transformer]

}

}

 new PipelineModel(uid, transformers.toArray).setParent(this) //最後建立PipelineModel對象,並傳遞上述的Transformer列表。

}

3.2 模型預測

  在模型訓練階段會經過向Pipeline的fit()方法傳遞DataFrame數據來訓練模型,從而生成一個PipelineModel對象(Transformer子類),該對象內部有一個stages容器,存放了全部Transformer對象。

  當進行模型預測時,即經過向PipelineModel對象的transform傳遞一個DataFrame數據來預測時,會依序調用其stages容器中的Transformer對象,每一個Transformer對象都有一個DataFrame輸入和一個DataFrame的輸出,最後生成一個DataFrame做爲用戶的輸出,如圖 3所示。

 

圖 3

相似,能夠查看PipelineModel對象的transform()方法,以下所示:

override def transform(dataset: Dataset[_]): DataFrame = {

transformSchema(dataset.schema, logging = true)

stages.foldLeft(dataset.toDF)((cur, transformer) => transformer.transform(cur))

}

  stages.foldLeft(dataset.toDF)((cur, transformer) => transformer.transform(cur))語句正是圖 3的實現,即第一次輸入數據是dataset.toDF,而後每次調用transformer.transform(cur))方法,產生的DataFrame輸出做爲下一次的輸入。

3.3 關係總結

  經過上述Pipeline工做機制的分析,如今從機器學習的角度總結一下Pipeline、Transformer和Estimator三者之間的關係,如圖 4所示。

 

圖 4

  1. Transformer:是對數據進行預處理,如特徵向量萃取、向量轉換或降維;
  2. Estimator:機器學習的某種算法,如線性迴歸、貝葉斯或支持向量機;
  3. Pipeline:是一種算法組織者,將TransformerEstimator組織成有序的執行過程。

4. Examples

4.1 Estimator、Transformer和Param

  本節以Estimator類爲例,沒有使用Pipeline結構來組織Estimator和Transformer對象。Estimator類能夠單獨使用,不須要Pipeline結構也能工做,此時Estimator相似Scikit-learn框架。首先,用戶直接調用Estimator對象的fit()方法來訓練數據;而後,根據fit()方法返回的Transformer對象,用戶接着調用Transformer的transform()方法來預測或測試;

以下所示的完整程序:

// scalastyle:off println

package org.apache.spark.examples.ml

 

// $example on$

import org.apache.spark.ml.classification.LogisticRegression

import org.apache.spark.ml.linalg.{Vector, Vectors}

import org.apache.spark.ml.param.ParamMap

import org.apache.spark.sql.Row

// $example off$

import org.apache.spark.sql.SparkSession

 

object EstimatorTransformerParamExample {

 

def main(args: Array[String]): Unit = {

val spark = SparkSession

.builder

.appName("EstimatorTransformerParamExample")

.getOrCreate()

 

// $example on$

// Prepare training data from a list of (label, features) tuples.

val training = spark.createDataFrame(Seq(

(1.0, Vectors.dense(0.0, 1.1, 0.1)),

(0.0, Vectors.dense(2.0, 1.0, -1.0)),

(0.0, Vectors.dense(2.0, 1.3, 1.0)),

(1.0, Vectors.dense(0.0, 1.2, -0.5))

)).toDF("label", "features")

 

// Create a LogisticRegression instance. This instance is an Estimator.

val lr = new LogisticRegression()

// Print out the parameters, documentation, and any default values.

println("LogisticRegression parameters:\n" + lr.explainParams() + "\n")

 

// We may set parameters using setter methods.

lr.setMaxIter(10)

.setRegParam(0.01)

 

// Learn a LogisticRegression model. This uses the parameters stored in lr.

val model1 = lr.fit(training)

// Since model1 is a Model (i.e., a Transformer produced by an Estimator),

// we can view the parameters it used during fit().

// This prints the parameter (name: value) pairs, where names are unique IDs for this

// LogisticRegression instance.

println("Model 1 was fit using parameters: " + model1.parent.extractParamMap)

 

// We may alternatively specify parameters using a ParamMap,

// which supports several methods for specifying parameters.

val paramMap = ParamMap(lr.maxIter -> 20)

.put(lr.maxIter, 30) // Specify 1 Param. This overwrites the original maxIter.

.put(lr.regParam -> 0.1, lr.threshold -> 0.55) // Specify multiple Params.

 

// One can also combine ParamMaps.

val paramMap2 = ParamMap(lr.probabilityCol -> "myProbability") // Change output column name.

val paramMapCombined = paramMap ++ paramMap2

 

// Now learn a new model using the paramMapCombined parameters.

// paramMapCombined overrides all parameters set earlier via lr.set* methods.

val model2 = lr.fit(training, paramMapCombined)

println("Model 2 was fit using parameters: " + model2.parent.extractParamMap)

 

// Prepare test data.

val test = spark.createDataFrame(Seq(

(1.0, Vectors.dense(-1.0, 1.5, 1.3)),

(0.0, Vectors.dense(3.0, 2.0, -0.1)),

(1.0, Vectors.dense(0.0, 2.2, -1.5))

)).toDF("label", "features")

 

// Make predictions on test data using the Transformer.transform() method.

// LogisticRegression.transform will only use the 'features' column.

// Note that model2.transform() outputs a 'myProbability' column instead of the usual

// 'probability' column since we renamed the lr.probabilityCol parameter previously.

model2.transform(test)

.select("features", "label", "myProbability", "prediction")

.collect()

.foreach { case Row(features: Vector, label: Double, prob: Vector, prediction: Double) =>

println(s"($features, $label) -> prob=$prob, prediction=$prediction")

}

// $example off$

 

spark.stop()

}

}

  其實Estimator類的單獨使用,也能夠理解爲Pipeline對象只有一個Estimator對象。上述的程序來自:\src\main\scala\org\apache\spark\examples\ml\ ElementwiseProductExample.scala

4.2 Pipeline

  輸入的Dataframe通過PipelineStage對象處理後悔輸出新的DataFrame,此時輸出的DataFrame會增長一些列,即增長了一些特徵,而具體增長什麼列,須要看具體是什麼PipelineStage對象。

以下所示,輸入DataFrame只有三列"id"、"text"、"label",但輸出DataFrame不只保存了輸入列,同時增長了一些列。

package org.apache.spark.examples.ml

 

// $example on$

import org.apache.spark.ml.{Pipeline, PipelineModel}

import org.apache.spark.ml.classification.LogisticRegression

import org.apache.spark.ml.feature.{HashingTF, Tokenizer}

import org.apache.spark.ml.linalg.Vector

import org.apache.spark.sql.Row

// $example off$

import org.apache.spark.sql.SparkSession

 

object PipelineExample {

 

def main(args: Array[String]): Unit = {

val spark = SparkSession

.builder

.appName("PipelineExample")

.getOrCreate()

 

// $example on$

// Prepare training documents from a list of (id, text, label) tuples.

val training = spark.createDataFrame(Seq(

(0L, "a b c d e spark", 1.0),

(1L, "b d", 0.0),

(2L, "spark f g h", 1.0),

(3L, "hadoop mapreduce", 0.0)

)).toDF("id", "text", "label")

 

// Configure an ML pipeline, which consists of three stages: tokenizer, hashingTF, and lr.

//Tokenizer功能是對輸入的DataFrame某一列進行分割,分割後將數據添加到DataFrame的新列種

val tokenizer = new Tokenizer()

.setInputCol("text") //設置輸入DataFrame中要處理的列名字

.setOutputCol("words") //設置輸出的DataFrame中增長列的名字

val hashingTF = new HashingTF()

.setNumFeatures(1000)

.setInputCol(tokenizer.getOutputCol)

.setOutputCol("features")

val lr = new LogisticRegression()

.setMaxIter(10)

.setRegParam(0.001)

val pipeline = new Pipeline()

.setStages(Array(tokenizer, hashingTF, lr))

 

// Fit the pipeline to training documents.

val model = pipeline.fit(training)

 

// Now we can optionally save the fitted pipeline to disk

model.write.overwrite().save("/tmp/spark-logistic-regression-model")

 

// We can also save this unfit pipeline to disk

pipeline.write.overwrite().save("/tmp/unfit-lr-model")

 

// And load it back in during production

val sameModel = PipelineModel.load("/tmp/spark-logistic-regression-model")

 

// Prepare test documents, which are unlabeled (id, text) tuples.

val test = spark.createDataFrame(Seq(

(4L, "spark i j k"),

(5L, "l m n"),

(6L, "spark hadoop spark"),

(7L, "apache hadoop")

)).toDF("id", "text")

 

// Make predictions on test documents.

model.transform(test)

.select("id", "text", "probability", "prediction")

.collect()

.foreach { case Row(id: Long, text: String, prob: Vector, prediction: Double) =>

println(s"($id, $text) --> prob=$prob, prediction=$prediction")

}

// $example off$

 

spark.stop()

}

}

5. 參考文獻

[1]. Spark MLlib
相關文章
相關標籤/搜索