Pig、Hive、MapReduce 解決分組 Top K 問題

問題:html

有以下數據文件 city.txt (id, city, value)java

cat city.txt 
1 wh 500
2 bj 600
3 wh 100
4 sh 400
5 wh 200
6 bj 100
7 sh 200
8 bj 300
9 sh 900
須要按 city 分組聚合,而後從每組數據中取出前兩條value最大的記錄。mysql

一、這是實際業務中常常會遇到的 group TopK 問題,下面來看看 pig 如何解決:sql

a = load '/data/city.txt'  using PigStorage(' ') as (id:chararray, city:chararray, value:int);
b = group a by city;
c = foreach b {c1=order a by value desc; c2=limit c1 2; generate group,c2.value;};
d = stream c through `sed 's/[(){}]//g'`;
dump d;

結果:apache

(bj,600,300)
(sh,900,400)
(wh,500,200)

這幾行代碼其實也實現了mysql中的 group_concat 函數的功能:segmentfault

a = load '/data/city.txt'  using PigStorage(' ') as (id:chararray, city:chararray, value:int);
b = group a by city;
c = foreach b {c1=order a by value desc;  generate group,c1.value;};
d = stream c through `sed 's/[(){}]//g'`;
dump d;

結果:app

(bj,600,300,100)
(sh,900,400,200)
(wh,500,200,100)

二、下面咱們再來看看hive如何處理group topk的問題:函數

本質上HSQL和sql有不少相同的地方,但HSQL目前功能還有不少缺失,至少不如原生態的SQL功能強大,oop

比起PIG也有些差距,若是SQL中這類分組topk的問題如何解決呢?測試

select * from city a where 
2>(select count(1) from city where cname=a.cname and value>a.value) 
distribute by a.cname sort by a.cname,a.value desc;

http://my.oschina.net/leejun2005/blog/78904

可是這種寫法在HQL中直接報語法錯誤了,下面咱們只能用hive udf的思路來解決了:

排序city和value,而後對city計數,最後where過濾掉city列計數器大於k的行便可。

好了,上代碼:

(1)定義UDF:

package com.example.hive.udf;
import org.apache.hadoop.hive.ql.exec.UDF;
	 
public final class Rank extends UDF{
	private int  counter;
	private String last_key;
	public int evaluate(final String key){
	  if ( !key.equalsIgnoreCase(this.last_key) ) {
	     this.counter = 0;
	     this.last_key = key;
	  }
	  return this.counter++;
	}
}

(2)註冊jar、建表、導數據,查詢:

add jar Rank.jar;
create temporary function rank as 'com.example.hive.udf.Rank';
create table city(id int,cname string,value int) row format delimited fields terminated by ' ';
LOAD DATA LOCAL INPATH 'city.txt' OVERWRITE INTO TABLE city;
select cname, value from (
	select cname,rank(cname) csum,value from (
		select id, cname, value from city distribute by cname sort by cname,value desc
	)a
)b where csum < 2;

(3)結果:

 

bj	600
bj	300
sh	900
sh	400
wh	500
wh	200

能夠看到,hive相比pig來講,處理起來稍微複雜了點,但隨着hive的日漸完善,之後比pig更簡潔也說不定。

REF:hive中分組取前N個值的實現

http://baiyunl.iteye.com/blog/1466343

 

三、最後咱們來看一下原生態的MR:

 

import java.io.IOException;
import java.util.TreeSet;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.util.GenericOptionsParser;

public class GroupTopK {
	// 這個 MR 將會取得每組年齡中 id 最大的前 3 個
	// 測試數據由腳本生成:http://my.oschina.net/leejun2005/blog/76631
	public static class GroupTopKMapper extends
			Mapper<LongWritable, Text, IntWritable, LongWritable> {
		IntWritable outKey = new IntWritable();
		LongWritable outValue = new LongWritable();
		String[] valArr = null;

		public void map(LongWritable key, Text value, Context context)
				throws IOException, InterruptedException {
			valArr = value.toString().split("\t");
			outKey.set(Integer.parseInt(valArr[2]));// age int
			outValue.set(Long.parseLong(valArr[0]));// id long
			context.write(outKey, outValue);
		}
	}

	public static class GroupTopKReducer extends
			Reducer<IntWritable, LongWritable, IntWritable, LongWritable> {

		LongWritable outValue = new LongWritable();

		public void reduce(IntWritable key, Iterable<LongWritable> values,
				Context context) throws IOException, InterruptedException {
			TreeSet<Long> idTreeSet = new TreeSet<Long>();
			for (LongWritable val : values) {
				idTreeSet.add(val.get());
				if (idTreeSet.size() > 3) {
					idTreeSet.remove(idTreeSet.first());
				}
			}
			for (Long id : idTreeSet) {
				outValue.set(id);
				context.write(key, outValue);
			}
		}
	}

	public static void main(String[] args) throws Exception {
		Configuration conf = new Configuration();
		String[] otherArgs = new GenericOptionsParser(conf, args)
				.getRemainingArgs();

		System.out.println(otherArgs.length);
		System.out.println(otherArgs[0]);
		System.out.println(otherArgs[1]);

		if (otherArgs.length != 3) {
			System.err.println("Usage: GroupTopK <in> <out>");
			System.exit(2);
		}
		Job job = new Job(conf, "GroupTopK");
		job.setJarByClass(GroupTopK.class);
		job.setMapperClass(GroupTopKMapper.class);
		job.setReducerClass(GroupTopKReducer.class);
		job.setNumReduceTasks(1);
		job.setOutputKeyClass(IntWritable.class);
		job.setOutputValueClass(LongWritable.class);
		FileInputFormat.addInputPath(job, new Path(otherArgs[1]));
		FileOutputFormat.setOutputPath(job, new Path(otherArgs[2]));
		System.exit(job.waitForCompletion(true) ? 0 : 1);
	}
}

hadoop jar GroupTopK.jar GroupTopK /tmp/decli/record_new.txt /tmp/1

結果:

 

hadoop fs -cat /tmp/1/part-r-00000
0       12869695
0       12869971
0       12869976
1       12869813
1       12869870
1       12869951

......

數據驗證:

awk '$3==0{print $1}' record_new.txt|sort -nr|head -3
12869976
12869971
12869695

能夠看到結果沒有問題。 

注:測試數據由如下腳本生成:

http://my.oschina.net/leejun2005/blog/76631
 

 

PS:

若是說hive相似sql的話,那pig就相似plsql存儲過程了:程序編寫更自由,邏輯能處理的更強大了。

pig中還能直接經過反射調用java的靜態類中的方法,這塊內容請參考以前的相關pig博文。

附幾個HIVE UDAF連接,有興趣的同窗本身看下:

Hive UDAF和UDTF實現group by後獲取top值 http://blog.csdn.net/liuzhoulong/article/details/7789183
hive中自定義函數(UDAF)實現多行字符串拼接爲一行 http://blog.sina.com.cn/s/blog_6ff05a2c0100tjw4.html
編寫Hive UDAF http://www.fuzhijie.me/?p=118
Hive UDAF開發 http://richiehu.blog.51cto.com/2093113/386113

用Spark解決一些經典MapReduce問題  https://segmentfault.com/a/1190000007649903

相關文章
相關標籤/搜索