hadoop1-構建電影推薦系統

問題導讀:
1. 推薦系統概述;
2. 推薦系統指標設計;
3. Hadoop並行算法;
4. 推薦系統架構;
5. MapReduce程序實現。
 
前言
Netflix電影推薦的百萬美金比賽,把「推薦」變成了時下最熱門的數據挖掘算法之一。也正是因爲Netflix的比賽,讓企業界和學科界有了更深層次的技術碰撞。引起了各類網站「推薦」熱,個性時代已經到來。

1、 推薦系統概述
電子商務網站是個性化推薦系統重要地應用的領域之一,亞馬遜就是個性化推薦系統的積極應用者和推廣者,亞馬遜的推薦系統深刻到網站的各種商品,爲亞馬遜帶來了至少30%的銷售額。
不光是電商類,推薦系統無處不在。QQ,人人網的好友推薦;新浪微博的你可能感受興趣的人;優酷,土豆的電影推薦;豆瓣的圖書推薦;大從點評的餐飲推薦;世紀佳緣的相親推薦;天際網的職業推薦等。

推薦算法分類:

按數據使用劃分:
協同過濾算法:UserCF, ItemCF, ModelCF
基於內容的推薦: 用戶內容屬性和物品內容屬性
社會化過濾:基於用戶的社會網絡關係

按模型劃分:
最近鄰模型:基於距離的協同過濾算法
Latent Factor Mode(SVD):基於矩陣分解的模型
Graph:圖模型,社會網絡圖模型

基於用戶的協同過濾算法UserCF
基於用戶的協同過濾,經過不一樣用戶對物品的評分來評測用戶之間的類似性,基於用戶之間的類似性作出推薦。簡單來說就是:給用戶推薦和他興趣類似的其餘用戶喜歡的物品。
用例說明:

算法實現及使用介紹,請參考文章:Mahout推薦算法API詳解

基於物品的協同過濾算法ItemCF
基於item的協同過濾,經過用戶對不一樣item的評分來評測item之間的類似性,基於item之間的類似性作出推薦。簡單來說就是:給用戶推薦和他以前喜歡的物品類似的物品。
用例說明:

算法實現及使用介紹,請參考文章:Mahout推薦算法API詳解
注:基於物品的協同過濾算法,是目前商用最普遍的推薦算法。

協同過濾算法實現,分爲2個步驟
  • 1. 計算物品之間的類似度
  • 2. 根據物品的類似度和用戶的歷史行爲給用戶生成推薦列表
有關協同過濾的另外一篇文章,請參考:RHadoop實踐系列之三 R實現MapReduce的協同過濾算法


2、 需求分析:推薦系統指標設計

下面咱們將從一個公司案例出發來全面的解釋,如何進行推薦系統指標設計。

案例介紹
Netflix電影推薦百萬獎金比賽,http://www.netflixprize.com/
Netflix官方網站:www.netflix.com

Netflix,2006年組織比賽是的時候,是一家以在線電影租賃爲生的公司。他們根據網友對電影的打分來判斷用戶有可能喜歡什麼電影,並結合會員看過的電影以及口味偏好設置作出判斷,混搭出各類電影風格的需求。
收集會員的一些信息,爲他們指定個性化的電影推薦後,有許多冷門電影居然進入了候租榜單。從公司的電影資源成本方面考量,熱門電影的成本通常較高,若是Netflix公司可以在電影租賃中增長冷門電影的比例,天然可以提高自身盈利能力。
Netflix 公司曾宣稱60%左右的會員根據推薦名單定製租賃順序,若是推薦系統不能準確地猜想會員喜歡的電影類型,容易形成屢次租借冷門電影而並不符合我的口味的會 員流失。爲了更高效地爲會員推薦電影,Netflix一直致力於不斷改進和完善個性化推薦服務,在2006年推出百萬美圓大獎,不管是誰能最好地優化 Netflix推薦算法就可獲獎勵100萬美圓。到2009年,獎金被一個7人開發小組奪得,Netflix隨後又當即推出第二個百萬美金懸賞。這充分說 明一套好的推薦算法系統是多麼重要,同時又是多麼困難。

上圖爲比賽的各支隊伍的排名!

補充說明:
1. Netflix的比賽是基於靜態數據的,就是給定「訓練級」,匹配「結果集」,「結果集」也是提早就作好的,因此這與咱們天天運營的系統,實際上是不同的。
2. Netflix用於比賽的數據集是小量的,整個全集才666MB,而實際的推薦系統都要基於大量歷史數據的,動不動就會上GB,TB等

