用戶自定義函數java
做者:尹正傑sql
版權聲明:原創做品,謝絕轉載!不然將追究法律責任。shell
一.用戶自定義UDF函數express
[root@hadoop101.yinzhengjie.org.cn ~]# spark-shell #在"spark-shell"窗口中能夠經過spark.udf功能用戶能夠自定義函數。 20/07/14 01:21:10 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable Using Spark's default log4j profile: org/apache/spark/log4j-defaults.properties Setting default log level to "WARN". To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel). Spark context Web UI available at http://hadoop101.yinzhengjie.org.cn:4040 Spark context available as 'sc' (master = local[*], app id = local-1594660981913). Spark session available as 'spark'. Welcome to ____ __ / __/__ ___ _____/ /__ _\ \/ _ \/ _ `/ __/ '_/ /___/ .__/\_,_/_/ /_/\_\ version 2.4.6 /_/ Using Scala version 2.11.12 (Java HotSpot(TM) 64-Bit Server VM, Java 1.8.0_201) Type in expressions to have them evaluated. Type :help for more information. scala> val df = spark.read.json("file:///tmp/user.json") #讀取數據,建立DataFrame df: org.apache.spark.sql.DataFrame = [name: string, passwd: string] scala> df.show() #展現數據 +-----------+------+ | name|passwd| +-----------+------+ |yinzhengjie| 2020| | Jason|666666| | Liming| 123| | Jenny| 456| | Danny| 789| +-----------+------+ scala> spark.udf.register("myNameFunc", (x:String)=> "Name:"+x) #註冊自定義函數,函數名爲"myNameFunc",功能就是在傳入的字符串前添加"Nmae:"字樣 res1: org.apache.spark.sql.expressions.UserDefinedFunction = UserDefinedFunction(<function1>,StringType,Some(List(StringType))) scala> df.createOrReplaceTempView("user") #建立一張user表 scala> spark.sql("select name, passwd from user").show() #正常查詢user表信息 +-----------+------+ | name|passwd| +-----------+------+ |yinzhengjie| 2020| | Jason|666666| | Liming| 123| | Jenny| 456| | Danny| 789| +-----------+------+ scala> spark.sql("select myNameFunc(name), passwd from user").show() #使用我們自定義函數(myNameFunc)來查詢user表信息 +--------------------+------+ |UDF:myNameFunc(name)|passwd| +--------------------+------+ | Name:yinzhengjie| 2020| | Name:Jason|666666| | Name:Liming| 123| | Name:Jenny| 456| | Name:Danny| 789| +--------------------+------+ scala>
二.弱類型的DataFrame自定義用戶聚合函數案例apache
{"name":"yinzhengjie","passwd":"2020","age":18} {"name":"Jason","passwd":"666666","age":27} {"name":"Liming","passwd":"123","age":49} {"name":"Jenny","passwd":"456","age":23} {"name":"Danny","passwd":"789","age":56}
package com.yinzhengjie.bigdata.spark.sql import org.apache.spark.SparkConf import org.apache.spark.sql.{DataFrame, Row, SparkSession} import org.apache.spark.sql.expressions.{MutableAggregationBuffer, UserDefinedAggregateFunction} import org.apache.spark.sql.types.{DataType, DoubleType, LongType, StructType} object SparkSQL_UDAF { def main(args: Array[String]): Unit = { //建立spark配置信息 val sparkConf: SparkConf = new SparkConf().setMaster("local[*]").setAppName("SparkSQLDemo3") //建立SparkSQL的環境對象,即建立SparkSession val spark:SparkSession = SparkSession.builder().config(sparkConf).getOrCreate() //建立我們自定義的聚合函數對象 val udaf = new MyAgeAvgFunction //註冊函數 spark.udf.register("avgAge",udaf) //讀取json文件,構建DataFrame對象 val frame:DataFrame = spark.read.json("E:\\yinzhengjie\\bigdata\\input\\json\\user.json") //使用聚合函數 frame.createOrReplaceTempView("user") spark.sql("select avgAge(age) from user").show() //釋放資源 spark.close() } } /** * 強類型的Dataset和弱類型的DataFrame都提供了相關的聚合函數, 如 count(),countDistinct(),avg(),max(),min()。除此以外,用戶能夠設定本身的自定義聚合函數。 * * 弱類型用戶自定義聚合函數:經過繼承UserDefinedAggregateFunction來實現用戶自定義聚合函數。 * * 聲明用戶自定義聚合函數大體步驟以下: * 1>.繼承UserDefinedAggregateFunction * 2>.實現方法 */ class MyAgeAvgFunction extends UserDefinedAggregateFunction{ //定義函數輸入的數據結構 override def inputSchema: StructType = { new StructType().add("age",LongType) } //定義計算時的數據結構 override def bufferSchema: StructType = { new StructType().add("sum",LongType).add("count",LongType) } //函數返回的數據類型 override def dataType: DataType = { DoubleType } //函數是否穩定(冪等性,也就是說給你相同的值你計算的結果是一致的) override def deterministic: Boolean = { true } //計算以前緩衝區的初始化 override def initialize(buffer: MutableAggregationBuffer): Unit = { buffer(0) = 0L //表明第一個結構,即上面定義的"sum"初始值爲"0" buffer(1) = 0L //表明第二個結構,即上面定義的"count"初始值爲"0" } //根據查詢結果更新緩衝區數據 override def update(buffer: MutableAggregationBuffer, input: Row): Unit = { buffer(0) = buffer.getLong(0) + input.getLong(0) //sum進行累加操做 buffer(1) = buffer.getLong(1) + 1 //count加1操做 } //將多個節點的緩衝區進行合併 override def merge(buffer1: MutableAggregationBuffer, buffer2: Row): Unit = { buffer1(0) = buffer1.getLong(0) + buffer2.getLong(0) //讓sum的值進行累加操做 buffer1(1) = buffer1.getLong(1) + buffer2.getLong(1) //讓count的值也進行累加 } //將最終的結果進行計算 override def evaluate(buffer: Row): Any = { buffer.getLong(0).toDouble / buffer.getLong(1) } }
三.強類型的Dataset自定義用戶聚合函數案例json
package com.yinzhengjie.bigdata.spark.sql import org.apache.spark.SparkConf import org.apache.spark.sql.expressions.Aggregator import org.apache.spark.sql._ object SparkSQL_UDAF2 { def main(args: Array[String]): Unit = { //建立spark配置信息 val sparkConf: SparkConf = new SparkConf().setMaster("local[*]").setAppName("SparkSQLDemo3") //建立SparkSQL的環境對象,即建立SparkSession val spark:SparkSession = SparkSession.builder().config(sparkConf).getOrCreate() //建立我們自定義的聚合函數對象 val udaf = new MyAgeAvgClassFunction //將聚合函數轉換爲查詢的列 val avgColume:TypedColumn[UserBean,Double] = udaf.toColumn.name("avgAge") //讀取json文件,構建DataFrame對象 val frame:DataFrame = spark.read.json("E:\\yinzhengjie\\bigdata\\input\\json\\user.json") /** * 舒適提示: * 進行轉換以前,須要引入隱式轉換規則,這裏的spark不是包名的含義,而是SparkSession對象的名字喲~ */ import spark.implicits._ val userDS:Dataset[UserBean] = frame.as[UserBean] //使用聚合函數 userDS.select(avgColume).show() //釋放資源 spark.close() } } /** * 強類型的Dataset和弱類型的DataFrame都提供了相關的聚合函數, 如 count(),countDistinct(),avg(),max(),min()。除此以外,用戶能夠設定本身的自定義聚合函數。 * * 強類型用戶自定義聚合函數:經過繼承Aggregator來實現強類型自定義聚合函數 * * 聲明用戶自定義聚合函數(強類型)大體步驟以下: * 1>.繼承Aggregator,設定泛型 * 2>.實現方法 */ class MyAgeAvgClassFunction extends Aggregator[UserBean,AvgBuffer,Double] { //緩衝區初始化操做 override def zero: AvgBuffer = { AvgBuffer(0,0) } /** * 聚合數據 * @param b * @param a * @return */ override def reduce(b: AvgBuffer, a: UserBean): AvgBuffer = { b.sum = b.sum + a.age b.count = b.count + 1 b } /** * 緩衝區的合併操做 * @param b1 * @param b2 * @return */ override def merge(b1: AvgBuffer, b2: AvgBuffer): AvgBuffer = { b1.sum = b1.sum + b2.sum b1.count = b1.count + b2.count b1 } //完成計算 override def finish(reduction: AvgBuffer): Double = { reduction.sum.toDouble / reduction.count } //指定緩衝編碼器,固定寫法,注意此時Encoder的泛型是我們定義的樣例類喲~ override def bufferEncoder: Encoder[AvgBuffer] = { Encoders.product } //指定輸出編碼器,固定寫法,觀察Encoder的泛型,而後調用Encoders對應的類型便可。 override def outputEncoder: Encoder[Double] = { Encoders.scalaDouble } } //定義樣例類 case class UserBean(name:String,age:BigInt) case class AvgBuffer(var sum:BigInt,var count:Int)