提到ALS相信你們應該都不會以爲陌生,它是協同過濾的一種,並被集成到Spark的Mllib庫中。本文就ALS的基本原理進行講解,並手把手、肩並肩地帶您實現這一算法。git
完整代碼須要的朋友能夠加我君羊705673780聯繫管理,免費獲取。github
咱們用人話而不是大段的數學公式來說講ALS是怎麼一回事。算法
1.1 你據說過推薦算法麼編程
假如我是豆瓣的CEO,不少豆瓣的用戶在豆瓣電影上都會對電影進行評分。那麼根據這個評分數據,咱們有可能知道這些用戶除了本身評過度的電影以外還喜歡或討厭哪些電影嗎?這就是一個典型的推薦問題,解決這一類問題的算法被稱爲推薦算法。網絡
1.2 什麼是協同過濾less
協同過濾的英文全稱是Collaborative Filtering,簡稱CF。注意,這不是一款遊戲!從字面上分析,協同就是尋找共同點,過濾就是篩選出優質的內容。dom
1.3 協同過濾的分類編程語言
通常來講,協同過濾推薦分爲三種類型:函數
2.基於物品(item-based)的協同過濾,經過計算物品和物品的類似度找到跟物品1類似的物品2, 3, 4…再把這些物品推薦給看過物品1的用戶們;工具
1.4 矩陣分解
矩陣分解 (decomposition, factorization)是將矩陣拆解爲數個矩陣的乘積。好比豆瓣電影有m個用戶,n個電影。那麼用戶對電影的評分能夠造成一個m行n列的矩陣R,咱們能夠找到一個m行k列的矩陣U,和一個k行n列的矩陣I,經過U * I來獲得矩陣R。
1.5 ALS
若是想經過矩陣分解的方法實現基於模型的協同過濾,ALS是一個不錯的選擇,其英文全稱是Alternating Least Square,翻譯過來是交替最小二乘法。假設用戶爲a,物品爲b,評分矩陣爲R(m, n),可分解爲用戶矩陣U(k, m)和物品矩陣I(k, n),其中m, n, k表明矩陣的維度。前方小段數學公式低能預警:
1.6 求解用戶矩陣U和物品矩陣I
矩陣R是已知的,咱們隨機生成用戶矩陣U, 1. 利用1.5中的式五、R和U求出I 2. 利用1.5中的式六、R和I求出U
如此交替地執行步驟1和步驟2,直到算法收斂或者迭代次數超過了最大限制,最終咱們用RMSE來評價模型的好壞。
本人用全宇宙最簡單的編程語言——Python實現了ALS算法,沒有依賴任何第三方庫,便於學習和使用。簡單說明一下實現過程,更詳細的註釋請參考本人github上的代碼。
注:代碼中用到的Matrix類是我寫的一個矩陣類,能夠取出矩陣的行或列,計算矩陣的乘法、轉置和逆。代碼連接:matrix.py
2.1 建立ALS類
初始化,存儲用戶ID、物品ID、用戶ID與用戶矩陣列號的對應關係、物品ID與物品矩陣列號的對應關係、用戶已經看過哪些物品、評分矩陣的Shape以及RMSE。
classALS(object): def__init__(self): self.user_ids =None self.item_ids =None self.user_ids_dict =None self.item_ids_dict =None self.user_matrix =None self.item_matrix =None self.user_items =None self.shape =None self.rmse =None
2.2 數據預處理
對訓練數據進行處理,獲得用戶ID、物品ID、用戶ID與用戶矩陣列號的對應關係、物品ID與物品矩陣列號的對應關係、評分矩陣的Shape、評分矩陣及評分矩陣的轉置。
def _process_data(self, X): self.user_ids = tuple((set(map(lambda x: x[0], X)))) self.user_ids_dict = dict(map(lambda x: x[::-1], enumerate(self.user_ids))) self.item_ids = tuple((set(map(lambda x: x[1], X)))) self.item_ids_dict = dict(map(lambda x: x[::-1], enumerate(self.item_ids))) self.shape = (len(self.user_ids),len(self.item_ids)) ratings = defaultdict(lambda: defaultdict(int)) ratings_T = defaultdict(lambda: defaultdict(int)) forrowinX: user_id, item_id, rating =row ratings[user_id][item_id] = rating ratings_T[item_id][user_id] = rating err_msg ="Length of user_ids %d and ratings %d not match!"% ( len(self.user_ids),len(ratings)) assertlen(self.user_ids) ==len(ratings), err_msg err_msg ="Length of item_ids %d and ratings_T %d not match!"% ( len(self.item_ids),len(ratings_T)) assertlen(self.item_ids) ==len(ratings_T), err_msg returnratings, ratings_T
2.3 用戶矩陣乘以評分矩陣
實現稠密矩陣與稀疏矩陣的矩陣乘法,獲得用戶矩陣與評分矩陣的乘積。
def_users_mul_ratings(self, users, ratings_T): deff(users_row, item_id): user_ids = iter(ratings_T[item_id].keys()) scores = iter(ratings_T[item_id].values()) col_nos = map(lambdax:self.user_ids_dict[x], user_ids) _users_row = map(lambdax:users_row[x], col_nos) returnsum(a * bfora, binzip(_users_row, scores)) ret = [[f(users_row, item_id)foritem_idinself.item_ids] forusers_rowinusers.data] returnMatrix(ret)
2.4 物品矩陣乘以評分矩陣
實現稠密矩陣與稀疏矩陣的矩陣乘法,獲得物品矩陣與評分矩陣的乘積。
def_items_mul_ratings(self, items, ratings): deff(items_row, user_id): item_ids = iter(ratings[user_id].keys()) scores = iter(ratings[user_id].values()) col_nos = map(lambdax:self.item_ids_dict[x], item_ids) _items_row = map(lambdax:items_row[x], col_nos) returnsum(a * bfora, binzip(_items_row, scores)) ret = [[f(items_row, user_id)foruser_idinself.user_ids] foritems_rowinitems.data] returnMatrix(ret)
2.5 生成隨機矩陣
def_gen_random_matrix(self, n_rows, n_colums): data = [[random()for_inrange(n_colums)]for_inrange(n_rows)] returnMatrix(data)
2.6 計算RMSE
def_get_rmse(self, ratings): m, n = self.shape mse = 0.0 n_elements = sum(map(len, ratings.values())) for i in range(m): for j in range(n): user_id = self.user_ids[i] item_id = self.item_ids[j] rating = ratings[user_id][item_id] if rating > 0: user_row = self.user_matrix.col(i).transpose item_col = self.item_matrix.col(j) rating_hat = user_row.mat_mul(item_col).data[0][0] square_error = (rating - rating_hat) ** 2 mse += square_error / n_elements return mse ** 0.5
2.7 訓練模型
1.數據預處理
2.變量k合法性檢查
3.生成隨機矩陣U
4.交替計算矩陣U和矩陣I,並打印RMSE信息,直到迭代次數達到max_iter
5.保存最終的RMSE
deffit(self, X, k, max_iter=10): ratings, ratings_T =self._process_data(X) self.user_items = {k:set(v.keys())fork, vinratings.items()} m, n =self.shape error_msg ="Parameter k must be less than the rank of original matrix" assert k < min(m, n), error_msg self.user_matrix =self._gen_random_matrix(k, m) foriinrange(max_iter): ifi %2: items =self.item_matrix self.user_matrix =self._items_mul_ratings( items.mat_mul(items.transpose).inverse.mat_mul(items), ratings ) else: users =self.user_matrix self.item_matrix =self._users_mul_ratings( users.mat_mul(users.transpose).inverse.mat_mul(users), ratings_T ) rmse =self._get_rmse(ratings) print("Iterations: %d, RMSE: %.6f"% (i +1, rmse)) self.rmse = rmse
2.8 預測一個用戶
預測一個用戶感興趣的內容,剔除用戶已看過的內容。而後按感興趣分值排序,取出前n_items個內容。
def_predict(self, user_id, n_items): users_col =self.user_matrix.col(self.user_ids_dict[user_id]) users_col = users_col.transpose items_col = enumerate(users_col.mat_mul(self.item_matrix).data[0]) items_scores = map(lambdax:(self.item_ids[x[0]], x[1]), items_col) viewed_items =self.user_items[user_id] items_scores = filter(lambdax:x[0]notinviewed_items, items_scores) returnsorted(items_scores, key=lambdax:x[1], reverse=True)[:n_items]
2.9 預測多個用戶
循環調用2.8,預測多個用戶感興趣的內容。
defpredict(self, user_ids, n_items=10): return[self._predict(user_id, n_items)foruser_idinuser_ids]
3 效果評估
3.1 main函數
使用電影評分數據集,訓練模型並統計RMSE。
@run_time defmain(): print("Tesing the accuracy of ALS...") X = load_movie_ratings() model = ALS() model.fit(X, k=3, max_iter=5) print() print("Showing the predictions of users...") user_ids = range(1,5) predictions = model.predict(user_ids, n_items=2) foruser_id, predictioninzip(user_ids, predictions): _prediction = [format_prediction(item_id, score) foritem_id, scoreinprediction] print("User id:%d recommedation: %s"% (user_id, _prediction))
3.2 效果展現
設置k=3,迭代5次,並展現了前4個用戶的推薦內容,最終RMSE爲0.370,運行時間46.5秒,效果還算不錯~
3.3 工具函數
本人自定義了一些工具函數,能夠在github上查看
1.run_time - 測試函數運行時間
2.load_movie_ratings - 加載電影評分數據
總結
ALS的原理:雞生蛋、蛋生雞
ALS的實現:基本上就是矩陣乘法