大數據教程(10.4)訂單中成交金額最大的訂單項分析

    上一篇博客分享了日誌清洗的實現,本篇文章博主將爲小夥伴們分享"訂單中成交金額最大的訂單項分析"的案例。java

    1、需求         apache

           分析出下圖中每一個訂單中成交金額最大的訂單項centos

#訂單號,商品號,成交金額    
Order_0000001,Pdt_01,222.8
Order_0000001,Pdt_05,25.8
Order_0000002,Pdt_05,325.8
Order_0000002,Pdt_03,522.8
Order_0000002,Pdt_04,122.4
Order_0000003,Pdt_01,222.8

    2、代碼實現服務器

          OrderBean(訂單bean類實現)app

package com.empire.hadoop.mr.secondarysort;

import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;

import org.apache.hadoop.io.DoubleWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.io.WritableComparable;

/**
 * 類 OrderBean.java的實現描述:訂單bean類
 * 
 * @author arron 2018年12月23日 上午12:22:31
 */
public class OrderBean implements WritableComparable<OrderBean> {

    private Text           itemid;
    private DoubleWritable amount;

    public OrderBean() {
    }

    public OrderBean(Text itemid, DoubleWritable amount) {
        set(itemid, amount);

    }

    public void set(Text itemid, DoubleWritable amount) {

        this.itemid = itemid;
        this.amount = amount;

    }

    public Text getItemid() {
        return itemid;
    }

    public DoubleWritable getAmount() {
        return amount;
    }

    @Override
    public int compareTo(OrderBean o) {
        int cmp = this.itemid.compareTo(o.getItemid());
        if (cmp == 0) {
            cmp = -this.amount.compareTo(o.getAmount());
        }
        return cmp;
    }

    @Override
    public void write(DataOutput out) throws IOException {
        out.writeUTF(itemid.toString());
        out.writeDouble(amount.get());

    }

    @Override
    public void readFields(DataInput in) throws IOException {
        String readUTF = in.readUTF();
        double readDouble = in.readDouble();

        this.itemid = new Text(readUTF);
        this.amount = new DoubleWritable(readDouble);
    }

    @Override
    public String toString() {

        return itemid.toString() + "\t" + amount.get();

    }

}

            ItemIdPartitioner(訂單bean分區類實現)框架

package com.empire.hadoop.mr.secondarysort;

import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.mapreduce.Partitioner;

/**
 * 類 ItemIdPartitioner.java的實現描述:分區實現類
 * 
 * @author arron 2018年12月23日 上午12:24:02
 */
public class ItemIdPartitioner extends Partitioner<OrderBean, NullWritable> {

    @Override
    public int getPartition(OrderBean bean, NullWritable value, int numReduceTasks) {
        //相同id的訂單bean,會發往相同的partition
        //並且,產生的分區數,是會跟用戶設置的reduce task數保持一致
        return (bean.getItemid().hashCode() & Integer.MAX_VALUE) % numReduceTasks;

    }

}

           ItemidGroupingComparator(訂單bean分組實現)ide

 

package com.empire.hadoop.mr.secondarysort;

import org.apache.hadoop.io.WritableComparable;
import org.apache.hadoop.io.WritableComparator;

/**
 * 類
 * ItemidGroupingComparator.java的實現描述:利用reduce端的GroupingComparator來實現將一組bean當作相同的key
 * 
 * @author arron 2018年12月23日 上午12:23:35
 */
public class ItemidGroupingComparator extends WritableComparator {

    //傳入做爲key的bean的class類型,以及制定須要讓框架作反射獲取實例對象
    protected ItemidGroupingComparator() {
        super(OrderBean.class, true);
    }

    @Override
    public int compare(WritableComparable a, WritableComparable b) {
        OrderBean abean = (OrderBean) a;
        OrderBean bbean = (OrderBean) b;

        //比較兩個bean時,指定只比較bean中的orderid
        return abean.getItemid().compareTo(bbean.getItemid());

    }

}

            SecondarySort(訂單分析主程序實現)oop

