Spark MLlib協同過濾算法

         

 

算法說明

  協同過濾(Collaborative Filtering,簡稱CF,WIKI上的定義是:簡單來講是利用某個興趣相投、擁有共同經驗之羣體的喜愛來推薦感興趣的資訊給使用者,我的透過合做的機制給予資訊至關程度的迴應(如評分)並記錄下來以達到過濾的目的,進而幫助別人篩選資訊,迴應不必定侷限於特別感興趣的,特別不感興趣資訊的紀錄也至關重要。java

  協同過濾常被應用於推薦系統。這些技術旨在補充用戶—商品關聯矩陣中所缺失的部分。算法

  MLlib 當前支持基於模型的協同過濾,其中用戶和商品經過一小組隱性因子進行表達,而且這些因子也用於預測缺失的元素。MLLib 使用交替最小二乘法(ALS) 來學習這些隱性因子。apache

  用戶對物品或者信息的偏好,根據應用自己的不一樣,可能包括用戶對物品的評分、用戶查看物品的記錄、用戶的購買記錄等。其實這些用戶的偏好信息能夠分爲兩類:app

  • 顯式的用戶反饋:這類是用戶在網站上天然瀏覽或者使用網站之外,顯式地提供反饋信息,例如用戶對物品的評分或者對物品的評論。
  • 隱式的用戶反饋:這類是用戶在使用網站是產生的數據,隱式地反映了用戶對物品的喜愛,例如用戶購買了某物品,用戶查看了某物品的信息,等等。

  顯式的用戶反饋能準確地反映用戶對物品的真實喜愛,但須要用戶付出額外的代價;而隱式的用戶行爲,經過一些分析和處理,也能反映用戶的喜愛,只是數據不是很精確,有些行爲的分析存在較大的噪音。但只要選擇正確的行爲特徵,隱式的用戶反饋也能獲得很好的效果,只是行爲特徵的選擇可能在不一樣的應用中有很大的不一樣,例如在電子商務的網站上,購買行爲其實就是一個能很好表現用戶喜愛的隱式反饋。eclipse

  推薦引擎根據不一樣的推薦機制可能用到數據源中的一部分,而後根據這些數據,分析出必定的規則或者直接對用戶對其餘物品的喜愛進行預測計算。這樣推薦引擎能夠在用戶進入時給他推薦他可能感興趣的物品。ide

 

 

  MLlib目前支持基於協同過濾的模型,在這個模型裏,用戶和產品被一組能夠用來預測缺失項目的潛在因子來描述。特別是咱們實現交替最小二乘(ALS)算法來學習這些潛在的因子,在 MLlib 中的實現有以下參數:oop

  • numBlocks是用於並行化計算的分塊個數(設置爲-1時 爲自動配置);
  • rank是模型中隱性因子的個數;
  • iterations是迭代的次數;
  • lambda是ALS 的正則化參數;
  • implicitPrefs決定了是用顯性反饋ALS 的版本仍是用隱性反饋數據集的版本;
  • alpha是一個針對於隱性反饋 ALS 版本的參數,這個參數決定了偏好行爲強度的基準。

         

 

 

 

 

 

實例介紹

  在本實例中將使用協同過濾算法對GroupLens Research(http://grouplens.org/datasets/movielens/)提供的數據進行分析,該數據爲一組從20世紀90年底到21世紀初由MovieLens用戶提供的電影評分數據,這些數據中包括電影評分、電影元數據(風格類型和年代)以及關於用戶的人口統計學數據(年齡、郵編、性別和職業等)。根據不一樣需求該組織提供了不一樣大小的樣本數據,不一樣樣本信息中包含三種數據:評分、用戶信息和電影信息。學習

  對這些數據分析進行以下步驟:測試

  1. 裝載以下兩種數據:網站

    a)裝載樣本評分數據,其中最後一列時間戳除10的餘數做爲key,Rating爲值;

    b)裝載電影目錄對照表(電影ID->電影標題)

  2.將樣本評分表以key值切分紅3個部分,分別用於訓練 (60%,並加入用戶評分), 校驗 (20%), and 測試 (20%)

  3.訓練不一樣參數下的模型,並再校驗集中驗證,獲取最佳參數下的模型

  4.用最佳模型預測測試集的評分,計算和實際評分之間的均方根偏差

  5.根據用戶評分的數據,推薦前十部最感興趣的電影(注意要剔除用戶已經評分的電影)

 

 

 

 

 

 

 

 

