<iframe width="800" height="500" src="//player.bilibili.com/player.html?aid=38193405&cid=67137841&page=4" scrolling="no" border="0" frameborder="no" framespacing="0" allowfullscreen="true"> </iframe>html
用戶定義函數(User-defined functions, UDFs)是大多數 SQL 環境的關鍵特性,用於擴展系統的內置功能。 UDF容許開發人員經過抽象其低級語言實現來在更高級語言(如SQL)中啓用新功能。 Apache Spark 也不例外,而且提供了用於將 UDF 與 Spark SQL工做流集成的各類選項。java
##示例git
/** * 獲得SparkSession * 首先 extends BaseSparkSession * 本地: val spark = sparkSession(true) * 集羣: val spark = sparkSession() */ class BaseSparkSession { var appName = "sparkSession" var master = "spark://standalone.com:7077" //本地模式:local standalone:spark://master:7077 def sparkSession(): SparkSession = { val spark = SparkSession.builder .master(master) .appName(appName) .config("spark.eventLog.enabled","true") .config("spark.history.fs.logDirectory","hdfs://standalone.com:9000/spark/log/historyEventLog") .config("spark.eventLog.dir","hdfs://standalone.com:9000/spark/log/historyEventLog") .getOrCreate() spark.sparkContext.addJar("/opt/n_001_workspaces/bigdata/spark-scala-maven-2.4.0/target/spark-scala-maven-2.4.0-1.0-SNAPSHOT.jar") //import spark.implicits._ spark } def sparkSession(isLocal:Boolean = false): SparkSession = { if(isLocal){ master = "local" val spark = SparkSession.builder .master(master) .appName(appName) .getOrCreate() //spark.sparkContext.addJar("/opt/n_001_workspaces/bigdata/spark-scala-maven-2.4.0/target/spark-scala-maven-2.4.0-1.0-SNAPSHOT.jar") //import spark.implicits._ spark }else{ val spark = SparkSession.builder .master(master) .appName(appName) .config("spark.eventLog.enabled","true") .config("spark.history.fs.logDirectory","hdfs://standalone.com:9000/spark/log/historyEventLog") .config("spark.eventLog.dir","hdfs://standalone.com:9000/spark/log/historyEventLog") .getOrCreate() // spark.sparkContext.addJar("/opt/n_001_workspaces/bigdata/spark-scala-maven-2.4.0/target/spark-scala-maven-2.4.0-1.0-SNAPSHOT.jar") //import spark.implicits._ spark } } /** * 獲得當前工程的路徑 * @return */ def getProjectPath:String=System.getProperty("user.dir") }
/** * 自定義匿名函數 * 功能: 獲得某列數據長度的函數 */ object Run extends BaseSparkSession{ def main(args: Array[String]): Unit = { val spark = sparkSession(true) val ds = spark.read.json("hdfs://standalone.com:9000/home/liuwen/data/employees.json") ds.show() // +-------+------+ // | name|salary| // +-------+------+ // |Michael| 3000| // | Andy| 4500| // | Justin| 3500| // | Berta| 4000| // +-------+------+ spark.udf.register("strLength",(str: String) => str.length()) ds.createOrReplaceTempView("employees") spark.sql("select name,salary,strLength(name) as name_Length from employees").show() // +-------+------+-----------+ // | name|salary|name_Length| // +-------+------+-----------+ // |Michael| 3000| 7| // | Andy| 4500| 4| // | Justin| 3500| 6| // | Berta| 4000| 5| // +-------+------+-----------+ spark.stop() } }
import com.opensource.bigdata.spark.standalone.base.BaseSparkSession /** * 自定義匿名函數 * 功能: 獲得某列數據長度的函數 */ object Run extends BaseSparkSession{ def main(args: Array[String]): Unit = { val spark = sparkSession(true) val ds = spark.read.json("hdfs://standalone.com:9000/home/liuwen/data/employees.json") ds.show() // +-------+------+ // | name|salary| // +-------+------+ // |Michael| 3000| // | Andy| 4500| // | Justin| 3500| // | Berta| 4000| // +-------+------+ import org.apache.spark.sql.functions._ val strUpper = udf((str: String) => str.toUpperCase()) import spark.implicits._ ds.withColumn("toUpperCase", strUpper($"name")).show // +-------+------+-----------+ // | name|salary|toUpperCase| // +-------+------+-----------+ // |Michael| 3000| MICHAEL| // | Andy| 4500| ANDY| // | Justin| 3500| JUSTIN| // | Berta| 4000| BERTA| // +-------+------+-----------+ spark.stop() } }
package com.opensource.bigdata.spark.sql.n_08_spark_udaf.n_01_spark_udaf_count import com.opensource.bigdata.spark.standalone.base.BaseSparkSession import org.apache.spark.sql.Row import org.apache.spark.sql.expressions.{MutableAggregationBuffer, UserDefinedAggregateFunction} import org.apache.spark.sql.types._ /** * ).initialize()方法,初使使,即沒數據時的值 * ).update() 方法把每一行的數據進行計算,放到緩衝對象中 * ).merge() 把每一個分區,緩衝對象進行合併 * ).evaluate()計算結果表達式,把緩衝對象中的數據進行最終計算 */ object Run2 extends BaseSparkSession{ object CustomerCount extends UserDefinedAggregateFunction{ //聚合函數的輸入參數數據類型 def inputSchema: StructType = { StructType(StructField("inputColumn",StringType) :: Nil) } //中間緩存的數據類型 def bufferSchema: StructType = { StructType(StructField("sum",LongType) :: Nil) } //最終輸出結果的數據類型 def dataType: DataType = LongType def deterministic: Boolean = true //初始值,要是DataSet沒有數據,就返回該值 def initialize(buffer: MutableAggregationBuffer): Unit = { buffer(0) = 0L } /** * * @param buffer 至關於把當前分區的,每行數據都須要進行計算,計算的結果保存到buffer中 * @param input */ def update(buffer: MutableAggregationBuffer, input: Row): Unit ={ if(!input.isNullAt(0)){ buffer(0) = buffer.getLong(0) + 1 } } /** * 至關於把每一個分區的數據進行彙總 * @param buffer1 分區一的數據 * @param buffer2 分區二的數據 */ def merge(buffer1: MutableAggregationBuffer, buffer2: Row): Unit={ buffer1(0) = buffer1.getLong(0) +buffer2.getLong(0) // salary } //計算最終的結果 def evaluate(buffer: Row): Long = buffer.getLong(0) } def main(args: Array[String]): Unit = { val spark = sparkSession(true) spark.udf.register("customerCount",CustomerCount) val df = spark.read.json("hdfs://standalone.com:9000/home/liuwen/data/employees.json") df.createOrReplaceTempView("employees") val sqlDF = spark.sql("select customerCount(name) as average_salary from employees ") df.show() // +-------+------+ // | name|salary| // +-------+------+ // |Michael| 3000| // | Andy| 4500| // | Justin| 3500| // | Berta| 4000| // +-------+------+ sqlDF.show() // +--------------+ // |average_salary| // +--------------+ // | 4.0| // +--------------+ spark.stop() } }
package com.opensource.bigdata.spark.sql.n_08_spark_udaf.n_03_spark_udaf_sum import com.opensource.bigdata.spark.standalone.base.BaseSparkSession import org.apache.spark.sql.Row import org.apache.spark.sql.expressions.{MutableAggregationBuffer, UserDefinedAggregateFunction} import org.apache.spark.sql.types._ /** * ).initialize()方法,初使使,即沒數據時的值 * ).update() 方法把每一行的數據進行計算,放到緩衝對象中 * ).merge() 把每一個分區,緩衝對象進行合併 * ).evaluate()計算結果表達式,把緩衝對象中的數據進行最終計算 */ object Run extends BaseSparkSession{ object CustomerSum extends UserDefinedAggregateFunction{ //聚合函數的輸入參數數據類型 def inputSchema: StructType = { StructType(StructField("inputColumn",LongType) :: Nil) } //中間緩存的數據類型 def bufferSchema: StructType = { StructType(StructField("sum",LongType) :: StructField("count",LongType) :: Nil) } //最終輸出結果的數據類型 def dataType: DataType = LongType def deterministic: Boolean = true //初始值,要是DataSet沒有數據,就返回該值 def initialize(buffer: MutableAggregationBuffer): Unit = { buffer(0) = 0L } /** * * @param buffer 至關於把當前分區的,每行數據都須要進行計算,計算的結果保存到buffer中 * @param input */ def update(buffer: MutableAggregationBuffer, input: Row): Unit ={ if(!input.isNullAt(0)){ buffer(0) = buffer.getLong(0) + input.getLong(0) } } /** * 至關於把每一個分區的數據進行彙總 * @param buffer1 分區一的數據 * @param buffer2 分區二的數據 */ def merge(buffer1: MutableAggregationBuffer, buffer2: Row): Unit={ buffer1(0) = buffer1.getLong(0) + buffer2.getLong(0) } //計算最終的結果 def evaluate(buffer: Row): Long = buffer.getLong(0) } def main(args: Array[String]): Unit = { val spark = sparkSession(true) spark.udf.register("customerSum",CustomerSum) val df = spark.read.json("hdfs://standalone.com:9000/home/liuwen/data/employees.json") df.createOrReplaceTempView("employees") val sqlDF = spark.sql("select customerSum(salary) as average_salary from employees ") df.show // +-------+------+ // | name|salary| // +-------+------+ // |Michael| 3000| // | Andy| 4500| // | Justin| 3500| // | Berta| 4000| // +-------+------+ sqlDF.show() // +--------------+ // |average_salary| // +--------------+ // | 15000| // +--------------+ spark.stop() } }
package com.opensource.bigdata.spark.sql.n_08_spark_udaf.n_04_spark_udaf_average import com.opensource.bigdata.spark.standalone.base.BaseSparkSession import org.apache.spark.sql.Row import org.apache.spark.sql.expressions.{MutableAggregationBuffer, UserDefinedAggregateFunction} import org.apache.spark.sql.types._ object Run extends BaseSparkSession{ object MyAverage extends UserDefinedAggregateFunction{ //聚合函數的輸入參數數據類型 def inputSchema: StructType = { StructType(StructField("inputColumn",LongType) :: Nil) } //中間緩存的數據類型 def bufferSchema: StructType = { StructType(StructField("sum",LongType) :: StructField("count",LongType) :: Nil) } //最終輸出結果的數據類型 def dataType: DataType = DoubleType def deterministic: Boolean = true //初始值,要是DataSet沒有數據,就返回該值 def initialize(buffer: MutableAggregationBuffer): Unit = { buffer(0) = 0L buffer(1) = 0L } /** * * @param buffer 至關於把當前分區的,每行數據都須要進行計算,計算的結果保存到buffer中 * @param input */ def update(buffer: MutableAggregationBuffer, input: Row): Unit ={ if(!input.isNullAt(0)){ buffer(0) = buffer.getLong(0) + input.getLong(0) // salary buffer(1) = buffer.getLong(1) + 1 // count } } /** * 至關於把每一個分區的數據進行彙總 * @param buffer1 分區一的數據 * @param buffer2 分區二的數據 */ def merge(buffer1: MutableAggregationBuffer, buffer2: Row): Unit={ buffer1(0) = buffer1.getLong(0) +buffer2.getLong(0) // salary buffer1(1) = buffer1.getLong(1) +buffer2.getLong(1) // count } //計算最終的結果 def evaluate(buffer: Row): Double = buffer.getLong(0).toDouble / buffer.getLong(1) } def main(args: Array[String]): Unit = { val spark = sparkSession(true) spark.udf.register("MyAverage",MyAverage) val df = spark.read.json("hdfs://standalone.com:9000/home/liuwen/data/employees.json") df.createOrReplaceTempView("employees") val sqlDF = spark.sql("select MyAverage(salary) as average_salary from employees ") sqlDF.show() spark.stop() } }
package com.opensource.bigdata.spark.sql.n_08_spark_udaf.n_05_spark_udaf_groupby_max import com.opensource.bigdata.spark.standalone.base.BaseSparkSession import org.apache.spark.sql.Row import org.apache.spark.sql.expressions.{MutableAggregationBuffer, UserDefinedAggregateFunction} import org.apache.spark.sql.types._ /** * ).initialize()方法,初使使,即沒數據時的值 * ).update() 方法把每一行的數據進行計算,放到緩衝對象中 * ).merge() 把每一個分區,緩衝對象進行合併 * ).evaluate()計算結果表達式,把緩衝對象中的數據進行最終計算 */ object Run extends BaseSparkSession{ object CustomerMax extends UserDefinedAggregateFunction{ //聚合函數的輸入參數數據類型 def inputSchema: StructType = { StructType(StructField("inputColumn",LongType) :: Nil) } //中間緩存的數據類型 def bufferSchema: StructType = { StructType(StructField("sum",LongType) :: StructField("count",LongType) :: Nil) } //最終輸出結果的數據類型 def dataType: DataType = LongType def deterministic: Boolean = true //初始值,要是DataSet沒有數據,就返回該值 def initialize(buffer: MutableAggregationBuffer): Unit = { buffer(0) = 0L } /** * * @param buffer 至關於把當前分區的,每行數據都須要進行計算,計算的結果保存到buffer中 * @param input */ def update(buffer: MutableAggregationBuffer, input: Row): Unit ={ if(!input.isNullAt(0)){ if(input.getLong(0) > buffer.getLong(0)){ buffer(0) = input.getLong(0) } } } /** * 至關於把每一個分區的數據進行彙總 * @param buffer1 分區一的數據 * @param buffer2 分區二的數據 */ def merge(buffer1: MutableAggregationBuffer, buffer2: Row): Unit={ if( buffer2.getLong(0) > buffer1.getLong(0)) buffer1(0) = buffer2.getLong(0) } //計算最終的結果 def evaluate(buffer: Row): Long = buffer.getLong(0) } def main(args: Array[String]): Unit = { val spark = sparkSession(true) spark.udf.register("customerMax",CustomerMax) val df = spark.read.json("hdfs://standalone.com:9000/home/liuwen/data/employeesCN.json") df.createOrReplaceTempView("employees") val sqlDF = spark.sql("select gender,customerMax(salary) as average_salary from employees group by gender ") df.show // +------+----+------+ // |gender|name|salary| // +------+----+------+ // | 男|小王| 30000| // | 女|小麗| 50000| // | 男|小軍| 80000| // | 女|小李| 90000| // +------+----+------+ sqlDF.show() // +------+--------------+ // |gender|average_salary| // +------+--------------+ // | 男| 80000| // | 女| 90000| // +------+--------------+ spark.stop() } }