Hive UDAF開發詳解

說明

這篇文章是來自Hadoop Hive UDAF Tutorial - Extending Hive with Aggregation Functions:的不嚴格翻譯,由於翻譯的文章示例寫得比較通俗易懂,此外,我把本身對於Hive的UDAF理解穿插到文章裏面。html

udfa是Hive中用戶自定義的彙集函數,hive內置UDAF函數包括有sum()與count(),UDAF實現有簡單與通用兩種方式,簡單UDAF由於使用Java反射致使性能損失,並且有些特性不能使用,已經被棄用了;在這篇博文中咱們將關注Hive中自定義聚類函數-GenericUDAF,UDAF開發主要涉及到如下兩個抽象類:java

[java] view plain copygit

  1. org.apache.hadoop.hive.ql.udf.generic.AbstractGenericUDAFResolver  github

  2. org.apache.hadoop.hive.ql.udf.generic.GenericUDAFEvaluator  sql

源碼連接

博文中的全部的代碼和數據能夠在如下連接找到:hive examples
shell

示例數據準備

首先先建立一張包含示例數據的表:people,該表只有name一列,該列中包含了一個或多個名字,該表數據保存在people.txt文件中。apache

[plain] view plain copyapi

  1. ~$ cat ./people.txt  跨域

  2.   

  3. John Smith  app

  4. John and Ann White  

  5. Ted Green  

  6. Dorothy  

把該文件上載到hdfs目錄/user/matthew/people中:

[plain] view plain copy

  1. hadoop fs -mkdir people  

  2. hadoop fs -put ./people.txt people  

下面要建立hive外部表,在hive shell中執行


[sql] view plain copy

  1. CREATE EXTERNAL TABLE people (name string)  

  2. ROW FORMAT DELIMITED FIELDS   

  3.     TERMINATED BY '\t'   

  4.     ESCAPED BY ''   

  5.     LINES TERMINATED BY '\n'  

  6. STORED AS TEXTFILE   

  7. LOCATION '/user/matthew/people';  


相關抽象類介紹

建立一個GenericUDAF必須先了解如下兩個抽象類:

[java] view plain copy

  1. org.apache.hadoop.hive.ql.udf.generic.AbstractGenericUDAFResolver   

[java] view plain copy

  1. org.apache.hadoop.hive.ql.udf.generic.GenericUDAFEvaluator  

爲了更好理解上述抽象類的API,要記住hive只是mapreduce函數,只不過hive已經幫助咱們寫好並隱藏mapreduce,向上提供簡潔的sql函數,因此咱們要結合Mapper、Combiner與Reducer來幫助咱們理解這個函數。要記住在Hadoop集羣中有若干臺機器,在不一樣的機器上Mapper與Reducer任務獨立運行。

因此大致上來講,這個UDAF函數讀取數據(mapper),彙集一堆mapper輸出到部分彙集結果(combiner),而且最終建立一個最終的彙集結果(reducer)。由於咱們跨域多個combiner進行彙集,因此咱們須要保存部分彙集結果。

AbstractGenericUDAFResolver

Resolver很簡單,要覆蓋實現下面方法,該方法會根據sql傳人的參數數據格式指定調用哪一個Evaluator進行處理。

[java] view plain copy

  1. <span style="background-color: rgb(255, 255, 255);"><span style="font-size:14px;">public GenericUDAFEvaluator getEvaluator(TypeInfo[] parameters) throws SemanticException;</span></span>  

GenericUDAFEvaluator

UDAF邏輯處理主要發生在Evaluator中,要實現該抽象類的幾個方法。

在理解Evaluator以前,必須先理解objectInspector接口與GenericUDAFEvaluator中的內部類Model。


ObjectInspector

做用主要是解耦數據使用與數據格式,使得數據流在輸入輸出端切換不一樣的輸入輸出格式,不一樣的Operator上使用不一樣的格式。能夠參考這兩篇文章:first post on Hive UDFsHive中ObjectInspector的做用,裏面有關於objectinspector的介紹。

Model

Model表明了UDAF在mapreduce的各個階段。

