協同過濾(Collaborative Filtering,簡稱CF,WIKI上的定義是:簡單來講是利用某個興趣相投、擁有共同經驗之羣體的喜愛來推薦感興趣的資訊給使用者,我的透過合做的機制給予資訊至關程度的迴應(如評分)並記錄下來以達到過濾的目的,進而幫助別人篩選資訊,迴應不必定侷限於特別感興趣的,特別不感興趣資訊的紀錄也至關重要。java
協同過濾常被應用於推薦系統。這些技術旨在補充用戶—商品關聯矩陣中所缺失的部分。算法
MLlib 當前支持基於模型的協同過濾,其中用戶和商品經過一小組隱性因子進行表達,而且這些因子也用於預測缺失的元素。MLLib 使用交替最小二乘法(ALS) 來學習這些隱性因子。apache
用戶對物品或者信息的偏好,根據應用自己的不一樣,可能包括用戶對物品的評分、用戶查看物品的記錄、用戶的購買記錄等。其實這些用戶的偏好信息能夠分爲兩類:app
顯式的用戶反饋能準確地反映用戶對物品的真實喜愛,但須要用戶付出額外的代價;而隱式的用戶行爲,經過一些分析和處理,也能反映用戶的喜愛,只是數據不是很精確,有些行爲的分析存在較大的噪音。但只要選擇正確的行爲特徵,隱式的用戶反饋也能獲得很好的效果,只是行爲特徵的選擇可能在不一樣的應用中有很大的不一樣,例如在電子商務的網站上,購買行爲其實就是一個能很好表現用戶喜愛的隱式反饋。eclipse
推薦引擎根據不一樣的推薦機制可能用到數據源中的一部分,而後根據這些數據,分析出必定的規則或者直接對用戶對其餘物品的喜愛進行預測計算。這樣推薦引擎能夠在用戶進入時給他推薦他可能感興趣的物品。ide
MLlib目前支持基於協同過濾的模型,在這個模型裏,用戶和產品被一組能夠用來預測缺失項目的潛在因子來描述。特別是咱們實現交替最小二乘(ALS)算法來學習這些潛在的因子,在 MLlib 中的實現有以下參數:oop
在本實例中將使用協同過濾算法對GroupLens Research(http://grouplens.org/datasets/movielens/)提供的數據進行分析,該數據爲一組從20世紀90年底到21世紀初由MovieLens用戶提供的電影評分數據,這些數據中包括電影評分、電影元數據(風格類型和年代)以及關於用戶的人口統計學數據(年齡、郵編、性別和職業等)。根據不一樣需求該組織提供了不一樣大小的樣本數據,不一樣樣本信息中包含三種數據:評分、用戶信息和電影信息。學習
對這些數據分析進行以下步驟:測試
1. 裝載以下兩種數據:網站
a)裝載樣本評分數據,其中最後一列時間戳除10的餘數做爲key,Rating爲值;
b)裝載電影目錄對照表(電影ID->電影標題)
2.將樣本評分表以key值切分紅3個部分,分別用於訓練 (60%,並加入用戶評分), 校驗 (20%), and 測試 (20%)
3.訓練不一樣參數下的模型,並再校驗集中驗證,獲取最佳參數下的模型
4.用最佳模型預測測試集的評分,計算和實際評分之間的均方根偏差
5.根據用戶評分的數據,推薦前十部最感興趣的電影(注意要剔除用戶已經評分的電影)
在MovieLens提供的電影評分數據分爲三個表:評分、用戶信息和電影信息,在該系列提供的附屬數據提供大概6000位讀者和100萬個評分數據,具體位置爲/data/class8/movielens/data目錄下,對三個表數聽說明能夠參考該目錄下README文檔。
1.評分數聽說明(ratings.data)
該評分數據總共四個字段,格式爲UserID::MovieID::Rating::Timestamp,分爲爲用戶編號::電影編號::評分::評分時間戳,其中各個字段說明以下:
使用的ratings.dat的數據樣本以下所示:
1::1193::5::978300760
1::661::3::978302109
1::914::3::978301968
1::3408::4::978300275
1::2355::5::978824291
1::1197::3::978302268
1::1287::5::978302039
1::2804::5::978300719
2.用戶信息(users.dat)
用戶信息五個字段,格式爲UserID::Gender::Age::Occupation::Zip-code,分爲爲用戶編號::性別::年齡::職業::郵編,其中各個字段說明以下:
使用的users.dat的數據樣本以下所示:
1::F::1::10::48067
2::M::56::16::70072
3::M::25::15::55117
4::M::45::7::02460
5::M::25::20::55455
6::F::50::9::55117
7::M::35::1::06810
8::M::25::12::11413
3.電影信息(movies.dat)
電影數據分爲三個字段,格式爲MovieID::Title::Genres,分爲爲電影編號::電影名::電影類別,其中各個字段說明以下:
使用的movies.dat的數據樣本以下所示:
1::Toy Story (1995)::Animation|Children's|Comedy 2::Jumanji (1995)::Adventure|Children's|Fantasy 3::Grumpier Old Men (1995)::Comedy|Romance 4::Waiting to Exhale (1995)::Comedy|Drama 5::Father of the Bride Part II (1995)::Comedy 6::Heat (1995)::Action|Crime|Thriller 7::Sabrina (1995)::Comedy|Romance 8::Tom and Huck (1995)::Adventure|Children's
程序代碼
import java.io.File import scala.io.Source import org.apache.log4j.{Level, Logger} import org.apache.spark.SparkConf import org.apache.spark.SparkContext import org.apache.spark.SparkContext._ import org.apache.spark.rdd._ import org.apache.spark.mllib.recommendation.{ALS, Rating, MatrixFactorizationModel} object MovieLensALS { def main(args: Array[String]) {
// 屏蔽沒必要要的日誌顯示在終端上 Logger.getLogger("org.apache.spark").setLevel(Level.WARN) Logger.getLogger("org.eclipse.jetty.server").setLevel(Level.OFF) if (args.length != 2) { println("Usage: /path/to/spark/bin/spark-submit --driver-memory 2g --class week7.MovieLensALS " + "week7.jar movieLensHomeDir personalRatingsFile") sys.exit(1) } // 設置運行環境 val conf = new SparkConf().setAppName("MovieLensALS").setMaster("local[4]") val sc = new SparkContext(conf) // 裝載用戶評分,該評分由評分器生成 val myRatings = loadRatings(args(1)) val myRatingsRDD = sc.parallelize(myRatings, 1) // 樣本數據目錄 val movieLensHomeDir = args(0) // 裝載樣本評分數據,其中最後一列Timestamp取除10的餘數做爲key,Rating爲值,即(Int,Rating) val ratings = sc.textFile(new File(movieLensHomeDir, "ratings.dat").toString).map { line => val fields = line.split("::") (fields(3).toLong % 10, Rating(fields(0).toInt, fields(1).toInt, fields(2).toDouble)) } // 裝載電影目錄對照表(電影ID->電影標題) val movies = sc.textFile(new File(movieLensHomeDir, "movies.dat").toString).map { line => val fields = line.split("::") (fields(0).toInt, fields(1)) }.collect().toMap val numRatings = ratings.count() val numUsers = ratings.map(_._2.user).distinct().count() val numMovies = ratings.map(_._2.product).distinct().count() println("Got " + numRatings + " ratings from " + numUsers + " users on " + numMovies + " movies.") // 將樣本評分表以key值切分紅3個部分,分別用於訓練 (60%,並加入用戶評分), 校驗 (20%), and 測試 (20%) // 該數據在計算過程當中要屢次應用到,因此cache到內存 val numPartitions = 4 val training = ratings.filter(x => x._1 < 6) .values .union(myRatingsRDD) //注意ratings是(Int,Rating),取value便可 .repartition(numPartitions) .cache() val validation = ratings.filter(x => x._1 >= 6 && x._1 < 8) .values .repartition(numPartitions) .cache() val test = ratings.filter(x => x._1 >= 8).values.cache() val numTraining = training.count() val numValidation = validation.count() val numTest = test.count() println("Training: " + numTraining + ", validation: " + numValidation + ", test: " + numTest)
// 訓練不一樣參數下的模型,並在校驗集中驗證,獲取最佳參數下的模型 val ranks = List(8, 12) val lambdas = List(0.1, 10.0) val numIters = List(10, 20) var bestModel: Option[MatrixFactorizationModel] = None var bestValidationRmse = Double.MaxValue var bestRank = 0 var bestLambda = -1.0 var bestNumIter = -1 for (rank <- ranks; lambda <- lambdas; numIter <- numIters) { val model = ALS.train(training, rank, numIter, lambda) val validationRmse = computeRmse(model, validation, numValidation) println("RMSE (validation) = " + validationRmse + " for the model trained with rank = " + rank + ", lambda = " + lambda + ", and numIter = " + numIter + ".") if (validationRmse < bestValidationRmse) { bestModel = Some(model) bestValidationRmse = validationRmse bestRank = rank bestLambda = lambda bestNumIter = numIter } } // 用最佳模型預測測試集的評分,並計算和實際評分之間的均方根偏差 val testRmse = computeRmse(bestModel.get, test, numTest) println("The best model was trained with rank = " + bestRank + " and lambda = " + bestLambda + ", and numIter = " + bestNumIter + ", and its RMSE on the test set is " + testRmse + ".") // create a naive baseline and compare it with the best model val meanRating = training.union(validation).map(_.rating).mean val baselineRmse = math.sqrt(test.map(x => (meanRating - x.rating) * (meanRating - x.rating)).mean) val improvement = (baselineRmse - testRmse) / baselineRmse * 100 println("The best model improves the baseline by " + "%1.2f".format(improvement) + "%.") // 推薦前十部最感興趣的電影,注意要剔除用戶已經評分的電影 val myRatedMovieIds = myRatings.map(_.product).toSet val candidates = sc.parallelize(movies.keys.filter(!myRatedMovieIds.contains(_)).toSeq) val recommendations = bestModel.get .predict(candidates.map((0, _))) .collect() .sortBy(-_.rating) .take(10) var i = 1 println("Movies recommended for you:") recommendations.foreach { r => println("%2d".format(i) + ": " + movies(r.product)) i += 1 } sc.stop() } /** 校驗集預測數據和實際數據之間的均方根偏差 **/ def computeRmse(model: MatrixFactorizationModel, data: RDD[Rating], n: Long): Double = { val predictions: RDD[Rating] = model.predict(data.map(x => (x.user, x.product))) val predictionsAndRatings = predictions.map(x => ((x.user, x.product), x.rating)) .join(data.map(x => ((x.user, x.product), x.rating))) .values math.sqrt(predictionsAndRatings.map(x => (x._1 - x._2) * (x._1 - x._2)).reduce(_ + _) / n) } /** 裝載用戶評分文件 **/ def loadRatings(path: String): Seq[Rating] = { val lines = Source.fromFile(path).getLines() val ratings = lines.map { line => val fields = line.split("::") Rating(fields(0).toInt, fields(1).toInt, fields(2).toDouble) }.filter(_.rating > 0.0) if (ratings.isEmpty) { sys.error("No ratings provided.") } else { ratings.toSeq } } }
第一步 使用以下命令啓動Spark集羣
$cd /app/hadoop/spark-1.1.0 $sbin/start-all.sh
第二步 進行用戶評分,生成用戶樣本數據
因爲該程序中最終推薦給用戶十部電影,這須要用戶提供對樣本電影數據的評分,而後根據生成的最佳模型獲取當前用戶推薦電影。用戶可使用/home/hadoop/upload/class8/movielens/bin/rateMovies程序進行評分,最終生成personalRatings.txt文件:
第三步 在IDEA中設置運行環境
在IDEA運行配置中設置MovieLensALS運行配置,須要設置輸入數據所在文件夾和用戶的評分文件路徑:
第四步 執行並觀察輸出
RMSE (validation) = 0.8680885498009973 for the model trained with rank = 8, lambda = 0.1, and numIter = 10.
RMSE (validation) = 0.868882967482595 for the model trained with rank = 8, lambda = 0.1, and numIter = 20.
RMSE (validation) = 3.7558695311242833 for the model trained with rank = 8, lambda = 10.0, and numIter = 10.
RMSE (validation) = 3.7558695311242833 for the model trained with rank = 8, lambda = 10.0, and numIter = 20.
RMSE (validation) = 0.8663942501841964 for the model trained with rank = 12, lambda = 0.1, and numIter = 10.
RMSE (validation) = 0.8674684744165418 for the model trained with rank = 12, lambda = 0.1, and numIter = 20.
RMSE (validation) = 3.7558695311242833 for the model trained with rank = 12, lambda = 10.0, and numIter = 10.
RMSE (validation) = 3.7558695311242833 for the model trained with rank = 12, lambda = 10.0, and numIter = 20.
The best model was trained with rank = 12 and lambda = 0.1, and numIter = 10, and its RMSE on the test set is 0.8652326018300565.
The best model improves the baseline by 22.30%.
Movies recommended for you:
1: Bewegte Mann, Der (1994)
2: Chushingura (1962)
3: Love Serenade (1996)
4: For All Mankind (1989)
5: Vie est belle, La (Life is Rosey) (1987)
6: Bandits (1997)
7: King of Masks, The (Bian Lian) (1996)
8: I'm the One That I Want (2000)
9: Big Trees, The (1952)
10: First Love, Last Rites (1997)