Spark SQL應用解析

一  Spark SQL概述

1.1 什麼是Spark SQL

  Spark SQL是Spark用來處理結構化數據的一個模塊,它提供了一個編程抽象叫作DataFrame而且做爲分佈式SQL查詢引擎的做用。程序員

  Hive是將Hive SQL轉換成MapReduce而後提交到集羣上執行,大大簡化了編寫MapReduce的程序的複雜性,因爲MapReduce這種計算模型執行效率比較慢。全部Spark SQL的應運而生,它是將Spark SQL轉換成RDD,而後提交到集羣執行,執行效率很是快!sql

  1.易整合shell

  2.統一的數據訪問方式數據庫

  3.兼容Hiveexpress

  4.標準的數據鏈接apache

  SparkSQL能夠看作是一個轉換層,向下對接各類不一樣的結構化數據源,向上提供不一樣的數據訪問方式。編程

1.2 RDD vs DataFrames vs DataSet

在SparkSQL中Spark爲咱們提供了兩個新的抽象,分別是DataFrame和DataSet。他們和RDD的區別,首先從版本的產生上來看:
RDD (Spark1.0) —> Dataframe(Spark1.3) —> Dataset(Spark1.6)json

若是一樣的數據都給到這三個數據結構,他們分別計算以後,都會給出相同的結果。不一樣是的他們的執行效率和執行方式。緩存

在後期的Spark版本中,DataSet會逐步取代RDD和DataFrame成爲惟一的API接口。安全

1.2.1  RDD

  • RDD是一個懶執行的不可變的能夠支持Lambda表達式的並行數據集合。
  • RDD的最大好處就是簡單,API的人性化程度很高。
  • RDD的劣勢是性能限制,它是一個JVM駐內存對象,這也就決定了存在GC(垃圾收集器)的限制和數據增長時Java序列化成本的升高。

1.2.2 Dataframe

  與RDD相似,DataFrame也是一個分佈式數據容器。然而DataFrame更像傳統數據庫的二維表格,除了數據之外,還記錄數據的結構信息,即schema。同時,與Hive相似,DataFrame也支持嵌套數據類型(struct、array和map)。從API易用性的角度上看,DataFrame API提供的是一套高層的關係操做,比函數式的RDD API要更加友好,門檻更低。因爲與R和Pandas的DataFrame相似,Spark DataFrame很好地繼承了傳統單機數據分析的開發體驗。

  上圖直觀地體現了DataFrame和RDD的區別。左側的RDD[Person]雖然以Person爲類型參數,但Spark框架自己不瞭解Person類的內部結構。而右側的DataFrame卻提供了詳細的結構信息,使得Spark SQL能夠清楚地知道該數據集中包含哪些列,每列的名稱和類型各是什麼。DataFrame多了數據的結構信息,即schema。RDD是分佈式的Java對象的集合。DataFrame是分佈式的Row對象的集合。DataFrame除了提供了比RDD更豐富的算子之外,更重要的特色是提高執行效率、減小數據讀取以及執行計劃的優化,好比filter下推、裁剪等。

DataFrame是爲數據提供了Schema的視圖。能夠把它當作數據庫中的一張表來對待

DataFrame也是懶執行的。

性能上比RDD要高,主要有兩方面緣由:

    定製化內存管理
    數據以二進制的方式存在於非堆內存,節省了大量空間以外,還擺脫了GC(垃圾收集器)的限制。

查詢優化器的意義在於,即使是經驗並不豐富的程序員寫出的次優的查詢,也能夠被儘可能轉換爲高效的形式予以執行。

Dataframe的劣勢在於在編譯期缺乏類型安全檢查,致使運行時出錯.

1.2.3  Dataset

1)     是Dataframe API的一個擴展,是Spark最新的數據抽象

2)     用戶友好的API風格,既具備類型安全檢查也具備Dataframe的查詢優化特性。

3)     Dataset支持編解碼器,當須要訪問非堆上的數據時能夠避免反序列化整個對象,提升了效率。

