Hive自定義UDF和聚合函數UDAF

       Hive是一種構建在Hadoop上的數據倉庫,Hive把SQL查詢轉換爲一系列在Hadoop集羣中運行的MapReduce做業,是MapReduce更高層次的抽象,不用編寫具體的MapReduce方法。Hive將數據組織爲表,這就使得HDFS上的數據有告終構,元數據即表的模式,都存儲在名爲metastore的數據庫中。java

       能夠在hive的外殼環境中直接使用dfs訪問hadoop的文件系統命令。數據庫

       Hive能夠容許用戶編寫本身定義的函數UDF,來在查詢中使用。Hive中有3種UDF:apache

       UDF:操做單個數據行,產生單個數據行;ide

       UDAF:操做多個數據行,產生一個數據行。函數

       UDTF:操做一個數據行,產生多個數據行一個表做爲輸出。oop

      用戶構建的UDF使用過程以下:lua

      第一步:繼承UDF或者UDAF或者UDTF,實現特定的方法。spa

      第二步:將寫好的類打包爲jar。如hivefirst.jar.對象

      第三步:進入到Hive外殼環境中,利用add jar /home/hadoop/hivefirst.jar.註冊該jar文件繼承

      第四步:爲該類起一個別名,create temporary function mylength as 'com.whut.StringLength';這裏注意UDF只是爲這個Hive會話臨時定義的。

      第五步:在select中使用mylength();


自定義UDF

package whut;
import org.apache.commons.lang.StringUtils;
import org.apache.hadoop.hive.ql.exec.UDF;
import org.apache.hadoop.io.Text;
//UDF是做用於單個數據行,產生一個數據行
//用戶必需要繼承UDF,且必須至少實現一個evalute方法,該方法並不在UDF中
//可是Hive會檢查用戶的UDF是否擁有一個evalute方法
public class Strip extends UDF{
    private Text result=new Text();
    //自定義方法
    public Text evaluate(Text str)
    {
      if(str==null)
        return null;
        result.set(StringUtils.strip(str.toString()));
        return result;
    }
    public Text evaluate(Text str,String stripChars)
    {
        if(str==null)
            return null;
        result.set(StringUtils.strip(str.toString(),stripChars));
        return result;
    }
}

注意事項:

   1,一個用戶UDF必須繼承org.apache.hadoop.hive.ql.exec.UDF;

   2,一個UDF必需要包含有evaluate()方法,可是該方法並不存在於UDF中。evaluate的參數個數以及類型都是用戶本身定義的。在使用的時候,Hive會調用UDF的evaluate()方法。


自定義UDAF

該UDAF主要是找到最大值

package whut;
import org.apache.hadoop.hive.ql.exec.UDAF;
import org.apache.hadoop.hive.ql.exec.UDAFEvaluator;
import org.apache.hadoop.io.IntWritable;
//UDAF是輸入多個數據行,產生一個數據行
//用戶自定義的UDAF必須是繼承了UDAF,且內部包含多個實現了exec的靜態類
public class MaxiNumber extends UDAF{
    public static class MaxiNumberIntUDAFEvaluator implements UDAFEvaluator{
        //最終結果
        private IntWritable result;
        //負責初始化計算函數並設置它的內部狀態,result是存放最終結果的
        @Override
        public void init() {
            result=null;
        }
        //每次對一個新值進行彙集計算都會調用iterate方法
        public boolean iterate(IntWritable value)
        {
            if(value==null)
                return false;
            if(result==null)
              result=new IntWritable(value.get());
            else
              result.set(Math.max(result.get(), value.get()));
            return true;
        }
                                                                                                                                 
        //Hive須要部分彙集結果的時候會調用該方法
        //會返回一個封裝了彙集計算當前狀態的對象
        public IntWritable terminatePartial()
        {
            return result;
        }
        //合併兩個部分彙集值會調用這個方法
        public boolean merge(IntWritable other)
        {
            return iterate(other);
        }
        //Hive須要最終彙集結果時候會調用該方法
        public IntWritable terminate()
        {
            return result;
        }
    }
}

注意事項:

    1,用戶的UDAF必須繼承了org.apache.hadoop.hive.ql.exec.UDAF;

    2,用戶的UDAF必須包含至少一個實現了org.apache.hadoop.hive.ql.exec的靜態類,諸如常見的實現了 UDAFEvaluator。

    3,一個計算函數必須實現的5個方法的具體含義以下:

    init():主要是負責初始化計算函數而且重設其內部狀態,通常就是重設其內部字段。通常在靜態類中定義一個內部字段來存放最終的結果。

   iterate():每一次對一個新值進行彙集計算時候都會調用該方法,計算函數會根據彙集計算結果更新內部狀態。當輸入值合法或者正確計算了,則就返回true。

   terminatePartial():Hive須要部分彙集結果的時候會調用該方法,必需要返回一個封裝了彙集計算當前狀態的對象。

   merge():Hive進行合併一個部分彙集和另外一個部分彙集的時候會調用該方法。

   terminate():Hive最終彙集結果的時候就會調用該方法。計算函數須要把狀態做爲一個值返回給用戶。

  4,部分彙集結果的數據類型和最終結果的數據類型能夠不一樣。

相關文章
相關標籤/搜索