咱們知道hadoop支持不少壓縮格式,有時候爲了節省存儲空間咱們常常講mapreduce任務生成的結果數據保存成壓縮文件,snappy 壓縮是比較經常使用的存儲加壓縮格式,這裏講解如何經過mapreduce任務生成snappy壓縮結果集,將snappy結果數據集,經過load方式加載到Hive數據表中。
java
舉例說明一下,咱們這一份訂單流水文件,裏面有三列:消費者,商品,購買數量,咱們經過mapreduce任務生成一份統計結果,而後load到hive數據表中,統計結果格式:consumer_product,購買數量
apache
1.數據準備,mapreduce輸入數據input1.txt,三列消費者id,產品id,購買數量:
app
[root@salver158 ~]# hadoop fs -text /tmp/input/input1.txtconsumer1:product1:1consumer2:product2:2consumer2:product3:3consumer1:product1:3consumer1:product3:2consumer1:product1:1consumer1:product2:1consumer2:product2:2consumer1:product1:3consumer2:product3:3consumer1:product1:2consumer1:product2:1
2.自定義map類:ide
protected void map(Object key, Text value, Context context) throws IOException, InterruptedException {
String[] lines = value.toString().split(":");
/*購買數量id*/ int count=Integer.valueOf(lines[2]); System.out.println("消費者: "+lines[0]); System.out.println("商品: "+lines[1]); System.out.println("購買數量: "+count); String mapKey=lines[0]+","+lines[1]; //拼接key:消費者id_商品id value爲 購買數量:count, context.write(new Text(mapKey), new IntWritable(count)); } }
3.自定義reduce類,這裏直接用的wordcount裏面的reduce沒作改動:
函數
//參數同Map同樣,依次表示是輸入鍵類型,輸入值類型,輸出鍵類型,輸出值類型。 // 這裏的輸入是來源於map,因此類型要與map的輸出類型對應 。 public static class MyReducer extends Reducer<Text, IntWritable, Text, IntWritable> { private IntWritable result = new IntWritable();
@Override protected void reduce(Text key, Iterable<IntWritable> values, Context context) throws IOException, InterruptedException { int sum = 0;
//for循環遍歷,將獲得的values值累加,統計購買次數 for (IntWritable value : values) { sum += value.get(); } result.set(sum); context.write(key, result);//將結果保存到context中,最終輸出形式爲"key" + "result" } }
4.這裏整個主函數以下:oop
package com.hadoop.ljs;import org.apache.hadoop.conf.Configuration;import org.apache.hadoop.fs.Path;import org.apache.hadoop.io.IntWritable;import org.apache.hadoop.io.Text;import org.apache.hadoop.mapreduce.Job;import org.apache.hadoop.mapreduce.Mapper;import org.apache.hadoop.mapreduce.Reducer;import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;import org.apache.hadoop.util.GenericOptionsParser;import java.io.IOException;/** * @author: Created By lujisen * @company ChinaUnicom Software JiNan * @date: 2020-03-10 10:54 * @version: v1.0 * @description: com.hadoop.ljs */public class TradeCount { public static class MyMapper extends Mapper<Object, Text, Text, IntWritable> { //這裏的IntWritable至關於Int類型 //Text至關於String類型 // map參數<keyIn key,valueIn value,Context context>,將處理後的數據寫入context並傳給reduce protected void map(Object key, Text value, Context context) throws IOException, InterruptedException {
String[] lines = value.toString().split(":"); /*購買數量id*/ int count=Integer.valueOf(lines[2]);
System.out.println("消費者: "+lines[0]); System.out.println("商品: "+lines[1]); System.out.println("購買數量: "+count);
String mapKey=lines[0]+","+lines[1]; //拼接key:消費者id_商品id value爲 購買數量:count, context.write(new Text(mapKey), new IntWritable(count)); } } //參數同Map同樣,依次表示是輸入鍵類型,輸入值類型,輸出鍵類型,輸出值類型。 // 這裏的輸入是來源於map,因此類型要與map的輸出類型對應 。 public static class MyReducer extends Reducer<Text, IntWritable, Text, IntWritable> { private IntWritable result = new IntWritable(); @Override protected void reduce(Text key, Iterable<IntWritable> values, Context context) throws IOException, InterruptedException { int sum = 0;
//for循環遍歷,將獲得的values值累加,統計購買次數 for (IntWritable value : values) { sum += value.get(); } result.set(sum); context.write(key, result);//將結果保存到context中,最終輸出形式爲"key" + "result" } } public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException { Configuration configuration = new Configuration(); /* *//*設置map端使用snappy壓縮*//* configuration.set("mapreduce.map.output.compress","true"); configuration.set("mapreduce.map.output.compress.codec","org.apache.hadoop.io.compress.SnappyCodec"); *//*設置reduce端使用snappy壓縮*//* configuration.set("mapreduce.output.fileoutputformat.compress","true"); configuration.set("mapreduce.output.fileoutputformat.compress.type","RECORD"); configuration.set("mapreduce.output.fileoutputformat.compress.codec","org.apache.hadoop.io.compress.SnappyCodec");*/ /* *//*設置Reduce端分隔符*//* configuration.set("mapreduce.output.textoutputformat.separator","@@@");*/ /*或者經過下面這個屬性設置*/ /*configuration.set("mapred.textoutputformat.separator",",");*/ String[] otherArgs = new GenericOptionsParser(configuration, args).getRemainingArgs(); if (otherArgs.length < 2) { System.err.println("Usage: wordcount <in> [<in>...] <out>"); System.exit(2); } Job job = Job.getInstance(configuration, "TradeCount"); job.setJarByClass(TradeCount.class); job.setMapperClass(MyMapper.class); job.setCombinerClass(MyReducer.class); job.setReducerClass(MyReducer.class); job.setOutputKeyClass(Text.class); job.setOutputValueClass(IntWritable.class); for (int i = 0; i < otherArgs.length - 1; ++i) { FileInputFormat.addInputPath(job, new Path(otherArgs[i])); } FileOutputFormat.setOutputPath(job, new Path(otherArgs[otherArgs.length - 1])); System.exit(job.waitForCompletion(true) ? 0 : 1); }}
6.mvn打成jar包,經過hadoop jar提交,執行以下命令:
spa
hadoop jar /root/hadoop273-1.0-SNAPSHOT-jar-with-dependencies.jar \-Dmapreduce.map.output.compress=true \ -Dmapreduce.map.output.compress.codec=org.apache.hadoop.io.compress.SnappyCodec \-Dmapreduce.output.fileoutputformat.compress=true \-Dmapreduce.output.fileoutputformat.compress.type=RECORD \-Dmapreduce.output.textoutputformat.separator="," \-Dmapreduce.output.fileoutputformat.compress.codec=org.apache.hadoop.io.compress.SnappyCodec \/tmp/input /tmp/output2
我輸入的參數比較多,map端的壓縮能夠不設置,只設置reduce便可,你能夠直接在代碼中進行設置,這裏只是讓你們熟悉下各個參數:
code
1).map端設置snappy壓縮:
orm
configuration.set("mapreduce.map.output.compress","true");configuration.set("mapreduce.map.output.compress.codec","org.apache.hadoop.io.compress.SnappyCodec");
2).reduce端設置snappy壓縮:ip
configuration.set("mapreduce.output.fileoutputformat.compress","true");configuration.set("mapreduce.output.fileoutputformat.compress.type","RECORD");configuration.set("mapreduce.output.fileoutputformat.compress.codec","org.apache.hadoop.io.compress.SnappyCodec");
3).指定reduce端分隔符,默認爲製表符:
/*設置Reduce端分隔符*/configuration.set("mapreduce.output.textoutputformat.separator",","); /*或者經過下面這個屬性設置*//*configuration.set("mapred.textoutputformat.separator",",");*/
7.生成結果以下,因爲我上面的命令,加了-D後面的幾個參數,對結果集作了壓縮,因此生成的結果集是.snappy文件,以下:
[root@salver158 ~]# hadoop fs -ls /tmp/output2Found 2 items-rw-r--r-- 3 hbase hdfs 0 2020-03-10 12:32 /tmp/output2/_SUCCESS-rw-r--r-- 3 hbase hdfs 70 2020-03-10 12:32 /tmp/output2/part-r-00000.snappy[root@salver158 ~]# hadoop fs -text /tmp/output2/part-r-00000.snappy20/03/10 14:06:11 INFO compress.CodecPool: Got brand-new decompressor [.snappy]consumer1,product1,10consumer1,product2,2consumer1,product3,2consumer2,product2,4consumer2,product3,6
8.新建hive表,執行命令:
hive> create table table8(name string,product string,count int )ROW FORMAT DELIMITED FIELDS TERMINATED BY ',' ;
9.加載數據到數據表中,執行命令:
hive> load data inpath '/tmp/output2/part-r-00000.snappy' into table table8;Loading data to table lujs.table8Table lujs.table8 stats: [numFiles=1, numRows=0, totalSize=70, rawDataSize=0]OKTime taken: 1.018 seconds
hive> select * from table8;OKconsumer1 product1 10consumer1 product2 2consumer1 product3 2consumer2 product2 4consumer2 product3 6Time taken: 0.295 seconds, Fetched: 5 row(s)
至此,Hive處理mapreduce任務生成的snappy結果集講解完畢,我這裏只是簡單的作了一個wordcount,你的業務邏輯確定比這個要複雜不少,只須要在map或者reduce端自定義本身的業務邏輯便可,思路都是一致的。