package org.slp; import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Mapper; import java.io.IOException; import java.util.StringTokenizer; /** * Created by sanglp on 2017/7/17. */ public class Test2Mapper extends Mapper<LongWritable ,Text,Text,Text> { @Override protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException { //super.map(key, value, context); String line = value.toString();//一行數據表明一組好友關係 String[] ss = line.split("\t"); context.write(new Text(ss[0]),new Text(ss[1]));//主從分紅兩行輸出 context.write(new Text(ss[1]),new Text(ss[0])); } }
package org.slp; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Reducer; import java.io.IOException; import java.util.HashSet; import java.util.Iterator; import java.util.Set; /** * Created by sanglp on 2017/7/17. */ public class Test2Reduce extends Reducer<Text,Text,Text,Text> { @Override protected void reduce(Text key, Iterable<Text> values, Context context) throws IOException, InterruptedException { //super.reduce(key, values, context); Set<String> set = new HashSet<String>(); for(Text t :values ){ set.add(t.toString()); } if (set.size()>1){ for(Iterator j = set.iterator();j.hasNext();){ String name = (String)j.next(); for(Iterator k = set.iterator();k.hasNext();){ String other = (String)k.next(); if(!name.equals(other)){ context.write(new Text(name),new Text(other)); } } } } } }
package org.slp; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.IntWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; import java.io.IOException; /** * Created by sanglp on 2017/7/17. */ public class JobRun2 { public static void main(String[] args){ Configuration conf = new Configuration(); conf.set("mapred.job.tracker","node1:9001"); conf.set("mapred.job.tracker","node1:9001"); conf.set("mapred.jar","C:\\Users\\sanglp\\qq.jar"); try { Job job = new Job(conf); job.setJobName("qq"); job.setJarByClass(JobRun2.class); job.setMapperClass(Test2Mapper.class); job.setReducerClass(Test2Reduce.class); job.setMapOutputKeyClass(Text.class); job.setMapOutputValueClass(Text.class); job.setNumReduceTasks(1);//設置reduce任務的個數 //mapreduce輸入數據所在目錄或文件 FileInputFormat.addInputPath(job,new Path("/usr/input/qq")); //mr執行以後的輸出數據的目錄 FileOutputFormat.setOutputPath(job,new Path("/usr/out/qq")); try { System.exit(job.waitForCompletion(true)?0:1); } catch (InterruptedException e) { e.printStackTrace(); } catch (ClassNotFoundException e) { e.printStackTrace(); } } catch (IOException e) { e.printStackTrace(); } } }
文件內容例如:java
小明 小李node
小花 小白apache