因此,咱們在真實的環境中設計推薦的時候,要全面考量數據量,算法性能,結果準確度等的指標。

推薦算法選型:基於物品的協同過濾算法ItemCF,並行實現
數據量:基於Hadoop架構,支持GB,TB,PB級數據量
算法檢驗:能夠經過 準確率,召回率,覆蓋率,流行度 等指標評判。
結果解讀:經過ItemCF的定義,合理給出結果解釋

3、 算法模型:Hadoop並行算法
這裏我使用」Mahout In Action」書裏,第一章第六節介紹的分步式基於物品的協同過濾算法進行實現。Chapter 6: Distributing recommendation computations
測試數據集:small.csv

  1. 1,101,5.0
  2. 1,102,3.0
  3. 1,103,2.5
  4. 2,101,2.0
  5. 2,102,2.5
  6. 2,103,5.0
  7. 2,104,2.0
  8. 3,101,2.0
  9. 3,104,4.0
  10. 3,105,4.5
  11. 3,107,5.0
  12. 4,101,5.0
  13. 4,103,3.0
  14. 4,104,4.5
  15. 4,106,4.0
  16. 5,101,4.0
  17. 5,102,3.0
  18. 5,103,2.0
  19. 5,104,4.0
  20. 5,105,3.5
  21. 5,106,4.0
複製代碼


每行3個字段,依次是用戶ID,電影ID,用戶對電影的評分(0-5分,每0.5爲一個評分點!)
算法的思想:
1. 創建物品的同現矩陣
2. 創建用戶對物品的評分矩陣
3. 矩陣計算推薦結果

1). 創建物品的同現矩陣
按用戶分組,找到每一個用戶所選的物品,單獨出現計數及兩兩一組計數。

  1.         [101] [102] [103] [104] [105] [106] [107]
  2. [101]    5      3      4      4       2       2      1
  3. [102]    3      3      3      2       1       1      0
  4. [103]    4      3      4      3       1       2      0
  5. [104]    4      2      3      4       2       2      1
  6. [105]    2      1      1      2       2       1      1
  7. [106]    2      1      2      2       1       2      0
  8. [107]    1      0      0      1       1       0      1
複製代碼



2). 創建用戶對物品的評分矩陣
按用戶分組,找到每一個用戶所選的物品及評分
     
  1.         U3
  2. [101] 2.0
  3. [102] 0.0
  4. [103] 0.0
  5. [104] 4.0
  6. [105] 4.5
  7. [106] 0.0
  8. [107] 5.0
複製代碼



