AreaPartitionerjava
package cn.itcast.hadoop.mr.areapartition; import java.util.HashMap; import org.apache.hadoop.mapreduce.Partitioner; public class AreaPartitioner<KEY, VALUE> extends Partitioner<KEY, VALUE> { private static HashMap<String, Integer> areaMap = new HashMap<>(); static { areaMap.put("135", 0); areaMap.put("136", 1); areaMap.put("137", 2); areaMap.put("138", 3); areaMap.put("139", 4); } @Override public int getPartition(KEY key, VALUE value, int numPartitions) { // 從 key 中拿到手機號,查詢手機歸屬字典,不一樣省份返回不一樣的組號 int areaCoder = areaMap.get(key.toString().substring(0, 3)) == null ? 5 : areaMap.get(key.toString().substring(0, 3)); return areaCoder; } }
FlowSumAreaapache
package cn.itcast.hadoop.mr.areapartition; import java.io.IOException; import org.apache.commons.lang.StringUtils; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.Mapper; import org.apache.hadoop.mapreduce.Reducer; import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; import cn.itcast.hadoop.mr.flowsum.FlowBean; /** * 對流量原始日誌進行流量統計,將不一樣省份的用戶統計結果輸出到不一樣文件 * 須要自定義改造兩個機制: * 1.改造分區的邏輯,自定義一個parttioner * 2.自定義 reduce task 的併發任務數量 * * @author duanhaitao@itcast.cn * */ public class FlowSumArea { public static class FlowSumAreaMapper extends Mapper<LongWritable, Text, Text, FlowBean>{ @Override protected void map(LongWritable key, Text value,Context context) throws IOException, InterruptedException { //拿一行數據 String line = value.toString(); //切分各個字段 String[] fields = StringUtils.split(line, "\t"); //拿到咱們須要的字段 String phoneNB = fields[1]; long u_flow = Long.parseLong(fields[7]); long d_flow = Long.parseLong(fields[8]); //封裝成 kv 並輸出 context.write(new Text(phoneNB), new FlowBean(phoneNB,u_flow,d_flow)); } } public static class FlowSumAreaReducer extends Reducer<Text, FlowBean, Text, FlowBean>{ @Override protected void reduce(Text key, Iterable<FlowBean> values,Context context) throws IOException, InterruptedException { long up_flow_counter = 0; long d_flow_counter = 0; for(FlowBean bean: values){ up_flow_counter += bean.getUp_flow(); d_flow_counter += bean.getD_flow(); } context.write(key, new FlowBean(key.toString(), up_flow_counter, d_flow_counter)); } } public static void main(String[] args) throws Exception { Configuration conf = new Configuration(); Job job = Job.getInstance(conf); job.setJarByClass(FlowSumArea.class); job.setMapperClass(FlowSumAreaMapper.class); job.setReducerClass(FlowSumAreaReducer.class); //設置咱們自定義的邏輯定義 job.setPartitionerClass(AreaPartitioner.class); job.setOutputKeyClass(Text.class); job.setOutputValueClass(FlowBean.class); //設置reduce的任務併發數,應該跟分組的數量保持一致 job.setNumReduceTasks(6); FileInputFormat.setInputPaths(job, new Path(args[0])); FileOutputFormat.setOutputPath(job, new Path(args[1])); System.exit(job.waitForCompletion(true)?0:1); } }
打包 jar 包,上傳:併發
上面最後一個加個 2 。。app
reduce 併發數量若是 < 分組數,會報錯;可是改爲 1 不會報錯。。。ide
reduce 併發數量若是 < 分組數,多的分組沒有數據oop
map 不會涉及到業務邏輯,,若是有 10 個map ,每一個就處理 1/10 的數據,map 的併發量是能夠任意去設置的。3d