有別於基於用戶的協同過濾和基於item的協同過濾,SlopeOne採用簡單的線性模型估計用戶對item的評分。以下圖,估計UserB對ItemJ的偏好算法
圖(1)express
在真實狀況下,該方法有以下幾個問題:jsp
1. 爲何要選擇UserA計算?ide
2. 對大量稀疏的狀況如何處理,而這種狀況是最爲廣泛的。oop
圖(2)大數據
Item1和item2的類似度:((5-3)+(3-4))/2=0.5ui
Item1和Item3的類似度:(5-2)/1=3this
Lucy對Item1的評估預估計爲:((2+0.5)*2+(3+5)*1)/(2+1)=4.333spa
Item3和Item1的類似度:(2-3)/1=-1debug
Item3和Item2的類似度:(5-2)/1=3
Make對item3的評分預估計爲:((4+3)*1+(3-1)*1)/(1+1)=4.5
經過以上例子能夠看出:須要計算item對之間的平均差異,以及item對之間的差異次數。
Mahout給出的訓練僞代碼:
for every item i for every other item j for every user u expressing preference for both i and j add the difference in u’s preference for i and j to an average
|
推薦僞代碼:
for every item i the user u expresses no preference for for every item j that user u expresses a preference for find the average preference difference between j and i add this diff to u’s preference value for j add this to a running average return the top items, ranked by these averages |
(一) 構建difference
1. 單機模型構建(MemoryDiffStorage)
private void buildAverageDiffs() throws TasteException { log.info("Building average diffs..."); try { buildAverageDiffsLock.writeLock().lock(); averageDiffs.clear(); long averageCount = 0L; LongPrimitiveIterator it = dataModel.getUserIDs(); while (it.hasNext()) { averageCount = processOneUser(averageCount, it.nextLong()); } pruneInconsequentialDiffs(); updateAllRecommendableItems(); } finally { buildAverageDiffsLock.writeLock().unlock(); } } private void pruneInconsequentialDiffs() { // Go back and prune inconsequential diffs. "Inconsequential" means, here, only represented by one // data point, so possibly unreliable Iterator<Map.Entry<Long,FastByIDMap<RunningAverage>>> it1 = averageDiffs.entrySet().iterator(); while (it1.hasNext()) { FastByIDMap<RunningAverage> map = it1.next().getValue(); Iterator<Map.Entry<Long,RunningAverage>> it2 = map.entrySet().iterator(); while (it2.hasNext()) { RunningAverage average = it2.next().getValue(); if (average.getCount() <= 1) { it2.remove(); } } if (map.isEmpty()) { it1.remove(); } else { map.rehash(); } } averageDiffs.rehash(); } private void updateAllRecommendableItems() throws TasteException { FastIDSet ids = new FastIDSet(dataModel.getNumItems()); for (Map.Entry<Long,FastByIDMap<RunningAverage>> entry : averageDiffs.entrySet()) { ids.add(entry.getKey()); LongPrimitiveIterator it = entry.getValue().keySetIterator(); while (it.hasNext()) { ids.add(it.next()); } } allRecommendableItemIDs.clear(); allRecommendableItemIDs.addAll(ids); allRecommendableItemIDs.rehash(); } private long processOneUser(long averageCount, long userID) throws TasteException { log.debug("Processing prefs for user {}", userID); // Save off prefs for the life of this loop iteration PreferenceArray userPreferences = dataModel.getPreferencesFromUser(userID); int length = userPreferences.length(); for (int i = 0; i < length; i++) { // Loop to length-1, not length-2, not for diffs but average item pref float prefAValue = userPreferences.getValue(i); long itemIDA = userPreferences.getItemID(i); FastByIDMap<RunningAverage> aMap = averageDiffs.get(itemIDA); if (aMap == null) { aMap = new FastByIDMap<RunningAverage>(); averageDiffs.put(itemIDA, aMap); } for (int j = i + 1; j < length; j++) { // This is a performance-critical block long itemIDB = userPreferences.getItemID(j); RunningAverage average = aMap.get(itemIDB); if (average == null && averageCount < maxEntries) { average = buildRunningAverage(); aMap.put(itemIDB, average); averageCount++; } if (average != null) { average.addDatum(userPreferences.getValue(j) - prefAValue); } } RunningAverage itemAverage = averageItemPref.get(itemIDA); if (itemAverage == null) { itemAverage = buildRunningAverage(); averageItemPref.put(itemIDA, itemAverage); } itemAverage.addDatum(prefAValue); } return averageCount; } private RunningAverage buildRunningAverage() { return stdDevWeighted ? new FullRunningAverageAndStdDev() : new FullRunningAverage(); } |
2. MapReduce模式構建(FileDiffStorage)
用MapReduce模式計算difference的部分參看下文。該方式是離線計算模式,不能實施更新,適合大數據量。因爲mapreduce模式計算了全部item之間的所有值,故比單機模式更準確。構建好以後拷貝到本地,使用用FileDiffStorage(newFile("diff"), 500) 便可。FileDiffStorage不支持添加和刪除pereference(實際上也是不能這麼作的);
private void buildDiffs() { if (buildAverageDiffsLock.writeLock().tryLock()) { try {
averageDiffs.clear(); allRecommendableItemIDs.clear();
FileLineIterator iterator = new FileLineIterator(dataFile, false); String firstLine = iterator.peek(); while (firstLine.isEmpty() || firstLine.charAt(0) == COMMENT_CHAR) { iterator.next(); firstLine = iterator.peek(); } long averageCount = 0L; while (iterator.hasNext()) { averageCount = processLine(iterator.next(), averageCount); }
pruneInconsequentialDiffs(); updateAllRecommendableItems();
} catch (IOException ioe) { log.warn("Exception while reloading", ioe); } finally { buildAverageDiffsLock.writeLock().unlock(); } } }
private long processLine(String line, long averageCount) {
if (line.isEmpty() || line.charAt(0) == COMMENT_CHAR) { return averageCount; }
String[] tokens = SEPARATOR.split(line); Preconditions.checkArgument(tokens.length >= 3 && tokens.length != 5, "Bad line: %s", line);
long itemID1 = Long.parseLong(tokens[0]); long itemID2 = Long.parseLong(tokens[1]); double diff = Double.parseDouble(tokens[2]); int count = tokens.length >= 4 ? Integer.parseInt(tokens[3]) : 1; boolean hasMkSk = tokens.length >= 5;
if (itemID1 > itemID2) { long temp = itemID1; itemID1 = itemID2; itemID2 = temp; }
FastByIDMap<RunningAverage> level1Map = averageDiffs.get(itemID1); if (level1Map == null) { level1Map = new FastByIDMap<RunningAverage>(); averageDiffs.put(itemID1, level1Map); } RunningAverage average = level1Map.get(itemID2); if (average != null) { throw new IllegalArgumentException("Duplicated line for item-item pair " + itemID1 + " / " + itemID2); } if (averageCount < maxEntries) { if (hasMkSk) { double mk = Double.parseDouble(tokens[4]); double sk = Double.parseDouble(tokens[5]); average = new FullRunningAverageAndStdDev(count, diff, mk, sk); } else { average = new FullRunningAverage(count, diff); } level1Map.put(itemID2, average); averageCount++; }
allRecommendableItemIDs.add(itemID1); allRecommendableItemIDs.add(itemID2);
return averageCount; }
private void pruneInconsequentialDiffs() { // Go back and prune inconsequential diffs. "Inconsequential" means, here, only represented by one // data point, so possibly unreliable Iterator<Map.Entry<Long,FastByIDMap<RunningAverage>>> it1 = averageDiffs.entrySet().iterator(); while (it1.hasNext()) { FastByIDMap<RunningAverage> map = it1.next().getValue(); Iterator<Map.Entry<Long,RunningAverage>> it2 = map.entrySet().iterator(); while (it2.hasNext()) { RunningAverage average = it2.next().getValue(); if (average.getCount() <= 1) { it2.remove(); } } if (map.isEmpty()) { it1.remove(); } else { map.rehash(); } } averageDiffs.rehash(); }
private void updateAllRecommendableItems() { for (Map.Entry<Long,FastByIDMap<RunningAverage>> entry : averageDiffs.entrySet()) { allRecommendableItemIDs.add(entry.getKey()); LongPrimitiveIterator it = entry.getValue().keySetIterator(); while (it.hasNext()) { allRecommendableItemIDs.add(it.next()); } } allRecommendableItemIDs.rehash(); } |
(二) 估值
private float doEstimatePreference(long userID, long itemID) throws TasteException { double count = 0.0; double totalPreference = 0.0; PreferenceArray prefs = getDataModel().getPreferencesFromUser(userID); RunningAverage[] averages = diffStorage.getDiffs(userID, itemID, prefs); int size = prefs.length(); for (int i = 0; i < size; i++) { RunningAverage averageDiff = averages[i]; if (averageDiff != null) { double averageDiffValue = averageDiff.getAverage(); if (weighted) { double weight = averageDiff.getCount(); if (stdDevWeighted) { double stdev = ((RunningAverageAndStdDev) averageDiff).getStandardDeviation(); if (!Double.isNaN(stdev)) { weight /= 1.0 + stdev; } // If stdev is NaN, then it is because count is 1. Because we're weighting by count, // the weight is already relatively low. We effectively assume stdev is 0.0 here and // that is reasonable enough. Otherwise, dividing by NaN would yield a weight of NaN // and disqualify this pref entirely } totalPreference += weight * (prefs.getValue(i) + averageDiffValue); count += weight; } else { totalPreference += prefs.getValue(i) + averageDiffValue; count += 1.0; } } } if (count <= 0.0) { RunningAverage itemAverage = diffStorage.getAverageItemPref(itemID); return itemAverage == null ? Float.NaN : (float) itemAverage.getAverage(); } else { return (float) (totalPreference / count); } } |
(三) 推薦
對於在線推薦系統,容許只有一個SlopeOneRecommender實例。
方法簽名 |
說明 |
備註 |
public void setPreference(long userID, long itemID, float value) |
添加偏好,線上系統常常須要。 |
動態添加偏好,添加以後會更新ItemID的和其餘Item之間的類似度 |
public void removePreference(long userID, long itemID) |
刪除偏好,不多用。 |
刪除偏好後,會更新itemId和其餘Item之間的類似度 |
public List<RecommendedItem> recommend(long userID, int howMany, IDRescorer rescorer) |
提供推薦。IDRescorer用於商業規則,調整item的得分 |
1.獲取userId還未評分的item做爲候選。2.估計每一個Item的得分,選取topk 返回。 |
public float estimatePreference(long userID,long itemID) |
估計userId對ItemId的評分 |
如userId對itemId有真實的值,則返回,不然估計。 |
1. 推薦接口
public List<RecommendedItem> recommend(long userID, int howMany, IDRescorer rescorer) throws TasteException { Preconditions.checkArgument(howMany >= 1, "howMany must be at least 1"); log.debug("Recommending items for user ID '{}'", userID); FastIDSet possibleItemIDs = diffStorage.getRecommendableItemIDs(userID); TopItems.Estimator<Long> estimator = new Estimator(userID); List<RecommendedItem> topItems = TopItems.getTopItems(howMany, possibleItemIDs.iterator(), rescorer, estimator); log.debug("Recommendations are: {}", topItems); return topItems; } |
2. 獲取推薦候選項
public FastIDSet getRecommendableItemIDs(long userID) throws TasteException { FastIDSet result; try { buildAverageDiffsLock.readLock().lock(); result = allRecommendableItemIDs.clone(); } finally { buildAverageDiffsLock.readLock().unlock(); } Iterator<Long> it = result.iterator(); while (it.hasNext()) { if (dataModel.getPreferenceValue(userID, it.next()) != null) { it.remove(); } } return result; } |
3. 估計候選項的得分,返回topK個推薦列表
public static List<RecommendedItem> getTopItems(int howMany, LongPrimitiveIterator possibleItemIDs, IDRescorer rescorer, Estimator<Long> estimator) throws TasteException { Preconditions.checkArgument(possibleItemIDs != null, "argument is null"); Preconditions.checkArgument(estimator != null, "argument is null"); Queue<RecommendedItem> topItems = new PriorityQueue<RecommendedItem>(howMany + 1, Collections.reverseOrder(ByValueRecommendedItemComparator.getInstance())); boolean full = false; double lowestTopValue = Double.NEGATIVE_INFINITY; while (possibleItemIDs.hasNext()) { long itemID = possibleItemIDs.next(); if (rescorer == null || !rescorer.isFiltered(itemID)) { double preference; try { preference = estimator.estimate(itemID); } catch (NoSuchItemException nsie) { continue; } double rescoredPref = rescorer == null ? preference : rescorer.rescore(itemID, preference); if (!Double.isNaN(rescoredPref) && (!full || rescoredPref > lowestTopValue)) { topItems.add(new GenericRecommendedItem(itemID, (float) rescoredPref)); if (full) { topItems.poll(); } else if (topItems.size() > howMany) { full = true; topItems.poll(); } lowestTopValue = topItems.peek().getValue(); } } } int size = topItems.size(); if (size == 0) { return Collections.emptyList(); } List<RecommendedItem> result = Lists.newArrayListWithCapacity(size); result.addAll(topItems); Collections.sort(result, ByValueRecommendedItemComparator.getInstance()); return result; } |
1. 計算每一個user的item之間的差值
Map: 輸入,文本文件,格式爲:userId\t itemId\t val 輸出:key userId,value itemId\t val |
Reduce: for(user u :users){ items of u for(int I =0 ;i<items.size;i++){ itema =items[i]; for(int j =i+1;j<items.size;j++){ itemb= items[j]; itemABdiff =itemb-itema; out.write(itemA\t itemb, itemABdiff); } } } |
2. 計算itemPair的全局平均
Map:輸出數據不作處理,將item相同的數據傳遞到同一個reduce中。 |
Reduce: 輸入 key itemA\t itemb ,val itemABdiff 計算改組數據的平均值(FullRunningAverageAndStdDev) 輸出: key EntityEntityWritable ,valueFullRunningAverageAndStdDevWritable |
(一) 單機模式
MemoryDiffStorage mds =new MemoryDiffStorage(new FileDataModel(new File("pereference")), Weighting.WEIGHTED, 1000); SlopeOneRecommender sr =new SlopeOneRecommender(new FileDataModel(new File("pereference")),Weighting.WEIGHTED,Weighting.WEIGHTED,mds); System.out.println(sr.recommend(1, 10,new IDRescorer() { @Override public double rescore(long id, double originalScore) { int clickCount =10;//id的點擊量 return originalScore*clickCount; } @Override public boolean isFiltered(long id) { //若是id和要推薦的item的id屬於同一個類型,return false ,不然return true ; return false; } })); |
(二) MapReduce模式
String [] arg ={"-i","p","-o","diff"}; SlopeOneAverageDiffsJob.main(arg); DiffStorage ds =new FileDiffStorage(new File("diff"), 1000); SlopeOneRecommender sr =new SlopeOneRecommender(new FileDataModel(new File("pereference")),Weighting.WEIGHTED,Weighting.WEIGHTED,mds); System.out.println(sr.recommend(1, 10,new IDRescorer() { @Override public double rescore(long id, double originalScore) { int clickCount =10;//id的點擊量 return originalScore*clickCount; } @Override public boolean isFiltered(long id) { //若是id和要推薦的item的id屬於同一個類型,return false ,不然return true ; return false; } })); |
1. http://en.wikipedia.org/wiki/Slope_One
2. DanielLemire, Anna Maclachlan, SlopeOne Predictors for Online Rating-Based Collaborative Filtering
3. PuWang, HongWu Ye, A Personalized Recommendation Algorithm Combining Slope OneScheme and User Based Collaborative Filtering
4. DeJiaZhang, An Item-based Collaborative Filtering Recommendation AlgorithmUsing Slope One Scheme Smoothing
5. Mi,Zhenzhen and Xu, Congfu, A Recommendation Algorithm Combining Clustering Methodand Slope One Scheme
1. BadrulM. Sarwar, George Karypis, Joseph A. Konstan, John Riedl: Item-basedcollaborative filtering recommendation algorithms
2. GregLinden, Brent Smith, Jeremy York, "Amazon.com Recommendations:Item-to-Item Collaborative Filterin