package cn.itcastcat.bigdata.secondarysort; import org.apache.hadoop.io.WritableComparable; import org.apache.hadoop.io.WritableComparator; /** * 利用reduce端的GroupingComparator來實現將一組bean當作相同的key * * */ public class ItemidGroupingComparator extends WritableComparator { //傳入做爲key的bean的class類型,以及制定須要讓框架作反射獲取實例對象 protected ItemidGroupingComparator() { super(OrderBean.class, true); } @Override public int compare(WritableComparable a, WritableComparable b) { OrderBean abean = (OrderBean) a; OrderBean bbean = (OrderBean) b; //比較兩個bean時,指定只比較bean中的orderid return abean.getItemid().compareTo(bbean.getItemid()); } }
package cn.itcastcat.bigdata.secondarysort; import org.apache.hadoop.io.NullWritable; import org.apache.hadoop.mapreduce.Partitioner; public class ItemIdPartitioner extends Partitioner<OrderBean, NullWritable>{ @Override public int getPartition(OrderBean bean, NullWritable value, int numReduceTasks) { //相同id的訂單bean,會發往相同的partition //並且,產生的分區數,是會跟用戶設置的reduce task數保持一致 return (bean.getItemid().hashCode() & Integer.MAX_VALUE) % numReduceTasks; } }
package cn.itcastcat.bigdata.secondarysort; import java.io.IOException; import org.apache.commons.lang.StringUtils; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.DoubleWritable; import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.io.NullWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.Mapper; import org.apache.hadoop.mapreduce.Reducer; import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; import com.sun.xml.bind.v2.schemagen.xmlschema.List; /** * * @author * */ public class SecondarySort { static class SecondarySortMapper extends Mapper<LongWritable, Text, OrderBean, NullWritable>{ OrderBean bean = new OrderBean(); @Override protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException { String line = value.toString(); String[] fields = StringUtils.split(line, ","); bean.set(new Text(fields[0]), new DoubleWritable(Double.parseDouble(fields[2]))); context.write(bean, NullWritable.get()); } } static class SecondarySortReducer extends Reducer<OrderBean, NullWritable, OrderBean, NullWritable>{ //到達reduce時,相同id的全部bean已經被當作一組,且金額最大的那個一排在第一位 @Override protected void reduce(OrderBean key, Iterable<NullWritable> values, Context context) throws IOException, InterruptedException { context.write(key, NullWritable.get()); } } public static void main(String[] args) throws Exception { Configuration conf = new Configuration(); Job job = Job.getInstance(conf); job.setJarByClass(SecondarySort.class); job.setMapperClass(SecondarySortMapper.class); job.setReducerClass(SecondarySortReducer.class); job.setOutputKeyClass(OrderBean.class); job.setOutputValueClass(NullWritable.class); FileInputFormat.setInputPaths(job, new Path("D:\\srcdata\\d10\\orders.txt")); FileOutputFormat.setOutputPath(job, new Path("D:/srcdata/d10/gpoutput")); //在此設置自定義的Groupingcomparator類 job.setGroupingComparatorClass(ItemidGroupingComparator.class); //在此設置自定義的partitioner類 job.setPartitionerClass(ItemIdPartitioner.class); job.setNumReduceTasks(2); job.waitForCompletion(true); } }