1介紹
Hive自定義函數包括三種UDF、UDAF、UDTFjava
UDF(User-Defined-Function) 一進一出apache
UDAF(User- Defined Aggregation Funcation) 彙集函數,多進一出。Count/max/minapi
UDTF(User-Defined Table-Generating Functions) 一進多出,如lateral view explore)數組
使用方式 :在HIVE會話中add 自定義函數的jar文件,而後建立function繼而使用函數緩存
1、編寫自定義函數數據結構
2、打包上傳到集羣機器中app
3、進入hive客戶端,添加jar包:hive> add jar /root/hive_udf.jaride
4、建立臨時函數:hive> create temporary function getLen as 'com.raphael.len.GetLength';函數
5、銷燬臨時函數:hive> DROP TEMPORARY FUNCTION getLen;oop
hive自定義函數中有幾類數據類型:
PrimitiveObjectInspector 經常使用(基本數據類):
PrimitiveObjectInspector.PrimitiveCategory.STRING
ListObjectInspector
StructObjectInspector
MapObjectInspector
等類型
三種自定義函數都會首先進行參數個數和參數類型檢查
參數類型檢查(是那一種大類型(primitive...),而後具體是什麼類型)
注意:在全部方法有返回時要注意返回的類型,最重要的是最後返回出去的結果,通常結果的數據類型會在初始化時就定義了,那麼在最後返回結果是應該要轉化成那種類型
初始化時init:
PrimitiveObjectInspector inputOI;
inputOI = (PrimitiveObjectInspector) parameters[0];
都會定義這種設置輸入數據的 ObjectInspector(類型吧)
有多個參數就會設置多個。
在真正處理數據須要獲取數據:
long distinctId = PrimitiveObjectInspectorUtils.getLong(objects[0], distinctIdOI);
通常都這樣獲取,objects是方法的參數,從外界傳遞進來的,包含不少值,獲取第幾個值,同時類型是什麼(distinctIdOI)
2 UDF
2.1介紹
hive的udf有兩種實現方式或者實現的API,一種是udf比較簡單,一種是GenericUDF比較複雜。
若是所操做的數據類型都是基礎數據類型,如(Hadoop&Hive 基本writable類型,如Text,IntWritable,LongWriable,DoubleWritable等等)。那麼簡單的org.apache.hadoop.hive.ql.exec.UDF就能夠作到。
若是所操做的數據類型是內嵌數據結構,如Map,List和Set,那麼要採用org.apache.hadoop.hive.ql.udf.generic.GenericUDF
2.2繼承UDF實現
須要繼承org.apache.hadoop.hive.ql.UDF,或者
org.apache.hadoop.hive.ql.udf.generic.GenericUDF,前者比較簡單,只須要實現evaluate函數,evaluate函數支持重載。
UDF代碼以下:
import org.apache.hadoop.hive.ql.exec.UDF;
public class GetLength extends UDF{
public int evaluate(String str) {
try{
return str.length();
}catch(Exception e){
return -1;
}
}
}
2.3繼承GenericUDF實現
繼承org.apache.hadoop.hive.ql.udf.generic.GenericUDF須要實現三個方法:
1)initialize:只調用一次,在任何evaluate()調用以前能夠接收到一個能夠表示函數輸入參數類型的object inspectors數組。initalize用來驗證該函數是否接收正確的參數類型和參數個數,最後提供最後結果對應的數據類型。
2)evaluate:真正的邏輯,讀取輸入數據,處理數據,返回結果。
3)getDisplayString:返回描述該方法的字符串,沒有太多做用。
繼承GenericUDF實現UDF,完成url解碼功能代碼以下:
public class UrlDecodeUDF2 extends GenericUDF {
private transient PrimitiveObjectInspector inputOI;
public ObjectInspector initialize(ObjectInspector[] objectInspectors) throws UDFArgumentException {
// 檢查參數數量
if (objectInspectors.length != 1) {
throw new UDFArgumentException("urlDecode() takes only one argument");
}
// 檢查參數類型
if (objectInspectors[0].getCategory() != ObjectInspector.Category.PRIMITIVE) {
if (((PrimitiveObjectInspector) objectInspectors[0]).getPrimitiveCategory() != PrimitiveObjectInspector.PrimitiveCategory.STRING) {
throw new UDFArgumentTypeException(0, "Only String type argument are accepted, but "
+ objectInspectors[0].getTypeName() + " was pass as parameter 1");
}
}
// 設置輸入數據的 ObjectInspector
inputOI = (PrimitiveObjectInspector) objectInspectors[0];
// 輸出數據的 ObjectInspector
return PrimitiveObjectInspectorFactory.writableStringObjectInspector;
}
public Object evaluate(DeferredObject[] deferredObjects) throws HiveException {
if (deferredObjects == null || deferredObjects[0] == null) {
return new Text("");
}
// 提取數據
String component = PrimitiveObjectInspectorUtils.getString(deferredObjects[0].get(), inputOI);
if (component == null || component.length() <= 0) {
return "";
}
String result = "";
//使用%25替換字符串中的%號
component = component.replaceAll("%(?![0-9a-fA-F]{2})", "%25");
try {
result = URLDecoder.decode(component, "UTF-8");
// result = URLDecoder.decode(result, "UTF-8");
} catch (UnsupportedEncodingException e) {
result = component;
}
System.out.println(result);
return result;
}
public String getDisplayString(String[] strings) {
StringBuilder sb = new StringBuilder();
sb.append("url_decode");
sb.append("(");
if (strings.length > 0) {
sb.append(strings[0]);
for (int index = 1; index < strings.length - 1; index++) {
sb.append(",");
sb.append(strings[index]);
}
}
sb.append(")");
return sb.toString();
}
}
3 UDAF
3.1介紹
多行進一行出,如sum()、min(),用在group by時。開發通用UDAF有兩個步驟
一、第一個是編寫resolver類(繼承AbstractGenericUDAFResolver),
二、第二個是編寫evaluator類(繼承GenericUDAFEvaluator)在resolver類內部。
resolver負責類型檢查,操做符重載。evaluator真正實現UDAF的邏輯。一般來講,頂層UDAF類繼承org.apache.hadoop.hive.ql.udf.GenericUDAFResolver,裏面編寫嵌套類evaluator (繼承GenericUDAFEvaluator)實現UDAF的邏輯。
實現evaluator全部evaluators必須繼承抽象類
org.apache.hadoop.hive.ql.udf.generic.GenericUDAFEvaluator。子類必須實現它的一些抽象方法,實現UDAF的邏輯。
同時須要一個繼承了AggregationBuffer的類,來存儲中間過程當中記錄的數據
3.2 Mode
GenericUDAFEvaluator有一個嵌套類Mode,這個類很重要,它表示了udaf在mapreduce的各個階段,理解Mode的含義,就能夠理解了hive的UDAF的運行流程。
PARTIAL1, //從原始數據到部分聚合數據的過程(map階段),將調用iterate()和terminatePartial()方法。
PARTIAL2, //從部分聚合數據到部分聚合數據的過程(map端的combiner階段),將調用merge() 和terminatePartial()方法。
FINAL, //從部分聚合數據到所有聚合的過程(reduce階段),將調用merge()和 terminate()方法。
COMPLETE //從原始數據直接到所有聚合的過程(表示只有map,沒有reduce,map端直接出結果),將調用merge() 和 terminate()方法。
public static enum Mode {
/**
* PARTIAL1: 這個是mapreduce的map階段:從原始數據到部分數據聚合
* 將會調用iterate()和terminatePartial()
*/
PARTIAL1,
/**
* PARTIAL2: 這個是mapreduce的map端的Combiner階段,負責在map端合併map的數據::從部分數據聚合到部分數據聚合:
* 將會調用merge() 和 terminatePartial()
*/
PARTIAL2,
/**
* FINAL: mapreduce的reduce階段:從部分數據的聚合到徹底聚合
* 將會調用merge()和terminate()
*/
FINAL,
/**
* COMPLETE: 若是出現了這個階段,表示mapreduce只有map,沒有reduce,因此map端就直接出結果了:從原始數據直接到徹底聚合
* 將會調用 iterate()和terminate()
*/
COMPLETE
};
通常狀況下,完整的UDAF邏輯是一個mapreduce過程,若是有mapper和reducer,就會經歷PARTIAL1(mapper),FINAL(reducer),若是還有combiner,那就會經歷PARTIAL1(mapper),PARTIAL2(combiner),FINAL(reducer)。
而有一些狀況下的mapreduce,只有mapper,而沒有reducer,因此就會只有COMPLETE階段,這個階段直接輸入原始數據,出結果。
3.3實現代碼分析
UDAF的實現代碼主幹以下:
//最外層繼承AbstractGenericUDAFResolver
public class GenericUDAFSum extends AbstractGenericUDAFResolver {
static final Log LOG = LogFactory.getLog(GenericUDAFSum.class.getName());
//實現getEvaluator方法
@Override
public GenericUDAFEvaluator getEvaluator(TypeInfo[] parameters)
throws SemanticException {
// Type-checking goes here!
return new GenericUDAFSumLong();
}
//編寫一個類,繼承GenericUDAFEvaluator,實現全部方法
public static class GenericUDAFSumLong extends GenericUDAFEvaluator {
// UDAF logic goes here!
}
}
說明:
一、getEvaluator:繼承AbstractGenericUDAFResolver 所須要實現的方法,也只須要重寫這一個方法,做用是檢查參數個數,參數類型等,而後返回GenericUDAFEvaluator對象。
二、GenericUDAFEvaluator:getEvaluator最後返回類型就是這個類,真正返回(return)的是繼承實現GenericUDAFEvaluator的類。
例如GenericUDAFSumLong 繼承GenericUDAFEvaluator須要實現以下方法:
一、init(初始化):肯定返回類型並返回UDAF的返回類型,
二、getNewAggregationBuffer:建立新的聚合計算的須要的內存,用來存儲mapper,combiner,reducer運算過程當中的相加總和,獲取聚合中間結果緩存
三、reset:重置中間結果緩存
四、iterate: 迭代每一行的數據,等同於 Map 階段。傳進來的行數據由 HiveSQL 決定,計算的中間結果緩存到 AggregationBuffer ,@param aggregationBuffer 中間結果緩存,@param objects 行中每一列的數據
五、terminatePartial(終止部分):對部分中間結果數據進行合併,等同於 Map 階段的 combine,返回mapper結果,combine後的結果
六、merge(合併):在最終進行 terminate() 前對全部傳入的中間結果進行合併,等同於 Reduce 階段的 merge。 各個 Map 傳來的中間結果 partial 合併到 aggregationBuffer
七、terminate(終止):計算合併後的數據得出最終結果,等同於 Reduce 階段的邏輯。reducer返回結果,或者是隻有mapper,沒有reducer時,在mapper端返回結果。
3.4代碼示例
例子以下:
public class UDAFDemo extends AbstractGenericUDAFResolver {
static final Log LOG = LogFactory.getLog(GenericUDAFSum.class.getName());
public GenericUDAFEvaluator getEvaluator(TypeInfo[] parameters)
throws SemanticException {
// 檢查參數個數
if (parameters.length != 1) {
throw new UDFArgumentTypeException(parameters.length - 1,
"Exactly one argument is expected.");
}
// 檢查參數類型
if (parameters[0].getCategory() != ObjectInspector.Category.PRIMITIVE) {
throw new UDFArgumentTypeException(0,
"Only primitive type arguments are accepted but "
+ parameters[0].getTypeName() + " is passed.");
}
// // 檢查參數類型
// if (parameters[0].getCategory() != ObjectInspector.Category.PRIMITIVE) {
// throw new UDFArgumentTypeException(0, "Only primitive type argument are accepted but "
// + parameters[0].getTypeName() + " was passed as parameter 1");
// }
// if (((PrimitiveTypeInfo) parameters[0]).getPrimitiveCategory() != PrimitiveObjectInspector.PrimitiveCategory.LONG) {
// throw new UDFArgumentTypeException(0, "Only Long Type type argument are accepted but "
// + parameters[0].getTypeName() + " was passed as parameter 1");
// }
// 檢查參數類型
switch (((PrimitiveTypeInfo) parameters[0]).getPrimitiveCategory()) {
case BYTE:
case SHORT:
case INT:
case LONG:
case TIMESTAMP:
return new GenericUDAFSumLong();
case FLOAT:
case DOUBLE:
// case STRING:
// return new GenericUDAFSumDouble();
case BOOLEAN:
default:
throw new UDFArgumentTypeException(0,
"Only numeric or string type arguments are accepted but "
+ parameters[0].getTypeName() + " is passed.");
}
}
public static class GenericUDAFSumLong extends GenericUDAFEvaluator {
private PrimitiveObjectInspector inputOI;
private LongWritable result;
/** 存儲sum的值的類 */
static class SumLongAgg implements AggregationBuffer {
boolean empty;
long sum;
}
//這個方法返回了UDAF的返回類型,這裏肯定了sum自定義函數的返回類型是Long類型
public ObjectInspector init(Mode m, ObjectInspector[] parameters) throws HiveException {
assert (parameters.length == 1);
super.init(m, parameters);
result = new LongWritable(0);
inputOI = (PrimitiveObjectInspector) parameters[0];
return PrimitiveObjectInspectorFactory.writableLongObjectInspector;
}
//建立新的聚合計算的須要的內存,用來存儲mapper,combiner,reducer運算過程當中的相加總和
public AggregationBuffer getNewAggregationBuffer() throws HiveException {
SumLongAgg result = new SumLongAgg();
reset(result);
return result;
}
//mapreduce支持mapper和reducer的重用,因此爲了兼容,也須要作內存的重用。
public void reset(AggregationBuffer aggregationBuffer) throws HiveException {
SumLongAgg myagg = (SumLongAgg) aggregationBuffer;
myagg.empty = true;
myagg.sum = 0;
}
private boolean warned = false;
//map階段調用,只要把保存當前和的對象agg,再加上輸入的參數,就能夠了。
public void iterate(AggregationBuffer aggregationBuffer, Object[] objects) throws HiveException {
assert (objects.length == 1);
try {
merge(aggregationBuffer, objects[0]);
}catch (NumberFormatException e) {
if (!warned) {
warned = true;
LOG.warn(getClass().getSimpleName() + " "
+ StringUtils.stringifyException(e));
}
}
}
//mapper結束要返回的結果,還有combiner結束返回的結果
public Object terminatePartial(AggregationBuffer aggregationBuffer) throws HiveException {
return terminate(aggregationBuffer);
}
//combiner合併map返回的結果,還有reducer合併mapper或combiner返回的結果。
public void merge(AggregationBuffer aggregationBuffer, Object o) throws HiveException {
if (o != null) {
SumLongAgg agg = (SumLongAgg) aggregationBuffer;
agg.sum += PrimitiveObjectInspectorUtils.getLong(o, inputOI);
agg.empty = false;
}
}
//reducer返回結果,或者是隻有mapper,沒有reducer時,在mapper端返回結果。
public Object terminate(AggregationBuffer aggregationBuffer) throws HiveException {
SumLongAgg ragg = (SumLongAgg) aggregationBuffer;
if (ragg.empty) {
return null;
}
result.set(ragg.sum);
return result;
}
}
}
4 UDTF
4.1介紹
udtf用來解決輸入一行輸出多行(On-to-many maping) 的需求,須要繼承org.apache.hadoop.hive.ql.udf.generic.GenericUDTF來實現三個方法。
一、initialize:返回UDTF的返回行的信息(返回個數,類型)。
二、process:真正的處理過程在process函數中,在process中,每一次forward()調用產生一行;若是產生多列能夠將多個列的值放在一個數組中,而後將該數組傳入到forward()函數。
三、close:對須要清理的方法進行清理。
forward()傳入的就是最後的結果,裏面通常是數組,數組有多少個元素就代碼最後一行輸出的結果有多少列
4.2代碼示例
下面有一個切分」key:value;key:value;」這種字符串,返回結果爲key, value兩個字段。
public class UDTFDemo extends GenericUDTF {
@Override
public void close() throws HiveException {
// TODO Auto-generated method stub
}
/**
* 此方法返回UDTF的返回行的信息(返回個數,類型)
*/
@Override
public StructObjectInspector initialize(ObjectInspector[] args)
throws UDFArgumentException {
if (args.length != 1) {
throw new UDFArgumentLengthException("ExplodeMap takes only one argument");
}
if (args[0].getCategory() != ObjectInspector.Category.PRIMITIVE) {
throw new UDFArgumentException("ExplodeMap takes string as a parameter");
}
//多出的列名
ArrayList<String> fieldNames = new ArrayList<String>();
//每一列的類型
ArrayList<ObjectInspector> fieldOIs = new ArrayList<ObjectInspector>();
fieldNames.add("c1");
fieldOIs.add(PrimitiveObjectInspectorFactory.javaStringObjectInspector);
fieldNames.add("c2");
fieldOIs.add(PrimitiveObjectInspectorFactory.javaStringObjectInspector);
return ObjectInspectorFactory.getStandardStructObjectInspector(fieldNames,fieldOIs);
}
/**
* 初始化完成以後會調用process方法,真正的處理過程在process函數中,在process中,每一次forward()調用產生一行;
* 若是產生多列能夠將多個列的值放在一個數組中,而後將該數組傳入到forward()函數。
*/
@Override
public void process(Object[] args) throws HiveException {
//每一行數 key:value,key:value,key:value
String input = args[0].toString();
String[] test = input.split(";");
for(int i=0; i<test.length; i++) {
try {
String[] result = test[i].split(":");
forward(result);
} catch (Exception e) {
continue;
}
}
}
}
5注意點(坑點)
一、使用String進行數據傳遞時,有時會出現String強轉Hadoop的Text類型異常,或者Integer強轉類型Text異常。
示例:自定義UDAF :
實現這個功能時:udaf的輸出的list,類型是Object類型,最後統計時使用map存放計數map<object,integer>,注意最後填充到list中時要把map的key和value所有變成Text類型才行,否則可能會出現類型轉換錯誤。我在iterate方法裏面須要獲取value時(此時類型是String),獲取以後立刻轉換成Text以後存放的。
二、注意在一些稍微複雜的udaf中會使用list或者其餘類型,在中間進行聚合等其餘操做時,不一樣操做的傳入的參數類型可能不一樣,最開始可能傳遞進來的數據是字符串類型或者Double類型的,可是通過一次迭代或者聚合以後變成了list,那麼下一次或者下一個流程的傳入數據的類型就變成了list,這時須要在初始化參數類型的時候定義好,如:
根據mode的流程判斷處於哪個階段,參數類型就哪種,就切換成哪種,這裏的返回值是定義的最終的返回結果。