Spark機器學習筆記一

Spark機器學習庫現支持兩種接口的API:RDD-based和DataFrame-based,Spark官方網站上說,RDD-based APIs在2.0後進入維護模式,主要的機器學習API是spark-ml包中的DataFrame-based API,並將在3.0後徹底移除RDD-based API。es6

在學習了兩週Spark MLlib後,準備轉向DataFrame-based接口。因爲現有的文檔資料均是RDD-based接口,因而便去看了看Spark MLlib的源碼。DataFrame-based API 包含在org.apache.spark.ml包中,其中主要的類結構以下:算法

咱先看一個線性迴歸的例子examples/ml/LinearRegressionExample.scala,其首先定義了一個LinearRegression的對象:sql

val lir = new LinearRegression()
          .setFeaturesCol("features")
          .setLabelCol("label")
          .setRegParam(params.regParam)
          .setElasticNetParam(params.elasticNetParam)
          .setMaxIter(params.maxIter)
          .setTol(params.tol)

而後,調用fit方法訓練數據,獲得一個訓練好的模型lirModel,它是一個LinearRegressionModel類的對象。apache

val lirModel = lir.fit(training)

如今,咱們大概能夠理清MLlib機器學習的流程,和不少單機機器學習庫同樣,先定義一個模型並設置好參數,而後訓練數據,最後返回一個訓練好了的模型。dom

咱們如今在源碼中去查看LinearRegression和LinearRegressionModel,其類的依賴關係以下:機器學習

LinearRegression是一個Predictor,LinearRegressionModel是一個Model,那麼Predictor是學習算法,Model是訓練獲得的模型。除此以外,還有一類繼承自Params的類,這是一個表示參數的類。Predictor 和Model 共享一套參數。學習

如今用Spark MLlib來完成第一個機器學習例子,數據是我以前放在txt文件裏的迴歸數據,一共550多萬條,共13列,第一列是Label,後面是Features。分別演示兩種接口,先用舊的接口:網站

1.讀取原始數據:lua

scala> import org.apache.spark.mllib.linalg._
import org.apache.spark.mllib.linalg._
scala> import org.apache.spark.mllib.regression._
import org.apache.spark.mllib.regression._      
scala> val raw_data = sc.textFile("data/my/y_x.txt")
raw_data: org.apache.spark.rdd.RDD[String] = data/my/y_x.txt MapPartitionsRDD[1] at textFile at <console>:30

2.轉換格式,RDD-based接口以LabeledPoint爲輸入數據的格式:spa

scala> val data = raw_data.map{ line =>
     | val arr = line.split(' ').map(_.toDouble)
     | val label = arr.head
     | val features = Vectors.dense(arr.tail)| LabeledPoint(label,features)
     | }
data: org.apache.spark.rdd.RDD[org.apache.spark.mllib.regression.LabeledPoint] = MapPartitionsRDD[2] at map at <console>:32

3.劃分train、test數據集:

scala> val splits = data.randomSplit(Array(0.8, 0.2))
splits: Array[org.apache.spark.rdd.RDD[org.apache.spark.mllib.regression.LabeledPoint]] = Array(MapPartitionsRDD[3] at randomSplit at <console>:34, MapPartitionsRDD[4] at randomSplit at <console>:34)
scala> val train_set = splits(0).cache
train_set: org.apache.spark.rdd.RDD[org.apache.spark.mllib.regression.LabeledPoint] = MapPartitionsRDD[3] at randomSplit at <console>:34
scala> val test_set = splits(1).cache
test_set: org.apache.spark.rdd.RDD[org.apache.spark.mllib.regression.LabeledPoint] = MapPartitionsRDD[4] at randomSplit at <console>:34

4.使用LinearRegressionWithSGD.train訓練模型:

