spark udf 初識初用

直接上代碼,詳見註釋sql

import org.apache.spark.sql.hive.HiveContext
import org.apache.spark.{SparkContext, SparkConf}

/**
 * Created by zxh on 2016/6/10.
 */
object UDF_test {
  def main(args: Array[String]): Unit = {

    val conf = new SparkConf()
    implicit val sc = new SparkContext(conf)
    implicit val sqlContext = new HiveContext(sc)

    import sqlContext.implicits._

    val data = sc.parallelize(Seq(("a", 1), ("bb", 5), ("cccc", 10), ("dddddd", 15))).toDF("a", "b")
    data.registerTempTable("data")


    {
      //函數體採用原生類型(非Column類型),使用udf包裝函數體,將函數體註冊到sqlContext.udf
      import org.apache.spark.sql.functions._

      //函數體
      val filter_length_f = (str: String, _length: Int) => {
        str.length > _length;
      }

      //註冊函數體到當前sqlContext,注意,註冊到sqlContext的函數體,參數不能爲Column
      //註冊後,能夠在如下地方使用:一、df.selectExpr 二、df.filter ,3、將該df註冊爲temptable,以後在sql中使用
      sqlContext.udf.register("filter_length", filter_length_f)

      val filter_length = udf(filter_length_f) //爲方便使用Column,咱們對函數體進行包裝,包裝後的輸入參數爲Column

      data.select($"*", filter_length($"a", lit(2))).show //使用udf包裝過的,必須傳入Column,注意 lit(2)
      data.selectExpr("*", " filter_length(a,2) as ax").show //select 若寫表達式調用函數,則須要使用selectExpr

      data.filter(filter_length($"a", lit(2))).show //同select
      data.filter("filter_length(a,2)").show //filter調用表達式,能夠直接使用df.filter函數,

      sqlContext.sql("select *,filter_length(a,2) from data").show
      sqlContext.sql("select *,filter_length(a,2) from data where filter_length(a,2)").show
    }
    {
      //函數體使用Column類型,沒法註冊到sqlContext.udf
      //使用udf包裝後,每列都必須輸入column,可否咱們本身定義呢,好比一個參數是Column,一個是其餘類型
      import org.apache.spark.sql.functions._
      import org.apache.spark.sql.Column

      val filter_length_f2 = (str: Column, _length: Int) => {
        length(str) > _length
      }
      sqlContext.udf.register("filter_length", filter_length_f2) //todo:很差意思,這裏註冊不了,註冊到sqlContext.udf的函數,入參不支持Column類型

      data.select($"*", filter_length_f2($"a", 2)).show //不用udf包裝,咱們就能夠徹底自定義,這時 length 就能夠傳入整型了
      data.selectExpr("*", " filter_length_f2(a,2) as ax").show //todo:很差意思,這裏用不了了,

      data.filter(filter_length_f2($"a", 2)).show //同select
      data.filter("filter_length(a,2)").show //todo:很差意思,這裏用不了了

    }
    //最後,咱們寫一個相對通用的吧
    {
      //定義兩個函數體,入參一個使用column類型,一個使用原生類型,將原生類型函數註冊到sqlContext.udf

      import org.apache.spark.sql.functions._
      import org.apache.spark.sql.Column

      //函數體
      val filter_length_f = (str: String, _length: Int) => {
        str.length > _length;
      }
      //主函數,下面df.select df.filter 等中使用
      val filter_length = (str: Column, _length: Int) => {
        length(str) > _length
      }
      //註冊函數體到當前sqlContext,注意,註冊到sqlContext的函數體,參數不能爲Column
      //註冊後,能夠在如下地方使用:一、df.selectExpr 二、df.filter ,3、將該df註冊爲temptable,以後在sql中使用
      sqlContext.udf.register("filter_length", filter_length_f)

      //這裏咱們不使用udf了,直接使用本身定義的支持Column的函數
      //val filter_length = udf(filter_length_f) //爲方便使用Column,咱們對函數體進行包裝,包裝後的輸入參數爲Column

      data.select($"*", filter_length($"a", 2)).show //使用udf包裝過的,必須傳入Column,注意 lit(2)
      data.selectExpr("*", " filter_length(a,2) as ax").show //select 若寫表達式調用函數,則須要使用selectExpr

      data.filter(filter_length($"a", 2)).show //同select
      data.filter("filter_length(a,2)").show //filter調用表達式,能夠直接使用df.filter函數,

      sqlContext.sql("select *,filter_length(a,2) from data").show
      sqlContext.sql("select *,filter_length(a,2) from data where filter_length(a,2)").show
    }


  }

}
相關文章
相關標籤/搜索