上一篇博客分享了日誌清洗的實現,本篇文章博主將爲小夥伴們分享"訂單中成交金額最大的訂單項分析"的案例。java
1、需求 apache
分析出下圖中每一個訂單中成交金額最大的訂單項centos
#訂單號,商品號,成交金額 Order_0000001,Pdt_01,222.8 Order_0000001,Pdt_05,25.8 Order_0000002,Pdt_05,325.8 Order_0000002,Pdt_03,522.8 Order_0000002,Pdt_04,122.4 Order_0000003,Pdt_01,222.8
2、代碼實現服務器
OrderBean(訂單bean類實現)app
package com.empire.hadoop.mr.secondarysort; import java.io.DataInput; import java.io.DataOutput; import java.io.IOException; import org.apache.hadoop.io.DoubleWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.io.WritableComparable; /** * 類 OrderBean.java的實現描述:訂單bean類 * * @author arron 2018年12月23日 上午12:22:31 */ public class OrderBean implements WritableComparable<OrderBean> { private Text itemid; private DoubleWritable amount; public OrderBean() { } public OrderBean(Text itemid, DoubleWritable amount) { set(itemid, amount); } public void set(Text itemid, DoubleWritable amount) { this.itemid = itemid; this.amount = amount; } public Text getItemid() { return itemid; } public DoubleWritable getAmount() { return amount; } @Override public int compareTo(OrderBean o) { int cmp = this.itemid.compareTo(o.getItemid()); if (cmp == 0) { cmp = -this.amount.compareTo(o.getAmount()); } return cmp; } @Override public void write(DataOutput out) throws IOException { out.writeUTF(itemid.toString()); out.writeDouble(amount.get()); } @Override public void readFields(DataInput in) throws IOException { String readUTF = in.readUTF(); double readDouble = in.readDouble(); this.itemid = new Text(readUTF); this.amount = new DoubleWritable(readDouble); } @Override public String toString() { return itemid.toString() + "\t" + amount.get(); } }
ItemIdPartitioner(訂單bean分區類實現)框架
package com.empire.hadoop.mr.secondarysort; import org.apache.hadoop.io.NullWritable; import org.apache.hadoop.mapreduce.Partitioner; /** * 類 ItemIdPartitioner.java的實現描述:分區實現類 * * @author arron 2018年12月23日 上午12:24:02 */ public class ItemIdPartitioner extends Partitioner<OrderBean, NullWritable> { @Override public int getPartition(OrderBean bean, NullWritable value, int numReduceTasks) { //相同id的訂單bean,會發往相同的partition //並且,產生的分區數,是會跟用戶設置的reduce task數保持一致 return (bean.getItemid().hashCode() & Integer.MAX_VALUE) % numReduceTasks; } }
ItemidGroupingComparator(訂單bean分組實現)ide
package com.empire.hadoop.mr.secondarysort; import org.apache.hadoop.io.WritableComparable; import org.apache.hadoop.io.WritableComparator; /** * 類 * ItemidGroupingComparator.java的實現描述:利用reduce端的GroupingComparator來實現將一組bean當作相同的key * * @author arron 2018年12月23日 上午12:23:35 */ public class ItemidGroupingComparator extends WritableComparator { //傳入做爲key的bean的class類型,以及制定須要讓框架作反射獲取實例對象 protected ItemidGroupingComparator() { super(OrderBean.class, true); } @Override public int compare(WritableComparable a, WritableComparable b) { OrderBean abean = (OrderBean) a; OrderBean bbean = (OrderBean) b; //比較兩個bean時,指定只比較bean中的orderid return abean.getItemid().compareTo(bbean.getItemid()); } }
SecondarySort(訂單分析主程序實現)oop
package com.empire.hadoop.mr.secondarysort; import java.io.IOException; import org.apache.commons.lang.StringUtils; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.DoubleWritable; import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.io.NullWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.Mapper; import org.apache.hadoop.mapreduce.Reducer; import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; /** * 類 SecondarySort.java的實現描述:訂單處理主程序類 * * @author arron 2018年12月23日 上午12:22:57 */ public class SecondarySort { static class SecondarySortMapper extends Mapper<LongWritable, Text, OrderBean, NullWritable> { OrderBean bean = new OrderBean(); @Override protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException { String line = value.toString(); String[] fields = StringUtils.split(line, ","); bean.set(new Text(fields[0]), new DoubleWritable(Double.parseDouble(fields[2]))); context.write(bean, NullWritable.get()); } } static class SecondarySortReducer extends Reducer<OrderBean, NullWritable, OrderBean, NullWritable> { //到達reduce時,相同id的全部bean已經被當作一組,且金額最大的那個一排在第一位 @Override protected void reduce(OrderBean key, Iterable<NullWritable> values, Context context) throws IOException, InterruptedException { context.write(key, NullWritable.get()); } } public static void main(String[] args) throws Exception { Configuration conf = new Configuration(); Job job = Job.getInstance(conf); job.setJarByClass(SecondarySort.class); job.setMapperClass(SecondarySortMapper.class); job.setReducerClass(SecondarySortReducer.class); job.setOutputKeyClass(OrderBean.class); job.setOutputValueClass(NullWritable.class); FileInputFormat.setInputPaths(job, new Path(args[0])); FileOutputFormat.setOutputPath(job, new Path(args[1])); //在此設置自定義的Groupingcomparator類 job.setGroupingComparatorClass(ItemidGroupingComparator.class); //在此設置自定義的partitioner類 job.setPartitionerClass(ItemIdPartitioner.class); job.setNumReduceTasks(2); job.waitForCompletion(true); } }
3、運行程序大數據
#上傳jar Alt+p lcd d:/ put ordergp.jar put ordergp.txt #準備hadoop處理的數據文件 cd /home/hadoop hadoop fs -mkdir -p /ordergp/gpinput hdfs dfs -put ordergp.txt /ordergp/gpinput #運行程序 hadoop jar ordergp.jar com.empire.hadoop.mr.secondarysort.SecondarySort /ordergp/gpinput /ordergp/gpoutput
4、運行效果this
[hadoop@centos-aaron-h1 ~]$ hadoop jar ordergp.jar com.empire.hadoop.mr.secondarysort.SecondarySort /ordergp/gpinput /ordergp/gpoutput 18/12/23 08:12:37 INFO client.RMProxy: Connecting to ResourceManager at centos-aaron-h1/192.168.29.144:8032 18/12/23 08:12:38 WARN mapreduce.JobResourceUploader: Hadoop command-line option parsing not performed. Implement the Tool interface and execute your application with ToolRunner to remedy this. 18/12/23 08:12:38 INFO mapreduce.JobSubmitter: number of splits:1 18/12/23 08:12:38 INFO Configuration.deprecation: yarn.resourcemanager.system-metrics-publisher.enabled is deprecated. Instead, use yarn.system-metrics-publisher.enabled 18/12/23 08:12:38 INFO mapreduce.JobSubmitter: Submitting tokens for job: job_1545512861141_0004 18/12/23 08:12:38 INFO impl.YarnClientImpl: Submitted application application_1545512861141_0004 18/12/23 08:12:38 INFO mapreduce.Job: The url to track the job: http://centos-aaron-h1:8088/proxy/application_1545512861141_0004/ 18/12/23 08:12:38 INFO mapreduce.Job: Running job: job_1545512861141_0004 18/12/23 08:12:46 INFO mapreduce.Job: Job job_1545512861141_0004 running in uber mode : false 18/12/23 08:12:46 INFO mapreduce.Job: map 0% reduce 0% 18/12/23 08:12:52 INFO mapreduce.Job: map 100% reduce 0% 18/12/23 08:12:58 INFO mapreduce.Job: map 100% reduce 100% 18/12/23 08:12:58 INFO mapreduce.Job: Job job_1545512861141_0004 completed successfully 18/12/23 08:12:58 INFO mapreduce.Job: Counters: 49 File System Counters FILE: Number of bytes read=162 FILE: Number of bytes written=593176 FILE: Number of read operations=0 FILE: Number of large read operations=0 FILE: Number of write operations=0 HDFS: Number of bytes read=285 HDFS: Number of bytes written=60 HDFS: Number of read operations=9 HDFS: Number of large read operations=0 HDFS: Number of write operations=4 Job Counters Launched map tasks=1 Launched reduce tasks=2 Data-local map tasks=1 Total time spent by all maps in occupied slots (ms)=4568 Total time spent by all reduces in occupied slots (ms)=6712 Total time spent by all map tasks (ms)=4568 Total time spent by all reduce tasks (ms)=6712 Total vcore-milliseconds taken by all map tasks=4568 Total vcore-milliseconds taken by all reduce tasks=6712 Total megabyte-milliseconds taken by all map tasks=4677632 Total megabyte-milliseconds taken by all reduce tasks=6873088 Map-Reduce Framework Map input records=6 Map output records=6 Map output bytes=138 Map output materialized bytes=162 Input split bytes=120 Combine input records=0 Combine output records=0 Reduce input groups=3 Reduce shuffle bytes=162 Reduce input records=6 Reduce output records=3 Spilled Records=12 Shuffled Maps =2 Failed Shuffles=0 Merged Map outputs=2 GC time elapsed (ms)=269 CPU time spent (ms)=2620 Physical memory (bytes) snapshot=454995968 Virtual memory (bytes) snapshot=2544693248 Total committed heap usage (bytes)=154140672 Shuffle Errors BAD_ID=0 CONNECTION=0 IO_ERROR=0 WRONG_LENGTH=0 WRONG_MAP=0 WRONG_REDUCE=0 File Input Format Counters Bytes Read=165 File Output Format Counters Bytes Written=60 [hadoop@centos-aaron-h1 ~]$
5、運行結果
[hadoop@centos-aaron-h1 ~]$ hdfs dfs -ls /ordergp/gpoutput Found 3 items -rw-r--r-- 2 hadoop supergroup 0 2018-12-23 08:12 /ordergp/gpoutput/_SUCCESS -rw-r--r-- 2 hadoop supergroup 20 2018-12-23 08:12 /ordergp/gpoutput/part-r-00000 -rw-r--r-- 2 hadoop supergroup 40 2018-12-23 08:12 /ordergp/gpoutput/part-r-00001 [hadoop@centos-aaron-h1 ~]$ hdfs dfs -cat /ordergp/gpoutput/part-r-00000 Order_0000002 522.8 [hadoop@centos-aaron-h1 ~]$ hdfs dfs -cat /ordergp/gpoutput/part-r-00001 Order_0000001 222.8 Order_0000003 222.8 [hadoop@centos-aaron-h1 ~]$
最後寄語,以上是博主本次文章的所有內容,若是你們以爲博主的文章還不錯,請點贊;若是您對博主其它服務器大數據技術或者博主本人感興趣,請關注博主博客,而且歡迎隨時跟博主溝通交流。