第四課. qq 推薦 好友

1.導入數據,以  tab 爲分割

hadoop    hello
hdfs    world
tom    cat
cat    dog
hello    world
hello    hdfsjava

上傳文件node

2. 寫代碼apache

mapperwindows

package com.bjsxt.mr;

import java.io.IOException;
import java.io.InterruptedIOException;
import java.util.StringTokenizer;

import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;

public class Test2Mapper extends  Mapper<LongWritable, Text, Text, Text>{
	// 23  9:31 視頻
	
	protected  void  map(LongWritable key ,Text value,Context context) throws IOException, InterruptedException {
	  //	throws IOException , InterruptedIOException
		String line =value.toString();
		String[]  ss=line.split("\t");
		context.write(new Text(ss[0]), new Text(ss[1]));
		context.write(new Text(ss[1]), new Text(ss[0]));
	}
	
	

}

reduceapp

package com.bjsxt.mr;

import java.io.IOException;
import java.util.HashSet;
import java.util.Iterator;
import java.util.Set;



import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer;

public class Test2Reducer extends  Reducer<Text, Text, Text, Text>{
	
	
	protected  void  reduce(Text key ,Iterable<Text> i,Context context) throws IOException, InterruptedException{	
	
		Set<String> set=new HashSet<String>();
	   for(Text t: i){
		   set.add(t.toString());
	   }
	   if(set.size()>1){
		     for(Iterator j=set.iterator();j.hasNext();){
		    	 String name=(String) j.next();
		    	 for(Iterator k=set.iterator();k.hasNext();){
		    		 String other =(String) k.next();
		    		 if(!name.equals(other)){
		    			 context.write(new Text(name), new Text(other));
		    		 }
		    		 
		    	 }
		    	 
		     }
	   }
	
	}

}

joboop

package com.bjsxt.mr;


import java.io.IOException;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.mapreduce.Job;

import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.fs.Path;

import org.apache.hadoop.io.Text;

public class Test2JobRunForWin7 {
	
	public static void main(String[] args) {
           
		 
			Configuration  conf=new Configuration(); 
			//windows 本地測試 hadoop程序 
			conf.set("fs.default.name", "hdfs://node1:9000"); 
			conf.set("mapred.jar", "D:\\wc.jar");
			
			
			conf.set("mapred.job.tracker", "node1:9001");
			try {
				Job job = new Job(conf);
				job.setJobName("QqSendFriend");
				job.setJarByClass(Test2JobRunForWin7.class);
				
				job.setMapperClass(Test2Mapper.class);  
				job.setReducerClass(Test2Reducer.class);
				
				job.setMapOutputKeyClass(Text.class);   //設置參數類型
				job.setMapOutputValueClass(Text.class);
				
				
			  //	job.setNumReduceTasks(tasks);  // 設置  reduce任務個數的任務
				
			    FileInputFormat.addInputPath(job, new Path("/usr/input/qq/"));
			    FileOutputFormat.setOutputPath(job, new Path("/usr/output/qq1"));
			    System.exit(job.waitForCompletion(true) ? 0 : 1);
			   
			    System.out.println("job run end");
				
			} catch (Exception e) {
				e.printStackTrace();
			}
	
	
	
	}

}

 

運行結果:測試

相關文章
相關標籤/搜索