Spark機器學習之協同過濾算法算法
一)、協同過濾apache
1.1 概念 數組
協同過濾是一種藉助"集體計算"的途徑。它利用大量已有的用戶偏好來估計用戶對其未接觸過的物品的喜愛程度。其內在思想是類似度的定義eclipse
1.2 分類機器學習
1.在基於用戶的方法的中,若是兩個用戶表現出類似的偏好(即對相同物品的偏好大致相同),那就認爲他們的興趣相似。要對他們中的一個用戶推薦一個未知物品,分佈式
即可選取若干與其相似的用戶並根據他們的喜愛計算出對各個物品的綜合得分,再以得分來推薦物品。其總體的邏輯是,若是其餘用戶也偏好某些物品,那這些物品極可能值得推薦。函數
2. 一樣也能夠藉助基於物品的方法來作推薦。這種方法一般根據現有用戶對物品的偏好或是評級狀況,來計算物品之間的某種類似度。oop
這時,類似用戶評級相同的那些物品會被認爲更相近。一旦有了物品之間的類似度,即可用用戶接觸過的物品來表示這個用戶,而後找出和這些已知物品類似的那些物品,學習
並將這些物品推薦給用戶。一樣,與已有物品類似的物品被用來生成一個綜合得分,而該得分用於評估未知物品的類似度。測試
二)、矩陣分解
Spark推薦模型庫當前只包含基於矩陣分解(matrix factorization)的實現,由此咱們也將重點關注這類模型。它們有吸引人的地方。首先,這些模型在協同過濾
中的表現十分出色。而在Netflix Prize等知名比賽中的表現也很拔尖
1,顯式矩陣分解
要找到和「用戶物品」矩陣近似的k維(低階)矩陣,最終要求出以下兩個矩陣:一個用於表示用戶的U × k維矩陣,以及一個表徵物品的I × k維矩陣。
這兩個矩陣也稱做因子矩陣。它們的乘積即是原始評級矩陣的一個近似。值得注意的是,原始評級矩陣一般很稀疏,但因子矩陣倒是稠密的。
特色:
因子分解類模型的好處在於,一旦創建了模型,對推薦的求解便相對容易。但也有弊端,即當用戶和物品的數量不少時,其對應的物品或是用戶的因子向量可能達到數以百萬計。
這將在存儲和計算能力上帶來挑戰。另外一個好處是,這類模型的表現一般都很出色。
2,隱式矩陣分解(關聯因子分肯定,可能隨時會變化)
隱式模型仍然會建立一個用戶因子矩陣和一個物品因子矩陣。可是,模型所求解的是偏好矩陣而非評級矩陣的近似。相似地,此時用戶因子向量和物品因子向量的點積所獲得的分數
也再也不是一個對評級的估值,而是對某個用戶對某一物品偏好的估值(該值的取值雖並不嚴格地處於0到1之間,但十分趨近於這個區間)
3,最小二乘法(Alternating Least Squares ALS):解決矩陣分解的最優化方法
ALS的實現原理是迭代式求解一系列最小二乘迴歸問題。在每一次迭代時,固定用戶因子矩陣或是物品因子矩陣中的一個,而後用固定的這個矩陣以及評級數據來更新另外一個矩陣。
以後,被更新的矩陣被固定住,再更新另一個矩陣。如此迭代,直到模型收斂(或是迭代了預設好的次數)。
三)、Spark下ALS算法的應用
1,數據來源電影集ml-100k
2,代碼實現
基於用戶類似度片斷代碼:
val movieFile=sc.textFile(fileName) val RatingDatas=movieFile.map(_.split("\t").take(3)) //轉爲Ratings數據 val ratings=RatingDatas.map(x =>Rating(x(0).toInt,x(1).toInt,x(2).toDouble)) //獲取用戶評價模型,設置k因子,和迭代次數,隱藏因子lambda,獲取模型 val model=ALS.train(ratings,50,10,0.01) //基於用戶類似度推薦 println("userNumber:"+model.userFeatures.count()+"\t"+"productNum:"+model.productFeatures.count()) //指定用戶及商品,輸出預測值 println(model.predict(789,123)) //爲指定用戶推薦的前N商品 model.recommendProducts(789,11).foreach(println(_)) //爲每一個人推薦前十個商品 model.recommendProductsForUsers(10).take(1).foreach{ case(x,rating) =>println(rating(0)) }
基於商品類似度代碼:
計算類似度的方法有類似度是經過某種方式比較表示兩個物品的向量而獲得的。常見的類似度衡量方法包括皮爾森相關係數(Pearson correlation)、針對實數向量的餘弦相
似度(cosine similarity)和針對二元向量的傑卡德類似係數(Jaccard similarity)。
val itemFactory=model.productFeatures.lookup(567).head val itemVector=new DoubleMatrix(itemFactory) //求餘弦類似度 val sim=model.productFeatures.map{ case(id,factory)=> val factorVector=new DoubleMatrix(factory) val sim=cosineSimilarity(factorVector,itemVector) (id,sim) } val sortedsim=sim.top(11)(Ordering.by[(Int,Double),Double]{ case(id,sim)=>sim }) println(sortedsim.take(10).mkString("\n")) def cosineSimilarity(vec1:DoubleMatrix,vec2:DoubleMatrix):Double={ vec1.dot(vec2)/(vec1.norm2()*vec2.norm2()) }
均方差評估模型代碼:
//模型評估,經過均偏差 //實際用戶評估值 val actualRatings=ratings.map{ case Rating(user,item,rats) => ((user,item),rats) } val userItems=ratings.map{ case(Rating(user,item,rats)) => (user,item) } //模型的用戶對商品的預測值 val predictRatings=model.predict(userItems).map{ case(Rating(user,item,rats)) =>((user,item),rats) } //聯合獲取rate值 val rates=actualRatings.join(predictRatings).map{ case x =>(x._2._1,x._2._2) } //求均方差 val regressionMetrics=new RegressionMetrics(rates) //越接近0越佳 println(regressionMetrics.meanSquaredError)
全局準確率評估(MAP):使用MLlib的 RankingMetrics 類來計算基於排名的評估指標。相似地,須要向咱們以前的平均準確率函數傳入一個鍵值對類型的RDD。
其鍵爲給定用戶預測的推薦物品的ID數組,而值則是實際的物品ID數組。
//全局平均準確率(MAP) val itemFactors = model.productFeatures.map { case (id, factor) => factor }.collect() val itemMatrix = new DoubleMatrix(itemFactors) //分佈式廣播商品的特徵矩陣 val imBroadcast = sc.broadcast(itemMatrix) //計算每個用戶的推薦,在這個操做裏,會對用戶因子矩陣和電影因子矩陣作乘積,其結果爲一個表示各個電影預計評級的向量(長度爲 //1682,即電影的總數目) val allRecs = model.userFeatures.map{ case (userId, array) => val userVector = new DoubleMatrix(array) val scores = imBroadcast.value.mmul(userVector) val sortedWithId = scores.data.zipWithIndex.sortBy(-_._1) val recommendedIds = sortedWithId.map(_._2 + 1).toSeq //+1,矩陣從0開始 (userId, recommendedIds) } //實際評分 val userMovies = ratings.map{ case Rating(user, product, rating) => (user, product)}.groupBy(_._1) val predictedAndTrueForRanking = allRecs.join(userMovies).map{ case (userId, (predicted, actualWithIds)) => val actual = actualWithIds.map(_._2) (predicted.toArray, actual.toArray) } //求MAP,越大越好吧 val rankingMetrics = new RankingMetrics(predictedAndTrueForRanking) println("Mean Average Precision = " + rankingMetrics.meanAveragePrecision)
詳細代碼:
package com.spark.milb.study import org.apache.log4j.{Level, Logger} import org.apache.spark.mllib.evaluation.{RankingMetrics, RegressionMetrics} import org.apache.spark.mllib.recommendation.{ALS, Rating} import org.apache.spark.{SparkConf, SparkContext} import org.jblas.DoubleMatrix /** * Created by hadoop on 17-5-3. * 協同過濾(處理對象movie,使用算法ALS:最小二乘法(實現用戶推薦) * 餘弦類似度實現商品類似度推薦 */ object cfTest { def main(args: Array[String]): Unit = { Logger.getLogger("org.apache.spark").setLevel(Level.WARN) Logger.getLogger("org.eclipse.jetty.server").setLevel(Level.OFF) val conf=new SparkConf().setMaster("local").setAppName("AlsTest") val sc=new SparkContext(conf) CF(sc,"ml-100k/u.data") } def CF(sc:SparkContext,fileName:String): Unit ={ val movieFile=sc.textFile(fileName) val RatingDatas=movieFile.map(_.split("\t").take(3)) //轉爲Ratings數據 val ratings=RatingDatas.map(x =>Rating(x(0).toInt,x(1).toInt,x(2).toDouble)) //獲取用戶評價模型,設置k因子,和迭代次數,隱藏因子lambda,獲取模型 /* * rank :對應ALS模型中的因子個數,也就是在低階近似矩陣中的隱含特徵個數。因子個 數通常越多越好。但它也會直接影響模型訓練和保存時所需的內存開銷,尤爲是在用戶 和物品不少的時候。所以實踐中該參數常做爲訓練效果與系統開銷之間的調節參數。通 常,其合理取值爲10到200。 iterations :對應運行時的迭代次數。ALS能確保每次迭代都能下降評級矩陣的重建誤 差,但通常經少數次迭代後ALS模型便已能收斂爲一個比較合理的好模型。這樣,大部分 狀況下都不必迭代太屢次(10次左右通常就挺好)。 lambda :該參數控制模型的正則化過程,從而控制模型的過擬合狀況。其值越高,正則 化越嚴厲。該參數的賦值與實際數據的大小、特徵和稀疏程度有關。和其餘的機器學習 模型同樣,正則參數應該經過用非樣本的測試數據進行交叉驗證來調整。 * */ val model=ALS.train(ratings,50,10,0.01) //基於用戶類似度推薦 println("userNumber:"+model.userFeatures.count()+"\t"+"productNum:"+model.productFeatures.count()) //指定用戶及商品,輸出預測值 println(model.predict(789,123)) //爲指定用戶推薦的前N商品 model.recommendProducts(789,11).foreach(println(_)) //爲每一個人推薦前十個商品 model.recommendProductsForUsers(10).take(1).foreach{ case(x,rating) =>println(rating(0)) } //基於商品類似度(使用餘弦類似度)進行推薦,獲取某個商品的特徵值 val itemFactory=model.productFeatures.lookup(567).head val itemVector=new DoubleMatrix(itemFactory) //求餘弦類似度 val sim=model.productFeatures.map{ case(id,factory)=> val factorVector=new DoubleMatrix(factory) val sim=cosineSimilarity(factorVector,itemVector) (id,sim) } val sortedsim=sim.top(11)(Ordering.by[(Int,Double),Double]{ case(id,sim)=>sim }) println(sortedsim.take(10).mkString("\n")) //模型評估,經過均偏差 //實際用戶評估值 val actualRatings=ratings.map{ case Rating(user,item,rats) => ((user,item),rats) } val userItems=ratings.map{ case(Rating(user,item,rats)) => (user,item) } //模型的用戶對商品的預測值 val predictRatings=model.predict(userItems).map{ case(Rating(user,item,rats)) =>((user,item),rats) } //聯合獲取rate值 val rates=actualRatings.join(predictRatings).map{ case x =>(x._2._1,x._2._2) } //求均方差 val regressionMetrics=new RegressionMetrics(rates) //越接近0越佳 println(regressionMetrics.meanSquaredError) //全局平均準確率(MAP) val itemFactors = model.productFeatures.map { case (id, factor) => factor }.collect() val itemMatrix = new DoubleMatrix(itemFactors) //分佈式廣播商品的特徵矩陣 val imBroadcast = sc.broadcast(itemMatrix) //計算每個用戶的推薦,在這個操做裏,會對用戶因子矩陣和電影因子矩陣作乘積,其結果爲一個表示各個電影預計評級的向量(長度爲 //1682,即電影的總數目) val allRecs = model.userFeatures.map{ case (userId, array) => val userVector = new DoubleMatrix(array) val scores = imBroadcast.value.mmul(userVector) val sortedWithId = scores.data.zipWithIndex.sortBy(-_._1) val recommendedIds = sortedWithId.map(_._2 + 1).toSeq //+1,矩陣從0開始 (userId, recommendedIds) } //實際評分 val userMovies = ratings.map{ case Rating(user, product, rating) => (user, product)}.groupBy(_._1) val predictedAndTrueForRanking = allRecs.join(userMovies).map{ case (userId, (predicted, actualWithIds)) => val actual = actualWithIds.map(_._2) (predicted.toArray, actual.toArray) } //求MAP,越大越好吧 val rankingMetrics = new RankingMetrics(predictedAndTrueForRanking) println("Mean Average Precision = " + rankingMetrics.meanAveragePrecision) } //餘弦類似度計算 def cosineSimilarity(vec1:DoubleMatrix,vec2:DoubleMatrix):Double={ vec1.dot(vec2)/(vec1.norm2()*vec2.norm2()) } }