4)     樣例類被用來在Dataset中定義數據的結構信息,樣例類中每一個屬性的名稱直接映射到DataSet中的字段名稱。

5)     Dataframe是Dataset的特列,DataFrame=Dataset[Row] ,因此能夠經過as方法將Dataframe轉換爲Dataset。Row是一個類型,跟Car、Person這些的類型同樣,全部的表結構信息都用Row來表示。

6)     DataSet是強類型的。好比能夠有Dataset[Car],Dataset[Person].

  DataFrame只是知道字段,可是不知道字段的類型,因此在執行這些操做的時候是沒辦法在編譯的時候檢查是否類型失敗的,好比你能夠對一個String進行減法操做,在執行的時候才報錯,而DataSet不只僅知道字段,並且知道字段類型,因此有更嚴格的錯誤檢查。就跟JSON對象和類對象之間的類比。

RDD讓咱們可以決定怎麼作,而DataFrame和DataSet讓咱們決定作什麼,控制的粒度不同。

1.2.4 三者的共性

  一、RDD、DataFrame、Dataset全都是spark平臺下的分佈式彈性數據集,爲處理超大型數據提供便利

  二、三者都有惰性機制,在進行建立、轉換,如map方法時,不會當即執行,只有在遇到Action如foreach時,三者纔會開始遍歷運算,極端狀況下,若是代碼裏面有建立、轉換,可是後面沒有在Action中使用對應的結果,在執行時會被直接跳過.

val sparkconf = new SparkConf().setMaster("local").setAppName("test").set("spark.port.maxRetries","1000")
val spark = SparkSession.builder().config(sparkconf).getOrCreate()
val rdd=spark.sparkContext.parallelize(Seq(("a", 1), ("b", 1), ("a", 1)))
// map不運行
rdd.map{line=>
  println("運行")
  line._1
}

  三、三者都會根據spark的內存狀況自動緩存運算,這樣即便數據量很大,也不用擔憂會內存溢出

  四、三者都有partition的概念

  五、三者有許多共同的函數,如filter,排序等

  六、在對DataFrame和Dataset進行操做許多操做都須要這個包進行支持

import spark.implicits._

  七、DataFrame和Dataset都可使用模式匹配獲取各個字段的值和類型

DataFrame:

testDF.map{
      case Row(col1:String,col2:Int)=>
        println(col1);println(col2)
        col1
      case _=>
        ""
    }

Dataset:

case class Coltest(col1:String,col2:Int)extends Serializable //定義字段名和類型
    testDS.map{
      case Coltest(col1:String,col2:Int)=>
        println(col1);println(col2)
        col1
      case _=>
        ""
    }

1.2.5 三者的區別

RDD:

一、RDD通常和spark mlib同時使用

二、RDD不支持sparksql操做

DataFrame:

一、與RDD和Dataset不一樣,DataFrame每一行的類型固定爲Row,只有經過解析才能獲取各個字段的值,如

testDF.foreach{
  line =>
    val col1=line.getAs[String]("col1")
    val col2=line.getAs[String]("col2")
}

每一列的值無法直接訪問

二、DataFrame與Dataset通常與spark ml同時使用

三、DataFrame與Dataset均支持sparksql的操做,好比select,groupby之類,還能註冊臨時表/視窗,進行sql語句操做,如

dataDF.createOrReplaceTempView("tmp")
spark.sql("select  ROW,DATE from tmp where DATE is not null order by DATE").show(100,false)

四、DataFrame與Dataset支持一些特別方便的保存方式,好比保存成csv,能夠帶上表頭,這樣每一列的字段名一目瞭然

//保存
val saveoptions = Map("header" -> "true", "delimiter" -> "\t", "path" -> "hdfs://master01:9000/test")
datawDF.write.format("com.atguigu.spark.csv").mode(SaveMode.Overwrite).options(saveoptions).save()
//讀取
val options = Map("header" -> "true", "delimiter" -> "\t", "path" -> "hdfs://master01:9000/test")
val datarDF= spark.read.options(options).format("com.atguigu.spark.csv").load()

