mr 程序自定義分組的實現

 

 

AreaPartitionerjava

package cn.itcast.hadoop.mr.areapartition;

import java.util.HashMap;
import org.apache.hadoop.mapreduce.Partitioner;

public class AreaPartitioner<KEY, VALUE> extends Partitioner<KEY, VALUE> {

	private static HashMap<String, Integer> areaMap = new HashMap<>();

	static {
		areaMap.put("135", 0);
		areaMap.put("136", 1);
		areaMap.put("137", 2);
		areaMap.put("138", 3);
		areaMap.put("139", 4);
	}

	@Override
	public int getPartition(KEY key, VALUE value, int numPartitions) {
		// 從 key 中拿到手機號,查詢手機歸屬字典,不一樣省份返回不一樣的組號

		int areaCoder = areaMap.get(key.toString().substring(0, 3)) == null ? 5
				: areaMap.get(key.toString().substring(0, 3));

		return areaCoder;
	}

}

FlowSumAreaapache

package cn.itcast.hadoop.mr.areapartition;

import java.io.IOException;
import org.apache.commons.lang.StringUtils;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import cn.itcast.hadoop.mr.flowsum.FlowBean;


/**
 * 對流量原始日誌進行流量統計,將不一樣省份的用戶統計結果輸出到不一樣文件
 * 須要自定義改造兩個機制:
 * 1.改造分區的邏輯,自定義一個parttioner
 * 2.自定義 reduce task 的併發任務數量
 * 
 * @author duanhaitao@itcast.cn
 *
 */
public class FlowSumArea {

	public static class FlowSumAreaMapper extends Mapper<LongWritable, Text, Text, FlowBean>{
		
		@Override
		protected void map(LongWritable key, Text value,Context context)
				throws IOException, InterruptedException {

			//拿一行數據
			String line = value.toString();
			//切分各個字段
			String[] fields = StringUtils.split(line, "\t");
			
			//拿到咱們須要的字段
			String phoneNB = fields[1];
			long u_flow = Long.parseLong(fields[7]);
			long d_flow = Long.parseLong(fields[8]);
			
			//封裝成 kv 並輸出
			context.write(new Text(phoneNB), new FlowBean(phoneNB,u_flow,d_flow));

		}
		
		
	}
	
	
	public static class FlowSumAreaReducer extends Reducer<Text, FlowBean, Text, FlowBean>{
		
		@Override
		protected void reduce(Text key, Iterable<FlowBean> values,Context context)
				throws IOException, InterruptedException {

			long up_flow_counter = 0;
			long d_flow_counter = 0;
			
			for(FlowBean bean: values){
				
				up_flow_counter += bean.getUp_flow();
				d_flow_counter += bean.getD_flow();
				
				
			}
			
			context.write(key, new FlowBean(key.toString(), up_flow_counter, d_flow_counter));
			
			
			
		}
		
	}
	
	public static void main(String[] args) throws Exception {
		
		Configuration conf = new Configuration();
		Job job = Job.getInstance(conf);
		
		job.setJarByClass(FlowSumArea.class);
		
		job.setMapperClass(FlowSumAreaMapper.class);
		job.setReducerClass(FlowSumAreaReducer.class);
		
		//設置咱們自定義的邏輯定義
		job.setPartitionerClass(AreaPartitioner.class);
		
		
		job.setOutputKeyClass(Text.class);
		job.setOutputValueClass(FlowBean.class);
		
		//設置reduce的任務併發數,應該跟分組的數量保持一致
		job.setNumReduceTasks(6);
		
		
		FileInputFormat.setInputPaths(job, new Path(args[0]));
		FileOutputFormat.setOutputPath(job, new Path(args[1]));
		
		
		System.exit(job.waitForCompletion(true)?0:1);
		
		
	}
	
	
}

打包 jar 包,上傳:併發

上面最後一個加個 2 。。app

 

 

 

reduce 併發數量若是 < 分組數,會報錯;可是改爲 1 不會報錯。。。ide

reduce 併發數量若是 < 分組數,多的分組沒有數據oop

map 不會涉及到業務邏輯,,若是有 10 個map ,每一個就處理 1/10 的數據,map 的併發量是能夠任意去設置的。3d

相關文章
相關標籤/搜索