SparkSQL之UDF使用

package cn.piesat.testimport org.apache.spark.sql.SparkSessionimport scala.collection.mutable.ArrayBufferobject SparkSQLTest {  def main(args: Array[String]): Unit = {    val spark=SparkSession.builder().appName("sparkSql").master("local[4]")      .config("spark.serializer","org.apache.spark.serializer.KryoSerializer").getOrCreate()    val sc=spark.sparkContext    val sqlContext=spark.sqlContext    val workerRDD=sc.textFile("F://Workers.txt").mapPartitions(itor=>{      val array=new ArrayBuffer[Worker]()      while(itor.hasNext){        val splited=itor.next().split(",")        array.append(new Worker(splited(0),splited(2).toInt,splited(2)))      }      array.toIterator    })    import spark.implicits._    //註冊UDF    spark.udf.register("strLen",(str:String,addr:String)=>str.length+addr.length)    val workDS=workerRDD.toDS()    workDS.createOrReplaceTempView("worker")    val resultDF=spark.sql("select strLen(name,addr) from worker")    val resultDS=resultDF.as("WO")    resultDS.show()    spark.stop()  }}
相關文章
相關標籤/搜索