package com.empire.hadoop.mr.secondarysort;

import java.io.IOException;

import org.apache.commons.lang.StringUtils;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.DoubleWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;

/**
 * 類 SecondarySort.java的實現描述:訂單處理主程序類
 * 
 * @author arron 2018年12月23日 上午12:22:57
 */
public class SecondarySort {

    static class SecondarySortMapper extends Mapper<LongWritable, Text, OrderBean, NullWritable> {

        OrderBean bean = new OrderBean();

        @Override
        protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {

            String line = value.toString();
            String[] fields = StringUtils.split(line, ",");

            bean.set(new Text(fields[0]), new DoubleWritable(Double.parseDouble(fields[2])));

            context.write(bean, NullWritable.get());

        }

    }

    static class SecondarySortReducer extends Reducer<OrderBean, NullWritable, OrderBean, NullWritable> {

        //到達reduce時,相同id的全部bean已經被當作一組,且金額最大的那個一排在第一位
        @Override
        protected void reduce(OrderBean key, Iterable<NullWritable> values, Context context)
                throws IOException, InterruptedException {
            context.write(key, NullWritable.get());
        }
    }

    public static void main(String[] args) throws Exception {

        Configuration conf = new Configuration();
        Job job = Job.getInstance(conf);

        job.setJarByClass(SecondarySort.class);

        job.setMapperClass(SecondarySortMapper.class);
        job.setReducerClass(SecondarySortReducer.class);

        job.setOutputKeyClass(OrderBean.class);
        job.setOutputValueClass(NullWritable.class);

        FileInputFormat.setInputPaths(job, new Path(args[0]));
        FileOutputFormat.setOutputPath(job, new Path(args[1]));

        //在此設置自定義的Groupingcomparator類 
        job.setGroupingComparatorClass(ItemidGroupingComparator.class);
        //在此設置自定義的partitioner類
        job.setPartitionerClass(ItemIdPartitioner.class);

        job.setNumReduceTasks(2);

        job.waitForCompletion(true);

    }

}

    3、運行程序大數據

#上傳jar

Alt+p
lcd d:/
put  ordergp.jar
put  ordergp.txt

#準備hadoop處理的數據文件

cd /home/hadoop
hadoop fs  -mkdir -p /ordergp/gpinput
hdfs dfs -put  ordergp.txt  /ordergp/gpinput


#運行程序

hadoop jar ordergp.jar  com.empire.hadoop.mr.secondarysort.SecondarySort /ordergp/gpinput /ordergp/gpoutput

    4、運行效果this

