這個例子將使用 Audioscrobbler 公開的數據集。Audioscrobbler是http://www.last.fm/zh/第一個音樂推薦系統.
http://www.last.fm/zh/ 是第一個網絡流媒體音頻網站,成立與2002年。
Audioscrobbler 爲 「scrobbling」 提供了一個開發的 API,主要記錄聽衆聽取了哪些做家的歌曲。 這個網站利用這些信息創建了一個強大的額音樂推薦系統。這個系統達到了數百萬用戶,由於第三方的App和網站能夠提供收聽數據給推薦引擎。 在那個時期,研究推薦系統大部分侷限在學習相似評級的數據集。也就是說,推薦的人每每使用須要輸入像 「某某某 評分3.5分」 這樣的工具。 然而,Audioscrobbler 數據集有趣地方在於僅僅記錄播放的歷史:「某某某 播放了 什麼」。一個播放記錄帶來的信息量遠遠小於一個評分數據帶來的信息量,可是評分數據總量確定沒有播放歷史記錄的數據多,當大量播放歷史記錄放在一塊兒的時候,比評分數據將更有價值。 由這個網站公佈的一個2005年的數據集合能夠在http://www-etud.iro.umontreal.ca/~bergstrj/audioscrobbler_data.html上面下載。這個數據集合沒有解壓以前大小是 135MB,解壓以後是 500MB,解壓以後將會看到主要的數據集是 user_artist_data.txt 文件,裏面大約包含 141000 惟一的用戶和 1.6 百萬惟一的artist藝術家,大約 24.2 百萬用戶播放記錄。 固然每一個 artist 都是用 id 記錄的,id 與 名字的對照關係是在 artist_data.txt 中。注意同一個 artist 可能對應不少不一樣名字即有不一樣的 id。 因此這裏還有一個文件 artist_alias.txt 標識每一個 artist 的別名。裏面用一個惟一的id 標識全部同一個artist id 列表。html
咱們須要選擇一種適合於這種隱士反饋數據的算法。這個數據集所有是用戶與做家的歌曲之間的交互行爲數據。它沒有包含用戶和做家自己除了名字以外的其餘屬性信息。 這個就是典型的協同過濾算法。舉個例子:好比決定兩個用戶具備相同的品味的緣由是他們有相同的年齡,這個不是協同過濾。決定兩個用戶都喜歡同一首歌曲的緣由是他們之間有不少共同喜歡的歌曲,這纔是協同過濾。
這個數據集很大,由於他包含1千萬多條用戶播放記錄。可是從另外一個方面來講,它又很小數據量不夠,由於它很稀疏。平均起來,每一個用戶才放過171個藝術家的歌曲,而總共的藝術家有 1.6 百萬個。有些用戶甚至只是聽歌一個做曲家的歌曲,咱們須要一種算法,可以對這種用戶也給出合理的推薦。畢竟,每一個人剛開始在系統中開始產生記錄的那一刻都只聽過一個做家。這個也說明算法對新用戶準確度低,這種狀況當用戶交互行爲變多的時候會慢慢變好。 固然,咱們須要咱們的算法有能力擴展,處理大數據,而且很快。
接下來咱們例子裏面展現的算法是普遍分類算法模型中的一個叫作隱因素模型。模型嘗試經過觀察不到的潛在的緣由去解釋這些觀察到的大量的用戶產品交互行爲。 更具體的來講,這個例子將使用一個矩陣分解模型。下面介紹交叉最小方差模型。ios
算法名字 | 交叉最小方差, Alternating Least Squares, ALS |
算法描述 | Spark上的交替性最小二乘ALS本質是一種協同過濾的算法 |
算法原理 | 1. 首先將用戶推薦對象交互歷史轉換爲矩陣,行表示用戶,列表示推薦對象,矩陣對應 i,j 表示用戶 i 在對象 j 上有沒有行爲 <br /> 2. 協同過濾就是要像填數獨同樣,填滿1獲得的矩陣,採用的方法是矩陣分解算法原理圖 <br /> 3. 原始矩陣 A 是一個很大的稀疏矩陣,而後利用 ALS 分解成近似兩個矩陣 B 和 C 的乘,另外兩個矩陣就比較密集,並且 B 矩陣的列能夠解釋爲一個事物的幾個方面。<br /> 4. 用戶 k 對對象 h 的喜愛程度就能夠經過矩陣 B 的 k 行乘 矩陣 C 的 h 列獲得 |
使用場景 | 當用戶和推薦的對象自己屬性數據沒有,只存在用戶和推薦對象歷史交互數據的時候,當提煉出用戶推薦對象的關係矩陣能夠發現是一個大型的稀疏矩陣 |
算法優缺點 | 優勢: 1. 此算法可伸縮 2. 速度很快 3. 適合大數據 4.新異興趣發現、不須要領域知識 5. 隨着時間推移性能提升 6. 推薦個性化、自動化程度高 7. 能處理複雜的非結構化對象 <br /> 缺點: 1. 稀疏問題 2. 可擴展性問題 3. 新用戶問題 4. 質量取決於歷史數據集 5. 系統開始時推薦質量差 |
參考資料 | 1. 算法原理 Large-scale Parallel Collaborative Filtering for the Netflix Prize <br /> 2. MLlib實現 MLlib - Collaborative Filtering |
首先將樣例數據上傳到HDFS,若是想要在本地測試這些功能的話,須要內存數量至少 6g, 固然能夠經過減小數據量來達到通用的測試。 下面給出完整的代碼,註釋已經說明每段代碼的含義:git
package clebeg.spark.action import org.apache.spark.mllib.recommendation.{ALS, Rating} import org.apache.spark.rdd.RDD import org.apache.spark.{SparkContext, SparkConf} /** * Spark 數據挖掘實戰 案例音樂推薦 * 注意:ALS 限制每個用戶產品對必須有一個 ID,並且這個 ID 必須小於 Integer.MAX_VALUE * Created by clebeg.xie on 2015/10/29. */ object MusicRecommend { val rootDir = "F:\\clebeg\\spark\\datas\\profiledata_06-May-2005\\"; //本地測試 def main(args: Array[String]) { val conf = new SparkConf().setAppName("SparkInAction").setMaster("local[1]") val sc = new SparkContext(conf) val rawUserArtistData = sc.textFile(rootDir + "user_artist_data.txt") //檢查數據集是否超過最大值,對於計算密集型算法,原始數據集最好多分塊 //println(rawUserArtistData.first()) //println(rawUserArtistData.map(_.split(' ')(0).toDouble).stats()) //println(rawUserArtistData.map(_.split(' ')(1).toDouble).stats()) //藝術家ID和名字對應 val artistById = artistByIdFunc(sc) //藝術家名字重複 val aliasArtist = artistsAlias(sc) aslModelTest(sc, aliasArtist, rawUserArtistData, artistById) //查看一下 2093760 這個用戶真正聽的歌曲 val existingProducts = rawUserArtistData.map(_.split(' ')).filter { case Array(userId, _, _) => userId.toInt == 2093760 }.map{ case Array(_, artistId, _) => { aliasArtist.getOrElse(artistId.toInt, artistId.toInt) } }.collect().toSet artistById.filter { line => line match { case Some((id, name)) => existingProducts.contains(id) case None => false } }.collect().foreach(println) } /** * 獲取藝術家名字和ID的對應關係 * 有些藝術家名字和ID沒有按 \t 分割,錯誤處理就是放棄這些數據 * @param sc * @return */ def artistByIdFunc(sc: SparkContext): RDD[Option[(Int, String)]] = { val rawArtistData = sc.textFile(rootDir + "artist_data.txt") val artistByID = rawArtistData.map { line => //span 碰到第一個不知足條件的開始劃分, 少許的行轉換不成功, 數據質量問題 val (id, name) = line.span(_ != '\t') if (name.isEmpty) { None } else { try { //特別注意Some None缺失值處理的方式,Scala 中很是給力的一種方法 Some((id.toInt, name.trim)) } catch { case e: NumberFormatException => None } } } artistByID } /** * 經過文件 artist_alias.txt 獲得全部藝術家的別名 * 文件不大,每一行按照 \t 分割包含一個拼錯的名字ID 還有一個正確的名字ID * 一些行沒有第一個拼錯的名字ID,直接跳過 * @param sc Spark上下文 * @return */ def artistsAlias(sc: SparkContext) = { val rawArtistAlias = sc.textFile(rootDir + "artist_alias.txt") val artistAlias = rawArtistAlias.flatMap { line => val tokens = line.split('\t') if (tokens(0).isEmpty) { None } else { Some((tokens(0).toInt, tokens(1).toInt)) } }.collectAsMap() artistAlias } def aslModelTest(sc: SparkContext, aliasArtist: scala.collection.Map[Int, Int], rawUserArtistData: RDD[String], artistById: RDD[Option[(Int, String)]] ) = { //將對應關係廣播出去,由於這個數據量不大,Spark廣播變量相似於 hive 的 mapjoin val bArtistAlias = sc.broadcast(aliasArtist) //轉換重複的藝術家的ID爲同一個ID,而後將 val trainData = rawUserArtistData.map{ line => val Array(userId, artistId, count) = line.split(' ').map(_.toInt) val finalArtistID = bArtistAlias.value.getOrElse(artistId, artistId) Rating(userId, finalArtistID, count) }.cache() //模型訓練 val model = ALS.trainImplicit(trainData, 10, 5, 0.01, 1.0) //模型創建以後,爲某個用戶給出一個具體的推薦列表 val recommendations = model.recommendProducts(2093760, 5) //爲ID爲2093760的用戶推薦5個產品 recommendations.foreach(println) val recommendedProductIDs = recommendations.map(_.product).toSet //輸出推薦的藝術家的名字 artistById.filter { line => line match { case Some((id, name)) => recommendedProductIDs.contains(id) case None => false } }.collect().foreach(println) } }
代碼中模型訓練好以後,預測了用戶 2093760 的推薦結果,我測試結果以下,因爲裏面代碼使用了隨機生成初始矩陣,每一個人的結果都有可能不同。github
Some((2814,50 Cent)) Some((829,Nas)) Some((1003249,Ludacris)) Some((1001819,2Pac)) Some((1300642,The Game))
代碼中也給出了該用戶之前聽過的藝術家的名字以下:算法
Some((1180,David Gray)) Some((378,Blackalicious)) Some((813,Jurassic 5)) Some((1255340,The Saw Doctors)) Some((942,Xzibit))
auc評價方法apache
def areaUnderCurve( positiveData: RDD[Rating], bAllItemIDs: Broadcast[Array[Int]], predictFunction: (RDD[(Int,Int)] => RDD[Rating])) = { // What this actually computes is AUC, per user. The result is actually something // that might be called "mean AUC". // Take held-out data as the "positive", and map to tuples val positiveUserProducts = positiveData.map(r => (r.user, r.product)) // Make predictions for each of them, including a numeric score, and gather by user val positivePredictions = predictFunction(positiveUserProducts).groupBy(_.user) // BinaryClassificationMetrics.areaUnderROC is not used here since there are really lots of // small AUC problems, and it would be inefficient, when a direct computation is available. // Create a set of "negative" products for each user. These are randomly chosen // from among all of the other items, excluding those that are "positive" for the user. val negativeUserProducts = positiveUserProducts.groupByKey().mapPartitions { // mapPartitions operates on many (user,positive-items) pairs at once userIDAndPosItemIDs => { // Init an RNG and the item IDs set once for partition val random = new Random() val allItemIDs = bAllItemIDs.value userIDAndPosItemIDs.map { case (userID, posItemIDs) => val posItemIDSet = posItemIDs.toSet val negative = new ArrayBuffer[Int]() var i = 0 // Keep about as many negative examples per user as positive. // Duplicates are OK while (i < allItemIDs.size && negative.size < posItemIDSet.size) { val itemID = allItemIDs(random.nextInt(allItemIDs.size)) if (!posItemIDSet.contains(itemID)) { negative += itemID } i += 1 } // Result is a collection of (user,negative-item) tuples negative.map(itemID => (userID, itemID)) } } }.flatMap(t => t) // flatMap breaks the collections above down into one big set of tuples // Make predictions on the rest: val negativePredictions = predictFunction(negativeUserProducts).groupBy(_.user) // Join positive and negative by user positivePredictions.join(negativePredictions).values.map { case (positiveRatings, negativeRatings) => // AUC may be viewed as the probability that a random positive item scores // higher than a random negative one. Here the proportion of all positive-negative // pairs that are correctly ranked is computed. The result is equal to the AUC metric. var correct = 0L var total = 0L // For each pairing, for (positive <- positiveRatings; negative <- negativeRatings) { // Count the correctly-ranked pairs if (positive.rating > negative.rating) { correct += 1 } total += 1 } // Return AUC: fraction of pairs ranked correctly correct.toDouble / total }.mean() // Return mean AUC over users }
能夠經過調整參數看 auc 的結果來反覆選擇網絡