利用這樣的保存方式,能夠方便的得到字段名和列的對應,並且分隔符(delimiter)能夠自由指定。

Dataset:

Dataset和DataFrame擁有徹底相同的成員函數,區別只是每一行的數據類型不一樣。

DataFrame也能夠叫Dataset[Row],每一行的類型是Row,不解析,每一行究竟有哪些字段,各個字段又是什麼類型都無從得知,只能用上面提到的getAS方法或者共性中的第七條提到的模式匹配拿出特定字段

而Dataset中,每一行是什麼類型是不必定的,在自定義了case class以後能夠很自由的得到每一行的信息

case class Coltest(col1:String,col2:Int)extends Serializable //定義字段名和類型
/**
 rdd
 ("a", 1)
 ("b", 1)
 ("a", 1)
**/
val test: Dataset[Coltest]=rdd.map{line=>
      Coltest(line._1,line._2)
    }.toDS
test.map{
      line=>
        println(line.col1)
        println(line.col2)
    }

  能夠看出,Dataset在須要訪問列中的某個字段時是很是方便的,然而,若是要寫一些適配性很強的函數時,若是使用Dataset,行的類型又不肯定,多是各類case class,沒法實現適配,這時候用DataFrame即Dataset[Row]就能比較好的解決問題

二。執行SparkSQL查詢

2.1 命令行查詢流程

打開Spark shell

例子:查詢大於30歲的用戶

建立以下JSON文件,注意JSON的格式:

{"name":"Michael"}
{"name":"Andy", "age":30}
{"name":"Justin", "age":19}

2.2 IDEA建立SparkSQL程序

IDEA中程序的打包和運行方式都和SparkCore相似,Maven依賴中須要添加新的依賴項:

<dependency>
    <groupId>org.apache.spark</groupId>
    <artifactId>spark-sql_2.11</artifactId>
    <version>${spark.version}</version>
    <scope>provided</scope>
</dependency>

程序以下

package com.atguigu.sparksql

import org.apache.spark.sql.SparkSession
import org.apache.spark.{SparkConf, SparkContext}
import org.slf4j.LoggerFactory


/**
  * Created by wuyufei on 22/06/2019.
  */
object HelloWorld {

  val logger = LoggerFactory.getLogger(HelloWorld.getClass)

  def main(args: Array[String]) {
    //建立SparkConf()並設置App名稱
    val spark = SparkSession
      .builder()
      .appName("Spark SQL basic example")
      .config("spark.some.config.option", "some-value")
      .getOrCreate()

    // RDDs 轉 DataFrames 須要的包
    import spark.implicits._

    val df = spark.read.json("examples/src/main/resources/people.json")

    df.show()

    df.filter($"age" > 21).show()

    df.createOrReplaceTempView("persons")

    spark.sql("SELECT * FROM persons where age > 21").show()

    spark.stop()
  }

}

三。SparkSQL解析

3.1 新的起始點SparkSession

  在老的版本中,SparkSQL提供兩種SQL查詢起始點,一個叫SQLContext,用於Spark本身提供的SQL查詢,一個叫HiveContext,用於鏈接Hive的查詢,SparkSession是Spark最新的SQL查詢起始點,實質上是SQLContext和HiveContext的組合,因此在SQLContext和HiveContext上可用的API在SparkSession上一樣是可使用的。SparkSession內部封裝了sparkContext,因此計算其實是由sparkContext完成的。

import org.apache.spark.sql.SparkSession

val spark = SparkSession
.builder()
.appName("Spark SQL basic example")
.config("spark.some.config.option", "some-value")
.getOrCreate()

// RDDs 轉 DataFrames 須要的包
import spark.implicits._

SparkSession.builder 用於建立一個SparkSession。

