MapReduce自帶的分區器是HashPartitioner
原理:先對map輸出的key求hash值,再模上reduce task個數,根據結果,決定此輸出kv對,被匹配的reduce任務取走。
自定義分分區須要繼承Partitioner
,複寫getpariton()
方法
自定義分區類:
注意:map的輸出是<K,V>鍵值對
其中int partitionIndex = dict.get(text.toString())
,partitionIndex
是獲取K的值java
附:被計算的的文本apache
Dear Dear Bear Bear River Car Dear Dear Bear Rive Dear Dear Bear Bear River Car Dear Dear Bear Rive
須要在main函數中設置,指定自定義分區類
自定義分區類:app
import org.apache.hadoop.io.IntWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Partitioner; import java.util.HashMap; public class CustomPartitioner extends Partitioner<Text, IntWritable> { public static HashMap<String, Integer> dict = new HashMap<String, Integer>(); //Text表明着map階段輸出的key,IntWritable表明着輸出的值 static{ dict.put("Dear", 0); dict.put("Bear", 1); dict.put("River", 2); dict.put("Car", 3); } public int getPartition(Text text, IntWritable intWritable, int i) { // int partitionIndex = dict.get(text.toString()); return partitionIndex; } }
注意:map的輸出結果是鍵值對<K,V>,int partitionIndex = dict.get(text.toString());
中的partitionIndex
是map輸出鍵值對中的鍵的值,也就是K的值。
Maper類:ide
import org.apache.hadoop.io.IntWritable; import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Mapper; import java.io.IOException; public class WordCountMap extends Mapper<LongWritable, Text, Text, IntWritable> { public void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException { String[] words = value.toString().split("\t"); for (String word : words) { // 每一個單詞出現1次,做爲中間結果輸出 context.write(new Text(word), new IntWritable(1)); } } }
Reducer類:函數
import org.apache.hadoop.io.IntWritable; import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Mapper; import java.io.IOException; public class WordCountMap extends Mapper<LongWritable, Text, Text, IntWritable> { public void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException { String[] words = value.toString().split("\t"); for (String word : words) { // 每一個單詞出現1次,做爲中間結果輸出 context.write(new Text(word), new IntWritable(1)); } } }
main函數:oop
import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.IntWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; import java.io.IOException; public class WordCountMain { public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException { if (args.length != 2 || args == null) { System.out.println("please input Path!"); System.exit(0); } Configuration configuration = new Configuration(); configuration.set("mapreduce.job.jar","/home/bruce/project/kkbhdp01/target/com.kaikeba.hadoop-1.0-SNAPSHOT.jar"); Job job = Job.getInstance(configuration, WordCountMain.class.getSimpleName()); // 打jar包 job.setJarByClass(WordCountMain.class); // 經過job設置輸入/輸出格式 //job.setInputFormatClass(TextInputFormat.class); //job.setOutputFormatClass(TextOutputFormat.class); // 設置輸入/輸出路徑 FileInputFormat.setInputPaths(job, new Path(args[0])); FileOutputFormat.setOutputPath(job, new Path(args[1])); // 設置處理Map/Reduce階段的類 job.setMapperClass(WordCountMap.class); //map combine //job.setCombinerClass(WordCountReduce.class); job.setReducerClass(WordCountReduce.class); //若是map、reduce的輸出的kv對類型一致,直接設置reduce的輸出的kv對就行;若是不同,須要分別設置map, reduce的輸出的kv類型 //job.setMapOutputKeyClass(.class) // 設置最終輸出key/value的類型m job.setOutputKeyClass(Text.class); job.setOutputValueClass(IntWritable.class); job.setPartitionerClass(CustomPartitioner.class); job.setNumReduceTasks(4); // 提交做業 job.waitForCompletion(true); } }
main函數參數設置:code