scala> val lr = LinearRegressionWithSGD.train(train_set,100,0.0001)
warning: there was one deprecation warning; re-run with -deprecation for details
16/08/26 09:20:44 WARN Executor: 1 block locks were not released by TID = 0:
[rdd_3_0]
lr: org.apache.spark.mllib.regression.LinearRegressionModel = org.apache.spark.mllib.regression.LinearRegressionModel: intercept = 0.0, numFeatures = 12

5.模型評估:

scala> val pred_labels = test_set.map(lp => (lp.label, lr.predict(lp.features))) 
pred_labels: org.apache.spark.rdd.RDD[(Double, Double)] = MapPartitionsRDD[17] at map at <console>:42
scala> val mse = pred_labels.map{case (p,v) => math.pow(p-v,2)}.mean
mse: Double = 0.05104150735910074

再用新的接口:

1.讀取原始數據:

scala> import org.apache.spark.ml.linalg._
import org.apache.spark.ml.linalg._
scala> import org.apache.spark.ml.regression._
import org.apache.spark.ml.regression._
scala> import org.apache.spark.sql._
import org.apache.spark.sql._
scala> val raw_data = spark.read.text("data/my/y_x.txt")
raw_data: org.apache.spark.sql.DataFrame = [value: string]

2.轉換數據

scala> val data = raw_data.rdd.map { case Row(line:String) => 
     | val arr = line.split(' ').map(_.toDouble)
     | val label = arr.head
     | val features = Vectors.dense(arr.tail)
     | (label,features)
     | }
data: org.apache.spark.rdd.RDD[(Double, org.apache.spark.ml.linalg.Vector)] = MapPartitionsRDD[4] at map at <console>:34

3.劃分數據集

scala> val splits = data.randomSplit(Array(0.8, 0.2))
splits: Array[org.apache.spark.rdd.RDD[(Double, org.apache.spark.ml.linalg.Vector)]] = Array(MapPartitionsRDD[5] at randomSplit at <console>:36, MapPartitionsRDD[6] at randomSplit at <console>:36)
scala> val train_set = splits(0).toDS.cache
train_set: org.apache.spark.sql.Dataset[(Double, org.apache.spark.ml.linalg.Vector)] = [_1: double, _2: vector]
scala> val test_set = splits(1).toDS.cache
test_set: org.apache.spark.sql.Dataset[(Double, org.apache.spark.ml.linalg.Vector)] = [_1: double, _2: vector]

4.建立LinearRegression對象,並設置模型參數。這裏設置類LabelCol和FeaturesCol列,默認爲「label」和「features」,而咱們的數據是"_1"和」_2「。

scala> val lir = new LinearRegression
lir: org.apache.spark.ml.regression.LinearRegression = linReg_c4e70a01bcd3
scala> lir.setFeaturesCol("_2")
res0: org.apache.spark.ml.regression.LinearRegression = linReg_c4e70a01bcd3
scala> lir.setLabelCol("_1")
res1: org.apache.spark.ml.regression.LinearRegression = linReg_c4e70a01bcd3

5.訓練模型

val model = lir.fit(train_set)
16/08/26 09:45:16 WARN Executor: 1 block locks were not released by TID = 0:
[rdd_9_0]
16/08/26 09:45:16 WARN WeightedLeastSquares: regParam is zero, which might cause numerical instability and overfitting.
model: org.apache.spark.ml.regression.LinearRegressionModel = linReg_c4e70a01bcd3

6.模型評估

scala> val res = model.transform(test_set)
res: org.apache.spark.sql.DataFrame = [_1: double, _2: vector ... 1 more field]
scala> import org.apache.spark.ml.evaluation._
import org.apache.spark.ml.evaluation._
scala> val eva = new RegressionEvaluator
eva: org.apache.spark.ml.evaluation.RegressionEvaluator = regEval_8fc6cce63aa9
scala> eva.setLabelCol("_1")
res6: eva.type = regEval_8fc6cce63aa9
scala> eva.setMetricName("mse")
res7: eva.type = regEval_8fc6cce63aa9
scala> eva.evaluate(res)
res8: Double = 0.027933653533088666                 
相關文章
相關標籤/搜索