推薦引擎背後的想法是預測人們可能喜愛的物品並經過探尋物品之間的聯繫來輔助這個過程
在學習Spark機器學習這本書時,書上用scala完成,本身不熟悉遂用pyshark完成,更深刻的理解了spark對協同過濾的實現python
在這裏咱們的推薦模型選用協同過濾這種類型,使用Spark的MLlib中推薦模型庫中基於矩陣分解(matrix factorization)的實現。算法
協同過濾簡單來講是利用某興趣相投、擁有共同經驗之羣體的喜愛來推薦用戶感興趣的信息,我的經過合做的機制給予信息至關程度的迴應(如評分)並記錄下來以達到過濾的目的進而幫助別人篩選信息,迴應不必定侷限於特別感興趣的,特別不感興趣信息的紀錄也至關重要。
很簡單的例子來介紹就是平常咱們生活中常常找電影會經過向和本身品味相似的朋友要求推薦,這就是協同過濾的思想app
基於用戶或物品的方法的得分取決於若干用戶或是物品之間依據類似度所構成的集合(即鄰居),故它們也常被稱爲最鄰近模型。機器學習
這裏咱們要處理的數據是用戶提供的自身偏好數據,即用戶對物品的打分數據。函數
這些數據能夠被轉換成用戶爲行,物品爲列的二維矩陣,即評分矩陣A(m*n)表示m個用戶對n個物品的打分狀況oop
UI | i1 | i2 | i3 |
---|---|---|---|
u1 | 3.0 | 3.0 | ? |
u2 | ? | 2.0 | 4.0 |
u3 | ? | 5.0 | ? |
這個矩陣A不少元素都是空的,咱們稱其爲缺失值(missing value)。源碼分析
協同過濾提出了一種支持不完整評分矩陣的矩陣分解方法,不用對評分矩陣進行估值填充。學習
在推薦系統中,咱們但願獲得用戶對全部物品的打分狀況,若是用戶沒有對一個物品打分,那麼就須要預測用戶是否會對該物品打分,以及會打多少分。這就是所謂的矩陣補全(矩陣分解)優化
對於這個矩陣分解的方式就是找出兩個低維度的矩陣,使得他們的乘積是原始的矩陣。lua
所以這也是一種降維技術。要找到和‘用戶-物品’矩陣近似的k維矩陣,最終要求得出表示用戶的m x k維矩陣,和一個表示物品的k x n維矩陣。
這兩個矩陣也稱做因子矩陣。
對於k的理解爲對於每一個產品,這裏已電影爲例,能夠從k個角度進行評價,即k個特徵值
因爲是對‘用戶-物品’矩陣直接建模,用這些模型進行預測也相對直接,要計算給定用戶對某個物品的預計評級,就從用戶因子矩陣和物品因子矩陣分別選取對應的行與列,而後計算二者的點積。
假設對於用戶A,該用戶對一部電影的綜合評分和電影的特徵值存在必定的線性關係,
即電影的綜合評分=(a1d1+a2d2+a3d3+a4d4)
其中a1-4爲用戶A的特徵值,d1-4爲以前所說的電影的特徵值
最小二乘法(Alternating Least Squares, ALS)是一種求解矩陣分解問題的最優化方法。它功能強大、效果理想並且被證實相對容易並行化。這使得它很適合如Spark這樣的平臺。
使用用戶特徵矩陣$ U(m*k) $ 中的第i個用戶的特徵向量$ u_i $ ,
和產品特徵矩陣$ V(n*k) $第j個物品的特徵向量$ v_i $來預測打分矩陣$ A(m*n) $中的$ a_{ij} $,
得出矩陣分解模型的損失函數以下
$$ \large C = \sum\limits_{(i,j)\in R}[(a_{ij} - u_iv_j^T)^2+\lambda(u_i^2+v_j^2)] $$
一般的優化方法分爲兩種:交叉最小二乘法(alternative least squares)和隨機梯度降低法(stochastic gradient descent)。Spark使用的是交叉最小二乘法(ALS)來最優化損失函數。
算法的思想就是:咱們先隨機生成而後固定它求解,再固定求解,這樣交替進行下去,直到取得最優解$ min(C) $
咱們這裏的數據集是Movielens 100k數據集,包含了多個用戶對多部電影的10萬次評級數據
讀取評級數據集,該數據包括用戶ID,影片ID,星級和時間戳等字段,使用/t分隔
經過sc.textFile()讀取數據爲RDD,經過分隔符取前3個屬性分別爲用戶ID,影片ID,星級
rawData = sc.textFile('/home/null/hadoop/data/ml-100k/u.data') rawData.first() type(rawData)
pyspark.rdd.RDD
rawRatings = rawData.map(lambda line: line.split("\t")[:3]) rawRatings.first()
['196', '242', '3']
# 導入spark中的mllib的推薦庫 import pyspark.mllib.recommendation as rd
生成Rating類的RDD數據
# 因爲ALS模型須要由Rating記錄構成的RDD做爲參數,所以這裏用rd.Rating方法封裝數據 ratings = rawRatings.map(lambda line: rd.Rating(int(line[0]), int(line[1]), float(line[2]))) ratings.first()
Rating(user=196, product=242, rating=3.0)
# 訓練ALS模型 model = rd.ALS.train(ratings, 50, 10, 0.01) model
<pyspark.mllib.recommendation.MatrixFactorizationModel at 0x7f53cc29c710>
# 對用戶789預測其對電影123的評級 predictedRating = model.predict(789,123) predictedRating
3.1740832151065774
# 獲取對用戶789的前10推薦 topKRecs = model.recommendProducts(789,10) topKRecs
[Rating(user=789, product=429, rating=6.302989890089658), Rating(user=789, product=496, rating=5.782039583864358), Rating(user=789, product=651, rating=5.665266358968961), Rating(user=789, product=250, rating=5.551256887914674), Rating(user=789, product=64, rating=5.5336980239740186), Rating(user=789, product=603, rating=5.468600343790217), Rating(user=789, product=317, rating=5.442052952711695), Rating(user=789, product=480, rating=5.414042111530209), Rating(user=789, product=180, rating=5.413309515550101), Rating(user=789, product=443, rating=5.400024900653429)]
這裏首先將電影的數據讀入,講數據處理爲電影ID到標題的映射
而後獲取某個用戶評級前10的影片同推薦這個用戶的前10影片進行比較
#檢查推薦內容 movies = sc.textFile('/home/null/hadoop/data/ml-100k/u.item') movies.first()
'1|Toy Story (1995)|01-Jan-1995||http://us.imdb.com/M/title-exact?Toy%20Story%20(1995)|0|0|0|1|1|1|0|0|0|0|0|0|0|0|0|0|0|0|0'
titles_data= movies.map(lambda line: line.split("|")[:2]).collect() titles = dict(titles_data) titles
moviesForUser = ratings.keyBy(lambda rating: rating.user).lookup(789) type(moviesForUser)
list
moviesForUser = sorted(moviesForUser,key=lambda r: r.rating, reverse=True)[0:10] moviesForUser
[Rating(user=789, product=127, rating=5.0), Rating(user=789, product=475, rating=5.0), Rating(user=789, product=9, rating=5.0), Rating(user=789, product=50, rating=5.0), Rating(user=789, product=150, rating=5.0), Rating(user=789, product=276, rating=5.0), Rating(user=789, product=129, rating=5.0), Rating(user=789, product=100, rating=5.0), Rating(user=789, product=741, rating=5.0), Rating(user=789, product=1012, rating=4.0)]
[(titles[str(r.product)], r.rating) for r in moviesForUser]
[('Godfather, The (1972)', 5.0), ('Trainspotting (1996)', 5.0), ('Dead Man Walking (1995)', 5.0), ('Star Wars (1977)', 5.0), ('Swingers (1996)', 5.0), ('Leaving Las Vegas (1995)', 5.0), ('Bound (1996)', 5.0), ('Fargo (1996)', 5.0), ('Last Supper, The (1995)', 5.0), ('Private Parts (1997)', 4.0)]
[(titles[str(r.product)], r.rating) for r in topKRecs]
[('Day the Earth Stood Still, The (1951)', 6.302989890089658), ("It's a Wonderful Life (1946)", 5.782039583864358), ('Glory (1989)', 5.665266358968961), ('Fifth Element, The (1997)', 5.551256887914674), ('Shawshank Redemption, The (1994)', 5.5336980239740186), ('Rear Window (1954)', 5.468600343790217), ('In the Name of the Father (1993)', 5.442052952711695), ('North by Northwest (1959)', 5.414042111530209), ('Apocalypse Now (1979)', 5.413309515550101), ('Birds, The (1963)', 5.400024900653429)]
定義爲各平方偏差的和與總數目的商,其中平方偏差是指預測到的評級與真實評級的差值平方
直接度量模型的預測目標變量的好壞
對MSE取其平方根,即預計評級和實際評級的差值的標準差
# evaluation metric usersProducts = ratings.map(lambda r:(r.user, r.product)) predictions = model.predictAll(usersProducts).map(lambda r: ((r.user, r.product),r.rating)) predictions.first()
((316, 1084), 4.006135662882842)
ratingsAndPredictions = ratings.map(lambda r: ((r.user,r.product), r.rating)).join(predictions) ratingsAndPredictions.first()
((186, 302), (3.0, 2.7544572973050236))
# 使用MLlib內置的評估函數計算MSE,RMSE from pyspark.mllib.evaluation import RegressionMetrics predictionsAndTrue = ratingsAndPredictions.map(lambda line: (line[1][0],line[1][3])) predictionsAndTrue.first()
(3.0, 2.7544572973050236)
# MSE regressionMetrics = RegressionMetrics(predictionsAndTrue) regressionMetrics.meanSquaredError
0.08509832708963357
# RMSE regressionMetrics.rootMeanSquaredError
0.2917161755707653
參考: