直接上代碼,詳見註釋sql
import org.apache.spark.sql.hive.HiveContext import org.apache.spark.{SparkContext, SparkConf} /** * Created by zxh on 2016/6/10. */ object UDF_test { def main(args: Array[String]): Unit = { val conf = new SparkConf() implicit val sc = new SparkContext(conf) implicit val sqlContext = new HiveContext(sc) import sqlContext.implicits._ val data = sc.parallelize(Seq(("a", 1), ("bb", 5), ("cccc", 10), ("dddddd", 15))).toDF("a", "b") data.registerTempTable("data") { //函數體採用原生類型(非Column類型),使用udf包裝函數體,將函數體註冊到sqlContext.udf import org.apache.spark.sql.functions._ //函數體 val filter_length_f = (str: String, _length: Int) => { str.length > _length; } //註冊函數體到當前sqlContext,注意,註冊到sqlContext的函數體,參數不能爲Column //註冊後,能夠在如下地方使用:一、df.selectExpr 二、df.filter ,3、將該df註冊爲temptable,以後在sql中使用 sqlContext.udf.register("filter_length", filter_length_f) val filter_length = udf(filter_length_f) //爲方便使用Column,咱們對函數體進行包裝,包裝後的輸入參數爲Column data.select($"*", filter_length($"a", lit(2))).show //使用udf包裝過的,必須傳入Column,注意 lit(2) data.selectExpr("*", " filter_length(a,2) as ax").show //select 若寫表達式調用函數,則須要使用selectExpr data.filter(filter_length($"a", lit(2))).show //同select data.filter("filter_length(a,2)").show //filter調用表達式,能夠直接使用df.filter函數, sqlContext.sql("select *,filter_length(a,2) from data").show sqlContext.sql("select *,filter_length(a,2) from data where filter_length(a,2)").show } { //函數體使用Column類型,沒法註冊到sqlContext.udf //使用udf包裝後,每列都必須輸入column,可否咱們本身定義呢,好比一個參數是Column,一個是其餘類型 import org.apache.spark.sql.functions._ import org.apache.spark.sql.Column val filter_length_f2 = (str: Column, _length: Int) => { length(str) > _length } sqlContext.udf.register("filter_length", filter_length_f2) //todo:很差意思,這裏註冊不了,註冊到sqlContext.udf的函數,入參不支持Column類型 data.select($"*", filter_length_f2($"a", 2)).show //不用udf包裝,咱們就能夠徹底自定義,這時 length 就能夠傳入整型了 data.selectExpr("*", " filter_length_f2(a,2) as ax").show //todo:很差意思,這裏用不了了, data.filter(filter_length_f2($"a", 2)).show //同select data.filter("filter_length(a,2)").show //todo:很差意思,這裏用不了了 } //最後,咱們寫一個相對通用的吧 { //定義兩個函數體,入參一個使用column類型,一個使用原生類型,將原生類型函數註冊到sqlContext.udf import org.apache.spark.sql.functions._ import org.apache.spark.sql.Column //函數體 val filter_length_f = (str: String, _length: Int) => { str.length > _length; } //主函數,下面df.select df.filter 等中使用 val filter_length = (str: Column, _length: Int) => { length(str) > _length } //註冊函數體到當前sqlContext,注意,註冊到sqlContext的函數體,參數不能爲Column //註冊後,能夠在如下地方使用:一、df.selectExpr 二、df.filter ,3、將該df註冊爲temptable,以後在sql中使用 sqlContext.udf.register("filter_length", filter_length_f) //這裏咱們不使用udf了,直接使用本身定義的支持Column的函數 //val filter_length = udf(filter_length_f) //爲方便使用Column,咱們對函數體進行包裝,包裝後的輸入參數爲Column data.select($"*", filter_length($"a", 2)).show //使用udf包裝過的,必須傳入Column,注意 lit(2) data.selectExpr("*", " filter_length(a,2) as ax").show //select 若寫表達式調用函數,則須要使用selectExpr data.filter(filter_length($"a", 2)).show //同select data.filter("filter_length(a,2)").show //filter調用表達式,能夠直接使用df.filter函數, sqlContext.sql("select *,filter_length(a,2) from data").show sqlContext.sql("select *,filter_length(a,2) from data where filter_length(a,2)").show } } }