[java] view plain copy

  1. public static enum Mode {  

  2.     /** 

  3.      * PARTIAL1: 這個是mapreduce的map階段:從原始數據到部分數據聚合 

  4.      * 將會調用iterate()和terminatePartial() 

  5.      */  

  6.     PARTIAL1,  

  7.         /** 

  8.      * PARTIAL2: 這個是mapreduce的map端的Combiner階段,負責在map端合併map的數據::從部分數據聚合到部分數據聚合: 

  9.      * 將會調用merge() 和 terminatePartial()  

  10.      */  

  11.     PARTIAL2,  

  12.         /** 

  13.      * FINAL: mapreduce的reduce階段:從部分數據的聚合到徹底聚合  

  14.      * 將會調用merge()和terminate() 

  15.      */  

  16.     FINAL,  

  17.         /** 

  18.      * COMPLETE: 若是出現了這個階段,表示mapreduce只有map,沒有reduce,因此map端就直接出結果了:從原始數據直接到徹底聚合 

  19.       * 將會調用 iterate()和terminate() 

  20.      */  

  21.     COMPLETE  

  22.   };  

通常狀況下,完整的UDAF邏輯是一個mapreduce過程,若是有mapper和reducer,就會經歷PARTIAL1(mapper),FINAL(reducer),若是還有combiner,那就會經歷PARTIAL1(mapper),PARTIAL2(combiner),FINAL(reducer)。

而有一些狀況下的mapreduce,只有mapper,而沒有reducer,因此就會只有COMPLETE階段,這個階段直接輸入原始數據,出結果。

GenericUDAFEvaluator的方法

[java] view plain copy

  1. // 肯定各個階段輸入輸出參數的數據格式ObjectInspectors  

  2. public  ObjectInspector init(Mode m, ObjectInspector[] parameters) throws HiveException;  

  3.   

  4. // 保存數據彙集結果的類  

  5. abstract AggregationBuffer getNewAggregationBuffer() throws HiveException;  

  6.   

  7. // 重置彙集結果  

  8. public void reset(AggregationBuffer agg) throws HiveException;  

  9.   

  10. // map階段,迭代處理輸入sql傳過來的列數據  

  11. public void iterate(AggregationBuffer agg, Object[] parameters) throws HiveException;  

  12.   

  13. // map與combiner結束返回結果,獲得部分數據彙集結果  

  14. public Object terminatePartial(AggregationBuffer agg) throws HiveException;  

  15.   

  16. // combiner合併map返回的結果,還有reducer合併mapper或combiner返回的結果。  

  17. public void merge(AggregationBuffer agg, Object partial) throws HiveException;  

  18.   

  19. // reducer階段,輸出最終結果  

  20. public Object terminate(AggregationBuffer agg) throws HiveException;  

圖解Model與Evaluator關係


Model各階段對應Evaluator方法調用




Evaluator各個階段下處理mapreduce流程

實例

下面將講述一個彙集函數UDAF的實例,咱們將計算people這張表中的name列字母的個數。

下面的函數代碼是計算指定列中字符的總數(包括空格)

代碼