import spark.implicits._的引入是用於將DataFrames隱式轉換成RDD,使df可以使用RDD中的方法。

若是須要Hive支持,則須要如下建立語句:

import org.apache.spark.sql.SparkSession

val spark = SparkSession
.builder()
.appName("Spark SQL basic example")
.config("spark.some.config.option", "some-value")
.enableHiveSupport()
.getOrCreate()
// RDDs 轉 DataFrames 須要的包
import spark.implicits._

3.2 建立DataFrames

  在Spark SQL中SparkSession是建立DataFrames和執行SQL的入口,建立DataFrames有三種方式,一種是能夠從一個存在的RDD進行轉換,還能夠從Hive Table進行查詢返回,或者經過Spark的數據源進行建立。

val df = spark.read.json("examples/src/main/resources/people.json")

df.show()
// +----+-------+
// | age|   name|
// +----+-------+
// |null|Michael|
// |  30|   Andy|
// |  19| Justin|
// +----+-------+

從RDD進行轉換:

scala> val peopleRdd = sc.textFile("examples/src/main/resources/people.txt")
peopleRdd: org.apache.spark.rdd.RDD[String] = examples/src/main/resources/people.txt MapPartitionsRDD[18] at textFile at <console>:24
//把每一行的數據用,隔開 而後經過第二個map轉換成一個Array 再經過toDF 映射給name age
scala> val peopleDF3 = peopleRdd.map(_.split(",")).map(paras => (paras(0),paras(1).trim().toInt)).toDF("name","age")
peopleDF3: org.apache.spark.sql.DataFrame = [name: string, age: int]

scala> peopleDF.show()
+-------+---+
|   name|age|
+-------+---+
|Michael| 29|
|   Andy| 30|
| Justin| 19|
+-------+---+

Hive在數據源章節介紹

3.3 DataFrame經常使用操做

3.3.1 DSL風格語法

import spark.implicits._

df.printSchema()
// root
// |-- age: long (nullable = true)
// |-- name: string (nullable = true)

df.select("name").show()
// +-------+
// |   name|
// +-------+
// |Michael|
// |   Andy|
// | Justin|
// +-------+

df.select($"name", $"age" + 1).show()
// +-------+---------+
// |   name|(age + 1)|
// +-------+---------+
// |Michael|     null|
// |   Andy|       31|
// | Justin|       20|
// +-------+---------+

df.filter($"age" > 21).show()
// +---+----+
// |age|name|
// +---+----+
// | 30|Andy|
// +---+----+

df.groupBy("age").count().show()
// +----+-----+
// | age|count|
// +----+-----+
// |  19|    1|
// |null|    1|
// |  30|    1|
// +----+-----+

3.3.2 SQL風格語法

// Register the DataFrame as a SQL temporary view
df.createOrReplaceTempView("people")

val sqlDF = spark.sql("SELECT * FROM people")
sqlDF.show()
// +----+-------+
// | age|   name|
// +----+-------+
// |null|Michael|
// |  30|   Andy|
// |  19| Justin|
// +----+-------+

// Register the DataFrame as a global temporary view
df.createGlobalTempView("people")

// Global temporary view is tied to a system preserved database `global_temp`
spark.sql("SELECT * FROM global_temp.people").show()
// +----+-------+
// | age|   name|
// +----+-------+
// |null|Michael|
// |  30|   Andy|
// |  19| Justin|
// +----+-------+

// Global temporary view is cross-session
spark.newSession().sql("SELECT * FROM global_temp.people").show()
// +----+-------+
// | age|   name|
// +----+-------+
// |null|Michael|
// |  30|   Andy|
// |  19| Justin|
// +----+-------+

臨時表是Session範圍內的,Session退出後,表就失效了。若是想應用範圍內有效,可使用全局表。注意使用全局表時須要全路徑訪問,如:global_temp.people

3.4 建立DataSet

Dataset是具備強類型的數據集合,須要提供對應的類型信息。

case class Person(name: String, age: Long)