測試數聽說明

  在MovieLens提供的電影評分數據分爲三個表:評分、用戶信息和電影信息,在該系列提供的附屬數據提供大概6000位讀者和100萬個評分數據,具體位置爲/data/class8/movielens/data目錄下,對三個表數聽說明能夠參考該目錄下README文檔。

  1.評分數聽說明(ratings.data)

  該評分數據總共四個字段,格式爲UserID::MovieID::Rating::Timestamp,分爲爲用戶編號::電影編號::評分::評分時間戳,其中各個字段說明以下:

  • 用戶編號範圍1~6040
  • 電影編號1~3952
  • 電影評分爲五星評分,範圍0~5
  • 評分時間戳單位秒
  • 每一個用戶至少有20個電影評分

 

使用的ratings.dat的數據樣本以下所示:

1::1193::5::978300760

1::661::3::978302109

1::914::3::978301968

1::3408::4::978300275

1::2355::5::978824291

1::1197::3::978302268

1::1287::5::978302039

1::2804::5::978300719

 

 

  2.用戶信息(users.dat)

  用戶信息五個字段,格式爲UserID::Gender::Age::Occupation::Zip-code,分爲爲用戶編號::性別::年齡::職業::郵編,其中各個字段說明以下:

  • 用戶編號範圍1~6040
  • 性別,其中M爲男性,F爲女性
  • 不一樣的數字表明不一樣的年齡範圍,如:25表明25~34歲範圍
  • 職業信息,在測試數據中提供了21中職業分類
  • 地區郵編

 

使用的users.dat的數據樣本以下所示:

1::F::1::10::48067

2::M::56::16::70072

3::M::25::15::55117

4::M::45::7::02460

5::M::25::20::55455

6::F::50::9::55117

7::M::35::1::06810

8::M::25::12::11413

 

 

 

3.電影信息(movies.dat)

  電影數據分爲三個字段,格式爲MovieID::Title::Genres,分爲爲電影編號::電影名::電影類別,其中各個字段說明以下:

  • 電影編號1~3952
  • 由IMDB提供電影名稱,其中包括電影上映年份
  • 電影分類,這裏使用實際分類名非編號,如:Action、Crime等

使用的movies.dat的數據樣本以下所示:

1::Toy Story (1995)::Animation|Children's|Comedy
2::Jumanji (1995)::Adventure|Children's|Fantasy
3::Grumpier Old Men (1995)::Comedy|Romance
4::Waiting to Exhale (1995)::Comedy|Drama
5::Father of the Bride Part II (1995)::Comedy
6::Heat (1995)::Action|Crime|Thriller
7::Sabrina (1995)::Comedy|Romance
8::Tom and Huck (1995)::Adventure|Children's

 

 

  程序代碼

import java.io.File
import scala.io.Source
import org.apache.log4j.{Level, Logger}
import org.apache.spark.SparkConf
import org.apache.spark.SparkContext
import org.apache.spark.SparkContext._
import org.apache.spark.rdd._
import org.apache.spark.mllib.recommendation.{ALS, Rating, MatrixFactorizationModel}

 