[java] view plain copy

  1. @Description(name = "letters", value = "_FUNC_(expr) - 返回該列中全部字符串的字符總數")  

  2. public class TotalNumOfLettersGenericUDAF extends AbstractGenericUDAFResolver {  

  3.   

  4.     @Override  

  5.     public GenericUDAFEvaluator getEvaluator(TypeInfo[] parameters)  

  6.             throws SemanticException {  

  7.         if (parameters.length != 1) {  

  8.             throw new UDFArgumentTypeException(parameters.length - 1,  

  9.                     "Exactly one argument is expected.");  

  10.         }  

  11.           

  12.         ObjectInspector oi = TypeInfoUtils.getStandardJavaObjectInspectorFromTypeInfo(parameters[0]);  

  13.           

  14.         if (oi.getCategory() != ObjectInspector.Category.PRIMITIVE){  

  15.             throw new UDFArgumentTypeException(0,  

  16.                             "Argument must be PRIMITIVE, but "  

  17.                             + oi.getCategory().name()  

  18.                             + " was passed.");  

  19.         }  

  20.           

  21.         PrimitiveObjectInspector inputOI = (PrimitiveObjectInspector) oi;  

  22.           

  23.         if (inputOI.getPrimitiveCategory() != PrimitiveObjectInspector.PrimitiveCategory.STRING){  

  24.             throw new UDFArgumentTypeException(0,  

  25.                             "Argument must be String, but "  

  26.                             + inputOI.getPrimitiveCategory().name()  

  27.                             + " was passed.");  

  28.         }  

  29.           

  30.         return new TotalNumOfLettersEvaluator();  

  31.     }  

  32.   

  33.     public static class TotalNumOfLettersEvaluator extends GenericUDAFEvaluator {  

  34.   

  35.         PrimitiveObjectInspector inputOI;  

  36.         ObjectInspector outputOI;  

  37.         PrimitiveObjectInspector integerOI;  

  38.           

  39.         int total = 0;  

  40.   

  41.         @Override  

  42.         public ObjectInspector init(Mode m, ObjectInspector[] parameters)  

  43.                 throws HiveException {  

  44.               

  45.             assert (parameters.length == 1);  

  46.             super.init(m, parameters);  

  47.              

  48.              //map階段讀取sql列,輸入爲String基礎數據格式  

  49.             if (m == Mode.PARTIAL1 || m == Mode.COMPLETE) {  

  50.                 inputOI = (PrimitiveObjectInspector) parameters[0];  

  51.             } else {  

  52.             //其他階段,輸入爲Integer基礎數據格式  

  53.                 integerOI = (PrimitiveObjectInspector) parameters[0];  

  54.             }  

  55.   

  56.              // 指定各個階段輸出數據格式都爲Integer類型  

  57.             outputOI = ObjectInspectorFactory.getReflectionObjectInspector(Integer.class,  

  58.                     ObjectInspectorOptions.JAVA);  

  59.             return outputOI;  

  60.   

  61.         }  

  62.   

  63.         /** 

  64.          * 存儲當前字符總數的類 

  65.          */  

  66.         static class LetterSumAgg implements AggregationBuffer {  

  67.             int sum = 0;  

  68.             void add(int num){  

  69.                 sum += num;  

  70.             }  

  71.         }  

  72.   

  73.         @Override  

  74.         public AggregationBuffer getNewAggregationBuffer() throws HiveException {  

  75.             LetterSumAgg result = new LetterSumAgg();  

  76.             return result;  

  77.         }  

  78.   

  79.         @Override  

  80.         public void reset(AggregationBuffer agg) throws HiveException {  

  81.             LetterSumAgg myagg = new LetterSumAgg();  

  82.         }  

  83.           

  84.         private boolean warned = false;  

  85.   

  86.         @Override  

  87.         public void iterate(AggregationBuffer agg, Object[] parameters)  

  88.                 throws HiveException {  

  89.             assert (parameters.length == 1);  

  90.             if (parameters[0] != null) {  

  91.                 LetterSumAgg myagg = (LetterSumAgg) agg;  

  92.                 Object p1 = ((PrimitiveObjectInspector) inputOI).getPrimitiveJavaObject(parameters[0]);  

  93.                 myagg.add(String.valueOf(p1).length());  

  94.             }  

  95.         }  

  96.   

  97.         @Override  

  98.         public Object terminatePartial(AggregationBuffer agg) throws HiveException {  

  99.             LetterSumAgg myagg = (LetterSumAgg) agg;  

  100.             total += myagg.sum;  

  101.             return total;  

  102.         }  

  103.   

  104.         @Override  

  105.         public void merge(AggregationBuffer agg, Object partial)  

  106.                 throws HiveException {  

  107.             if (partial != null) {  

  108.                   

  109.                 LetterSumAgg myagg1 = (LetterSumAgg) agg;  

  110.                   

  111.                 Integer partialSum = (Integer) integerOI.getPrimitiveJavaObject(partial);  

  112.                   

  113.                 LetterSumAgg myagg2 = new LetterSumAgg();  

  114.                   

  115.                 myagg2.add(partialSum);  

  116.                 myagg1.add(myagg2.sum);  

  117.             }  

  118.         }  

  119.   

  120.         @Override  

  121.         public Object terminate(AggregationBuffer agg) throws HiveException {  

  122.             LetterSumAgg myagg = (LetterSumAgg) agg;  

  123.             total = myagg.sum;  

  124.             return myagg.sum;  

  125.         }  

  126.   

  127.     }  

  128. }  


