spark1.0.0 mllib機器學習庫使用初探

本文機器學習庫使用的部分代碼來源於spark1.0.0官方文檔html

mllib是spark對機器學習算法和應用的實現庫,包括分類、迴歸、聚類、協同過濾、降維等,本文的主要內容爲如何使用scala語言建立sbt工程實現機器學習算法,並進行本地和集羣的運行。(初學者建議先在RDD交互式模式下按行輸入代碼,以熟悉scala架構)若想了解SBT等相關信息,可參見這裏算法

1.SVM(linear support vector machine)apache

  • 新建SimpleSVM目錄,在SimpleSVM目錄下,建立以下的目錄結構:

        

  • simple.sbt文件內容以下:
name := "SimpleSVM Project"
version := "1.0"
scalaVersion := "2.10.4"
libraryDependencies += "org.apache.spark" %% "spark-core" % "1.0.0"
libraryDependencies += "org.apache.spark" %% "spark-mllib" % "1.0.0"
resolvers += "Akka Repository" at "http://repo.akka.io/releases/"

PS:因爲該應用須要調用mllib,所以要特別注意在libraryDependencies加入spark-mllib,不然會編譯不經過的哦。bash

  • SimpleApp.scala文件內容以下:
import org.apache.spark.SparkContext
import org.apache.spark.mllib.classification.SVMWithSGD
import org.apache.spark.mllib.evaluation.BinaryClassificationMetrics
import org.apache.spark.mllib.regression.LabeledPoint
import org.apache.spark.mllib.linalg.Vectors
import org.apache.spark.mllib.util.MLUtils
import org.apache.spark.SparkContext._
import org.apache.spark.SparkConf

object SimpleApp{
  def main(args: Array[String]) {
    val conf = new SparkConf().setAppName("SimpleSVM Application")
    val sc = new SparkContext(conf)   
    val data = MLUtils.loadLibSVMFile(sc, "mllib/test50.txt")

    val splits = data.randomSplit(Array(0.6, 0.4), seed = 11L)
    val training = splits(0).cache()
    val test = splits(1)
    
    val numIterations = 100
    val model = SVMWithSGD.train(training, numIterations)

    model.clearThreshold()

    val scoreAndLabels = test.map { point =>
      val score = model.predict(point.features)
      (score, point.label)
    }

    val metrics = new BinaryClassificationMetrics(scoreAndLabels)
    val auROC = metrics.areaUnderROC()
    
    println("Area under ROC = " + auROC)
  }
}

PS:因爲咱們以前在spark配置過程當中將hadoop路徑配置好了,所以這裏的輸入路徑mllib/test50.txt架構

實際上爲HDFS文件系統中的文件,存儲位置與hadoop配置文件core-site.xml中的<name>相關(具體可參見這裏,這個地方很容易出錯)。所以須要先將test50.txt文件puthdfs上面,另外test50.txt文件爲libsvm文件的輸入格式,實例以下:dom

  • 編譯:

     cd ~/SimpleSVM機器學習

  sbt package     #打包過程,時間可能會比較長,最後會出現[success]XXXide

  PS:成功後會生成許多文件 target/scala-2.10/simplesvm-project_2.10-1.0.jaroop

  • 本地運行:

  spark-submit --class "SimpleApp" --master local target/scala-2.10/simplesvm-project_2.10-1.0.jar學習

  • 集羣運行:

      spark-submit --class "SimpleApp" --master spark://master:7077 target/scala-2.10/simplesvm-project_2.10-1.0.jar

  • 結果:

PS:若但願在算法中添加正則項因子,可將SimpleApp.scala文件修改以下:

import org.apache.spark.mllib.optimization.L1Updater

val svmAlg = new SVMWithSGD()
svmAlg.optimizer.
  setNumIterations(200).
  setRegParam(0.1).
  setUpdater(new L1Updater)
val modelL1 = svmAlg.run(training)

2.邏輯迴歸(Logistic Regression)

同理,若要實現邏輯迴歸算法則只需將SimpleApp.scala文件中的SVMWithSGD替換爲 LogisticRegressionWithSGD

3. 協同過濾(Collaborative filtering)

文件系統如上所示,協同過濾算法能夠將只需將SimpleApp.scala文件進行以下修改:

import org.apache.spark.mllib.recommendation.ALS
import org.apache.spark.mllib.recommendation.Rating
import org.apache.spark.mllib.linalg.Vectors
import org.apache.spark.SparkContext
import org.apache.spark.SparkContext._
import org.apache.spark.SparkConf

object SimpleApp{
  def main(args: Array[String]) {
    val conf = new SparkConf().setAppName("SimpleCF Application")
    val sc = new SparkContext(conf)
    val data = sc.textFile("mllib/test.data")

    val ratings = data.map(_.split(',') match { case Array(user, item, rate) =>
        Rating(user.toInt, item.toInt, rate.toDouble)
       })
    
    val rank = 10
    val numIterations = 5
    val model = ALS.train(ratings, rank, numIterations, 0.01)

    val usersProducts = ratings.map { case Rating(user, product, rate) =>
       (user, product)
    }
    val predictions = 
       model.predict(usersProducts).map { case Rating(user, product, rate) => 
          ((user, product), rate)
     }
    val ratesAndPreds = ratings.map { case Rating(user, product, rate) => 
       ((user, product), rate)
    }.join(predictions)
    val MSE = ratesAndPreds.map { case ((user, product), (r1, r2)) => 
       val err = (r1 - r2)
       err * err
    }.mean()
    println("Mean Squared Error = " + MSE)
  }
}

PS:同理,mllib/test.data存儲於HDFS文件系統,爲示例數據:

  • 本地運行:

  spark-submit --class "SimpleApp" --master local target/scala-2.10/simplecf-project_2.10-1.0.jar

  • 集羣運行:

      spark-submit --class "SimpleApp" --master spark://master:7077 target/scala-2.10/simplecf-project_2.10-1.0.jar

  • 結果:

PS:能夠加入alpha參數控制:

val alpha = 0.01
val model = ALS.trainImplicit(ratings, rank, numIterations, alpha)
同理聚類算法、降維方法代碼可參見這裏

本文爲原創博客,若轉載請註明出處。
相關文章
相關標籤/搜索