MapReduce--——求兩兩共同好友

A:B,C,D,F,E,O
B:A,C,E,K
C:F,A,D,I
D:A,E,F,L
E:B,C,D,M,L
F:A,B,C,D,E,O,M
G:A,C,D,E,F
H:A,C,D,E,O
I:A,O
J:B,O
K:A,C,D
L:D,E,F
M:E,F,G
O:A,H,I,J,K
 
以上是數據:
A:B,C,D,F,E,O
表示:B,C,D,E,F,O是A用戶的好友。
 
一、求全部兩兩用戶之間的共同好友java

/**
 * @author: lpj   
 * @date: 2018年3月16日 下午7:16:47
 * @Description:
 */
package lpj.reduceWork;
 
import java.io.IOException;
import java.util.Arrays;
 
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.jobcontrol.ControlledJob;
import org.apache.hadoop.mapreduce.lib.jobcontrol.JobControl;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
/**
 *
 */
public class SameFriendsMR {
    
    public static void main(String[] args) throws Exception {
        Configuration conf = new Configuration();
//        conf.addResource("hdfs-site.xml");//使用配置文件
//        System.setProperty("HADOOP_USER_NAME", "hadoop");//使用集羣
        ///////////////////第一步////////////////////////
        FileSystem fs = FileSystem.get(conf);//默認使用本地
        
        Job job = Job.getInstance(conf);
        job.setJarByClass(SameFriendsMR.class);
        job.setMapperClass(SameFriendsMR_Mapper.class);
        job.setReducerClass(SameFriendsMR_Reducer.class);
        
        job.setMapOutputKeyClass(Text.class);
        job.setMapOutputValueClass(Text.class);
        job.setOutputKeyClass(Text.class);
        job.setOutputValueClass(Text.class);
        
        Path inputPath = new Path("d:/a/homework4.txt");
        Path outputPath = new Path("d:/a/homework4");
        if (fs.exists(inputPath)) {
            fs.delete(outputPath, true);
        }
        
        FileInputFormat.setInputPaths(job, inputPath);
        FileOutputFormat.setOutputPath(job, outputPath);
        ////////////////////第二步//////////////////////////////////
        FileSystem fs2 = FileSystem.get(conf);//默認使用本地
        
        Job job2 = Job.getInstance(conf);
        job2.setJarByClass(SameFriendsMR.class);
        job2.setMapperClass(SameFriends2MR_Mapper.class);
        job2.setReducerClass(SameFriends2MR_Reducer.class);
        
        job2.setMapOutputKeyClass(Text.class);
        job2.setMapOutputValueClass(Text.class);
        job2.setOutputKeyClass(Text.class);
        job2.setOutputValueClass(Text.class);
        
        Path inputPath2 = new Path("d:/a/homework4");
        Path outputPath2 = new Path("d:/a/homework4_1");
        if (fs2.exists(inputPath2)) {
            fs2.delete(outputPath2, true);
        }
        
        FileInputFormat.setInputPaths(job2, inputPath2);
        FileOutputFormat.setOutputPath(job2, outputPath2);
        
        //採用jobcontrol進行2個MapReduce串行
        ControlledJob aJob = new ControlledJob(job.getConfiguration());
        ControlledJob bJob = new ControlledJob(job2.getConfiguration());
        
        aJob.setJob(job);
        bJob.setJob(job2);
        bJob.addDependingJob(aJob);//指定依賴關係
        
        JobControl jc = new JobControl("jcF");
        jc.addJob(aJob);
        jc.addJob(bJob);
        
        Thread thread = new Thread(jc);
        thread.start();
        while(!jc.allFinished()){
            thread.sleep(1000);
        }
        jc.stop();
    }
    ///////////////////////////////第一步/////////////////////////////////////////
    public static class SameFriendsMR_Mapper extends Mapper<LongWritable, Text, Text, Text>{
        Text kout = new Text();
        Text valueout = new Text();
        @Override
        protected void map(LongWritable key, Text value,Context context)throws IOException, InterruptedException {
            //A:B,C,D,F,E,O
            String [] reads = value.toString().trim().split(":");
            //以用戶爲value值,好友爲key值,求取以某用戶爲共同好友的人
            String vv = reads[0];
            valueout.set(vv);
            String [] friends = reads[1].split(",");
            for(int i = 0; i < friends.length; i++){
                String kk = friends[i];
                kout.set(kk);
                context.write(kout, valueout);
            }
        }
    }
    public static class SameFriendsMR_Reducer extends Reducer<Text, Text, Text, Text>{
        Text kout = new Text();
        Text valueout = new Text();
        @Override
        protected void reduce(Text key, Iterable<Text> values, Context context)throws IOException, InterruptedException {
            //將用戶共同好友的人組合起來
            StringBuilder sb = new StringBuilder();
            for(Text text : values){
                sb.append(text.toString()).append(",");
            }
            String vv = sb.substring(0, sb.length() - 1);
            valueout.set(vv);
            context.write(key, valueout);
        }
        
    }
    //////////////////////////////////////第二步///////////////////////////////////////////////////////////
    public static class SameFriends2MR_Mapper extends Mapper<LongWritable, Text, Text, Text>{
        Text kout = new Text();
        Text valueout = new Text();
        @Override
        protected void map(LongWritable key, Text value,Context context)throws IOException, InterruptedException {
            //A    F,I,O,K,G,D,C,H,B :意思是F,I,O,K,G,D,C,H,B的共同好友爲A,第二步執行求取兩兩好友,A爲value,兩兩對爲key
            String [] reads = value.toString().trim().split("\t");
            //A爲value
            String vv = reads[0];
            valueout.set(vv);
            //求兩兩好友對,爲了防止重複,須要對好友進行排序
            String [] friends = reads[1].split(",");
            Arrays.sort(friends);
            //使用兩重循環,尋找A-B等好友對組合
            for(int i = 0; i < friends.length - 1; i++){
                for(int j = i + 1; j < friends.length; j++ ){
                    String kk = friends[i] + "-" + friends[j];
                    kout.set(kk);
                    context.write(kout, valueout);
                }
            }
        }
    }
    public static class SameFriends2MR_Reducer extends Reducer<Text, Text, Text, Text>{
        Text kout = new Text();
        Text valueout = new Text();
        @Override
        protected void reduce(Text key, Iterable<Text> values, Context context)throws IOException, InterruptedException {
            //好友對組合起來
            StringBuilder sb = new StringBuilder();
            for(Text text : values){
                sb.append(text.toString()).append(",");
            }
            String vv = sb.substring(0, sb.length() - 1);
            valueout.set(vv);
            context.write(key, valueout);
        }
        
    }
}
相關文章
相關標籤/搜索