// Encoders are created for case classes
val caseClassDS = Seq(Person("Andy", 32)).toDS()
caseClassDS.show()
// +----+---+
// |name|age|
// +----+---+
// |Andy| 32|
// +----+---+

// Encoders for most common types are automatically provided by importing spark.implicits._
val primitiveDS = Seq(1, 2, 3).toDS()
primitiveDS.map(_ + 1).collect() // Returns: Array(2, 3, 4)

// DataFrames can be converted to a Dataset by providing a class. Mapping will be done by name
val path = "examples/src/main/resources/people.json"
val peopleDS = spark.read.json(path).as[Person]
peopleDS.show()
// +----+-------+
// | age|   name|
// +----+-------+
// |null|Michael|
// |  30|   Andy|
// |  19| Justin|
// +----+-------+

3.5 Dataset和RDD互操做

  Spark SQL支持經過兩種方式將存在的RDD轉換爲Dataset,轉換的過程當中須要讓Dataset獲取RDD中的Schema信息,主要有兩種方式,一種是經過反射來獲取RDD中的Schema信息。這種方式適合於列名已知的狀況下。第二種是經過編程接口的方式將Schema信息應用於RDD,這種方式能夠處理那種在運行時才能知道列的方式。

3.5.1 經過反射獲取Scheam

  SparkSQL可以自動將包含有case類的RDD轉換成DataFrame,case類定義了table的結構,case類屬性經過反射變成了表的列名。Case類能夠包含諸如Seqs或者Array等複雜的結構。

// For implicit conversions from RDDs to DataFrames
import spark.implicits._

// Create an RDD of Person objects from a text file, convert it to a Dataframe
val peopleDF = spark.sparkContext
.textFile("examples/src/main/resources/people.txt")
.map(_.split(","))
.map(attributes => Person(attributes(0), attributes(1).trim.toInt))
.toDF()
// Register the DataFrame as a temporary view
peopleDF.createOrReplaceTempView("people")

// SQL statements can be run by using the sql methods provided by Spark
val teenagersDF = spark.sql("SELECT name, age FROM people WHERE age BETWEEN 13 AND 19")

// The columns of a row in the result can be accessed by field index
teenagersDF.map(teenager => "Name: " + teenager(0)).show()
// +------------+
// |       value|
// +------------+
// |Name: Justin|
// +------------+

// or by field name
teenagersDF.map(teenager => "Name: " + teenager.getAs[String]("name")).show()
// +------------+
// |       value|
// +------------+
// |Name: Justin|
// +------------+

// No pre-defined encoders for Dataset[Map[K,V]], define explicitly
implicit val mapEncoder = org.apache.spark.sql.Encoders.kryo[Map[String, Any]]
// Primitive types and case classes can be also defined as
// implicit val stringIntMapEncoder: Encoder[Map[String, Any]] = ExpressionEncoder()

// row.getValuesMap[T] retrieves multiple columns at once into a Map[String, T]
teenagersDF.map(teenager => teenager.getValuesMap[Any](List("name", "age"))).collect()
// Array(Map("name" -> "Justin", "age" -> 19))

3.5.2 經過編程設置Schema(StructType)

若是case類不可以提早定義,能夠經過下面三個步驟定義一個DataFrame

建立一個多行結構的RDD;

建立用StructType來表示的行結構信息。

經過SparkSession提供的createDataFrame方法來應用Schema .

import org.apache.spark.sql.types._

// Create an RDD
val peopleRDD = spark.sparkContext.textFile("examples/src/main/resources/people.txt")

// The schema is encoded in a string
val schemaString = "name age"

// Generate the schema based on the string of schema
val fields = schemaString.split(" ")
.map(fieldName => StructField(fieldName, StringType, nullable = true))
val schema = StructType(fields)

// Convert records of the RDD (people) to Rows
import org.apache.spark.sql._
val rowRDD = peopleRDD
.map(_.split(","))
.map(attributes => Row(attributes(0), attributes(1).trim))