object MovieLensALS {
  def main(args: Array[String]) {
// 屏蔽沒必要要的日誌顯示在終端上 Logger.getLogger("org.apache.spark").setLevel(Level.WARN) Logger.getLogger("org.eclipse.jetty.server").setLevel(Level.OFF) if (args.length != 2) { println("Usage: /path/to/spark/bin/spark-submit --driver-memory 2g --class week7.MovieLensALS " + "week7.jar movieLensHomeDir personalRatingsFile") sys.exit(1) } // 設置運行環境 val conf = new SparkConf().setAppName("MovieLensALS").setMaster("local[4]") val sc = new SparkContext(conf) // 裝載用戶評分,該評分由評分器生成 val myRatings = loadRatings(args(1)) val myRatingsRDD = sc.parallelize(myRatings, 1) // 樣本數據目錄 val movieLensHomeDir = args(0) // 裝載樣本評分數據,其中最後一列Timestamp取除10的餘數做爲key,Rating爲值,即(Int,Rating) val ratings = sc.textFile(new File(movieLensHomeDir, "ratings.dat").toString).map { line => val fields = line.split("::") (fields(3).toLong % 10, Rating(fields(0).toInt, fields(1).toInt, fields(2).toDouble)) } // 裝載電影目錄對照表(電影ID->電影標題) val movies = sc.textFile(new File(movieLensHomeDir, "movies.dat").toString).map { line => val fields = line.split("::") (fields(0).toInt, fields(1)) }.collect().toMap val numRatings = ratings.count() val numUsers = ratings.map(_._2.user).distinct().count() val numMovies = ratings.map(_._2.product).distinct().count() println("Got " + numRatings + " ratings from " + numUsers + " users on " + numMovies + " movies.") // 將樣本評分表以key值切分紅3個部分,分別用於訓練 (60%,並加入用戶評分), 校驗 (20%), and 測試 (20%) // 該數據在計算過程當中要屢次應用到,因此cache到內存 val numPartitions = 4 val training = ratings.filter(x => x._1 < 6) .values .union(myRatingsRDD) //注意ratings是(Int,Rating),取value便可 .repartition(numPartitions) .cache() val validation = ratings.filter(x => x._1 >= 6 && x._1 < 8) .values .repartition(numPartitions) .cache() val test = ratings.filter(x => x._1 >= 8).values.cache() val numTraining = training.count() val numValidation = validation.count() val numTest = test.count() println("Training: " + numTraining + ", validation: " + numValidation + ", test: " + numTest)
// 訓練不一樣參數下的模型,並在校驗集中驗證,獲取最佳參數下的模型 val ranks = List(8, 12) val lambdas = List(0.1, 10.0) val numIters = List(10, 20) var bestModel: Option[MatrixFactorizationModel] = None var bestValidationRmse = Double.MaxValue var bestRank = 0 var bestLambda = -1.0 var bestNumIter = -1 for (rank <- ranks; lambda <- lambdas; numIter <- numIters) { val model = ALS.train(training, rank, numIter, lambda) val validationRmse = computeRmse(model, validation, numValidation) println("RMSE (validation) = " + validationRmse + " for the model trained with rank = " + rank + ", lambda = " + lambda + ", and numIter = " + numIter + ".") if (validationRmse < bestValidationRmse) { bestModel = Some(model) bestValidationRmse = validationRmse bestRank = rank bestLambda = lambda bestNumIter = numIter } } // 用最佳模型預測測試集的評分,並計算和實際評分之間的均方根偏差 val testRmse = computeRmse(bestModel.get, test, numTest) println("The best model was trained with rank = " + bestRank + " and lambda = " + bestLambda + ", and numIter = " + bestNumIter + ", and its RMSE on the test set is " + testRmse + ".") // create a naive baseline and compare it with the best model val meanRating = training.union(validation).map(_.rating).mean val baselineRmse = math.sqrt(test.map(x => (meanRating - x.rating) * (meanRating - x.rating)).mean) val improvement = (baselineRmse - testRmse) / baselineRmse * 100 println("The best model improves the baseline by " + "%1.2f".format(improvement) + "%.") // 推薦前十部最感興趣的電影,注意要剔除用戶已經評分的電影 val myRatedMovieIds = myRatings.map(_.product).toSet val candidates = sc.parallelize(movies.keys.filter(!myRatedMovieIds.contains(_)).toSeq) val recommendations = bestModel.get .predict(candidates.map((0, _))) .collect() .sortBy(-_.rating) .take(10) var i = 1 println("Movies recommended for you:") recommendations.foreach { r => println("%2d".format(i) + ": " + movies(r.product)) i += 1 } sc.stop() } /** 校驗集預測數據和實際數據之間的均方根偏差 **/ def computeRmse(model: MatrixFactorizationModel, data: RDD[Rating], n: Long): Double = { val predictions: RDD[Rating] = model.predict(data.map(x => (x.user, x.product))) val predictionsAndRatings = predictions.map(x => ((x.user, x.product), x.rating)) .join(data.map(x => ((x.user, x.product), x.rating))) .values math.sqrt(predictionsAndRatings.map(x => (x._1 - x._2) * (x._1 - x._2)).reduce(_ + _) / n) } /** 裝載用戶評分文件 **/ def loadRatings(path: String): Seq[Rating] = { val lines = Source.fromFile(path).getLines() val ratings = lines.map { line => val fields = line.split("::") Rating(fields(0).toInt, fields(1).toInt, fields(2).toDouble) }.filter(_.rating > 0.0) if (ratings.isEmpty) { sys.error("No ratings provided.") } else { ratings.toSeq } } }

      

 

 

 

 

 

 

 

 

 

IDEA執行狀況

  第一步   使用以下命令啓動Spark集羣

$cd /app/hadoop/spark-1.1.0
$sbin/start-all.sh

 

  第二步   進行用戶評分,生成用戶樣本數據

  因爲該程序中最終推薦給用戶十部電影,這須要用戶提供對樣本電影數據的評分,而後根據生成的最佳模型獲取當前用戶推薦電影。用戶可使用/home/hadoop/upload/class8/movielens/bin/rateMovies程序進行評分,最終生成personalRatings.txt文件:

 

 

 

 

 

 

第三步   在IDEA中設置運行環境

在IDEA運行配置中設置MovieLensALS運行配置,須要設置輸入數據所在文件夾和用戶的評分文件路徑:

  • 輸入數據所在目錄:輸入數據文件目錄,在該目錄中包含了評分信息、用戶信息和電影信息,這裏設置爲/home/hadoop/upload/class8/movielens/data/
  •  用戶的評分文件路徑:前一步驟中用戶對十部電影評分結果文件路徑,在這裏設置爲/home/hadoop/upload/class8/movielens/personalRatings.txt

 

第四步   執行並觀察輸出

  • 輸出Got 1000209 ratings from 6040 users on 3706 movies,表示本算法中計算數據包括大概100萬評分數據、6000多用戶和3706部電影;
  • 輸出Training: 602252, validation: 198919, test: 199049,表示對評分數據進行拆分爲訓練數據、校驗數據和測試數據,大體佔比爲6:2:2;
  • 在計算過程當中選擇8種不一樣模型對數據進行訓練,而後從中選擇最佳模型,其中最佳模型比基準模型提供22.30%

 

RMSE (validation) = 0.8680885498009973 for the model trained with rank = 8, lambda = 0.1, and numIter = 10.

RMSE (validation) = 0.868882967482595 for the model trained with rank = 8, lambda = 0.1, and numIter = 20.

RMSE (validation) = 3.7558695311242833 for the model trained with rank = 8, lambda = 10.0, and numIter = 10.

RMSE (validation) = 3.7558695311242833 for the model trained with rank = 8, lambda = 10.0, and numIter = 20.

RMSE (validation) = 0.8663942501841964 for the model trained with rank = 12, lambda = 0.1, and numIter = 10.

RMSE (validation) = 0.8674684744165418 for the model trained with rank = 12, lambda = 0.1, and numIter = 20.

RMSE (validation) = 3.7558695311242833 for the model trained with rank = 12, lambda = 10.0, and numIter = 10.

RMSE (validation) = 3.7558695311242833 for the model trained with rank = 12, lambda = 10.0, and numIter = 20.

The best model was trained with rank = 12 and lambda = 0.1, and numIter = 10, and its RMSE on the test set is 0.8652326018300565.

The best model improves the baseline by 22.30%.

 

  • 利用前面獲取的最佳模型,結合用戶提供的樣本數據,最終推薦給用戶以下影片:

Movies recommended for you:

 1: Bewegte Mann, Der (1994)

 2: Chushingura (1962)

 3: Love Serenade (1996)

 4: For All Mankind (1989)

 5: Vie est belle, La (Life is Rosey) (1987)

 6: Bandits (1997)

 7: King of Masks, The (Bian Lian) (1996)

 8: I'm the One That I Want (2000)

 9: Big Trees, The (1952)

10: First Love, Last Rites (1997)

        

相關文章
相關標籤/搜索