spark +hive 自定義聚合函數回顧--group_concat實現

閒下來再回顧下spark 和 hive 的聚合函數 使用:java

spark自定義聚合函數類sql

class GroupConcatUDAF extends  UserDefinedAggregateFunction{
  /**
    * 指定輸入字段的字段及類型
    * group by  以後會有1到多個數據被歸到一組,因此用Array()封裝
    */
  override def inputSchema: StructType = {
    StructType(Array(
      StructField("str",StringType,true)
    ))
  }
  //聚合過程當中的中間結果集類型
  override def bufferSchema: StructType ={
    StructType(Array(
      StructField("strings",StringType,true)
    ))
  }
  //函數的返回類型
  override def dataType: DataType = {
    StringType
  }

  override def deterministic: Boolean = {
    true
  }
  //爲每一個分組的數據初始化
  override def initialize(buffer: MutableAggregationBuffer): Unit = {
    buffer(0)=""
  }
  //指的是,每一個分組,有新的值進來時,如何進行分組的聚合計算
  //至關於map的combiner,buffer裏面存放着累計的執行結果,input是當前的執行結果
  override def update(buffer: MutableAggregationBuffer, input: Row): Unit = {
    buffer(0)=buffer.getAs[String](0)+"|"+input.getAs[String](0)
  }
  //因爲Spark是分佈式的,因此一個分組的數據,可能會在不一樣的節點上進行局部聚合,就是update
  //可是最後一個分組,在各節點上的聚合值,要進行Merge,也就是合併
  //至關於reduce端的合併
  override def merge(buffer1: MutableAggregationBuffer, buffer2: Row): Unit =  {
    buffer1(0)=buffer1.getAs[String](0) + buffer2.getAs[String](0)
  }
  //一個分組的聚合值,如何經過中間的聚合值,最後返回一個最終的聚合值
  override def evaluate(buffer: Row): Any ={
    buffer.getAs[String](0)
  }
}

spark自定義聚合函數的調用:服務器

object 測試spark的聚合函數 extends App{
  val spark=SparkSession.builder().appName("spark udaf").master("local[*]").getOrCreate()
  strCount()
  //1--測試strCount的使用+group_concat函數的使用
  def strCount(): Unit ={
    //導入隱式轉化
    import spark.implicits._
    //構造用戶的訪問數據,並建立DataFrame
    val names=Array("張三","李四","王五","趙六","趙六","張三")
    val namesRDD: RDD[String] = spark.sparkContext.parallelize(names)
    //將RDD轉換爲DataFram
    val namesRowRDD=namesRDD.map(name=>Row(name))
    val structType=StructType(Array(
      StructField("name",StringType,true)
    ))
    val namesDF=spark.sqlContext.createDataFrame(namesRowRDD,structType)
    //註冊表
    namesDF.createOrReplaceTempView("names")
    //定義和註冊自定義函數
    spark.sqlContext.udf.register("group_concat",new GroupConcatUDAF)
    //使用自定義函數
   spark.sqlContext.sql("select name,concat_ws('|',collect_set(name)), concat_ws('|',collect_list(name)),group_concat(name) from names group by name").show()

  }
}

下圖爲調用自定義聚合函數group_concat的結果,其實直接使用concat_ws()函數也能實現group_concat功能,不過若是須要保持順序對應關係,則使用concat_ws('|',collect_list(name))。若須要去重則使用concat_ws('|',collect_set(name))。app

hive的自定義聚合函數---group_concat分佈式

public class GroupConcat extends UDAF {
    public static class ConcatUDAFEvaluator implements UDAFEvaluator {
        //定義一個構造類,封裝結果
        public static class PartialResult{
            String result;
            String delimiter;
        }

        private PartialResult partial;
        //init函數相似於構造函數,用於UDAF的初始化
        public void init() {
            partial = null;
        }
        // iterate接收傳入的參數,並進行內部的輪轉。其返回類型爲boolean
        public boolean iterate(String value,String deli){
            if (value == null){
                return true;
            }
            if (partial == null){
                partial = new PartialResult();//構造類
                partial.result = new String("");//初始化值
                if(  deli == null || deli.equals("") )
                {
                    partial.delimiter = new String(",");//設置分隔符,沒有設置默認使用 ','
                }
                else
                {
                    partial.delimiter = new String(deli);//設置分隔符
                }

            }
            if ( partial.result.length() > 0 )//處理傳入的值
            {
                partial.result = partial.result.concat(partial.delimiter);//值 拼接 分隔符
            }

            partial.result = partial.result.concat(value);//拼接每次傳入的值

            return true;
        }
        //terminatePartial無參數,其爲iterate函數遍歷結束後,返回輪轉結果
        public PartialResult terminatePartial(){
            return partial;
        }
        //合併兩個部分彙集值
        public boolean merge(PartialResult other){
            if (other == null){
                return true;
            }
            if (partial == null){
                partial = new PartialResult();
                partial.result = new String(other.result);
                partial.delimiter = new String(other.delimiter);
            }
            else
            {
                if ( partial.result.length() > 0 )
                {
                    partial.result = partial.result.concat(partial.delimiter);
                }
                partial.result = partial.result.concat(other.result);
            }
            return true;
        }
        //terminate返回最終的彙集函數結果 * * @return
        public String terminate(){
            return new String(partial.result);
        }
    }
}

使用方法:ide

1.將程序打成jar包,上傳至服務器。

2.進入hive客戶端

3.添加jar包。

hive>add jar /opt/hive-1.0-SNAPSHOT.jar
4.建立臨時函數

hive>create temporary function group_concat as 'GroupConcat';

5.調用臨時函數

hive>select group_concat (ykd018) as pdxCode from t_kc21k1 group by akc190;
相關文章
相關標籤/搜索