// Apply the schema to the RDD
val peopleDF = spark.createDataFrame(rowRDD, schema)

// Creates a temporary view using the DataFrame
peopleDF.createOrReplaceTempView("people")

// SQL can be run over a temporary view created using DataFrames
val results = spark.sql("SELECT name FROM people")

// The results of SQL queries are DataFrames and support all the normal RDD operations
// The columns of a row in the result can be accessed by field index or by field name
results.map(attributes => "Name: " + attributes().show()
// +-------------+
// |        value|
// +-------------+
// |Name: Michael|
// |   Name: Andy|
// | Name: Justin|
// +-------------+

3.6 類型之間的轉換總結

RDD、DataFrame、Dataset三者有許多共性,有各自適用的場景經常須要在三者之間轉換

DataFrame/Dataset轉RDD:

val rdd1=testDF.rdd
val rdd2=testDS.rdd

RDD轉DataFrame:

import spark.implicits._
val testDF = rdd.map {line=>
      (line._1,line._2)
    }.toDF("col1","col2")

通常用元組把一行的數據寫在一塊兒,而後在toDF中指定字段名

RDD轉Dataset:

import spark.implicits._
case class Coltest(col1:String,col2:Int)extends Serializable //定義字段名和類型
val testDS = rdd.map {line=>
      Coltest(line._1,line._2)
    }.toDS

能夠看到,定義每一行的類型(case class)時,已經給出了字段名和類型,後面只要往case class裏面添加值便可

Dataset轉DataFrame

把case class封裝成Row

import spark.implicits._
val testDF = testDS.toDF

DataFrame轉Dataset:

import spark.implicits._
case class Coltest(col1:String,col2:Int)extends Serializable //定義字段名和類型
val testDS = testDF.as[Coltest]

這種方法就是在給出每一列的類型後,使用as方法,轉成Dataset,這在數據類型是DataFrame又須要針對各個字段處理時極爲方便。

在使用一些特殊的操做時,必定要加上 import spark.implicits._ 否則toDF、toDS沒法使用

3.7 用戶自定義函數

經過spark.udf功能用戶能夠自定義函數。

3.7.1 用戶自定義UDF函數

scala> val df = spark.read.json("examples/src/main/resources/people.json")
df: org.apache.spark.sql.DataFrame = [age: bigint, name: string]

scala> df.show()
+----+-------+
| age|   name|
+----+-------+
|null|Michael|
|  30|   Andy|
|  19| Justin|
+----+-------+


scala> spark.udf.register("addName", (x:String)=> "Name:"+x)
res5: org.apache.spark.sql.expressions.UserDefinedFunction = UserDefinedFunction(<function1>,StringType,Some(List(StringType)))

scala> df.createOrReplaceTempView("people")

scala> spark.sql("Select addName(name), age from people").show()
+-----------------+----+
|UDF:addName(name)| age|
+-----------------+----+
|     Name:Michael|null|
|        Name:Andy|  30|
|      Name:Justin|  19|
+-----------------+----+

3.7.2 用戶自定義聚合函數

強類型的Dataset和弱類型的DataFrame都提供了相關的聚合函數, 如 count(),countDistinct(),avg(),max(),min()。除此以外,用戶能夠設定本身的自定義聚合函數。

3.7.2.1 弱類型用戶自定義聚合函數

經過繼承UserDefinedAggregateFunction來實現用戶自定義聚合函數。下面展現一個求平均工資的自定義聚合函數。

import org.apache.spark.sql.expressions.MutableAggregationBuffer
import org.apache.spark.sql.expressions.UserDefinedAggregateFunction
import org.apache.spark.sql.types._
import org.apache.spark.sql.Row
import org.apache.spark.sql.SparkSession

object MyAverage extends UserDefinedAggregateFunction {
// 聚合函數輸入參數的數據類型 
def inputSchema: StructType = StructType(StructField("inputColumn", LongType) :: Nil)
// 聚合緩衝區中值得數據類型 
def bufferSchema: StructType = {
StructType(StructField("sum", LongType) :: StructField("count", LongType) :: Nil)
}
// 返回值的數據類型 
def dataType: DataType = DoubleType
// 對於相同的輸入是否一直返回相同的輸出。 
def deterministic: Boolean = true
// 初始化
def initialize(buffer: MutableAggregationBuffer): Unit = {
// 存工資的總額
buffer(0) = 0L
// 存工資的個數
buffer(1) = 0L
}
// 相同Execute間的數據合併。 
def update(buffer: MutableAggregationBuffer, input: Row): Unit = {
if (!input.isNullAt(0)) {
buffer(0) = buffer.getLong(0) + input.getLong(0)
buffer(1) = buffer.getLong(1) + 1
}
}
// 不一樣Execute間的數據合併 
def merge(buffer1: MutableAggregationBuffer, buffer2: Row): Unit = {
buffer1(0) = buffer1.getLong(0) + buffer2.getLong(0)
buffer1(1) = buffer1.getLong(1) + buffer2.getLong(1)
}
// 計算最終結果
def evaluate(buffer: Row): Double = buffer.getLong(0).toDouble / buffer.getLong(1)
}

// 註冊函數
spark.udf.register("myAverage", MyAverage)

val df = spark.read.json("examples/src/main/resources/employees.json")
df.createOrReplaceTempView("employees")
df.show()
// +-------+------+
// |   name|salary|
// +-------+------+
// |Michael|  3000|
// |   Andy|  4500|
// | Justin|  3500|
// |  Berta|  4000|
// +-------+------+

val result = spark.sql("SELECT myAverage(salary) as average_salary FROM employees")
result.show()
// +--------------+
// |average_salary|
// +--------------+
// |        3750.0|
// +--------------+

3.7.2.2 強類型用戶自定義聚合函數

經過繼承Aggregator來實現強類型自定義聚合函數,一樣是求平均工資

import org.apache.spark.sql.expressions.Aggregator
import org.apache.spark.sql.Encoder
import org.apache.spark.sql.Encoders
import org.apache.spark.sql.SparkSession
// 既然是強類型,可能有case類
case class Employee(name: String, salary: Long)
case class Average(var sum: Long, var count: Long)

object MyAverage extends Aggregator[Employee, Average, Double] {
// 定義一個數據結構,保存工資總數和工資總個數,初始都爲0
def zero: Average = Average(0L, 0L)
// Combine two values to produce a new value. For performance, the function may modify `buffer`
// and return it instead of constructing a new object
def reduce(buffer: Average, employee: Employee): Average = {
buffer.sum += employee.salary
buffer.count += 1
buffer
}
// 聚合不一樣execute的結果
def merge(b1: Average, b2: Average): Average = {
b1.sum += b2.sum
b1.count += b2.count
b1
}
// 計算輸出
def finish(reduction: Average): Double = reduction.sum.toDouble / reduction.count
// 設定之間值類型的編碼器,要轉換成case類
// Encoders.product是進行scala元組和case類轉換的編碼器 
def bufferEncoder: Encoder[Average] = Encoders.product
// 設定最終輸出值的編碼器
def outputEncoder: Encoder[Double] = Encoders.scalaDouble
}

val ds = spark.read.json("examples/src/main/resources/employees.json").as[Employee]
ds.show()
// +-------+------+
// |   name|salary|
// +-------+------+
// |Michael|  3000|
// |   Andy|  4500|
// | Justin|  3500|
// |  Berta|  4000|
// +-------+------+

// Convert the function to a `TypedColumn` and give it a name
val averageSalary = MyAverage.toColumn.name("average_salary")
val result = ds.select(averageSalary)
result.show()
// +--------------+
// |average_salary|
// +--------------+
// |        3750.0|
// +--------------+

四。第1章 SparkSQL數據源

相關文章
相關標籤/搜索