[hadoop@centos-aaron-h1 ~]$ hadoop jar ordergp.jar  com.empire.hadoop.mr.secondarysort.SecondarySort /ordergp/gpinput /ordergp/gpoutput   18/12/23 08:12:37 INFO client.RMProxy: Connecting to ResourceManager at centos-aaron-h1/192.168.29.144:8032
18/12/23 08:12:38 WARN mapreduce.JobResourceUploader: Hadoop command-line option parsing not performed. Implement the Tool interface and execute your application with ToolRunner to remedy this.
18/12/23 08:12:38 INFO mapreduce.JobSubmitter: number of splits:1
18/12/23 08:12:38 INFO Configuration.deprecation: yarn.resourcemanager.system-metrics-publisher.enabled is deprecated. Instead, use yarn.system-metrics-publisher.enabled
18/12/23 08:12:38 INFO mapreduce.JobSubmitter: Submitting tokens for job: job_1545512861141_0004
18/12/23 08:12:38 INFO impl.YarnClientImpl: Submitted application application_1545512861141_0004
18/12/23 08:12:38 INFO mapreduce.Job: The url to track the job: http://centos-aaron-h1:8088/proxy/application_1545512861141_0004/
18/12/23 08:12:38 INFO mapreduce.Job: Running job: job_1545512861141_0004
18/12/23 08:12:46 INFO mapreduce.Job: Job job_1545512861141_0004 running in uber mode : false
18/12/23 08:12:46 INFO mapreduce.Job:  map 0% reduce 0%
18/12/23 08:12:52 INFO mapreduce.Job:  map 100% reduce 0%
18/12/23 08:12:58 INFO mapreduce.Job:  map 100% reduce 100%
18/12/23 08:12:58 INFO mapreduce.Job: Job job_1545512861141_0004 completed successfully
18/12/23 08:12:58 INFO mapreduce.Job: Counters: 49
        File System Counters
                FILE: Number of bytes read=162
                FILE: Number of bytes written=593176
                FILE: Number of read operations=0
                FILE: Number of large read operations=0
                FILE: Number of write operations=0
                HDFS: Number of bytes read=285
                HDFS: Number of bytes written=60
                HDFS: Number of read operations=9
                HDFS: Number of large read operations=0
                HDFS: Number of write operations=4
        Job Counters 
                Launched map tasks=1
                Launched reduce tasks=2
                Data-local map tasks=1
                Total time spent by all maps in occupied slots (ms)=4568
                Total time spent by all reduces in occupied slots (ms)=6712
                Total time spent by all map tasks (ms)=4568
                Total time spent by all reduce tasks (ms)=6712
                Total vcore-milliseconds taken by all map tasks=4568
                Total vcore-milliseconds taken by all reduce tasks=6712
                Total megabyte-milliseconds taken by all map tasks=4677632
                Total megabyte-milliseconds taken by all reduce tasks=6873088
        Map-Reduce Framework
                Map input records=6
                Map output records=6
                Map output bytes=138
                Map output materialized bytes=162
                Input split bytes=120
                Combine input records=0
                Combine output records=0
                Reduce input groups=3
                Reduce shuffle bytes=162
                Reduce input records=6
                Reduce output records=3
                Spilled Records=12
                Shuffled Maps =2
                Failed Shuffles=0
                Merged Map outputs=2
                GC time elapsed (ms)=269
                CPU time spent (ms)=2620
                Physical memory (bytes) snapshot=454995968
                Virtual memory (bytes) snapshot=2544693248
                Total committed heap usage (bytes)=154140672
        Shuffle Errors
                BAD_ID=0
                CONNECTION=0
                IO_ERROR=0
                WRONG_LENGTH=0
                WRONG_MAP=0
                WRONG_REDUCE=0
        File Input Format Counters 
                Bytes Read=165
        File Output Format Counters 
                Bytes Written=60
[hadoop@centos-aaron-h1 ~]$

    5、運行結果

[hadoop@centos-aaron-h1 ~]$ hdfs dfs -ls /ordergp/gpoutput  
Found 3 items
-rw-r--r--   2 hadoop supergroup          0 2018-12-23 08:12 /ordergp/gpoutput/_SUCCESS
-rw-r--r--   2 hadoop supergroup         20 2018-12-23 08:12 /ordergp/gpoutput/part-r-00000
-rw-r--r--   2 hadoop supergroup         40 2018-12-23 08:12 /ordergp/gpoutput/part-r-00001
[hadoop@centos-aaron-h1 ~]$ hdfs dfs -cat  /ordergp/gpoutput/part-r-00000
Order_0000002   522.8
[hadoop@centos-aaron-h1 ~]$ hdfs dfs -cat  /ordergp/gpoutput/part-r-00001
Order_0000001   222.8
Order_0000003   222.8
[hadoop@centos-aaron-h1 ~]$

    最後寄語,以上是博主本次文章的所有內容,若是你們以爲博主的文章還不錯,請點贊;若是您對博主其它服務器大數據技術或者博主本人感興趣,請關注博主博客,而且歡迎隨時跟博主溝通交流。

相關文章
相關標籤/搜索