用戶自定義函數

             用戶自定義函數java

                                     做者:尹正傑sql

版權聲明:原創做品,謝絕轉載!不然將追究法律責任。shell

 

 

 

一.用戶自定義UDF函數express

[root@hadoop101.yinzhengjie.org.cn ~]# spark-shell                                 #在"spark-shell"窗口中能夠經過spark.udf功能用戶能夠自定義函數。
20/07/14 01:21:10 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
Using Spark's default log4j profile: org/apache/spark/log4j-defaults.properties
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
Spark context Web UI available at http://hadoop101.yinzhengjie.org.cn:4040
Spark context available as 'sc' (master = local[*], app id = local-1594660981913).
Spark session available as 'spark'.
Welcome to
      ____              __
     / __/__  ___ _____/ /__
    _\ \/ _ \/ _ `/ __/  '_/
   /___/ .__/\_,_/_/ /_/\_\   version 2.4.6
      /_/
         
Using Scala version 2.11.12 (Java HotSpot(TM) 64-Bit Server VM, Java 1.8.0_201)
Type in expressions to have them evaluated.
Type :help for more information.

scala> val df = spark.read.json("file:///tmp/user.json")          #讀取數據,建立DataFrame
df: org.apache.spark.sql.DataFrame = [name: string, passwd: string]             

scala> df.show()                                   #展現數據
+-----------+------+
|       name|passwd|
+-----------+------+
|yinzhengjie|  2020|
|      Jason|666666|
|     Liming|   123|
|      Jenny|   456|
|      Danny|   789|
+-----------+------+


scala> spark.udf.register("myNameFunc", (x:String)=> "Name:"+x)      #註冊自定義函數,函數名爲"myNameFunc",功能就是在傳入的字符串前添加"Nmae:"字樣
res1: org.apache.spark.sql.expressions.UserDefinedFunction = UserDefinedFunction(<function1>,StringType,Some(List(StringType)))

scala> df.createOrReplaceTempView("user")                    #建立一張user表

scala> spark.sql("select name, passwd from user").show()           #正常查詢user表信息
+-----------+------+
|       name|passwd|
+-----------+------+
|yinzhengjie|  2020|
|      Jason|666666|
|     Liming|   123|
|      Jenny|   456|
|      Danny|   789|
+-----------+------+


scala> spark.sql("select myNameFunc(name), passwd from user").show()    #使用我們自定義函數(myNameFunc)來查詢user表信息
+--------------------+------+
|UDF:myNameFunc(name)|passwd|
+--------------------+------+
|    Name:yinzhengjie|  2020|
|          Name:Jason|666666|
|         Name:Liming|   123|
|          Name:Jenny|   456|
|          Name:Danny|   789|
+--------------------+------+


scala>

 

二.弱類型的DataFrame自定義用戶聚合函數案例apache

{"name":"yinzhengjie","passwd":"2020","age":18}
{"name":"Jason","passwd":"666666","age":27}
{"name":"Liming","passwd":"123","age":49}
{"name":"Jenny","passwd":"456","age":23}
{"name":"Danny","passwd":"789","age":56}
E:\yinzhengjie\bigdata\input\json\user.json(文件內容戳這裏)
package com.yinzhengjie.bigdata.spark.sql

import org.apache.spark.SparkConf
import org.apache.spark.sql.{DataFrame, Row, SparkSession}
import org.apache.spark.sql.expressions.{MutableAggregationBuffer, UserDefinedAggregateFunction}
import org.apache.spark.sql.types.{DataType, DoubleType, LongType, StructType}

object SparkSQL_UDAF {
  def main(args: Array[String]): Unit = {
    //建立spark配置信息
    val sparkConf: SparkConf = new SparkConf().setMaster("local[*]").setAppName("SparkSQLDemo3")

    //建立SparkSQL的環境對象,即建立SparkSession
    val spark:SparkSession = SparkSession.builder().config(sparkConf).getOrCreate()

    //建立我們自定義的聚合函數對象
    val udaf = new MyAgeAvgFunction

    //註冊函數
    spark.udf.register("avgAge",udaf)

    //讀取json文件,構建DataFrame對象
    val frame:DataFrame = spark.read.json("E:\\yinzhengjie\\bigdata\\input\\json\\user.json")

    //使用聚合函數
    frame.createOrReplaceTempView("user")
    spark.sql("select avgAge(age) from user").show()

    //釋放資源
    spark.close()
  }
}

/**
  *   強類型的Dataset和弱類型的DataFrame都提供了相關的聚合函數, 如 count(),countDistinct(),avg(),max(),min()。除此以外,用戶能夠設定本身的自定義聚合函數。
  *
  *   弱類型用戶自定義聚合函數:經過繼承UserDefinedAggregateFunction來實現用戶自定義聚合函數。
  *
  *   聲明用戶自定義聚合函數大體步驟以下:
  *     1>.繼承UserDefinedAggregateFunction
  *     2>.實現方法
  */
class MyAgeAvgFunction extends UserDefinedAggregateFunction{

  //定義函數輸入的數據結構
  override def inputSchema: StructType = {
    new StructType().add("age",LongType)
  }

  //定義計算時的數據結構
  override def bufferSchema: StructType = {
    new StructType().add("sum",LongType).add("count",LongType)
  }

  //函數返回的數據類型
  override def dataType: DataType = {
    DoubleType
  }

  //函數是否穩定(冪等性,也就是說給你相同的值你計算的結果是一致的)
  override def deterministic: Boolean = {
    true
  }

  //計算以前緩衝區的初始化
  override def initialize(buffer: MutableAggregationBuffer): Unit = {
    buffer(0) = 0L    //表明第一個結構,即上面定義的"sum"初始值爲"0"
    buffer(1) = 0L    //表明第二個結構,即上面定義的"count"初始值爲"0"
  }

  //根據查詢結果更新緩衝區數據
  override def update(buffer: MutableAggregationBuffer, input: Row): Unit = {
    buffer(0) = buffer.getLong(0) + input.getLong(0)    //sum進行累加操做
    buffer(1) = buffer.getLong(1) + 1                   //count加1操做
  }

  //將多個節點的緩衝區進行合併
  override def merge(buffer1: MutableAggregationBuffer, buffer2: Row): Unit = {
    buffer1(0) = buffer1.getLong(0) + buffer2.getLong(0)    //讓sum的值進行累加操做
    buffer1(1) = buffer1.getLong(1) + buffer2.getLong(1)    //讓count的值也進行累加
  }

  //將最終的結果進行計算
  override def evaluate(buffer: Row): Any = {
    buffer.getLong(0).toDouble / buffer.getLong(1)
  }
}

 

三.強類型的Dataset自定義用戶聚合函數案例json

package com.yinzhengjie.bigdata.spark.sql

import org.apache.spark.SparkConf
import org.apache.spark.sql.expressions.Aggregator
import org.apache.spark.sql._


object SparkSQL_UDAF2 {
  def main(args: Array[String]): Unit = {
    //建立spark配置信息
    val sparkConf: SparkConf = new SparkConf().setMaster("local[*]").setAppName("SparkSQLDemo3")

    //建立SparkSQL的環境對象,即建立SparkSession
    val spark:SparkSession = SparkSession.builder().config(sparkConf).getOrCreate()

    //建立我們自定義的聚合函數對象
    val udaf = new MyAgeAvgClassFunction

    //將聚合函數轉換爲查詢的列
    val avgColume:TypedColumn[UserBean,Double] = udaf.toColumn.name("avgAge")

    //讀取json文件,構建DataFrame對象
    val frame:DataFrame = spark.read.json("E:\\yinzhengjie\\bigdata\\input\\json\\user.json")

    /**
      *   舒適提示:
      *     進行轉換以前,須要引入隱式轉換規則,這裏的spark不是包名的含義,而是SparkSession對象的名字喲~
      */
    import spark.implicits._
    val userDS:Dataset[UserBean] = frame.as[UserBean]

    //使用聚合函數
    userDS.select(avgColume).show()

    //釋放資源
    spark.close()
  }
}

/**
  *   強類型的Dataset和弱類型的DataFrame都提供了相關的聚合函數, 如 count(),countDistinct(),avg(),max(),min()。除此以外,用戶能夠設定本身的自定義聚合函數。
  *
  *   強類型用戶自定義聚合函數:經過繼承Aggregator來實現強類型自定義聚合函數
  *
  *   聲明用戶自定義聚合函數(強類型)大體步驟以下:
  *     1>.繼承Aggregator,設定泛型
  *     2>.實現方法
  */
class MyAgeAvgClassFunction extends Aggregator[UserBean,AvgBuffer,Double] {

  //緩衝區初始化操做
  override def zero: AvgBuffer = {
    AvgBuffer(0,0)
  }

  /**
    *   聚合數據
    * @param b
    * @param a
    * @return
    */
  override def reduce(b: AvgBuffer, a: UserBean): AvgBuffer = {
    b.sum = b.sum + a.age
    b.count = b.count + 1
    b
  }

  /**
    *   緩衝區的合併操做
    * @param b1
    * @param b2
    * @return
    */
  override def merge(b1: AvgBuffer, b2: AvgBuffer): AvgBuffer = {
    b1.sum = b1.sum + b2.sum
    b1.count = b1.count + b2.count
    b1
  }

  //完成計算
  override def finish(reduction: AvgBuffer): Double = {
    reduction.sum.toDouble / reduction.count
  }

  //指定緩衝編碼器,固定寫法,注意此時Encoder的泛型是我們定義的樣例類喲~
  override def bufferEncoder: Encoder[AvgBuffer] = {
    Encoders.product
  }

  //指定輸出編碼器,固定寫法,觀察Encoder的泛型,而後調用Encoders對應的類型便可。
  override def outputEncoder: Encoder[Double] = {
    Encoders.scalaDouble
  }
}


//定義樣例類
case class UserBean(name:String,age:BigInt)
case class AvgBuffer(var sum:BigInt,var count:Int)
相關文章
相關標籤/搜索