協同過濾(ALS)

Spark ALS算法進行矩陣分解,U * V = Qjava

若是數據不是運行在集羣上,而是運行在本地,爲了保證內存充足,在啓動spark-shell時須要指定參數--driver-memory 6g算法

    1. 數據集

藝術家點播數據集:sql

用戶和藝術家的關係是經過其餘行動隱含提現出來的,例如播放歌曲或專輯,而不是經過顯式的評分或者點贊獲得的。這被稱爲隱式反饋數據。如今的家用電視點播也是這樣,用戶通常不會主動評分。shell

數據集下載地址是http://www.iro.umontreal.ca/~lisa/datasets/profiledata_06-May-2005.tar.gzapache

    1. 數據處理

Spark MLibALS算法實現有一個小缺點:它要求用戶和產品的ID必須是數值型,而且是32位非負整數,這意味着大於Integer.MAX_VALUE2147483647)的ID是非法的。咱們首先看看數據集是否知足要求:dom

scala> val rawUserArtistData = sc.textFile("D:/Workspace/AnalysisWithSpark/src/main/java/advanced/chapter3/profiledata_06-May-2005/user_artist_data.txt")機器學習

rawUserArtistData: org.apache.spark.rdd.RDD[String] = D:/Workspace/AnalysisWithSpark/src/main/java/advanced/chapter3/profiledata_06-May-2005/user_artist_data.txt MapPartitionsRDD[1] at textFile at <console>:27學習

 

scala> rawUserArtistData.map(_.split(' ')(0).toDouble).stats()lua

res0: org.apache.spark.util.StatCounter = (count: 24296858, mean: 1947573.265353, stdev: 496000.544975, max: 2443548.000000, min: 90.000000)spa

 

scala> rawUserArtistData.map(_.split(' ')(1).toDouble).stats()

res1: org.apache.spark.util.StatCounter = (count: 24296858, mean: 1718704.093757, stdev: 2539389.040171, max: 10794401.000000, min: 1.000000)

過濾數據

val rawArtistData = sc.textFile("hdfs:///user/ds/artist_data.txt")

val artistByID = rawArtistData.flatMap { line =>

    val (id, name) = line.span(_ != '\t')

    if (name.isEmpty) {

        None

    } else {

        try {

            Some((id.toInt, name.trim))

        } catch {

            case e: NumberFormatException => None

        }

    }

}

 

 

    1. 訓練算法

首先將數據轉換成Rating

import org.apache.spark.mllib.recommendation._

val bArtistAlias = sc.broadcast(artistAlias)

val trainData = rawUserArtistData.map { line =>

    val Array(userID, artistID, count) = line.split(' ').map(_.toInt)

    val finalArtistID =

    bArtistAlias.value.getOrElse(artistID, artistID)

    Rating(userID, finalArtistID, count)

}.cache()

 

val model = ALS.trainImplicit(trainData, 10, 5, 0.01, 1.0)

//查看特徵變量:

model.userFeatures.mapValues(_.mkString(", ")).first()

 

//咱們能夠對此用戶作出5個推薦:

val recommendations = model.recommendProducts(2093760, 5)

recommendations.foreach(println)

 

    1. 選擇超參數

計算AUC這部分代碼沒有試。AUC(Area Under ROC Curve)ROC(Receiver Operating Characteristic,受試者工做特徵)線,它源於二戰中用於敵機檢測的雷達信號分析技術。在非均等代價下,ROC曲線不能直接反映出學習器的指望整體代價,而代價曲線則可達到該目的。

機器學習常涉及兩類參數:一類是算法的參數,亦稱超參數,數目常在10之內;另外一類是模型的參數,數目可能不少。前者一般是由人工設定多個參數候選值後產生模型,後者則是經過學習來產生多個候選模型。
ALS.trainImplicit()的參數包括如下幾個:

rank
  模型的潛在因素的個數,即用戶-特徵產品-特徵矩陣的列數;通常來講,它也是矩陣的階。

iterations
  矩陣分解迭代的次數;迭代的次數越多,花費的時間越長,但分解的結果可能會更好。

lambda
  標準的過擬合參數;值越大越不容易產生過擬合,但值太大會下降分解的準確度。lambda取較大的值看起來結果要稍微好一些。

alpha
  控制矩陣分解時,被觀察到的用戶-產品交互相對沒被觀察到的交互的權重。40是最初ALS論文的默認值,這說明了模型在強調用戶聽過什麼時的表現要比強調用戶沒聽過什麼時要好。

 

    1. 應用(產生推薦)

val someUsers = allData.map(_.user).distinct().take(100)

val someRecommendations =

    someUsers.map(userID => model.recommendProducts(userID, 5))

someRecommendations.map(

    recs => recs.head.user + " -> " + recs.map(_.product).mkString(", ")

).foreach(println)

 

    1. Rating數據代碼

def run(filename: String) {

    println("mlALS is running!")

    val data = getData(filename)

    println(s"dataLen:${data.count}")

 

    val als = new ALS()

      .setMaxIter(5)

      .setRegParam(0.01)

      .setUserCol("newUserId")

      .setItemCol("newMovieId")

      .setRatingCol("newRating");

 

    val split = data.randomSplit(Array(0.8, 0.2))

    val model = als.fit(split(0));

    println(s"userCol name:${model.getUserCol}")

 

    val predictions = model.transform(split(1))

    val evaluator = new RegressionEvaluator()

      .setMetricName("rmse")

      .setLabelCol("newRating")

      .setPredictionCol("prediction")

    val rmse = evaluator.evaluate(predictions)

   

    println(s"Root-mean-square error = $rmse")

   

    val userCommdmodel.recommendForAllUsers(10)

    userCommd.show()

  }

 

  def getData(filename: String) :Dataset[Row] = {

    val sqlContext = new SQLContext(sc.sparkContext)

    val df = sqlContext.load("com.databricks.spark.csv", Map("path" -> filename, "header" -> "true"))

    df.columns.map(col => println(s"col:${col}"))

   

    val newdf = df.select(df.col("userId").cast(IntegerType).as("newUserId"),

        df.col("movieId").cast(IntegerType).as("newMovieId"),

        df.col("rating").cast(DoubleType).as("newRating"),

        df.col("timestamp").cast(LongType).as("newTimeStamp"))

   

    newdf

  }

 

    1. 數據正則化

 

val dataFrame = sqlContext.read.format("libsvm").load("data/libsvm.txt")

 

    // L1正則化

    val normalizer = new Normalizer().setInputCol("features").setOutputCol("normFeatures")

      // 設置 L1正則化

      .setP(1.0)

    // 正則化轉換

val l1NormData = normalizer.transform(dataFrame)

  // L2正則化

     val l2InfNormData = normalizer.transform(dataFrame, normalizer.p -> 2)

    l2InfNormData.foreach(println)

相關文章
相關標籤/搜索