3). 矩陣計算推薦結果
同現矩陣*評分矩陣=推薦結果(在這裏由於須要排除掉用戶已看過的電影,能夠在MR中判斷用戶評分矩陣,將得分是0的過濾出來,不是0的不參與計算。

圖片摘自」Mahout In Action」

MapReduce任務設計

圖片摘自」Mahout In Action」

解讀MapRduce任務:
步驟1: 按用戶分組,計算全部物品出現的組合列表,獲得用戶對物品的評分矩陣
步驟2: 對物品組合列表進行計數,創建物品的同現矩陣
步驟3: 合併同現矩陣和評分矩陣
步驟4: 計算推薦結果列表

4、 架構設計:推薦系統架構







上圖中,左邊是Application業務系統,右邊是Hadoop的HDFS, MapReduce。

  • 業務系統記錄了用戶的行爲和對物品的打分
  • 設置系統定時器CRON,每xx小時,增量向HDFS導入數據(userid,itemid,value,time)。
  • 完成導入後,設置系統定時器,啓動MapReduce程序,運行推薦算法。
  • 完成計算後,設置系統定時器,從HDFS導出推薦結果數據到數據庫,方便之後的及時查詢。
5、 程序開發:MapReduce程序實現
  注意:原有參考是在一個job中完成全部步子,在這裏將其分解了,每一步都是一個完整的job,(擴展,可使用Mapreduce鏈實現)
win7的開發環境 和 Hadoop的運行環境 ,請參考文章:用Maven構建Hadoop項目
新建Java類:
Step1.java,按用戶分組,計算全部物品出現的組合列表,獲得用戶對物品的評分矩陣
package recommend;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.util.GenericOptionsParser;

public class Step1 {

	public static class Map extends Mapper<Object, Text, IntWritable, Text>{
		private static IntWritable k = new IntWritable();
		private static Text v = new Text();
		
		protected void map(Object key, Text value, Context context) 
				throws java.io.IOException ,InterruptedException {
			String[] splits = value.toString().split(",");
			if(splits.length != 3){
				return;
			}
			int userId = Integer.parseInt(splits[0]);
			String itemId = splits[1];
			String pref = splits[2];
			//解析出用戶ID和關聯的商品ID與打分。並輸出
			k.set(userId);
			v.set(itemId+":"+pref);
			context.write(k, v);
		};
	}
	public static class Reduce extends Reducer<IntWritable, Text, IntWritable, Text>{
		private static StringBuilder sub = new StringBuilder(256);
		private static Text v = new Text();
		
		protected void reduce(IntWritable key, Iterable<Text> values, Context context) 
				throws java.io.IOException ,InterruptedException {
			//合併用戶的關聯商品ID,做爲一個組
for (Text v : values) {
				sub.append(v.toString()+",");
			}
			v.set(sub.toString());
			context.write(key, v);
			sub.delete(0, sub.length());
		};
	}
	public static void main(String[] args) throws Exception {
		Configuration conf = new Configuration();
		String[] otherArgs = new GenericOptionsParser(conf,args).getRemainingArgs();
		if(otherArgs.length != 2){
			System.err.println("Usage:Step1");
			System.exit(2);
		}
		Job job = new Job(conf,"Step1");
		job.setJarByClass(Step1.class);
		
		job.setMapperClass(Map.class);
		job.setReducerClass(Reduce.class);
		
		job.setMapOutputKeyClass(IntWritable.class);
		job.setMapOutputValueClass(Text.class);
		job.setOutputKeyClass(IntWritable.class);
		job.setOutputValueClass(Text.class);
		
		FileInputFormat.addInputPath(job, new Path(args[0]));
		FileOutputFormat.setOutputPath(job, new Path(args[1]));
		
		System.exit(job.waitForCompletion(true) ? 0 : 1);
	}
}

 計算結果:html

[root@hadoop ~]# hadoop dfs -cat /step1/*java

1       102:3.0,103:2.5,101:5.0,git

2       101:2.0,102:2.5,103:5.0,104:2.0,github

3       107:5.0,101:2.0,104:4.0,105:4.5,算法

4       101:5.0,103:3.0,104:4.5,106:4.0,數據庫

5       101:4.0,102:3.0,103:2.0,104:4.0,105:3.5,106:4.0,apache

Step2.java,對物品組合列表進行計數,創建物品的同現矩陣

使用Step1的輸出結果做爲輸入的數據文件編程

程序代碼:
  
package recommend;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.util.GenericOptionsParser;

//使用Step1的輸出結果做爲輸入的數據文件
public class Step2 {

	public static class Map extends Mapper<Object, Text, Text, IntWritable>{
		private static Text k = new Text();
		private static IntWritable v = new IntWritable(1);
		
		protected void map(Object key, Text value, Context context) 
				throws java.io.IOException ,InterruptedException {
			String[] tokens = value.toString().split("\t")[1].split(",");
			
			//商品的相關是相互的,固然也包含本身。
			String item1Id = null;
			String item2Id = null;
			for (int i = 0; i < tokens.length; i++) {
				item1Id = tokens[i].split(":")[0];
				for (int j = 0; j < tokens.length; j++) {
					item2Id = tokens[j].split(":")[0];
					k.set(item1Id+":"+item2Id);
					context.write(k, v);
				}
			}
		};
	}
	public static class Reduce extends Reducer<Text, IntWritable, Text, IntWritable>{
		private static IntWritable v = new IntWritable();
		
		protected void reduce(Text key, Iterable<IntWritable> values, Context context) 
				throws java.io.IOException ,InterruptedException {
			int count = 0;
			//計算總的次數
			for (IntWritable temp : values) {
				count++;
			}
			v.set(count);
			context.write(key, v);
		};
	}
	public static void main(String[] args) throws Exception {
		Configuration conf = new Configuration();
		String[] otherArgs = new GenericOptionsParser(conf,args).getRemainingArgs();
		if(otherArgs.length != 2){
			System.err.println("Usage:Step2");
			System.exit(2);
		}
		Job job = new Job(conf,"Step2");
		job.setJarByClass(Step2.class);
		
		job.setMapperClass(Map.class);
		job.setReducerClass(Reduce.class);
		
		job.setMapOutputKeyClass(Text.class);
		job.setMapOutputValueClass(IntWritable.class);
		job.setOutputKeyClass(Text.class);
		job.setOutputValueClass(IntWritable.class);
		
		FileInputFormat.addInputPath(job, new Path(args[0]));
		FileOutputFormat.setOutputPath(job, new Path(args[1]));
		
		System.exit(job.waitForCompletion(true) ? 0 : 1);
	}
}

 計算結果:api

[root@hadoop ~]# hadoop dfs -cat /step2/*網絡

101:101 5

101:102 3

101:103 4

101:104 4

101:105 2

101:106 2

101:107 1

102:101 3

102:102 3

102:103 3

102:104 2

102:105 1

102:106 1

103:101 4

103:102 3

103:103 4

103:104 3

103:105 1

103:106 2

104:101 4

104:102 2

104:103 3

104:104 4

104:105 2

104:106 2

104:107 1

105:101 2

105:102 1

105:103 1

105:104 2

105:105 2

105:106 1

105:107 1

106:101 2

106:102 1

106:103 2

106:104 2

106:105 1

106:106 2

107:101 1

107:104 1

107:105 1

107:107 1

Step3.java,合併同現矩陣和評分矩陣

(忽略了原有參考的step3_2這一步,由於他的輸出是和step2的輸出是同樣的。)

package recommend;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.util.GenericOptionsParser;

public class Step3 {

	public static class Map extends Mapper<LongWritable, Text, IntWritable, Text>{
		private static IntWritable k = new IntWritable();
		private static Text v = new Text();
		
		protected void map(LongWritable key, Text value, Context context) 
				throws java.io.IOException ,InterruptedException {
			String[] tokens = value.toString().split("\t");
			String[] vector = tokens[1].split(",");
			for (String s : vector) {
				String[] t = s.split(":");
				//設置商品ID
				k.set(Integer.parseInt(t[0]));
				//設置用戶ID:評分
				v.set(tokens[0]+":"+t[1]);
				context.write(k, v);
			}
		};
	}
	public static void main(String[] args) throws Exception {
		Configuration conf = new Configuration();
		String[] otherArgs = new GenericOptionsParser(conf,args).getRemainingArgs();
		if(otherArgs.length != 2){
			System.err.println("Usage:Step3");
			System.exit(2);
		}
		Job job = new Job(conf,"Step3");
		job.setJarByClass(Step3.class);
		
		job.setMapperClass(Map.class);
		
		job.setMapOutputKeyClass(IntWritable.class);
		job.setMapOutputValueClass(Text.class);
		job.setOutputKeyClass(IntWritable.class);
		job.setOutputValueClass(Text.class);
		
		FileInputFormat.addInputPath(job, new Path(args[0]));
		FileOutputFormat.setOutputPath(job, new Path(args[1]));
		
		System.exit(job.waitForCompletion(true) ? 0 : 1);
	}
}

計算結果:

[root@hadoop ~]# hadoop dfs -cat /step3_1/*

101     5:4.0

101     1:5.0

101     2:2.0

101     3:2.0

101     4:5.0

102     1:3.0

102     5:3.0

102     2:2.5

103     2:5.0

103     5:2.0

103     1:2.5

103     4:3.0

104     2:2.0

104     5:4.0

104     3:4.0

104     4:4.5

105     3:4.5

105     5:3.5

106     5:4.0

106     4:4.0

107     3:5.0

Step4.java,計算推薦結果列表(將step2和step3_1的輸出做爲map的輸入文件)
程序代碼:
package recommend;

import java.util.ArrayList;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.util.GenericOptionsParser;

public class Step4 {

	public static class Map extends Mapper<Object, Text, IntWritable, Text>{
		private static IntWritable k = new IntWritable();
		private static Text v = new Text();
		
		private static java.util.Map<Integer, List<Coocurence>> matrix = new HashMap<Integer, List<Coocurence>>();
		
		protected void map(Object key, Text value, Context context) 
				throws java.io.IOException ,InterruptedException {
			//文件一格式、101		5:4.0   文件二格式、101:101		5
			String[] tokens = value.toString().split("\t");
			String[] v1 = tokens[0].split(":");
			String[] v2 = tokens[1].split(":");
			//文件二 101:101 	   5
			if(v1.length > 1){
				int itemId1 = Integer.parseInt(v1[0]);
				int itemId2 = Integer.parseInt(v1[1]);
				int num = Integer.parseInt(tokens[1]);
				
				List<Coocurence> list = null;
				if(matrix.containsKey(itemId1)){
					list = matrix.get(itemId1);
				}else{
					list = new ArrayList<Coocurence>();
				}
				list.add(new Coocurence(itemId1, itemId2, num));
				matrix.put(itemId1,list);
			}
			//文件一 101		5:4.0
			if(v2.length > 1){
				int itemId = Integer.parseInt(tokens[0]);
				int userId = Integer.parseInt(v2[0]);
				double pref = Double.parseDouble(v2[1]);
				
				k.set(userId);
				for (Coocurence c : matrix.get(itemId)) {
					v.set(c.getItemId2()+","+pref*c.getNum());
					context.write(k, v);
				}
			}
		};
	}
	public static class Reduce extends Reducer<IntWritable, Text, IntWritable, Text>{
		private static Text v = new Text();
		
		protected void reduce(IntWritable key, Iterable<Text> values, Context context) 
				throws java.io.IOException ,InterruptedException {
			java.util.Map<String, Double> result = new HashMap<String, Double>();
			for (Text t : values) {
				String[] str = t.toString().split(",");
				if(result.containsKey(str[0])){
					result.put(str[0], result.get(str[0])+Double.parseDouble(str[1]));
				}else {
					result.put(str[0], Double.parseDouble(str[1]));
				}
			}
			Iterator<String> iter = result.keySet().iterator();
			while (iter.hasNext()){
				String itemId = iter.next();
				double score = result.get(itemId);
				v.set(itemId+","+score);
				context.write(key, v);
			}
		};
	}
	public static void main(String[] args) throws Exception {
		Configuration conf = new Configuration();
		String[] otherArgs = new GenericOptionsParser(conf,args).getRemainingArgs();
		if(otherArgs.length != 3){
			System.err.println("Usage:Step4");
			System.exit(2);
		}
		Job job = new Job(conf,"Step4");
		job.setJarByClass(Step4.class);
		
		job.setMapperClass(Map.class);
		job.setReducerClass(Reduce.class);
		
		job.setMapOutputKeyClass(IntWritable.class);
		job.setMapOutputValueClass(Text.class);
		job.setOutputKeyClass(IntWritable.class);
		job.setOutputValueClass(Text.class);
		//設置step2和step3_1兩個輸入目錄做爲輸入,因此係統須要三個參數配置
		FileInputFormat.addInputPath(job,new Path(args[0]));
		FileInputFormat.addInputPath(job,new Path(args[1]));
		FileOutputFormat.setOutputPath(job, new Path(args[2]));
		
		System.exit(job.waitForCompletion(true) ? 0 : 1);
	}
}
class Coocurence{
	private int itemId1;
	private int itemId2;
	private int num;
	
	public Coocurence(int itemId1, int itemId2, int num) {
		super();
		this.itemId1 = itemId1;
		this.itemId2 = itemId2;
		this.num = num;
	}
	public int getItemId1() {
		return itemId1;
	}
	public void setItemId1(int itemId1) {
		this.itemId1 = itemId1;
	}
	public int getItemId2() {
		return itemId2;
	}
	public void setItemId2(int itemId2) {
		this.itemId2 = itemId2;
	}
	public int getNum() {
		return num;
	}
	public void setNum(int num) {
		this.num = num;
	}
}

 計算結果:

[root@hadoop ~]# hadoop dfs -cat /output/*

1       107,5.0

1       106,18.0

1       105,15.5

1       104,33.5

1       103,39.0

1       102,31.5

1       101,44.0

2       107,4.0

2       106,20.5

2       105,15.5

2       104,36.0

2       103,41.5

2       102,32.5

2       101,45.5

3       107,15.5

3       106,16.5

3       105,26.0

3       104,38.0

3       103,24.5

3       102,18.5

3       101,40.0

4       107,9.5

4       106,33.0

4       105,26.0

4       104,55.0

4       103,53.5

4       102,37.0

4       101,63.0

5       107,11.5

5       106,34.5

5       105,32.0

5       104,59.0

5       103,56.5

5       102,42.5

5       101,68.0

--------------------------------------------------------------------------------

參考:http://www.aboutyun.com/thread-8155-1-1.html

這樣咱們就本身編程實現了MapReduce化基於物品的協同過濾算法。
RHadoop的實現方案,請參考文章:RHadoop實踐系列之三 R實現MapReduce的協同過濾算法
Mahout的實現方案,請參考文章:Mahout分步式程序開發 基於物品的協同過濾ItemCF
我已經把整個MapReduce的實現都放到了github上面:
https://github.com/bsspirit/maven_hadoop_template/releases/tag/recommend
相關文章
相關標籤/搜索