代碼說明

這裏有一些關於combiner的資源,Philippe Adjiman 講得不錯。


AggregationBuffer 容許咱們保存中間結果,經過定義咱們的buffer,咱們能夠處理任何格式的數據,在代碼例子中字符總數保存在AggregationBuffer 。


[java] view plain copy

  1. /** 

  2. * 保存當前字符總數的類 

  3. */  

  4. static class LetterSumAgg implements AggregationBuffer {  

  5.     int sum = 0;  

  6.     void add(int num){  

  7.         sum += num;  

  8.     }  

  9. }  


這意味着UDAF在不一樣的mapreduce階段會接收到不一樣的輸入。Iterate讀取咱們表中的一行(或者準確來講是表),而後輸出其餘數據格式的彙集結果。

artialAggregation合併這些彙集結果到另外相同格式的新的彙集結果,而後最終的reducer取得這些彙集結果真後輸出最終結果(該結果或許與接收數據的格式不一致)。

在init()方法中咱們指定輸入爲string,結果輸出格式爲integer,還有,部分彙集結果輸出格式爲integer(保存在aggregation buffer中);terminate()terminatePartial()二者輸出一個integer


[java] view plain copy

  1. // init方法中根據不一樣的mode指定輸出數據的格式objectinspector  

  2. if (m == Mode.PARTIAL1 || m == Mode.COMPLETE) {  

  3.     inputOI = (PrimitiveObjectInspector) parameters[0];  

  4. else {  

  5.     integerOI = (PrimitiveObjectInspector) parameters[0];  

  6. }  

  7.   

  8. // 不一樣model階段的輸出數據格式  

  9. outputOI = ObjectInspectorFactory.getReflectionObjectInspector(Integer.class,  

  10.                     ObjectInspectorOptions.JAVA);  


iterate()函數讀取到每行中列的字符串,計算與保存該字符串的長度

[java] view plain copy

  1. public void iterate(AggregationBuffer agg, Object[] parameters)  

  2.     throws HiveException {  

  3.     ...  

  4.     Object p1 = ((PrimitiveObjectInspector) inputOI).getPrimitiveJavaObject(parameters[0]);  

  5.     myagg.add(String.valueOf(p1).length());  

  6.     }  

  7. }  


Merge函數增長部分彙集總數到AggregationBuffer

[java] view plain copy

  1. public void merge(AggregationBuffer agg, Object partial)  

  2.         throws HiveException {  

  3.     if (partial != null) {  

  4.                   

  5.         LetterSumAgg myagg1 = (LetterSumAgg) agg;  

  6.                   

  7.         Integer partialSum = (Integer) integerOI.getPrimitiveJavaObject(partial);  

  8.                   

  9.         LetterSumAgg myagg2 = new LetterSumAgg();  

  10.                   

  11.         myagg2.add(partialSum);  

  12.         myagg1.add(myagg2.sum);  

  13.     }  

  14. }  


Terminate()函數返回AggregationBuffer中的內容,這裏產生了最終結果。

[java] view plain copy

  1. public Object terminate(AggregationBuffer agg) throws HiveException {  

  2.     LetterSumAgg myagg = (LetterSumAgg) agg;  

  3.     total = myagg.sum;  

  4.     return myagg.sum;  

  5. }  

使用自定義函數

[plain] view plain copy

  1. ADD JAR ./hive-extension-examples-master/target/hive-extensions-1.0-SNAPSHOT-jar-with-dependencies.jar;  

  2. CREATE TEMPORARY FUNCTION letters as 'com.matthewrathbone.example.TotalNumOfLettersGenericUDAF';  

  3.   

  4. SELECT letters(name) FROM people;  

  5. OK  

  6. 44  

  7. Time taken: 20.688 seconds  

相關文章
相關標籤/搜索