hadoop hello
hdfs world
tom cat
cat dog
hello world
hello hdfsjava
上傳文件node
2. 寫代碼apache
mapperwindows
package com.bjsxt.mr; import java.io.IOException; import java.io.InterruptedIOException; import java.util.StringTokenizer; import org.apache.hadoop.io.IntWritable; import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Mapper; public class Test2Mapper extends Mapper<LongWritable, Text, Text, Text>{ // 23 9:31 視頻 protected void map(LongWritable key ,Text value,Context context) throws IOException, InterruptedException { // throws IOException , InterruptedIOException String line =value.toString(); String[] ss=line.split("\t"); context.write(new Text(ss[0]), new Text(ss[1])); context.write(new Text(ss[1]), new Text(ss[0])); } }
reduceapp
package com.bjsxt.mr; import java.io.IOException; import java.util.HashSet; import java.util.Iterator; import java.util.Set; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Reducer; public class Test2Reducer extends Reducer<Text, Text, Text, Text>{ protected void reduce(Text key ,Iterable<Text> i,Context context) throws IOException, InterruptedException{ Set<String> set=new HashSet<String>(); for(Text t: i){ set.add(t.toString()); } if(set.size()>1){ for(Iterator j=set.iterator();j.hasNext();){ String name=(String) j.next(); for(Iterator k=set.iterator();k.hasNext();){ String other =(String) k.next(); if(!name.equals(other)){ context.write(new Text(name), new Text(other)); } } } } } }
joboop
package com.bjsxt.mr; import java.io.IOException; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.io.IntWritable; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.Text; public class Test2JobRunForWin7 { public static void main(String[] args) { Configuration conf=new Configuration(); //windows 本地測試 hadoop程序 conf.set("fs.default.name", "hdfs://node1:9000"); conf.set("mapred.jar", "D:\\wc.jar"); conf.set("mapred.job.tracker", "node1:9001"); try { Job job = new Job(conf); job.setJobName("QqSendFriend"); job.setJarByClass(Test2JobRunForWin7.class); job.setMapperClass(Test2Mapper.class); job.setReducerClass(Test2Reducer.class); job.setMapOutputKeyClass(Text.class); //設置參數類型 job.setMapOutputValueClass(Text.class); // job.setNumReduceTasks(tasks); // 設置 reduce任務個數的任務 FileInputFormat.addInputPath(job, new Path("/usr/input/qq/")); FileOutputFormat.setOutputPath(job, new Path("/usr/output/qq1")); System.exit(job.waitForCompletion(true) ? 0 : 1); System.out.println("job run end"); } catch (Exception e) { e.printStackTrace(); } } }
運行結果:測試