hive自定義函數學習

1介紹

Hive自定義函數包括三種UDFUDAFUDTFjava

UDF(User-Defined-Function) 一進一出apache

UDAF(User- Defined Aggregation Funcation) 彙集函數,多進一出。Count/max/minapi

UDTF(User-Defined Table-Generating Functions)  一進多出,如lateral view explore)數組

使用方式 :在HIVE會話中add 自定義函數的jar文件,而後建立function繼而使用函數緩存

1、編寫自定義函數數據結構

2打包上傳到集羣機器中app

3進入hive客戶端,添加jar包:hive> add jar /root/hive_udf.jaride

4建立臨時函數:hive> create temporary function getLen as 'com.raphael.len.GetLength';函數

5銷燬臨時函數:hive> DROP TEMPORARY FUNCTION getLen;oop

 

hive自定義函數中有幾類數據類型:

PrimitiveObjectInspector  經常使用(基本數據類):

PrimitiveObjectInspector.PrimitiveCategory.STRING

ListObjectInspector

StructObjectInspector

MapObjectInspector

等類型

三種自定義函數都會首先進行參數個數和參數類型檢查

參數類型檢查(是那一種大類型(primitive...,而後具體是什麼類型)

注意:在全部方法有返回時要注意返回的類型,最重要的是最後返回出去的結果,通常結果的數據類型會在初始化時就定義了,那麼在最後返回結果是應該要轉化成那種類型

 

初始化時init:

PrimitiveObjectInspector inputOI;

inputOI = (PrimitiveObjectInspector) parameters[0];

都會定義這種設置輸入數據的 ObjectInspector(類型吧)

有多個參數就會設置多個。

 

在真正處理數據須要獲取數據:

long distinctId = PrimitiveObjectInspectorUtils.getLong(objects[0], distinctIdOI);

通常都這樣獲取,objects是方法的參數,從外界傳遞進來的,包含不少值,獲取第幾個值,同時類型是什麼(distinctIdOI

2 UDF

2.1介紹

hiveudf有兩種實現方式或者實現的API,一種是udf比較簡單,一種是GenericUDF比較複雜。

若是所操做的數據類型都是基礎數據類型,如(Hadoop&Hive 基本writable類型,如Text,IntWritable,LongWriable,DoubleWritable等等)。那麼簡單的org.apache.hadoop.hive.ql.exec.UDF就能夠作到。

若是所操做的數據類型是內嵌數據結構,如MapListSet,那麼要採用org.apache.hadoop.hive.ql.udf.generic.GenericUDF

2.2繼承UDF實現

須要繼承org.apache.hadoop.hive.ql.UDF,或者

org.apache.hadoop.hive.ql.udf.generic.GenericUDF,前者比較簡單,只須要實現evaluate函數,evaluate函數支持重載。

UDF代碼以下:

import org.apache.hadoop.hive.ql.exec.UDF;

public class GetLength extends UDF{

    public int evaluate(String str) {

        try{

            return str.length();

        }catch(Exception e){

            return -1;

        }

    }

}

2.3繼承GenericUDF實現

繼承org.apache.hadoop.hive.ql.udf.generic.GenericUDF須要實現三個方法:

1initialize:只調用一次,在任何evaluate()調用以前能夠接收到一個能夠表示函數輸入參數類型的object inspectors數組。initalize用來驗證該函數是否接收正確的參數類型和參數個數,最後提供最後結果對應的數據類型。

2evaluate:真正的邏輯,讀取輸入數據,處理數據,返回結果。

3getDisplayString:返回描述該方法的字符串,沒有太多做用。

繼承GenericUDF實現UDF,完成url解碼功能代碼以下:

public class UrlDecodeUDF2 extends GenericUDF {

    private transient PrimitiveObjectInspector inputOI;

    public ObjectInspector initialize(ObjectInspector[] objectInspectors) throws UDFArgumentException {

        // 檢查參數數量

        if (objectInspectors.length != 1) {

            throw new UDFArgumentException("urlDecode() takes only one argument");

        }

        // 檢查參數類型

        if (objectInspectors[0].getCategory() != ObjectInspector.Category.PRIMITIVE) {

            if (((PrimitiveObjectInspector) objectInspectors[0]).getPrimitiveCategory() != PrimitiveObjectInspector.PrimitiveCategory.STRING) {

                throw new UDFArgumentTypeException(0, "Only String type argument are accepted, but "

                        + objectInspectors[0].getTypeName() + " was pass as parameter 1");

            }

        }

        // 設置輸入數據的 ObjectInspector

        inputOI = (PrimitiveObjectInspector) objectInspectors[0];

        // 輸出數據的 ObjectInspector

        return PrimitiveObjectInspectorFactory.writableStringObjectInspector;

    }

    public Object evaluate(DeferredObject[] deferredObjects) throws HiveException {

        if (deferredObjects == null || deferredObjects[0] == null) {

            return new Text("");

        }

        // 提取數據

        String component = PrimitiveObjectInspectorUtils.getString(deferredObjects[0].get(), inputOI);

        if (component == null || component.length() <= 0) {

            return "";

        }

        String result = "";

        //使用%25替換字符串中的%

        component = component.replaceAll("%(?![0-9a-fA-F]{2})", "%25");

        try {

            result = URLDecoder.decode(component, "UTF-8");

//            result = URLDecoder.decode(result, "UTF-8");

        } catch (UnsupportedEncodingException e) {

            result = component;

        }

        System.out.println(result);

        return result;

    }

    public String getDisplayString(String[] strings) {

        StringBuilder sb = new StringBuilder();

        sb.append("url_decode");

        sb.append("(");

        if (strings.length > 0) {

            sb.append(strings[0]);

            for (int index = 1; index < strings.length - 1; index++) {

                sb.append(",");

                sb.append(strings[index]);

            }

        }

        sb.append(")");

        return sb.toString();

    }

}

 

3 UDAF

3.1介紹

多行進一行出,如sum()min(),用在group  by時。開發通用UDAF有兩個步驟

一、第一個是編寫resolver類(繼承AbstractGenericUDAFResolver),

二、第二個是編寫evaluator類(繼承GenericUDAFEvaluator)在resolver類內部。

resolver負責類型檢查,操做符重載。evaluator真正實現UDAF的邏輯。一般來講,頂層UDAF類繼承org.apache.hadoop.hive.ql.udf.GenericUDAFResolver,裏面編寫嵌套類evaluator (繼承GenericUDAFEvaluator)實現UDAF的邏輯。

實現evaluator全部evaluators必須繼承抽象類

org.apache.hadoop.hive.ql.udf.generic.GenericUDAFEvaluator。子類必須實現它的一些抽象方法,實現UDAF的邏輯。

同時須要一個繼承了AggregationBuffer的類,來存儲中間過程當中記錄的數據

3.2 Mode

GenericUDAFEvaluator有一個嵌套類Mode,這個類很重要,它表示了udafmapreduce的各個階段,理解Mode的含義,就能夠理解了hiveUDAF的運行流程。

 

PARTIAL1, //從原始數據到部分聚合數據的過程(map階段),將調用iterate()terminatePartial()方法。

PARTIAL2, //從部分聚合數據到部分聚合數據的過程(map端的combiner階段),將調用merge() terminatePartial()方法。

FINAL,    //從部分聚合數據到所有聚合的過程(reduce階段),將調用merge()terminate()方法。

COMPLETE  //從原始數據直接到所有聚合的過程(表示只有map,沒有reducemap端直接出結果),將調用merge() terminate()方法。

 

 

public static enum Mode {

    /**

     * PARTIAL1: 這個是mapreducemap階段:從原始數據到部分數據聚合

     * 將會調用iterate()terminatePartial()

     */

    PARTIAL1,

        /**

     * PARTIAL2: 這個是mapreducemap端的Combiner階段,負責在map端合併map的數據::從部分數據聚合到部分數據聚合:

     * 將會調用merge() terminatePartial()

     */

    PARTIAL2,

        /**

     * FINAL: mapreducereduce階段:從部分數據的聚合到徹底聚合

     * 將會調用merge()terminate()

     */

    FINAL,

        /**

     * COMPLETE: 若是出現了這個階段,表示mapreduce只有map,沒有reduce,因此map端就直接出結果了:從原始數據直接到徹底聚合

      * 將會調用 iterate()terminate()

     */

    COMPLETE

  };

通常狀況下,完整的UDAF邏輯是一個mapreduce過程,若是有mapperreducer,就會經歷PARTIAL1(mapper)FINAL(reducer),若是還有combiner,那就會經歷PARTIAL1(mapper)PARTIAL2(combiner)FINAL(reducer)

而有一些狀況下的mapreduce,只有mapper,而沒有reducer,因此就會只有COMPLETE階段,這個階段直接輸入原始數據,出結果。

3.3實現代碼分析

UDAF的實現代碼主幹以下:

//最外層繼承AbstractGenericUDAFResolver

public class GenericUDAFSum extends AbstractGenericUDAFResolver {

  static final Log LOG = LogFactory.getLog(GenericUDAFSum.class.getName());

//實現getEvaluator方法

  @Override

  public GenericUDAFEvaluator getEvaluator(TypeInfo[] parameters)

    throws SemanticException {

    // Type-checking goes here!

    return new GenericUDAFSumLong();

  }

//編寫一個類,繼承GenericUDAFEvaluator,實現全部方法

  public static class GenericUDAFSumLong extends GenericUDAFEvaluator {

    // UDAF logic goes here!

  }

}

說明:

一、getEvaluator:繼承AbstractGenericUDAFResolver 所須要實現的方法,也只須要重寫這一個方法,做用是檢查參數個數,參數類型等,而後返回GenericUDAFEvaluator對象。

二、GenericUDAFEvaluatorgetEvaluator最後返回類型就是這個類,真正返回(return)的是繼承實現GenericUDAFEvaluator的類。

 

例如GenericUDAFSumLong 繼承GenericUDAFEvaluator須要實現以下方法:

一、init(初始化):肯定返回類型並返回UDAF的返回類型,

二、getNewAggregationBuffer:建立新的聚合計算的須要的內存,用來存儲mapper,combiner,reducer運算過程當中的相加總和,獲取聚合中間結果緩存

三、reset:重置中間結果緩存

四、iterate: 迭代每一行的數據,等同於 Map 階段。傳進來的行數據由 HiveSQL 決定,計算的中間結果緩存到 AggregationBuffer @param aggregationBuffer 中間結果緩存,@param objects 行中每一列的數據

五、terminatePartial(終止部分):對部分中間結果數據進行合併,等同於 Map 階段的 combine,返回mapper結果,combine後的結果

六、merge(合併):在最終進行 terminate() 前對全部傳入的中間結果進行合併,等同於 Reduce 階段的 merge。 各個 Map 傳來的中間結果 partial 合併到 aggregationBuffer

七、terminate(終止):計算合併後的數據得出最終結果,等同於 Reduce 階段的邏輯。reducer返回結果,或者是隻有mapper,沒有reducer時,在mapper端返回結果。

3.4代碼示例

例子以下:

public class UDAFDemo extends  AbstractGenericUDAFResolver {

    static final Log LOG = LogFactory.getLog(GenericUDAFSum.class.getName());

    public GenericUDAFEvaluator getEvaluator(TypeInfo[] parameters)

        throws SemanticException {

        // 檢查參數個數

        if (parameters.length != 1) {

            throw new UDFArgumentTypeException(parameters.length - 1,

                    "Exactly one argument is expected.");

        }

        // 檢查參數類型

        if (parameters[0].getCategory() != ObjectInspector.Category.PRIMITIVE) {

            throw new UDFArgumentTypeException(0,

                    "Only primitive type arguments are accepted but "

                            + parameters[0].getTypeName() + " is passed.");

        }

//        // 檢查參數類型

//        if (parameters[0].getCategory() != ObjectInspector.Category.PRIMITIVE) {

//            throw new UDFArgumentTypeException(0, "Only primitive type argument are accepted but "

//                    + parameters[0].getTypeName() + " was passed as parameter 1");

//        }

//        if (((PrimitiveTypeInfo) parameters[0]).getPrimitiveCategory() != PrimitiveObjectInspector.PrimitiveCategory.LONG) {

//            throw new UDFArgumentTypeException(0, "Only Long Type type argument are accepted but "

//                    + parameters[0].getTypeName() + " was passed as parameter 1");

//        }

        // 檢查參數類型

        switch (((PrimitiveTypeInfo) parameters[0]).getPrimitiveCategory()) {

            case BYTE:

            case SHORT:

            case INT:

            case LONG:

            case TIMESTAMP:

                return new GenericUDAFSumLong();

            case FLOAT:

            case DOUBLE:

//                case STRING:

//                    return new GenericUDAFSumDouble();

            case BOOLEAN:

            default:

                throw new UDFArgumentTypeException(0,

                        "Only numeric or string type arguments are accepted but "

                                + parameters[0].getTypeName() + " is passed.");

        }

    }

    public static class GenericUDAFSumLong extends GenericUDAFEvaluator {

        private PrimitiveObjectInspector inputOI;

        private LongWritable result;

        /** 存儲sum的值的類 */

        static class SumLongAgg implements AggregationBuffer {

            boolean empty;

            long sum;

        }

        //這個方法返回了UDAF的返回類型,這裏肯定了sum自定義函數的返回類型是Long類型

        public ObjectInspector init(Mode m, ObjectInspector[] parameters) throws HiveException {

            assert (parameters.length == 1);

            super.init(m, parameters);

            result = new LongWritable(0);

            inputOI = (PrimitiveObjectInspector) parameters[0];

            return PrimitiveObjectInspectorFactory.writableLongObjectInspector;

        }

        //建立新的聚合計算的須要的內存,用來存儲mapper,combiner,reducer運算過程當中的相加總和

        public AggregationBuffer getNewAggregationBuffer() throws HiveException {

            SumLongAgg result = new SumLongAgg();

            reset(result);

            return result;

        }

        //mapreduce支持mapperreducer的重用,因此爲了兼容,也須要作內存的重用。

        public void reset(AggregationBuffer aggregationBuffer) throws HiveException {

            SumLongAgg myagg = (SumLongAgg) aggregationBuffer;

            myagg.empty = true;

            myagg.sum = 0;

        }

        private boolean warned = false;

        //map階段調用,只要把保存當前和的對象agg,再加上輸入的參數,就能夠了。

        public void iterate(AggregationBuffer aggregationBuffer, Object[] objects) throws HiveException {

            assert (objects.length == 1);

            try {

                merge(aggregationBuffer, objects[0]);

            }catch (NumberFormatException e) {

                if (!warned) {

                    warned = true;

                    LOG.warn(getClass().getSimpleName() + " "

                            + StringUtils.stringifyException(e));

                }

            }

        }

        //mapper結束要返回的結果,還有combiner結束返回的結果

        public Object terminatePartial(AggregationBuffer aggregationBuffer) throws HiveException {

            return terminate(aggregationBuffer);

        }

        //combiner合併map返回的結果,還有reducer合併mappercombiner返回的結果。

        public void merge(AggregationBuffer aggregationBuffer, Object o) throws HiveException {

            if (o != null) {

                SumLongAgg agg = (SumLongAgg) aggregationBuffer;

                agg.sum += PrimitiveObjectInspectorUtils.getLong(o, inputOI);

                agg.empty = false;

            }

        }

        //reducer返回結果,或者是隻有mapper,沒有reducer時,在mapper端返回結果。

        public Object terminate(AggregationBuffer aggregationBuffer) throws HiveException {

            SumLongAgg ragg = (SumLongAgg) aggregationBuffer;

            if (ragg.empty) {

                return null;

            }

            result.set(ragg.sum);

            return result;

        }

    }

}

4 UDTF

4.1介紹

udtf用來解決輸入一行輸出多行(On-to-many maping) 的需求須要繼承org.apache.hadoop.hive.ql.udf.generic.GenericUDTF來實現三個方法。

一、initialize:返回UDTF的返回行的信息(返回個數,類型)。

二、process:真正的處理過程在process函數中,在process中,每一次forward()調用產生一行;若是產生多列能夠將多個列的值放在一個數組中,而後將該數組傳入到forward()函數。

三、close:對須要清理的方法進行清理

forward()傳入的就是最後的結果,裏面通常是數組,數組有多少個元素就代碼最後一行輸出的結果有多少列

4.2代碼示例

下面有一個切分」key:value;key:value;」這種字符串,返回結果爲key, value兩個字段。

public class UDTFDemo extends GenericUDTF {

    @Override

    public void close() throws HiveException {

        // TODO Auto-generated method stub

    }

    /**

     * 此方法返回UDTF的返回行的信息(返回個數,類型)

     */

    @Override

    public StructObjectInspector initialize(ObjectInspector[] args)

            throws UDFArgumentException {

        if (args.length != 1) {

            throw new UDFArgumentLengthException("ExplodeMap takes only one argument");

        }

        if (args[0].getCategory() != ObjectInspector.Category.PRIMITIVE) {

            throw new UDFArgumentException("ExplodeMap takes string as a parameter");

        }

        //多出的列名

        ArrayList<String> fieldNames = new ArrayList<String>();

        //每一列的類型

        ArrayList<ObjectInspector> fieldOIs = new ArrayList<ObjectInspector>();

        fieldNames.add("c1");

        fieldOIs.add(PrimitiveObjectInspectorFactory.javaStringObjectInspector);

        fieldNames.add("c2");

        fieldOIs.add(PrimitiveObjectInspectorFactory.javaStringObjectInspector);

        return ObjectInspectorFactory.getStandardStructObjectInspector(fieldNames,fieldOIs);

    }

    /**

     * 初始化完成以後會調用process方法,真正的處理過程在process函數中,在process中,每一次forward()調用產生一行;

     * 若是產生多列能夠將多個列的值放在一個數組中,而後將該數組傳入到forward()函數。

     */

    @Override

    public void process(Object[] args) throws HiveException {

        //每一行數 key:value,key:value,key:value

        String input = args[0].toString();

        String[] test = input.split(";");

        for(int i=0; i<test.length; i++) {

            try {

                String[] result = test[i].split(":");

                forward(result);

            } catch (Exception e) {

                continue;

            }

        }

    }

}

 

5注意點(坑點)

一、使用String進行數據傳遞時,有時會出現String強轉HadoopText類型異常,或者Integer強轉類型Text異常。

示例:自定義UDAF

 

實現這個功能時:udaf的輸出的list,類型是Object類型,最後統計時使用map存放計數map<object,integer>,注意最後填充到list中時要把mapkeyvalue所有變成Text類型才行,否則可能會出現類型轉換錯誤。我在iterate方法裏面須要獲取value時(此時類型是String),獲取以後立刻轉換成Text以後存放的。

二、注意在一些稍微複雜的udaf中會使用list或者其餘類型,在中間進行聚合等其餘操做時,不一樣操做的傳入的參數類型可能不一樣,最開始可能傳遞進來的數據是字符串類型或者Double類型的,可是通過一次迭代或者聚合以後變成了list,那麼下一次或者下一個流程的傳入數據的類型就變成了list,這時須要在初始化參數類型的時候定義好,如:

 

根據mode的流程判斷處於哪個階段,參數類型就哪種,就切換成哪種,這裏的返回值是定義的最終的返回結果。

相關文章
相關標籤/搜索