Hive實戰:加載mapreduce生成的snappy壓縮文件到Hive表

咱們知道hadoop支持不少壓縮格式,有時候爲了節省存儲空間咱們常常講mapreduce任務生成的結果數據保存成壓縮文件,snappy 壓縮是比較經常使用的存儲加壓縮格式,這裏講解如何經過mapreduce任務生成snappy壓縮結果集,將snappy結果數據集,經過load方式加載到Hive數據表中。
java


    舉例說明一下,咱們這一份訂單流水文件,裏面有三列:消費者,商品,購買數量,咱們經過mapreduce任務生成一份統計結果,而後load到hive數據表中,統計結果格式:consumer_product,購買數量
apache


    1.數據準備,mapreduce輸入數據input1.txt,三列消費者id,產品id,購買數量:
app

[root@salver158 ~]# hadoop fs -text /tmp/input/input1.txtconsumer1:product1:1consumer2:product2:2consumer2:product3:3consumer1:product1:3consumer1:product3:2consumer1:product1:1consumer1:product2:1consumer2:product2:2consumer1:product1:3consumer2:product3:3consumer1:product1:2consumer1:product2:1

2.自定義map類:ide

protected void map(Object key, Text value, Context context) throws IOException, InterruptedException {
           String[] lines = value.toString().split(":");
           /*購買數量id*/            int  count=Integer.valueOf(lines[2]);                        System.out.println("消費者: "+lines[0]);            System.out.println("商品: "+lines[1]);            System.out.println("購買數量: "+count);                        String mapKey=lines[0]+","+lines[1];            //拼接key:消費者id_商品id   value爲 購買數量:count,            context.write(new Text(mapKey), new IntWritable(count));        }    }


3.自定義reduce類,這裏直接用的wordcount裏面的reduce沒作改動:
函數

 //參數同Map同樣,依次表示是輸入鍵類型,輸入值類型,輸出鍵類型,輸出值類型。    // 這裏的輸入是來源於map,因此類型要與map的輸出類型對應 。    public static class MyReducer extends Reducer<Text, IntWritable, Text, IntWritable> {        private IntWritable result = new IntWritable();
       @Override        protected void reduce(Text key, Iterable<IntWritable> values, Context context)                throws IOException, InterruptedException {            int sum = 0;
           //for循環遍歷,將獲得的values值累加,統計購買次數            for (IntWritable value : values) {                sum += value.get();            }            result.set(sum);            context.write(key, result);//將結果保存到context中,最終輸出形式爲"key" + "result"        }    }

4.這裏整個主函數以下:oop

package com.hadoop.ljs;import org.apache.hadoop.conf.Configuration;import org.apache.hadoop.fs.Path;import org.apache.hadoop.io.IntWritable;import org.apache.hadoop.io.Text;import org.apache.hadoop.mapreduce.Job;import org.apache.hadoop.mapreduce.Mapper;import org.apache.hadoop.mapreduce.Reducer;import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;import org.apache.hadoop.util.GenericOptionsParser;import java.io.IOException;/** * @author: Created By lujisen * @company ChinaUnicom Software JiNan * @date: 2020-03-10 10:54 * @version: v1.0 * @description: com.hadoop.ljs */public class TradeCount {    public static class MyMapper extends Mapper<Object, Text, Text, IntWritable> {        //這裏的IntWritable至關於Int類型        //Text至關於String類型        // map參數<keyIn key,valueIn value,Context context>,將處理後的數據寫入context並傳給reduce        protected void map(Object key, Text value, Context context) throws IOException, InterruptedException {
            String[] lines = value.toString().split(":");            /*購買數量id*/            int  count=Integer.valueOf(lines[2]);
           System.out.println("消費者: "+lines[0]);            System.out.println("商品: "+lines[1]);            System.out.println("購買數量: "+count);
           String mapKey=lines[0]+","+lines[1];            //拼接key:消費者id_商品id   value爲 購買數量:count,            context.write(new Text(mapKey), new IntWritable(count));        }    }    //參數同Map同樣,依次表示是輸入鍵類型,輸入值類型,輸出鍵類型,輸出值類型。    // 這裏的輸入是來源於map,因此類型要與map的輸出類型對應 。    public static class MyReducer extends Reducer<Text, IntWritable, Text, IntWritable> {        private IntWritable result = new IntWritable();        @Override        protected void reduce(Text key, Iterable<IntWritable> values, Context context)                throws IOException, InterruptedException {            int sum = 0;
           //for循環遍歷,將獲得的values值累加,統計購買次數            for (IntWritable value : values) {                sum += value.get();            }            result.set(sum);            context.write(key, result);//將結果保存到context中,最終輸出形式爲"key" + "result"        }    }    public static void main(String[] args) throws IOExceptionClassNotFoundExceptionInterruptedException {        Configuration configuration = new Configuration();    /*    *//*設置map端使用snappy壓縮*//*        configuration.set("mapreduce.map.output.compress","true");        configuration.set("mapreduce.map.output.compress.codec","org.apache.hadoop.io.compress.SnappyCodec");        *//*設置reduce端使用snappy壓縮*//*        configuration.set("mapreduce.output.fileoutputformat.compress","true");        configuration.set("mapreduce.output.fileoutputformat.compress.type","RECORD");        configuration.set("mapreduce.output.fileoutputformat.compress.codec","org.apache.hadoop.io.compress.SnappyCodec");*/      /*  *//*設置Reduce端分隔符*//*        configuration.set("mapreduce.output.textoutputformat.separator","@@@");*/        /*或者經過下面這個屬性設置*/        /*configuration.set("mapred.textoutputformat.separator",",");*/                        String[] otherArgs = new GenericOptionsParser(configuration, args).getRemainingArgs();        if (otherArgs.length < 2) {            System.err.println("Usage: wordcount <in> [<in>...] <out>");            System.exit(2);        }        Job job = Job.getInstance(configuration, "TradeCount");        job.setJarByClass(TradeCount.class);        job.setMapperClass(MyMapper.class);        job.setCombinerClass(MyReducer.class);        job.setReducerClass(MyReducer.class);        job.setOutputKeyClass(Text.class);        job.setOutputValueClass(IntWritable.class);        for (int i = 0; i < otherArgs.length - 1; ++i) {            FileInputFormat.addInputPath(job, new Path(otherArgs[i]));        }        FileOutputFormat.setOutputPath(job,                new Path(otherArgs[otherArgs.length - 1]));        System.exit(job.waitForCompletion(true) ? 0 : 1);    }}

6.mvn打成jar包,經過hadoop jar提交,執行以下命令:
spa

hadoop jar /root/hadoop273-1.0-SNAPSHOT-jar-with-dependencies.jar     \-Dmapreduce.map.output.compress=true   \  -Dmapreduce.map.output.compress.codec=org.apache.hadoop.io.compress.SnappyCodec  \-Dmapreduce.output.fileoutputformat.compress=true  \-Dmapreduce.output.fileoutputformat.compress.type=RECORD \-Dmapreduce.output.textoutputformat.separator=","   \-Dmapreduce.output.fileoutputformat.compress.codec=org.apache.hadoop.io.compress.SnappyCodec  \/tmp/input   /tmp/output2

    我輸入的參數比較多,map端的壓縮能夠不設置,只設置reduce便可,你能夠直接在代碼中進行設置,這裏只是讓你們熟悉下各個參數:
code

    1).map端設置snappy壓縮:
orm

configuration.set("mapreduce.map.output.compress","true");configuration.set("mapreduce.map.output.compress.codec","org.apache.hadoop.io.compress.SnappyCodec");

    2).reduce端設置snappy壓縮:ip

configuration.set("mapreduce.output.fileoutputformat.compress","true");configuration.set("mapreduce.output.fileoutputformat.compress.type","RECORD");configuration.set("mapreduce.output.fileoutputformat.compress.codec","org.apache.hadoop.io.compress.SnappyCodec");

    3).指定reduce端分隔符,默認爲製表符:

 /*設置Reduce端分隔符*/configuration.set("mapreduce.output.textoutputformat.separator",",");  /*或者經過下面這個屬性設置*//*configuration.set("mapred.textoutputformat.separator",",");*/


7.生成結果以下,因爲我上面的命令,加了-D後面的幾個參數,對結果集作了壓縮,因此生成的結果集是.snappy文件,以下:

[root@salver158 ~]# hadoop fs -ls /tmp/output2Found 2 items-rw-r--r--   3 hbase hdfs          0 2020-03-10 12:32 /tmp/output2/_SUCCESS-rw-r--r--   3 hbase hdfs         70 2020-03-10 12:32 /tmp/output2/part-r-00000.snappy[root@salver158 ~]# hadoop fs -text /tmp/output2/part-r-00000.snappy20/03/10 14:06:11 INFO compress.CodecPool: Got brand-new decompressor [.snappy]consumer1,product1,10consumer1,product2,2consumer1,product3,2consumer2,product2,4consumer2,product3,6

8.新建hive表,執行命令:

hive> create table table8(name string,product string,count int )ROW FORMAT DELIMITED FIELDS TERMINATED BY ',' ;

9.加載數據到數據表中,執行命令:

hive> load data inpath '/tmp/output2/part-r-00000.snappy'  into table table8;Loading data to table lujs.table8Table lujs.table8 stats: [numFiles=1, numRows=0, totalSize=70, rawDataSize=0]OKTime taken: 1.018 seconds
hive> select * from table8;OKconsumer1  product1  10consumer1  product2  2consumer1  product3  2consumer2  product2  4consumer2  product3  6Time taken: 0.295 seconds, Fetched: 5 row(s)

 

    至此,Hive處理mapreduce任務生成的snappy結果集講解完畢,我這裏只是簡單的作了一個wordcount,你的業務邏輯確定比這個要複雜不少,只須要在map或者reduce端自定義本身的業務邏輯便可,思路都是一致的。

相關文章
相關標籤/搜索