Spark Pipeline API的靈感來自scikit-learn,旨在簡化機器學習流程的建立,調優和檢驗。
ML Pipeline一般由一下幾個階段構成:算法
ML Pipeline的各階段是經過一系列轉換器和評估器來實現的。sql
abstract class Transformer extends PipelineStage { ... def transform(dataset: Dataset[_]):DataFrame }
轉換器抽象類Transformer
定義了transform
方法,用來將一個DataFrame轉換爲另外一個DataFrame,spark中定義type DataFrame = org.apache.spark.sql.Dataset[org.apache.spark.sql.Row]
。apache
使用轉換器時,通常須要指定inputCol
和outputCol
,即指定一個輸入列,進行轉換操做,獲得一個新的列(原來的列任然存在)。app
/** @group setParam */ def setInputCol(value: String): T = set(inputCol, value).asInstanceOf[T] /** @group setParam */ def setOutputCol(value: String): T = set(outputCol, value).asInstanceOf[T]
評估器是對學習算法的抽象。評估器抽象類Estimator
定義了fit()
方法,該方法以DataFrame爲輸入,返回一個算法模型。
在許多Estimator中一樣會有setInputCol()
方法和setOutputCol()
方法,實際上這裏指定的輸入列和輸出列是爲Estimator.fit()返回的Transformer而準備的。dom
abstract class Estimator[M <: Model[M]] extends PipelineStage { def fit(dataset: Dataset[_]): M }
轉換器Transform和評估器Estimator都繼承自PipelineStage
,而Pipeline能夠理解爲是從數據預處理,特徵提取到模型擬合和驗證的工做流程,是由一系列按特定順序運行的PipelineStage構成的,每個PipelineStage對應一個轉換器或評估器。建立一個Pipeline對象後,能夠經過setStages(value: Array[_ <: PipelineStage])
方法指定工做流程中使用到的PipelineStage以及它們之間的前後順序。機器學習
class Pipeline @Since("1.4.0") ( @Since("1.4.0") override val uid: String) extends Estimator[PipelineModel] with MLWritable { def setStages(value: Array[_ <: PipelineStage]): this.type = { set(stages, value.asInstanceOf[Array[PipelineStage]]) this } override def fit(dataset: Dataset[_]): PipelineModel = {} }
注意到Pipeline自己就是一個Estimator,能夠經過調用fit()方法,返回一個PipelineModel
,而PipelineModel是一個Transformer。ide
class PipelineModel private[ml] ( @Since("1.4.0") override val uid: String, @Since("1.4.0") val stages: Array[Transformer]) extends Model[PipelineModel] with MLWritable with Logging {} abstract class Model[M <: Model[M]] extends Transformer {}
val spark: SparkSession = SparkSession.builder().appName("OneHotEncoderExample").master("local[*]").getOrCreate() val df: DataFrame = spark.createDataFrame(Seq((0, 3), (1, 2), (2, 4), (3, 3), (4, 3), (5, 4))).toDF("id", "category") val indexer: StringIndexerModel = new StringIndexer().setInputCol("category").setOutputCol("categoryIndex").fit(df) val indexed: DataFrame = indexer.transform(df) indexed.show() val encoder: OneHotEncoder = new OneHotEncoder().setInputCol("categoryIndex").setOutputCol("categoryVec").setDropLast(false) val encoded: DataFrame = encoder.transform(indexed) encoded.show()
output學習
+---+--------+-------------+ | id|category|categoryIndex| +---+--------+-------------+ | 0| 3| 0.0| | 1| 2| 2.0| | 2| 4| 1.0| | 3| 3| 0.0| | 4| 3| 0.0| | 5| 4| 1.0| +---+--------+-------------+ +---+--------+-------------+-------------+ | id|category|categoryIndex| categoryVec| +---+--------+-------------+-------------+ | 0| 3| 0.0|(3,[0],[1.0])| | 1| 2| 2.0|(3,[2],[1.0])| | 2| 4| 1.0|(3,[1],[1.0])| | 3| 3| 0.0|(3,[0],[1.0])| | 4| 3| 0.0|(3,[0],[1.0])| | 5| 4| 1.0|(3,[1],[1.0])| +---+--------+-------------+-------------+
StringIndexer
是一個Estimator,它的功能是maps a string column of labels to an ML column of label indices
。即對輸入的標籤列進行編號(string類型),且出現頻率越高的標籤,編號越靠前,若是一個標籤對應的編號爲0,表示該標籤出現的最多。OneHotEncoder
是一個Tramsformer。ui
val spark: SparkSession = SparkSession.builder().appName("test_VectorAssembler").master("local[*]").getOrCreate() val df: DataFrame = spark.createDataFrame(Seq((0.1, 0.3, 0.2), (1.0, 1.2, 0.5), (2.1, 0.4, 0.2), (1.1, 0.1, 0.3))).toDF("f1", "f2", "f3") val assembler: VectorAssembler = new VectorAssembler().setInputCols(Array("f1", "f2", "f3")).setOutputCol("features") val df1: DataFrame = assembler.transform(df) df1.show()
+---+---+---+-------------+ | f1| f2| f3| features| +---+---+---+-------------+ |0.1|0.3|0.2|[0.1,0.3,0.2]| |1.0|1.2|0.5|[1.0,1.2,0.5]| |2.1|0.4|0.2|[2.1,0.4,0.2]| |1.1|0.1|0.3|[1.1,0.1,0.3]| +---+---+---+-------------+
VectorAssembler
是一個Transformer,它的做用是將多個列合併爲一個。this
def decisionTreePipeline(vectorAssembler: VectorAssembler, dataFrame: DataFrame) = { val Array(training, test): Array[Dataset[Row]] = dataFrame.randomSplit(Array(0.8, 0.2), seed = 12345) val stages = new mutable.ArrayBuffer[PipelineStage]() val labelIndexer = new StringIndexer().setInputCol("label").setOutputCol("indexedLabel") stages += labelIndexer val dt: DecisionTreeClassifier = new DecisionTreeClassifier().setFeaturesCol(vectorAssembler.getOutputCol) .setLabelCol("indexedLabel").setMaxDepth(5).setMaxBins(32).setMinInstancesPerNode(1) .setMinInfoGain(0.0).setCacheNodeIds(false).setCheckpointInterval(10) stages += vectorAssembler stages += dt val pipeline: Pipeline = new Pipeline().setStages(stages.toArray) val model: PipelineModel = pipeline.fit(training) val holdout: DataFrame = model.transform(test).select("prediction", "label") val evaluator: MulticlassClassificationEvaluator = new MulticlassClassificationEvaluator().setPredictionCol("prediction").setLabelCol("label").setMetricName("accuracy") val acc: Double = evaluator.evaluate(holdout) println(acc) }
該案例中的決策樹Pipeline由3個PipelineStage構成:StringIndexer、VectorAssembler和DecisionTreeClassfier。分別爲Estimator、Transformer和Estimator。
[1] Machine Learning with Spark (Second Edition), Rajdeep Dua.