問題:html
有以下數據文件 city.txt (id, city, value)java
cat city.txt
1 wh 500
2 bj 600
3 wh 100
4 sh 400
5 wh 200
6 bj 100
7 sh 200
8 bj 300
9 sh 900
須要按 city 分組聚合,而後從每組數據中取出前兩條value最大的記錄。mysql
一、這是實際業務中常常會遇到的 group TopK 問題,下面來看看 pig 如何解決:sql
a = load '/data/city.txt' using PigStorage(' ') as (id:chararray, city:chararray, value:int); b = group a by city; c = foreach b {c1=order a by value desc; c2=limit c1 2; generate group,c2.value;}; d = stream c through `sed 's/[(){}]//g'`; dump d;
結果:apache
(bj,600,300) (sh,900,400) (wh,500,200)
這幾行代碼其實也實現了mysql中的 group_concat 函數的功能:segmentfault
a = load '/data/city.txt' using PigStorage(' ') as (id:chararray, city:chararray, value:int); b = group a by city; c = foreach b {c1=order a by value desc; generate group,c1.value;}; d = stream c through `sed 's/[(){}]//g'`; dump d;
結果:app
(bj,600,300,100) (sh,900,400,200) (wh,500,200,100)
二、下面咱們再來看看hive如何處理group topk的問題:函數
本質上HSQL和sql有不少相同的地方,但HSQL目前功能還有不少缺失,至少不如原生態的SQL功能強大,oop
比起PIG也有些差距,若是SQL中這類分組topk的問題如何解決呢?測試
select * from city a where 2>(select count(1) from city where cname=a.cname and value>a.value) distribute by a.cname sort by a.cname,a.value desc;
http://my.oschina.net/leejun2005/blog/78904
可是這種寫法在HQL中直接報語法錯誤了,下面咱們只能用hive udf的思路來解決了:
排序city和value,而後對city計數,最後where過濾掉city列計數器大於k的行便可。
好了,上代碼:
(1)定義UDF:
package com.example.hive.udf; import org.apache.hadoop.hive.ql.exec.UDF; public final class Rank extends UDF{ private int counter; private String last_key; public int evaluate(final String key){ if ( !key.equalsIgnoreCase(this.last_key) ) { this.counter = 0; this.last_key = key; } return this.counter++; } }
(2)註冊jar、建表、導數據,查詢:
add jar Rank.jar; create temporary function rank as 'com.example.hive.udf.Rank'; create table city(id int,cname string,value int) row format delimited fields terminated by ' '; LOAD DATA LOCAL INPATH 'city.txt' OVERWRITE INTO TABLE city; select cname, value from ( select cname,rank(cname) csum,value from ( select id, cname, value from city distribute by cname sort by cname,value desc )a )b where csum < 2;
(3)結果:
bj 600 bj 300 sh 900 sh 400 wh 500 wh 200
能夠看到,hive相比pig來講,處理起來稍微複雜了點,但隨着hive的日漸完善,之後比pig更簡潔也說不定。
REF:hive中分組取前N個值的實現
http://baiyunl.iteye.com/blog/1466343
三、最後咱們來看一下原生態的MR:
import java.io.IOException; import java.util.TreeSet; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.IntWritable; import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.Mapper; import org.apache.hadoop.mapreduce.Reducer; import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; import org.apache.hadoop.util.GenericOptionsParser; public class GroupTopK { // 這個 MR 將會取得每組年齡中 id 最大的前 3 個 // 測試數據由腳本生成:http://my.oschina.net/leejun2005/blog/76631 public static class GroupTopKMapper extends Mapper<LongWritable, Text, IntWritable, LongWritable> { IntWritable outKey = new IntWritable(); LongWritable outValue = new LongWritable(); String[] valArr = null; public void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException { valArr = value.toString().split("\t"); outKey.set(Integer.parseInt(valArr[2]));// age int outValue.set(Long.parseLong(valArr[0]));// id long context.write(outKey, outValue); } } public static class GroupTopKReducer extends Reducer<IntWritable, LongWritable, IntWritable, LongWritable> { LongWritable outValue = new LongWritable(); public void reduce(IntWritable key, Iterable<LongWritable> values, Context context) throws IOException, InterruptedException { TreeSet<Long> idTreeSet = new TreeSet<Long>(); for (LongWritable val : values) { idTreeSet.add(val.get()); if (idTreeSet.size() > 3) { idTreeSet.remove(idTreeSet.first()); } } for (Long id : idTreeSet) { outValue.set(id); context.write(key, outValue); } } } public static void main(String[] args) throws Exception { Configuration conf = new Configuration(); String[] otherArgs = new GenericOptionsParser(conf, args) .getRemainingArgs(); System.out.println(otherArgs.length); System.out.println(otherArgs[0]); System.out.println(otherArgs[1]); if (otherArgs.length != 3) { System.err.println("Usage: GroupTopK <in> <out>"); System.exit(2); } Job job = new Job(conf, "GroupTopK"); job.setJarByClass(GroupTopK.class); job.setMapperClass(GroupTopKMapper.class); job.setReducerClass(GroupTopKReducer.class); job.setNumReduceTasks(1); job.setOutputKeyClass(IntWritable.class); job.setOutputValueClass(LongWritable.class); FileInputFormat.addInputPath(job, new Path(otherArgs[1])); FileOutputFormat.setOutputPath(job, new Path(otherArgs[2])); System.exit(job.waitForCompletion(true) ? 0 : 1); } }
hadoop jar GroupTopK.jar GroupTopK /tmp/decli/record_new.txt /tmp/1
結果:
hadoop fs -cat /tmp/1/part-r-00000
0 12869695
0 12869971
0 12869976
1 12869813
1 12869870
1 12869951
......
數據驗證:
awk '$3==0{print $1}' record_new.txt|sort -nr|head -3
12869976
12869971
12869695
能夠看到結果沒有問題。
注:測試數據由如下腳本生成:
http://my.oschina.net/leejun2005/blog/76631
PS:
若是說hive相似sql的話,那pig就相似plsql存儲過程了:程序編寫更自由,邏輯能處理的更強大了。
pig中還能直接經過反射調用java的靜態類中的方法,這塊內容請參考以前的相關pig博文。
附幾個HIVE UDAF連接,有興趣的同窗本身看下:
Hive UDAF和UDTF實現group by後獲取top值 http://blog.csdn.net/liuzhoulong/article/details/7789183
hive中自定義函數(UDAF)實現多行字符串拼接爲一行 http://blog.sina.com.cn/s/blog_6ff05a2c0100tjw4.html
編寫Hive UDAF http://www.fuzhijie.me/?p=118
Hive UDAF開發 http://richiehu.blog.51cto.com/2093113/386113
用Spark解決一些經典MapReduce問題 https://segmentfault.com/a/1190000007649903