這篇文章是來自Hadoop Hive UDAF Tutorial - Extending Hive with Aggregation Functions:的不嚴格翻譯,由於翻譯的文章示例寫得比較通俗易懂,此外,我把本身對於Hive的UDAF理解穿插到文章裏面。html
udfa是Hive中用戶自定義的彙集函數,hive內置UDAF函數包括有sum()與count(),UDAF實現有簡單與通用兩種方式,簡單UDAF由於使用Java反射致使性能損失,並且有些特性不能使用,已經被棄用了;在這篇博文中咱們將關注Hive中自定義聚類函數-GenericUDAF,UDAF開發主要涉及到如下兩個抽象類:java
[java] view plain copygit
org.apache.hadoop.hive.ql.udf.generic.AbstractGenericUDAFResolver github
org.apache.hadoop.hive.ql.udf.generic.GenericUDAFEvaluator sql
博文中的全部的代碼和數據能夠在如下連接找到:hive examples
shell
首先先建立一張包含示例數據的表:people,該表只有name一列,該列中包含了一個或多個名字,該表數據保存在people.txt文件中。apache
[plain] view plain copyapi
~$ cat ./people.txt 跨域
John Smith app
John and Ann White
Ted Green
Dorothy
把該文件上載到hdfs目錄/user/matthew/people中:
[plain] view plain copy
hadoop fs -mkdir people
hadoop fs -put ./people.txt people
下面要建立hive外部表,在hive shell中執行
[sql] view plain copy
CREATE EXTERNAL TABLE people (name string)
ROW FORMAT DELIMITED FIELDS
TERMINATED BY '\t'
ESCAPED BY ''
LINES TERMINATED BY '\n'
STORED AS TEXTFILE
LOCATION '/user/matthew/people';
建立一個GenericUDAF必須先了解如下兩個抽象類:
[java] view plain copy
org.apache.hadoop.hive.ql.udf.generic.AbstractGenericUDAFResolver
[java] view plain copy
org.apache.hadoop.hive.ql.udf.generic.GenericUDAFEvaluator
爲了更好理解上述抽象類的API,要記住hive只是mapreduce函數,只不過hive已經幫助咱們寫好並隱藏mapreduce,向上提供簡潔的sql函數,因此咱們要結合Mapper、Combiner與Reducer來幫助咱們理解這個函數。要記住在Hadoop集羣中有若干臺機器,在不一樣的機器上Mapper與Reducer任務獨立運行。
因此大致上來講,這個UDAF函數讀取數據(mapper),彙集一堆mapper輸出到部分彙集結果(combiner),而且最終建立一個最終的彙集結果(reducer)。由於咱們跨域多個combiner進行彙集,因此咱們須要保存部分彙集結果。
AbstractGenericUDAFResolver
Resolver很簡單,要覆蓋實現下面方法,該方法會根據sql傳人的參數數據格式指定調用哪一個Evaluator進行處理。
[java] view plain copy
<span style="background-color: rgb(255, 255, 255);"><span style="font-size:14px;">public GenericUDAFEvaluator getEvaluator(TypeInfo[] parameters) throws SemanticException;</span></span>
GenericUDAFEvaluator
UDAF邏輯處理主要發生在Evaluator中,要實現該抽象類的幾個方法。
在理解Evaluator以前,必須先理解objectInspector接口與GenericUDAFEvaluator中的內部類Model。
做用主要是解耦數據使用與數據格式,使得數據流在輸入輸出端切換不一樣的輸入輸出格式,不一樣的Operator上使用不一樣的格式。能夠參考這兩篇文章:first post on Hive UDFs、Hive中ObjectInspector的做用,裏面有關於objectinspector的介紹。
Model表明了UDAF在mapreduce的各個階段。
[java] view plain copy
public static enum Mode {
/**
* PARTIAL1: 這個是mapreduce的map階段:從原始數據到部分數據聚合
* 將會調用iterate()和terminatePartial()
*/
PARTIAL1,
/**
* PARTIAL2: 這個是mapreduce的map端的Combiner階段,負責在map端合併map的數據::從部分數據聚合到部分數據聚合:
* 將會調用merge() 和 terminatePartial()
*/
PARTIAL2,
/**
* FINAL: mapreduce的reduce階段:從部分數據的聚合到徹底聚合
* 將會調用merge()和terminate()
*/
FINAL,
/**
* COMPLETE: 若是出現了這個階段,表示mapreduce只有map,沒有reduce,因此map端就直接出結果了:從原始數據直接到徹底聚合
* 將會調用 iterate()和terminate()
*/
COMPLETE
};
通常狀況下,完整的UDAF邏輯是一個mapreduce過程,若是有mapper和reducer,就會經歷PARTIAL1(mapper),FINAL(reducer),若是還有combiner,那就會經歷PARTIAL1(mapper),PARTIAL2(combiner),FINAL(reducer)。
而有一些狀況下的mapreduce,只有mapper,而沒有reducer,因此就會只有COMPLETE階段,這個階段直接輸入原始數據,出結果。
[java] view plain copy
// 肯定各個階段輸入輸出參數的數據格式ObjectInspectors
public ObjectInspector init(Mode m, ObjectInspector[] parameters) throws HiveException;
// 保存數據彙集結果的類
abstract AggregationBuffer getNewAggregationBuffer() throws HiveException;
// 重置彙集結果
public void reset(AggregationBuffer agg) throws HiveException;
// map階段,迭代處理輸入sql傳過來的列數據
public void iterate(AggregationBuffer agg, Object[] parameters) throws HiveException;
// map與combiner結束返回結果,獲得部分數據彙集結果
public Object terminatePartial(AggregationBuffer agg) throws HiveException;
// combiner合併map返回的結果,還有reducer合併mapper或combiner返回的結果。
public void merge(AggregationBuffer agg, Object partial) throws HiveException;
// reducer階段,輸出最終結果
public Object terminate(AggregationBuffer agg) throws HiveException;
Model各階段對應Evaluator方法調用
Evaluator各個階段下處理mapreduce流程
下面將講述一個彙集函數UDAF的實例,咱們將計算people這張表中的name列字母的個數。
下面的函數代碼是計算指定列中字符的總數(包括空格)
[java] view plain copy
@Description(name = "letters", value = "_FUNC_(expr) - 返回該列中全部字符串的字符總數")
public class TotalNumOfLettersGenericUDAF extends AbstractGenericUDAFResolver {
@Override
public GenericUDAFEvaluator getEvaluator(TypeInfo[] parameters)
throws SemanticException {
if (parameters.length != 1) {
throw new UDFArgumentTypeException(parameters.length - 1,
"Exactly one argument is expected.");
}
ObjectInspector oi = TypeInfoUtils.getStandardJavaObjectInspectorFromTypeInfo(parameters[0]);
if (oi.getCategory() != ObjectInspector.Category.PRIMITIVE){
throw new UDFArgumentTypeException(0,
"Argument must be PRIMITIVE, but "
+ oi.getCategory().name()
+ " was passed.");
}
PrimitiveObjectInspector inputOI = (PrimitiveObjectInspector) oi;
if (inputOI.getPrimitiveCategory() != PrimitiveObjectInspector.PrimitiveCategory.STRING){
throw new UDFArgumentTypeException(0,
"Argument must be String, but "
+ inputOI.getPrimitiveCategory().name()
+ " was passed.");
}
return new TotalNumOfLettersEvaluator();
}
public static class TotalNumOfLettersEvaluator extends GenericUDAFEvaluator {
PrimitiveObjectInspector inputOI;
ObjectInspector outputOI;
PrimitiveObjectInspector integerOI;
int total = 0;
@Override
public ObjectInspector init(Mode m, ObjectInspector[] parameters)
throws HiveException {
assert (parameters.length == 1);
super.init(m, parameters);
//map階段讀取sql列,輸入爲String基礎數據格式
if (m == Mode.PARTIAL1 || m == Mode.COMPLETE) {
inputOI = (PrimitiveObjectInspector) parameters[0];
} else {
//其他階段,輸入爲Integer基礎數據格式
integerOI = (PrimitiveObjectInspector) parameters[0];
}
// 指定各個階段輸出數據格式都爲Integer類型
outputOI = ObjectInspectorFactory.getReflectionObjectInspector(Integer.class,
ObjectInspectorOptions.JAVA);
return outputOI;
}
/**
* 存儲當前字符總數的類
*/
static class LetterSumAgg implements AggregationBuffer {
int sum = 0;
void add(int num){
sum += num;
}
}
@Override
public AggregationBuffer getNewAggregationBuffer() throws HiveException {
LetterSumAgg result = new LetterSumAgg();
return result;
}
@Override
public void reset(AggregationBuffer agg) throws HiveException {
LetterSumAgg myagg = new LetterSumAgg();
}
private boolean warned = false;
@Override
public void iterate(AggregationBuffer agg, Object[] parameters)
throws HiveException {
assert (parameters.length == 1);
if (parameters[0] != null) {
LetterSumAgg myagg = (LetterSumAgg) agg;
Object p1 = ((PrimitiveObjectInspector) inputOI).getPrimitiveJavaObject(parameters[0]);
myagg.add(String.valueOf(p1).length());
}
}
@Override
public Object terminatePartial(AggregationBuffer agg) throws HiveException {
LetterSumAgg myagg = (LetterSumAgg) agg;
total += myagg.sum;
return total;
}
@Override
public void merge(AggregationBuffer agg, Object partial)
throws HiveException {
if (partial != null) {
LetterSumAgg myagg1 = (LetterSumAgg) agg;
Integer partialSum = (Integer) integerOI.getPrimitiveJavaObject(partial);
LetterSumAgg myagg2 = new LetterSumAgg();
myagg2.add(partialSum);
myagg1.add(myagg2.sum);
}
}
@Override
public Object terminate(AggregationBuffer agg) throws HiveException {
LetterSumAgg myagg = (LetterSumAgg) agg;
total = myagg.sum;
return myagg.sum;
}
}
}
這裏有一些關於combiner的資源,Philippe Adjiman 講得不錯。
AggregationBuffer
容許咱們保存中間結果,經過定義咱們的buffer,咱們能夠處理任何格式的數據,在代碼例子中字符總數保存在AggregationBuffer
。
[java] view plain copy
/**
* 保存當前字符總數的類
*/
static class LetterSumAgg implements AggregationBuffer {
int sum = 0;
void add(int num){
sum += num;
}
}
這意味着UDAF在不一樣的mapreduce階段會接收到不一樣的輸入。Iterate讀取咱們表中的一行(或者準確來講是表),而後輸出其餘數據格式的彙集結果。
artialAggregation
合併這些彙集結果到另外相同格式的新的彙集結果,而後最終的reducer取得這些彙集結果真後輸出最終結果(該結果或許與接收數據的格式不一致)。
在init()方法中咱們指定輸入爲string,結果輸出格式爲integer,還有,部分彙集結果輸出格式爲integer(保存在aggregation buffer中);terminate()
與
terminatePartial()
二者輸出一個
integer
。
[java] view plain copy
// init方法中根據不一樣的mode指定輸出數據的格式objectinspector
if (m == Mode.PARTIAL1 || m == Mode.COMPLETE) {
inputOI = (PrimitiveObjectInspector) parameters[0];
} else {
integerOI = (PrimitiveObjectInspector) parameters[0];
}
// 不一樣model階段的輸出數據格式
outputOI = ObjectInspectorFactory.getReflectionObjectInspector(Integer.class,
ObjectInspectorOptions.JAVA);
iterate()
函數讀取到每行中列的字符串,計算與保存該字符串的長度
[java] view plain copy
public void iterate(AggregationBuffer agg, Object[] parameters)
throws HiveException {
...
Object p1 = ((PrimitiveObjectInspector) inputOI).getPrimitiveJavaObject(parameters[0]);
myagg.add(String.valueOf(p1).length());
}
}
Merge函數增長部分彙集總數到AggregationBuffer
[java] view plain copy
public void merge(AggregationBuffer agg, Object partial)
throws HiveException {
if (partial != null) {
LetterSumAgg myagg1 = (LetterSumAgg) agg;
Integer partialSum = (Integer) integerOI.getPrimitiveJavaObject(partial);
LetterSumAgg myagg2 = new LetterSumAgg();
myagg2.add(partialSum);
myagg1.add(myagg2.sum);
}
}
Terminate()函數返回AggregationBuffer中的內容,這裏產生了最終結果。
[java] view plain copy
public Object terminate(AggregationBuffer agg) throws HiveException {
LetterSumAgg myagg = (LetterSumAgg) agg;
total = myagg.sum;
return myagg.sum;
}
[plain] view plain copy
ADD JAR ./hive-extension-examples-master/target/hive-extensions-1.0-SNAPSHOT-jar-with-dependencies.jar;
CREATE TEMPORARY FUNCTION letters as 'com.matthewrathbone.example.TotalNumOfLettersGenericUDAF';
SELECT letters(name) FROM people;
OK
44
Time taken: 20.688 seconds