MapReduce簡單需求分析-共同好友及查找互粉的狀況

MapReduce的設計,最重要的是要找準key,而後制定一系列的數據處理流程。MapReduce的Map中,會把key相同的分配到同一個reduce中,對於key的選擇,能夠找到某個相同的因素。如下面的幾個例子說明。java

查找共同好友

有一份多我的的好友名單,求哪兩我的之間有共同好友,共同好友是誰。測試數據以下:apache

A B,C,D,E,F,O
B A,C,E,K
C F,A,D,I
D A,E,F,L
E B,C,D,M,L
F A,B,C,D,E,O,M
G A,C,D,E,F
H A,C,D,E,O
I A,O
J B,O
K A,C,D
L D,E,F
M E,F,G
O A,H,I,H

問題分析問題要求解的是共同好友,如A有好友D,C也有好友D,那麼這裏的共同因素就是共同好友D,所以會想到把這個共同好友做爲一個key,而這個共同好友的全部owners做爲value,這樣在reduce中,循環遍歷values兩兩配對就能夠求解。app

查找互粉的狀況

問題分析如A的好友中有B,B的好友中有A,則這種狀況就是互粉。這種狀況下,不變的共同因素實際上是互相之間的關係:咱們能夠將A和B組成一對,看成一個key,如「A-B」,value則是此種關係對的數目。若是某個關係對的數目等於2,則代表A是B的好友,B也是A的好友。從而就是互粉的狀況。代碼以下。ide

package mutualfriend;

import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;

import java.io.IOException;

public class MutualFriendMapper extends Mapper<LongWritable, Text, Text, LongWritable> {
    Text out = new Text();
    LongWritable times = new LongWritable(1);

    @Override
    protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
        String line = value.toString();
        String[] tmp = line.split(" ");
        String owner = tmp[0];
        String[] friends = tmp[1].split(",");
        for (String friend : friends) {
            if (owner.compareTo(friend) <= 0) {
                out.set(owner + "-" + friend);
            } else {
                out.set(friend + "-" + owner);
            }
            context.write(out, times);
        }
    }
}

package mutualfriend;

import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer;

import java.io.IOException;

public class MutualFriendReducer extends Reducer<Text, LongWritable, Text, NullWritable> {
    @Override
    protected void reduce(Text key, Iterable<LongWritable> values, Context context) throws IOException, InterruptedException {
        long sum = 0;
        for (LongWritable value : values) {
            sum += value.get();
        }
        if (sum >= 2) {
            context.write(key, null);
        }
    }
}

package mutualfriend;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner;

public class MutualFriendDriver extends Configured implements Tool {
    @Override
    public int run(String[] strings) throws Exception {
        Configuration conf = new Configuration();
        conf.set("fs.defaultFS", "hdfs://master:9000");
        conf.set("mapreduce.framework.name", "local");

        Job job = Job.getInstance(conf, "MutualFriend");
        job.setJarByClass(getClass());
        job.setMapperClass(MutualFriendMapper.class);
        job.setReducerClass(MutualFriendReducer.class);

        job.setMapOutputKeyClass(Text.class);
        job.setMapOutputValueClass(LongWritable.class);
        job.setOutputKeyClass(Text.class);
        job.setOutputValueClass(NullWritable.class);

        FileInputFormat.addInputPath(job, new Path("mutualfriend/input"));
        FileOutputFormat.setOutputPath(job, new Path("mutualfriend/output"));

        return job.waitForCompletion(true) ? 0 : 1;
    }

    public static void main(String[] args) throws Exception {
        int exitCode = ToolRunner.run(new MutualFriendDriver(), args);
        System.out.println(exitCode);
    }
}

代碼運行結果以下:oop

A-B
A-C
A-D
A-F
A-O
B-E
C-F
D-E
D-F
D-L
E-L
E-M
F-M
H-O
I-O
相關文章
相關標籤/搜索