閒下來再回顧下spark 和 hive 的聚合函數 使用:java
spark自定義聚合函數類sql
class GroupConcatUDAF extends UserDefinedAggregateFunction{ /** * 指定輸入字段的字段及類型 * group by 以後會有1到多個數據被歸到一組,因此用Array()封裝 */ override def inputSchema: StructType = { StructType(Array( StructField("str",StringType,true) )) } //聚合過程當中的中間結果集類型 override def bufferSchema: StructType ={ StructType(Array( StructField("strings",StringType,true) )) } //函數的返回類型 override def dataType: DataType = { StringType } override def deterministic: Boolean = { true } //爲每一個分組的數據初始化 override def initialize(buffer: MutableAggregationBuffer): Unit = { buffer(0)="" } //指的是,每一個分組,有新的值進來時,如何進行分組的聚合計算 //至關於map的combiner,buffer裏面存放着累計的執行結果,input是當前的執行結果 override def update(buffer: MutableAggregationBuffer, input: Row): Unit = { buffer(0)=buffer.getAs[String](0)+"|"+input.getAs[String](0) } //因爲Spark是分佈式的,因此一個分組的數據,可能會在不一樣的節點上進行局部聚合,就是update //可是最後一個分組,在各節點上的聚合值,要進行Merge,也就是合併 //至關於reduce端的合併 override def merge(buffer1: MutableAggregationBuffer, buffer2: Row): Unit = { buffer1(0)=buffer1.getAs[String](0) + buffer2.getAs[String](0) } //一個分組的聚合值,如何經過中間的聚合值,最後返回一個最終的聚合值 override def evaluate(buffer: Row): Any ={ buffer.getAs[String](0) } }
spark自定義聚合函數的調用:服務器
object 測試spark的聚合函數 extends App{ val spark=SparkSession.builder().appName("spark udaf").master("local[*]").getOrCreate() strCount() //1--測試strCount的使用+group_concat函數的使用 def strCount(): Unit ={ //導入隱式轉化 import spark.implicits._ //構造用戶的訪問數據,並建立DataFrame val names=Array("張三","李四","王五","趙六","趙六","張三") val namesRDD: RDD[String] = spark.sparkContext.parallelize(names) //將RDD轉換爲DataFram val namesRowRDD=namesRDD.map(name=>Row(name)) val structType=StructType(Array( StructField("name",StringType,true) )) val namesDF=spark.sqlContext.createDataFrame(namesRowRDD,structType) //註冊表 namesDF.createOrReplaceTempView("names") //定義和註冊自定義函數 spark.sqlContext.udf.register("group_concat",new GroupConcatUDAF) //使用自定義函數 spark.sqlContext.sql("select name,concat_ws('|',collect_set(name)), concat_ws('|',collect_list(name)),group_concat(name) from names group by name").show() } }
下圖爲調用自定義聚合函數group_concat的結果,其實直接使用concat_ws()函數也能實現group_concat功能,不過若是須要保持順序對應關係,則使用concat_ws('|',collect_list(name))。若須要去重則使用concat_ws('|',collect_set(name))。app
hive的自定義聚合函數---group_concat分佈式
public class GroupConcat extends UDAF { public static class ConcatUDAFEvaluator implements UDAFEvaluator { //定義一個構造類,封裝結果 public static class PartialResult{ String result; String delimiter; } private PartialResult partial; //init函數相似於構造函數,用於UDAF的初始化 public void init() { partial = null; } // iterate接收傳入的參數,並進行內部的輪轉。其返回類型爲boolean public boolean iterate(String value,String deli){ if (value == null){ return true; } if (partial == null){ partial = new PartialResult();//構造類 partial.result = new String("");//初始化值 if( deli == null || deli.equals("") ) { partial.delimiter = new String(",");//設置分隔符,沒有設置默認使用 ',' } else { partial.delimiter = new String(deli);//設置分隔符 } } if ( partial.result.length() > 0 )//處理傳入的值 { partial.result = partial.result.concat(partial.delimiter);//值 拼接 分隔符 } partial.result = partial.result.concat(value);//拼接每次傳入的值 return true; } //terminatePartial無參數,其爲iterate函數遍歷結束後,返回輪轉結果 public PartialResult terminatePartial(){ return partial; } //合併兩個部分彙集值 public boolean merge(PartialResult other){ if (other == null){ return true; } if (partial == null){ partial = new PartialResult(); partial.result = new String(other.result); partial.delimiter = new String(other.delimiter); } else { if ( partial.result.length() > 0 ) { partial.result = partial.result.concat(partial.delimiter); } partial.result = partial.result.concat(other.result); } return true; } //terminate返回最終的彙集函數結果 * * @return public String terminate(){ return new String(partial.result); } } }
使用方法:ide
1.將程序打成jar包,上傳至服務器。 2.進入hive客戶端 3.添加jar包。 hive>add jar /opt/hive-1.0-SNAPSHOT.jar 4.建立臨時函數 hive>create temporary function group_concat as 'GroupConcat'; 5.調用臨時函數 hive>select group_concat (ykd018) as pdxCode from t_kc21k1 group by akc190;