上一篇咱們講了基於用戶的協同過濾算法,基本流程就是尋找與目標用戶興趣類似的用戶,按照他們對物品喜愛的對目標用戶進行推薦,其中哪些類似用戶的評分要帶上目標用戶與類似用戶的類似度做爲權重來計算。可是,基於用戶的協同過濾算法存在一些弊端,如計算用戶興趣類似度矩陣將愈來愈困難,其運算時間複雜度和空間複雜度的增加和用戶數的增加近似於平方關係,另外也很難對推薦結果進行解釋。那麼,這一篇咱們繼續來了解一下基於物品的協同過濾算法。python
基於物品的協同過濾算法是大多數網站經常使用的推薦算法的基礎。ItemCF不會利用物品的內容屬性計算物品之間的類似度,而是分析用戶的行爲記錄計算物品之間的類似度。那麼,ItemCF主要分爲兩個步驟:git
咱們能夠用以下公式定義物品的類似度:github
\(N(i)\)是指喜歡物品\(i\)的用戶數量,分子部分表示既喜歡物品\(i\)又喜歡物品\(j\)的用戶有多少,整個類似度公式表示的是喜歡物品\(i\)的用戶中,同時喜歡物品\(j\)的用戶比例是多少。可使用歸一化以後的結果做爲物品類似度。可是若是物品\(j\)很熱門人人都喜歡,那麼整個類似度就會變成1,這對於推薦冷門物品的推薦系統來講並非好事情,因此咱們對物品類似度公式進行改進。算法
這個公式懲罰了熱門物品\(j\)的權重,必定程度緩和了這個問題。一樣的,咱們在計算物品類似度的時候能夠先創建一個用戶-物品的倒排表,僞代碼以下:app
def ItemSimilarity(train): # calculate co-rated users between items C = dict() N = dict() for u, items in train.items(): for i in users: N[i] += 1 for j in users: if i == j: continue C[i][j] += 1 # finial similarity matrix W W = dict() for i, related_items in C.items(): for j, cij in related_items.items(): W[u][v] = cij / math.sqrt(N[i] * N[j]) return W
ItemCF雖然沒有利用內容屬性計算類似度,可是最後獲得的結果仍然是內容上某種類似的,如同主演,同分類等等的電影。在獲得物品類似度以後,咱們用以下公式計算用戶對物品的興趣:dom
這裏\(N(u)\)是用戶喜歡的物品的集合,\(S(j,K)\)是和物品\(j\)最類似的\(K\)個物品的集合,\(w_{ji}\)是物品\(j\)和\(i\)的類似度,\(r_{ui}\)是用戶\(u\)對物品\(i\)的興趣。函數
def Recommendation(train, user_id, W, K): rank = dict() ru = train[user_id] for i, pi in ru.items(): for j, wj in sorted(W[i].items(), key=itemgetter(1), reverse=True)[0:K]: if j in ru: continue rank[j] += pi * wj return rank
另外加就是用戶活躍度對物品類似度產生的影響。一個不活躍的用戶含有大量的感興趣的物品,那麼會產生稠密的物品類似度大矩陣,因此活躍用戶對物品類似度的貢獻應該小於不活躍的用戶。那麼公式修正爲:測試
跟基於用戶的協同過濾的修正公式很像啊。網站
def ItemSimilarity(train): #calculate co-rated users between items C = dict() N = dict() for u, items in train.items(): for i in users: N[i] += 1 for j in users: if i == j: continue C[i][j] += 1 / math.log(1 + len(items) * 1.0) #calculate finial similarity matrix W W = dict() for i,related_items in C.items(): for j, cij in related_items.items() W[u][v] = cij / math.sqrt(N[i] * N[j]) return W
仍是得感謝@Magic-Bubble分享在github上代碼,清晰易懂,省去我重複造輪子的時間。那麼給出在MovieLens數據集上的實驗代碼:ui
# 導入包 import random import math import time from tqdm import tqdm # 定義裝飾器,監控運行時間 def timmer(func): def wrapper(*args, **kwargs): start_time = time.time() res = func(*args, **kwargs) stop_time = time.time() print('Func %s, run time: %s' % (func.__name__, stop_time - start_time)) return res return wrapper class Dataset(): def __init__(self, fp): # fp: data file path self.data = self.loadData(fp) @timmer def loadData(self, fp): data = [] for l in open(fp): data.append(tuple(map(int, l.strip().split('::')[:2]))) return data @timmer def splitData(self, M, k, seed=1): ''' :params: data, 加載的全部(user, item)數據條目 :params: M, 劃分的數目,最後須要取M折的平均 :params: k, 本次是第幾回劃分,k~[0, M) :params: seed, random的種子數,對於不一樣的k應設置成同樣的 :return: train, test ''' train, test = [], [] random.seed(seed) for user, item in self.data: # 這裏與書中的不一致,本人認爲取M-1較爲合理,因randint是左右都覆蓋的 if random.randint(0, M - 1) == k: test.append((user, item)) else: train.append((user, item)) # 處理成字典的形式,user->set(items) def convert_dict(data): data_dict = {} for user, item in data: if user not in data_dict: data_dict[user] = set() data_dict[user].add(item) data_dict = {k: list(data_dict[k]) for k in data_dict} return data_dict return convert_dict(train), convert_dict(test) class Metric(): def __init__(self, train, test, GetRecommendation): ''' :params: train, 訓練數據 :params: test, 測試數據 :params: GetRecommendation, 爲某個用戶獲取推薦物品的接口函數 ''' self.train = train self.test = test self.GetRecommendation = GetRecommendation self.recs = self.getRec() # 爲test中的每一個用戶進行推薦 def getRec(self): recs = {} for user in self.test: rank = self.GetRecommendation(user) recs[user] = rank return recs # 定義精確率指標計算方式 def precision(self): all, hit = 0, 0 for user in self.test: test_items = set(self.test[user]) rank = self.recs[user] for item, score in rank: if item in test_items: hit += 1 all += len(rank) return round(hit / all * 100, 2) # 定義召回率指標計算方式 def recall(self): all, hit = 0, 0 for user in self.test: test_items = set(self.test[user]) rank = self.recs[user] for item, score in rank: if item in test_items: hit += 1 all += len(test_items) return round(hit / all * 100, 2) # 定義覆蓋率指標計算方式 def coverage(self): all_item, recom_item = set(), set() for user in self.test: for item in self.train[user]: all_item.add(item) rank = self.recs[user] for item, score in rank: recom_item.add(item) return round(len(recom_item) / len(all_item) * 100, 2) # 定義新穎度指標計算方式 def popularity(self): # 計算物品的流行度 item_pop = {} for user in self.train: for item in self.train[user]: if item not in item_pop: item_pop[item] = 0 item_pop[item] += 1 num, pop = 0, 0 for user in self.test: rank = self.recs[user] for item, score in rank: # 取對數,防止因長尾問題帶來的被流行物品所主導 pop += math.log(1 + item_pop[item]) num += 1 return round(pop / num, 6) def eval(self): metric = { 'Precision': self.precision(), 'Recall': self.recall(), 'Coverage': self.coverage(), 'Popularity': self.popularity() } print('Metric:', metric) return metric # 1. 基於物品餘弦類似度的推薦 def ItemCF(train, K, N): ''' :params: train, 訓練數據集 :params: K, 超參數,設置取TopK類似物品數目 :params: N, 超參數,設置取TopN推薦物品數目 :return: GetRecommendation, 推薦接口函數 ''' # 計算物品類似度矩陣 sim = {} num = {} for user in train: items = train[user] for i in range(len(items)): u = items[i] if u not in num: num[u] = 0 num[u] += 1 if u not in sim: sim[u] = {} for j in range(len(items)): if j == i: continue v = items[j] if v not in sim[u]: sim[u][v] = 0 sim[u][v] += 1 for u in sim: for v in sim[u]: sim[u][v] /= math.sqrt(num[u] * num[v]) # 按照類似度排序 sorted_item_sim = {k: list(sorted(v.items(), \ key=lambda x: x[1], reverse=True)) \ for k, v in sim.items()} # 獲取接口函數 def GetRecommendation(user): items = {} seen_items = set(train[user]) for item in train[user]: for u, _ in sorted_item_sim[item][:K]: if u not in seen_items: if u not in items: items[u] = 0 items[u] += sim[item][u] recs = list(sorted(items.items(), key=lambda x: x[1], reverse=True))[:N] return recs return GetRecommendation # 2. 基於改進的物品餘弦類似度的推薦 def ItemIUF(train, K, N): ''' :params: train, 訓練數據集 :params: K, 超參數,設置取TopK類似物品數目 :params: N, 超參數,設置取TopN推薦物品數目 :return: GetRecommendation, 推薦接口函數 ''' # 計算物品類似度矩陣 sim = {} num = {} for user in train: items = train[user] for i in range(len(items)): u = items[i] if u not in num: num[u] = 0 num[u] += 1 if u not in sim: sim[u] = {} for j in range(len(items)): if j == i: continue v = items[j] if v not in sim[u]: sim[u][v] = 0 # 相比ItemCF,主要是改進了這裏 sim[u][v] += 1 / math.log(1 + len(items)) for u in sim: for v in sim[u]: sim[u][v] /= math.sqrt(num[u] * num[v]) # 按照類似度排序 sorted_item_sim = {k: list(sorted(v.items(), \ key=lambda x: x[1], reverse=True)) \ for k, v in sim.items()} # 獲取接口函數 def GetRecommendation(user): items = {} seen_items = set(train[user]) for item in train[user]: for u, _ in sorted_item_sim[item][:K]: # 要去掉用戶見過的 if u not in seen_items: if u not in items: items[u] = 0 items[u] += sim[item][u] recs = list(sorted(items.items(), key=lambda x: x[1], reverse=True))[:N] return recs return GetRecommendation # 3. 基於歸一化的物品餘弦類似度的推薦 def ItemCF_Norm(train, K, N): ''' :params: train, 訓練數據集 :params: K, 超參數,設置取TopK類似物品數目 :params: N, 超參數,設置取TopN推薦物品數目 :return: GetRecommendation, 推薦接口函數 ''' # 計算物品類似度矩陣 sim = {} num = {} for user in train: items = train[user] for i in range(len(items)): u = items[i] if u not in num: num[u] = 0 num[u] += 1 if u not in sim: sim[u] = {} for j in range(len(items)): if j == i: continue v = items[j] if v not in sim[u]: sim[u][v] = 0 sim[u][v] += 1 for u in sim: for v in sim[u]: sim[u][v] /= math.sqrt(num[u] * num[v]) # 對類似度矩陣進行按行歸一化 for u in sim: s = 0 for v in sim[u]: s += sim[u][v] if s > 0: for v in sim[u]: sim[u][v] /= s # 按照類似度排序 sorted_item_sim = {k: list(sorted(v.items(), \ key=lambda x: x[1], reverse=True)) \ for k, v in sim.items()} # 獲取接口函數 def GetRecommendation(user): items = {} seen_items = set(train[user]) for item in train[user]: for u, _ in sorted_item_sim[item][:K]: if u not in seen_items: if u not in items: items[u] = 0 items[u] += sim[item][u] recs = list(sorted(items.items(), key=lambda x: x[1], reverse=True))[:N] return recs return GetRecommendation class Experiment(): def __init__(self, M, K, N, fp='../dataset/ml-1m/ratings.dat', rt='ItemCF'): ''' :params: M, 進行多少次實驗 :params: K, TopK類似物品的個數 :params: N, TopN推薦物品的個數 :params: fp, 數據文件路徑 :params: rt, 推薦算法類型 ''' self.M = M self.K = K self.N = N self.fp = fp self.rt = rt self.alg = { 'ItemCF': ItemCF, 'ItemIUF': ItemIUF, 'ItemCF-Norm': ItemCF_Norm } # 定義單次實驗 @timmer def worker(self, train, test): ''' :params: train, 訓練數據集 :params: test, 測試數據集 :return: 各指標的值 ''' getRecommendation = self.alg[self.rt](train, self.K, self.N) metric = Metric(train, test, getRecommendation) return metric.eval() # 屢次實驗取平均 @timmer def run(self): metrics = {'Precision': 0, 'Recall': 0, 'Coverage': 0, 'Popularity': 0} dataset = Dataset(self.fp) for ii in range(self.M): train, test = dataset.splitData(self.M, ii) print('Experiment {}:'.format(ii)) metric = self.worker(train, test) metrics = {k: metrics[k] + metric[k] for k in metrics} metrics = {k: metrics[k] / self.M for k in metrics} print('Average Result (M={}, K={}, N={}): {}'.format(\ self.M, self.K, self.N, metrics)) # 1. ItemCF實驗 M, N = 8, 10 for K in [5, 10, 20, 40, 80, 160]: cf_exp = Experiment(M, K, N, rt='ItemCF') cf_exp.run() # 2. ItemIUF實驗 M, N = 8, 10 K = 10 # 與書中保持一致 iuf_exp = Experiment(M, K, N, rt='ItemIUF') iuf_exp.run() # 3. ItemCF-Norm實驗 M, N = 8, 10 K = 10 # 與書中保持一致 norm_exp = Experiment(M, K, N, rt='ItemCF-Norm') norm_exp.run()