MapReduce的設計,最重要的是要找準key,而後制定一系列的數據處理流程。MapReduce的Map中,會把key相同的分配到同一個reduce中,對於key的選擇,能夠找到某個相同的因素。如下面的幾個例子說明。java
有一份多我的的好友名單,求哪兩我的之間有共同好友,共同好友是誰。測試數據以下:apache
A B,C,D,E,F,O B A,C,E,K C F,A,D,I D A,E,F,L E B,C,D,M,L F A,B,C,D,E,O,M G A,C,D,E,F H A,C,D,E,O I A,O J B,O K A,C,D L D,E,F M E,F,G O A,H,I,H
問題分析問題要求解的是共同好友,如A有好友D,C也有好友D,那麼這裏的共同因素就是共同好友D,所以會想到把這個共同好友做爲一個key,而這個共同好友的全部owners做爲value,這樣在reduce中,循環遍歷values兩兩配對就能夠求解。app
問題分析如A的好友中有B,B的好友中有A,則這種狀況就是互粉。這種狀況下,不變的共同因素實際上是互相之間的關係:咱們能夠將A和B組成一對,看成一個key,如「A-B」,value則是此種關係對的數目。若是某個關係對的數目等於2,則代表A是B的好友,B也是A的好友。從而就是互粉的狀況。代碼以下。ide
package mutualfriend; import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Mapper; import java.io.IOException; public class MutualFriendMapper extends Mapper<LongWritable, Text, Text, LongWritable> { Text out = new Text(); LongWritable times = new LongWritable(1); @Override protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException { String line = value.toString(); String[] tmp = line.split(" "); String owner = tmp[0]; String[] friends = tmp[1].split(","); for (String friend : friends) { if (owner.compareTo(friend) <= 0) { out.set(owner + "-" + friend); } else { out.set(friend + "-" + owner); } context.write(out, times); } } } package mutualfriend; import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.io.NullWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Reducer; import java.io.IOException; public class MutualFriendReducer extends Reducer<Text, LongWritable, Text, NullWritable> { @Override protected void reduce(Text key, Iterable<LongWritable> values, Context context) throws IOException, InterruptedException { long sum = 0; for (LongWritable value : values) { sum += value.get(); } if (sum >= 2) { context.write(key, null); } } } package mutualfriend; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.conf.Configured; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.io.NullWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; import org.apache.hadoop.util.Tool; import org.apache.hadoop.util.ToolRunner; public class MutualFriendDriver extends Configured implements Tool { @Override public int run(String[] strings) throws Exception { Configuration conf = new Configuration(); conf.set("fs.defaultFS", "hdfs://master:9000"); conf.set("mapreduce.framework.name", "local"); Job job = Job.getInstance(conf, "MutualFriend"); job.setJarByClass(getClass()); job.setMapperClass(MutualFriendMapper.class); job.setReducerClass(MutualFriendReducer.class); job.setMapOutputKeyClass(Text.class); job.setMapOutputValueClass(LongWritable.class); job.setOutputKeyClass(Text.class); job.setOutputValueClass(NullWritable.class); FileInputFormat.addInputPath(job, new Path("mutualfriend/input")); FileOutputFormat.setOutputPath(job, new Path("mutualfriend/output")); return job.waitForCompletion(true) ? 0 : 1; } public static void main(String[] args) throws Exception { int exitCode = ToolRunner.run(new MutualFriendDriver(), args); System.out.println(exitCode); } }
代碼運行結果以下:oop
A-B A-C A-D A-F A-O B-E C-F D-E D-F D-L E-L E-M F-M H-O I-O