分區(partition),屬於Mapper階段的流程。前面提到,線程首先根據數據最終要傳的reducer把數據分紅相應的分區(partition)。java
默認狀況下,採起的分區類是HashPartitionergit
public class HashPartitioner<K, V> extends Partitioner<K, V> { /** Use {@link Object#hashCode()} to partition. */ public int getPartition(K key, V value, int numReduceTasks) { return (key.hashCode() & Integer.MAX_VALUE) % numReduceTasks; } }
顯然,默認分區是根據key的hashCode對reduceTasks個數取模獲得的。用戶無法控制哪一個key存儲到哪一個分區,所以咱們經常須要自定義分區類。github
編寫自定義分區並應用的流程大體分爲3步。apache
爲何須要設置reduce task的數量?app
reduceTask的數量 > getPartition的結果數
,則會多產生幾個空的輸出文件part-r-000xx;1 < reduceTask的數量 < getPartition的結果數
,則有一部分分區數據無處安放,會Exception;reduceTask的數量 = 1
,則無論mapTask端輸出多少個分區文件,最終結果都交給這一個reduceTask,最終也就只會產生一個結果文件 part-r-00000;例如:假設自定義分區數爲5,則ide
接下來咱們完善以前的電話號碼手機流量統計項目。oop
完善需求:將統計結果按照手機歸屬地不一樣省份輸出到不一樣文件。this
根據手機號的前三位能夠區分不一樣歸宿地的電話號碼。線程
一、數據準備:依然是以前的輸入文件phone_data.txt;code
二、分析
Mapreduce中會將map輸出的kv對,按照相同key分組,而後分發給不一樣的reducetask。默認的分發規則爲:根據key的hashcode%reducetask數來分發。
若是要按照咱們本身的需求進行分組,則須要改寫數據分發(分組)組件Partitioner 自定義一個CustomPartitioner繼承抽象類:Partitioner
最後再從job驅動中,設置自定義partitioner(以前咱們使用默認的): job.setPartitionerClass(CustomPartitioner.class)
三、編寫代碼
(1)、在以前的項目中添加分區類
package com.zhaoyi.phoneflow; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Partitioner; public class FlowPartition extends Partitioner<Text, FlowBean> { public static final String PARTITION_136 = "136"; public static final String PARTITION_137 = "137"; public static final String PARTITION_138 = "138"; public static final String PARTITION_139 = "139"; @Override public int getPartition(Text text, FlowBean flowBean, int i) { // default partition is 0. int partition = 0; // get the phone left 3 number. String phonePrefix = text.toString().substring(0,3); // get partition. if(PARTITION_136.equals(phonePrefix)){ partition = 1; }else if(PARTITION_137.equals(phonePrefix)){ partition = 2; }else if(PARTITION_138.equals(phonePrefix)) { partition = 3; }else if(PARTITION_139.equals(phonePrefix)) { partition = 4; } return partition; } }
(2)、在驅動類中指定自定義分區類
package com.zhaoyi.phoneflow; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; public class FlowDriver { public static void main(String[] args) throws Exception { if(args.length != 2){ System.out.println("Please enter the parameter: data input and output paths..."); System.exit(-1); } Job job = Job.getInstance(new Configuration()); job.setJarByClass(FlowDriver.class); job.setMapperClass(FlowMapper.class); job.setReducerClass(FlowReducer.class); job.setOutputKeyClass(Text.class); job.setOutputValueClass(FlowBean.class); // 設置分區類 job.setPartitionerClass(FlowPartition.class); // 咱們設置了5個分區,對應上。 job.setNumReduceTasks(5); FileInputFormat.setInputPaths(job, new Path(args[0])); FileOutputFormat.setOutputPath(job,new Path(args[1])); System.exit(job.waitForCompletion(true)? 1:0); } }
注意分區返回數字是從0開始計數。
以前咱們未指定分區類的時候,生成一個分區結果文件。這一次就會生成5個分區文件了,分別對應不一樣地區的電話號碼記錄。
本案例的具體代碼參見github項目phoneflow模塊。
既然改造了電話號碼案例,咱們也來改造一下wordcount案例,將單詞統計根據首字母大小寫不一樣輸出爲2個文件。即大寫字母開頭的輸出爲一個文件,小寫字母開頭的輸出爲1個文件。
一、分析
只需在原有項目的基礎上,添加分區類:獲取每一個單詞的首字母,直接使用JAVA Character類API判斷是否是小寫字母,若不是,則統一斷定爲大寫字母。
二、編寫代碼,添加分區類
package com.zhaoyi.wordcount; import org.apache.hadoop.io.IntWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Partitioner; public class WordCountPartition extends Partitioner<Text, IntWritable> { @Override public int getPartition(Text text, IntWritable intWritable, int i) { char firstLetter = text.toString().charAt(0); if(Character.isLowerCase(firstLetter)){ return 1; } return 0; } }
別忘了在驅動類中設置自定義分區類。
三、查看輸出結果
分區文件有2個,分別表明大小寫字母的統計結果
#### part-r-00000 Alice 2 So 1 `and 1 `without 1 #### part-r-00001 a 3 and 1 bank 1 ...... ...... the 3 this 1 ...... ...... was 3 what 1
注意省略號表明我刪除掉的一些記錄,只是爲了限制篇幅,不屬於文件內容。
本案例見github項目的word-count模塊。