【大數據】SparkSql學習筆記

 

1Spark SQL概述

1.1 什麼是Spark SQL

Spark SQLSpark用來處理結構化數據的一個模塊,它提供了2個編程抽象:DataFramejava

DataSet,而且做爲分佈式SQL查詢引擎的做用。mysql

咱們已經學習了Hive,它是將Hive SQL轉換成MapReduce而後提交到集羣上執行,大大簡化了編寫MapReduc的程序的複雜性,因爲MapReduce這種計算模型執行效率比較慢。全部Spark SQL的應運而生,它是將Spark SQL轉換成RDD,而後提交到集羣執行,執行效率很是快!sql

1.2 Spark SQL的特色

1)易整合shell

 

2)統一的數據訪問方式數據庫

 

3)兼容Hiveexpress

 

4)標準的數據鏈接apache

 

1.3 什麼是DataFrame

RDD相似,DataFrame也是一個分佈式數據容器。然而DataFrame更像傳統數據庫的二維表格,除了數據之外,還記錄數據的結構信息,即schema。同時,與Hive相似,DataFrame也支持嵌套數據類型(structarraymap)。從API易用性的角度上看,DataFrame API提供的是一套高層的關係操做,比函數式的RDD API要更加友好,門檻更低。編程

 

 上圖直觀地體現了DataFrameRDD的區別。左側的RDD[Person]雖然以Person爲類型參數,但Spark框架自己不瞭解Person類的內部結構。而右側的DataFrame卻提供了詳細的結構信息,使得Spark SQL能夠清楚地知道該數據集中包含哪些列,每列的名稱和類型各是什麼。DataFrame是爲數據提供了Schema的視圖。能夠把它當作數據庫中的一張表來對待DataFrame也是懶執行的性能上比RDD高,主要緣由:json

優化的執行計劃查詢計劃經過Spark catalyst optimiser進行優化緩存

 

好比下面一個例子

 

 爲了說明查詢優化,咱們來看上圖展現的人口數據分析的示例。圖中構造了兩個DataFrame,將它們join以後又作了一次filter操做。若是原封不動地執行這個執行計劃,最終的執行效率是不高的。由於join是一個代價較大的操做,也可能會產生一個較大的數據集。若是咱們能將filter下推到 join下方,先對DataFrame進行過濾,再join過濾後的較小的結果集,即可以有效縮短執行時間。而Spark SQL的查詢優化器正是這樣作的。簡而言之,邏輯查詢計劃優化就是一個利用基於關係代數的等價變換,將高成本的操做替換爲低成本操做的過程。

1.4 什麼是DataSet

1Dataframe API的一個擴展,是Spark最新的數據抽象。

2用戶友好的API風格,既具備類型安全檢查也具備Dataframe的查詢優化特性

3Dataset支持編解碼器,當須要訪問非堆上的數據時能夠避免反序列化整個對象,提升了效率。

4)樣例類被用來在Dataset中定義數據的結構信息,樣例類中每一個屬性的名稱直接映射到DataSet中的字段名稱。

5 DataframeDataset的特列,DataFrame=Dataset[Row] ,因此能夠經過as方法將Dataframe轉換爲DatasetRow是一個類型,跟CarPerson這些的類型同樣,全部的表結構信息我都用Row來表示。

6DataSet是強類型的。好比能夠有Dataset[Car]Dataset[Person].

7)DataFrame只是知道字段,可是不知道字段的類型,因此在執行這些操做的時候是沒辦法在編譯的時候檢查是否類型失敗的,好比你能夠對一個String進行減法操做,在執行的時候才報錯,而DataSet不只僅知道字段,並且知道字段類型,因此有更嚴格的錯誤檢查。就跟JSON對象和類對象之間的類比。

2SparkSQL編程

2.1 SparkSession新的起始點

在老的版本中,SparkSQL提供兩種SQL查詢起始點:一個叫SQLContext,用於Spark本身提供的SQL查詢;一個叫HiveContext,用於鏈接Hive的查詢。

SparkSessionSpark最新的SQL查詢起始點,實質上是SQLContextHiveContext的組合,因此在SQLContextHiveContext上可用的APISparkSession上一樣是可使用的。SparkSession內部封裝了sparkContext,因此計算其實是由sparkContext完成的。

2.2 DataFrame

2.2.1

Spark SQLSparkSession是建立DataFrame和執行SQL的入口,建立DataFrame有三種方式:經過Spark的數據源進行建立;從一個存在的RDD進行轉換;還能夠從Hive Table進行查詢返回。

1)Spark數據源

(1)查看Spark數據源建的文件格式

scala> spark.read.

csv   format   jdbc   json   load   option   options   orc   parquet   schema   table   text   textFile

2)讀取json文件建立DataFrame

scala> val df = spark.read.json("/opt/module/spark/examples/src/main/resources/people.json")

df: org.apache.spark.sql.DataFrame = [age: bigint, name: string]

(3)展現結果

scala> df.show

+----+-------+

| age|   name|

+----+-------+

|null|Michael|

|  30|   Andy|

|  19| Justin|

+----+-------+

2)從RDD進行轉換

2.5節咱們專門討論

3)從Hive Table進行查詢返回

3.3節咱們專門討論

2.2.2 SQL風格語法(主要)

1)建立一個DataFrame

scala> val df = spark.read.json("/opt/module/spark/examples/src/main/resources/people.json")

df: org.apache.spark.sql.DataFrame = [age: bigint, name: string]

2)對DataFrame建立一個臨時表

scala> df.createOrReplaceTempView("people")

3)經過SQL語句實現查詢全表

scala> val sqlDF = spark.sql("SELECT * FROM people")

sqlDF: org.apache.spark.sql.DataFrame = [age: bigint, name: string]

4)結果展現

scala> sqlDF.show

+----+-------+

| age|   name|

+----+-------+

|null|Michael|

|  30|   Andy|

|  19| Justin|

+----+-------+

注意:臨時表是Session範圍內的,Session退出後,表就失效了。若是想應用內有效,可使用全局表。注意使用全局表須要全路徑訪問,如:global_temp.people

5)對於DataFrame建立一個全局表

scala> df.createGlobalTempView("people")

6)經過SQL語句實現查詢全表

scala> spark.sql("SELECT * FROM global_temp.people").show()

+----+-------+

| age|   name|

+----+-------+

|null|Michael|

|  30|   Andy|

|  19| Justin|

 

scala> spark.newSession().sql("SELECT * FROM global_temp.people").show()

+----+-------+

| age|   name|

+----+-------+

|null|Michael|

|  30|   Andy|

|  19| Justin|

+----+-------+

2.2.3 DSL風格語法(次要)

1)建立一個DateFrame

scala> spark.read.

csv   format   jdbc   json   load   option   options   orc   parquet   schema   table   text   textFile

2)查看DataFrameSchema信息

scala> df.printSchema

root

 |-- age: long (nullable = true)

 |-- name: string (nullable = true)

3)只查看」name」列數據

scala> df.select("name").show()

+-------+

|   name|

+-------+

|Michael|

|   Andy|

| Justin|

+-------+

4)查看」name」列數據以及」age+1」數據

scala> df.select($"name", $"age" + 1).show()

+-------+---------+

|   name|(age + 1)|

+-------+---------+

|Michael|     null|

|   Andy|       31|

| Justin|       20|

+-------+---------+

5)查看」age」大於」21」的數據

scala> df.filter($"age" > 21).show()

+---+----+

|age|name|

+---+----+

| 30|Andy|

+---+----+

6)按照」age」分組,查看數據條數

scala> df.groupBy("age").count().show()

+----+-----+

| age|count|

+----+-----+

|  19|     1|

|null|     1|

|  30|     1|

+----+-----+

2.2.4 RDD轉換爲DateFrame

注意:若是須要RDD與DF或者DS之間操做,那麼都須要引入 import spark.implicits._  【spark不是包名,而是sparkSession對象的名稱】

前置條件:導入隱式轉換並建立一個RDD

scala> import spark.implicits._

import spark.implicits._

 

scala> val peopleRDD = sc.textFile("examples/src/main/resources/people.txt")

peopleRDD: org.apache.spark.rdd.RDD[String] = examples/src/main/resources/people.txt MapPartitionsRDD[3] at textFile at <console>:27

1)經過手動肯定轉換

scala> peopleRDD.map{x=>val para = x.split(",");(para(0),para(1).trim.toInt)}.toDF("name","age")

res1: org.apache.spark.sql.DataFrame = [name: string, age: int]

2)經過反射肯定(須要用到樣例類)

1)建立一個樣例類

scala> case class People(name:String, age:Int)

2)根據樣例類將RDD轉換爲DataFrame

scala> peopleRDD.map{ x => val para = x.split(",");People(para(0),para(1).trim.toInt)}.toDF

res2: org.apache.spark.sql.DataFrame = [name: string, age: int]

3)經過編程的方式(瞭解)

1)導入所需的類型

scala> import org.apache.spark.sql.types._

import org.apache.spark.sql.types._

2)建立Schema

scala> val structType: StructType = StructType(StructField("name", StringType) :: StructField("age", IntegerType) :: Nil)

structType: org.apache.spark.sql.types.StructType = StructType(StructField(name,StringType,true), StructField(age,IntegerType,true))

(3)導入所需的類型

scala> import org.apache.spark.sql.Row

import org.apache.spark.sql.Row

4)根據給定的類型建立二元組RDD

scala> val data = peopleRDD.map{ x => val para = x.split(",");Row(para(0),para(1).trim.toInt)}

data: org.apache.spark.rdd.RDD[org.apache.spark.sql.Row] = MapPartitionsRDD[6] at map at <console>:33

5)根據數據及給定的schema建立DataFrame

scala> val dataFrame = spark.createDataFrame(data, structType)

dataFrame: org.apache.spark.sql.DataFrame = [name: string, age: int]

2.2.5 DateFrame轉換爲RDD

直接調用rdd便可

1)建立一個DataFrame

scala> val df = spark.read.json("/opt/module/spark/examples/src/main/resources/people.json")

df: org.apache.spark.sql.DataFrame = [age: bigint, name: string]

2)將DataFrame轉換爲RDD

scala> val dfToRDD = df.rdd

dfToRDD: org.apache.spark.rdd.RDD[org.apache.spark.sql.Row] = MapPartitionsRDD[19] at rdd at <console>:29

3)打印RDD

scala> dfToRDD.collect

res13: Array[org.apache.spark.sql.Row] = Array([Michael, 29], [Andy, 30], [Justin, 19])

2.3 DataSet

Dataset是具備強類型的數據集合,須要提供對應的類型信息。

2.3.1 建立

1)建立一個樣例類

scala> case class Person(name: String, age: Long)

defined class Person

2)建立DataSet

scala> val caseClassDS = Seq(Person("Andy", 32)).toDS()

caseClassDS: org.apache.spark.sql.Dataset[Person] = [name: string, age: bigint]

2.3.2 RDD轉換爲DataSet

SparkSQL可以自動將包含有case類的RDD轉換成DataFramecase類定義了table的結構,case類屬性經過反射變成了表的列名。Case類能夠包含諸如Seqs或者Array等複雜的結構

1)建立一個RDD

scala> val peopleRDD = sc.textFile("examples/src/main/resources/people.txt")

peopleRDD: org.apache.spark.rdd.RDD[String] = examples/src/main/resources/people.txt MapPartitionsRDD[3] at textFile at <console>:27

2)建立一個樣例類

scala> case class Person(name: String, age: Long)

defined class Person

3)RDD轉化爲DataSet

4)

scala> peopleRDD.map(line => {val para = line.split(",");Person(para(0),para(1).trim.toInt)}).toDS()

res8: org.apache.spark.sql.DataSet[People] = [name: string, age: bigint]

2.3.3 DataSet轉換爲RDD

調用rdd方法便可。

1)建立一個DataSet

scala> val DS = Seq(Person("Andy", 32)).toDS()

DS: org.apache.spark.sql.Dataset[Person] = [name: string, age: bigint]

2)將DataSet轉換爲RDD

scala> DS.rdd

res11: org.apache.spark.rdd.RDD[Person] = MapPartitionsRDD[15] at rdd at <console>:28

2.4 DataFrameDataSet的互操做

1. DataFrame轉換爲DataSet

1)建立一個DateFrame

scala> val df = spark.read.json("examples/src/main/resources/people.json")

df: org.apache.spark.sql.DataFrame = [age: bigint, name: string]

2)建立一個樣例類

scala> case class Person(name: String, age: Long)

defined class Person

3)將DateFrame轉化爲DataSet

scala> df.as[Person]

res14: org.apache.spark.sql.Dataset[Person] = [age: bigint, name: string]

2. DataSet轉換爲DataFrame

1)建立一個樣例類

scala> case class Person(name: String, age: Long)

defined class Person

2)建立DataSet

scala> val ds = Seq(Person("Andy", 32)).toDS()

ds: org.apache.spark.sql.Dataset[Person] = [name: string, age: bigint]

3)將DataSet轉化爲DataFrame

scala> val df = ds.toDF

df: org.apache.spark.sql.DataFrame = [name: string, age: bigint]

4)展現

scala> df.show

+----+---+

|name|age|

+----+---+

|Andy| 32|

+----+---+

2.4.1 DatasetDataFrame

這個很簡單,由於只是把case class封裝成Row

1)導入隱式轉換

import spark.implicits._

(2)轉換

val testDF = testDS.toDF

2.4.2 DataFrameDataset

1)導入隱式轉換

import spark.implicits._

2)建立樣例類

case class Coltest(col1:String,col2:Int)extends Serializable //定義字段名和類型

3)轉換

val testDS = testDF.as[Coltest]

這種方法就是在給出每一列的類型後,使用as方法,轉成Dataset,這在數據類型是DataFrame又須要針對各個字段處理時極爲方便在使用一些特殊的操做時,必定要加上 import spark.implicits._ 否則toDFtoDS沒法使用

2.5 RDDDataFrameDataSet

 

 SparkSQLSpark爲咱們提供了兩個新的抽象,分別是DataFrame和DataSet他們和RDD有什麼區別呢?首先從版本的產生上來看:

RDD (Spark1.0) —> Dataframe(Spark1.3) —> Dataset(Spark1.6)

若是一樣的數據都給到這三個數據結構,他們分別計算以後,都會給出相同的結果。不一樣是的他們的執行效率和執行方式。

在後期的Spark版本中,DataSet會逐步取代RDDDataFrame成爲惟一的API接口。

2.5.1 三者的共性

1RDDDataFrameDataset全都是spark平臺下的分佈式彈性數據集,爲處理超大型數據提供便利

2、三者都有惰性機制,在進行建立、轉換,如map方法時,不會當即執行,只有在遇到Actionforeach時,三者纔會開始遍歷運算

3、三者都會根據spark的內存狀況自動緩存運算,這樣即便數據量很大,也不用擔憂會內存溢出

4、三者都有partition的概念

5、三者有許多共同的函數,如filter,排序等

6、在對DataFrameDataset進行操做許多操做都須要這個包進行支持

import spark.implicits._

7DataFrameDataset都可使用模式匹配獲取各個字段的值和類型

DataFrame:

testDF.map{

      case Row(col1:String,col2:Int)=>

        println(col1);println(col2)

        col1

      case _=>

        ""

    }

Dataset:

case class Coltest(col1:String,col2:Int)extends Serializable //定義字段名和類型

    testDS.map{

      case Coltest(col1:String,col2:Int)=>

        println(col1);println(col2)

        col1

      case _=>

        ""

    }

2.5.2 三者的區別

1. RDD:

1)RDD通常和spark mlib同時使用

2)RDD不支持sparksql操做

2. DataFrame:

1)RDDDataset不一樣,DataFrame每一行的類型固定爲Row,每一列的值無法直接訪問只有經過解析才能獲取各個字段的值,如

testDF.foreach{

  line =>

    val col1=line.getAs[String]("col1")

    val col2=line.getAs[String]("col2")

}

2)DataFrameDataset通常不與spark mlib同時使用

3)DataFrameDataset均支持sparksql的操做,好比selectgroupby之類,還能註冊臨時表/視窗,進行sql語句操做,如

dataDF.createOrReplaceTempView("tmp")

spark.sql("select  ROW,DATE from tmp where DATE is not null order by DATE").show(100,false)

4)DataFrameDataset支持一些特別方便的保存方式,好比保存成csv,能夠帶上表頭,這樣每一列的字段名一目瞭然

//保存

val saveoptions = Map("header" -> "true", "delimiter" -> "\t", "path" -> "hdfs://hadoop102:9000/test")

datawDF.write.format("com.atguigu.spark.csv").mode(SaveMode.Overwrite).options(saveoptions).save()

//讀取

val options = Map("header" -> "true", "delimiter" -> "\t", "path" -> "hdfs://hadoop102:9000/test")

val datarDF= spark.read.options(options).format("com.atguigu.spark.csv").load()

利用這樣的保存方式,能夠方便的得到字段名和列的對應,並且分隔符(delimiter)能夠自由指定

3. Dataset:

1)DatasetDataFrame擁有徹底相同的成員函數,區別只是每一行的數據類型不一樣

2)DataFrame也能夠叫Dataset[Row],每一行的類型是Row,不解析,每一行究竟有哪些字段,各個字段又是什麼類型都無從得知,只能用上面提到的getAS方法或者共性中的第七條提到的模式匹配拿出特定字段Dataset中,每一行是什麼類型是不必定的,在自定義了case class以後能夠很自由的得到每一行的信息

case class Coltest(col1:String,col2:Int)extends Serializable //定義字段名和類型

/**

 rdd

 ("a", 1)

 ("b", 1)

 ("a", 1)

**/

val test: Dataset[Coltest]=rdd.map{line=>

      Coltest(line._1,line._2)

    }.toDS

test.map{

      line=>

        println(line.col1)

        println(line.col2)

    }

能夠看出,Dataset在須要訪問列中的某個字段時是很是方便的,然而,若是要寫一些適配性很強的函數時,若是使用Dataset,行的類型又不肯定,多是各類case class,沒法實現適配,這時候用DataFrameDataset[Row]就能比較好的解決問題

2.6 IDEA建立SparkSQL程序

IDEA中程序的打包和運行方式都和SparkCore相似,Maven依賴中須要添加新的依賴項:

<dependency>
    <groupId>org.apache.spark</groupId>
    <artifactId>spark-sql_2.11</artifactId>
    <version>2.1.1</version>
</dependency>

程序以下:

package com.atguigu.sparksql

import org.apache.spark.sql.SparkSession
import org.apache.spark.{SparkConf, SparkContext}
import org.slf4j.LoggerFactory

object HelloWorld {

  def main(args: Array[String]) {
    //建立SparkConf()並設置App名稱
    val spark = SparkSession
      .builder().master(「local[*]」)
      .appName("Spark SQL basic example")
      .getOrCreate()

    // For implicit conversions like converting RDDs to DataFrames
    import spark.implicits._

    val df = spark.read.json("file:/F:/hajima.json")

    // Displays the content of the DataFrame to stdout
    df.show()

    df.filter($"age" > 21).show()

    df.createOrReplaceTempView("persons")

    spark.sql("SELECT * FROM persons where age > 21").show()

    spark.stop()
  }

}

2.7 用戶自定義函數

Shell窗口中能夠經過spark.udf功能用戶能夠自定義函數。

2.7.1 用戶自定義UDF函數

scala> val df = spark.read.json("examples/src/main/resources/people.json")

df: org.apache.spark.sql.DataFrame = [age: bigint, name: string]

 

scala> df.show()

+----+-------+

| age|   name|

+----+-------+

|null|Michael|

|  30|   Andy|

|  19| Justin|

+----+-------+

 

 

scala> spark.udf.register("addName", (x:String)=> "Name:"+x)

res5: org.apache.spark.sql.expressions.UserDefinedFunction = UserDefinedFunction(<function1>,StringType,Some(List(StringType)))

 

scala> df.createOrReplaceTempView("people")

 

scala> spark.sql("Select addName(name), age from people").show()

+-----------------+----+

|UDF:addName(name)| age|

+-----------------+----+

|     Name:Michael|null|

|        Name:Andy|  30|

|      Name:Justin|  19|

+-----------------+----+

2.7.2 用戶自定義聚合函數

強類型的Dataset型的DataFrame都提供了相關的聚合函數, 如 count()countDistinct()avg()max()min()。除此以外,用能夠定本身的自定聚合函數。

弱類型用戶自定義聚合函數:經過繼承UserDefinedAggregateFunction來實現用戶自定義聚合函數。下面展現一個求平均工資的自定義聚合函數。

import org.apache.spark.sql.expressions.MutableAggregationBuffer
import org.apache.spark.sql.expressions.UserDefinedAggregateFunction
import org.apache.spark.sql.types._
import org.apache.spark.sql.Row
import org.apache.spark.sql.SparkSession

object MyAverage extends UserDefinedAggregateFunction {
// 聚合函數輸入參數的數據類型
def inputSchema: StructType = StructType(StructField("inputColumn", LongType) :: Nil)
// 聚合緩衝區中值得數據類型
def bufferSchema: StructType = {
StructType(StructField("sum", LongType) :: StructField("count", LongType) :: Nil)
}
// 返回值的數據類型
def dataType: DataType = DoubleType
// 對於相同的輸入是否一直返回相同的輸出。
def deterministic: Boolean = true
// 初始化
def initialize(buffer: MutableAggregationBuffer): Unit = {

// 存工資的總額
buffer(0) = 0L

// 存工資的個數
buffer(1) = 0L
}
// 相同Execute間的數據合併。
def update(buffer: MutableAggregationBuffer, input: Row): Unit = {
if (!input.isNullAt(0)) {
buffer(0) = buffer.getLong(0) + input.getLong(0)
buffer(1) = buffer.getLong(1) + 1
}
}
// 不一樣Execute間的數據合併
def merge(buffer1: MutableAggregationBuffer, buffer2: Row): Unit = {
buffer1(0) = buffer1.getLong(0) + buffer2.getLong(0)
buffer1(1) = buffer1.getLong(1) + buffer2.getLong(1)
}
// 計算最終結果

def evaluate(buffer: Row): Double = buffer.getLong(0).toDouble / buffer.getLong(1)
}

// 註冊函數
spark.udf.register("myAverage", MyAverage)

val df = spark.read.json("examples/src/main/resources/employees.json")
df.createOrReplaceTempView("employees")
df.show()
// +-------+------+
// |   name|salary|
// +-------+------+
// |Michael|  3000|
// |   Andy|  4500|
// | Justin|  3500|
// |  Berta|  4000|
// +-------+------+

val result = spark.sql("SELECT myAverage(salary) as average_salary FROM employees")
result.show()
// +--------------+
// |average_salary|
// +--------------+
// |        3750.0|
// +--------------+

強類型用戶自定義聚合函數:經過繼承Aggregator來實現強類型自定義聚合函數,一樣是求平均工資

import org.apache.spark.sql.expressions.Aggregator
import org.apache.spark.sql.Encoder
import org.apache.spark.sql.Encoders
import org.apache.spark.sql.SparkSession
// 既然是強類型,可能有case
case class Employee(name: String, salary: Long)
case class Average(var sum: Long, var count: Long)

object MyAverage extends Aggregator[Employee, Average, Double] {
// 定義一個數據結構,保存工資總數和工資總個數,初始都爲0
def zero: Average = Average(0L, 0L)
// Combine two values to produce a new value. For performance, the function may modify `buffer`
// and return it instead of constructing a new object
def reduce(buffer: Average, employee: Employee): Average = {
buffer.sum += employee.salary
buffer.count += 1
buffer
}
// 聚合不一樣execute的結果
def merge(b1: Average, b2: Average): Average = {
b1.sum += b2.sum
b1.count += b2.count
b1
}
// 計算輸出
def finish(reduction: Average): Double = reduction.sum.toDouble / reduction.count
// 設定之間值類型的編碼器,要轉換成case

// Encoders.product是進行scala元組和case類轉換的編碼器
def bufferEncoder: Encoder[Average] = Encoders.product
// 設定最終輸出值的編碼器
def outputEncoder: Encoder[Double] = Encoders.scalaDouble
}

import spark.implicits._


val ds = spark.read.json("examples/src/main/resources/employees.json").as[Employee]
ds.show()
// +-------+------+
// |   name|salary|
// +-------+------+
// |Michael|  3000|
// |   Andy|  4500|
// | Justin|  3500|
// |  Berta|  4000|
// +-------+------+

// Convert the function to a `TypedColumn` and give it a name
val averageSalary = MyAverage.toColumn.name("average_salary")
val result = ds.select(averageSalary)
result.show()
// +--------------+
// |average_salary|
// +--------------+
// |        3750.0|
// +--------------+

3 SparkSQL數據源

3.1 通用加載/保存方法

3.1.1 手動指定選項

Spark SQLDataFrame接口支持多種數據源的操做。一個DataFrame能夠進行RDDs方式的操做,也能夠被註冊爲臨時表。把DataFrame註冊爲臨時表以後,就能夠對該DataFrame執行SQL查詢。

Spark SQL的默認數據源爲Parquet格式。數據源爲Parquet文件時,Spark SQL能夠方便的執行全部的操做。修改配置項spark.sql.sources.default,可修改默認數據源格式。

val df = spark.read.load("examples/src/main/resources/users.parquet") df.select("name", "favorite_color").write.save("namesAndFavColors.parquet")

當數據源格式不是parquet格式文件時,須要手動指定數據源的格式。數據源格式須要指定全名(例如:org.apache.spark.sql.parquet),若是數據源格式爲內置格式,則只須要指定簡稱定json, parquet, jdbc, orc, libsvm, csv, text來指定數據的格式。

能夠經過SparkSession提供的read.load方法用於通用加載數據,使用writesave保存數據。 

val peopleDF = spark.read.format("json").load("examples/src/main/resources/people.json")
peopleDF.write.format("parquet").save("hdfs://hadoop102:9000/namesAndAges.parquet")

除此以外,能夠直接運行SQL在文件上:

val sqlDF = spark.sql("SELECT * FROM parquet.`hdfs://hadoop102:9000/namesAndAges.parquet`")

sqlDF.show()

scala> val peopleDF = spark.read.format("json").load("examples/src/main/resources/people.json")

peopleDF: org.apache.spark.sql.DataFrame = [age: bigint, name: string]

 

scala> peopleDF.write.format("parquet").save("hdfs://hadoop102:9000/namesAndAges.parquet")

 

scala> peopleDF.show()

+----+-------+

| age|   name|

+----+-------+

|null|Michael|

|  30|   Andy|

|  19| Justin|

+----+-------+

 

scala> val sqlDF = spark.sql("SELECT * FROM parquet.`hdfs:// hadoop102:9000/namesAndAges.parquet`")

17/09/05 04:21:11 WARN ObjectStore: Failed to get database parquet, returning NoSuchObjectException

sqlDF: org.apache.spark.sql.DataFrame = [age: bigint, name: string]

 

scala> sqlDF.show()

+----+-------+

| age|   name|

+----+-------+

|null|Michael|

|  30|   Andy|

|  19| Justin|

+----+-------+

3.1.2 文件保存選項

能夠採用SaveMode執行存儲操做,SaveMode定義了對數據的處理模式。須要注意的是,這些保存模式不使用任何鎖定,不是原子操做。此外,當使用Overwrite方式執行時,在輸出新數據以前原數據就已經被刪除。SaveMode詳細介紹以下表:

 

Scala/Java

Any Language

Meaning

SaveMode.ErrorIfExists(default)

"error"(default)

若是文件存在,則報錯

SaveMode.Append

"append"

追加

SaveMode.Overwrite

"overwrite"

覆寫

SaveMode.Ignore

"ignore"

數據存在,則忽略

3.2 JSON文件

Spark SQL 可以自動推測 JSON數據集的結構,並將它加載爲一個Dataset[Row]. 能夠經過SparkSession.read.json()去加載一個 一個JSON 文件。

注意:這個JSON文件不是一個傳統的JSON文件,每一行都得是一個JSON串。

{"name":"Michael"}

{"name":"Andy", "age":30}

{"name":"Justin", "age":19}

 

// Primitive types (Int, String, etc) and Product types (case classes) encoders are
// supported by importing this when creating a Dataset.
import spark.implicits._

// A JSON dataset is pointed to by path.
// The path can be either a single text file or a directory storing text files
val path = "examples/src/main/resources/people.json"
val peopleDF = spark.read.json(path)

// The inferred schema can be visualized using the printSchema() method
peopleDF.printSchema()
// root
//  |-- age: long (nullable = true)
//  |-- name: string (nullable = true)

// Creates a temporary view using the DataFrame
peopleDF.createOrReplaceTempView("people")

// SQL statements can be run by using the sql methods provided by spark
val teenagerNamesDF = spark.sql("SELECT name FROM people WHERE age BETWEEN 13 AND 19")
teenagerNamesDF.show()
// +------+
// |  name|
// +------+
// |Justin|
// +------+

// Alternatively, a DataFrame can be created for a JSON dataset represented by
// a Dataset[String] storing one JSON object per string
val otherPeopleDataset = spark.createDataset(
"""{"name":"Yin","address":{"city":"Columbus","state":"Ohio"}}""" :: Nil)
val otherPeople = spark.read.json(otherPeopleDataset)
otherPeople.show()
// +---------------+----+
// |        address|name|
// +---------------+----+
// |[Columbus,Ohio]| Yin|

3.3 Parquet文件

Parquet是一種流行的列式存儲格式,能夠高效地存儲具備嵌套字段的記錄。Parquet格式常常在Hadoop生態圈中被使用,它也支持Spark SQL的所有數據類型。Spark SQL 提供了直接讀取和存儲 Parquet 格式文件的方法。

importing spark.implicits._
import spark.implicits._

val peopleDF = spark.read.json("examples/src/main/resources/people.json")

peopleDF.write.parquet("hdfs://hadoop102:9000/people.parquet")

val parquetFileDF = spark.read.parquet("hdfs:// hadoop102:9000/people.parquet")

parquetFileDF.createOrReplaceTempView("parquetFile")


val namesDF = spark.sql("SELECT name FROM parquetFile WHERE age BETWEEN 13 AND 19")
namesDF.map(attributes => "Name: " + attributes(0)).show()
// +------------+
// |       value|
// +------------+
// |Name: Justin|
// +------------+

3.4 JDBC

Spark SQL能夠經過JDBC從關係型數據庫中讀取數據的方式建立DataFrame,經過對DataFrame一系列的計算後,還能夠將數據再寫回關係型數據庫中。

注意:須要將相關的數據庫驅動放到spark的類路徑下。

$ bin/spark-shell --master spark://hadoop102:7077 --jars mysql-connector-java-5.1.27-bin.jar

 

// Note: JDBC loading and saving can be achieved via either the load/save or jdbc methods
// Loading data from a JDBC source
val jdbcDF = spark.read.format("jdbc").option("url", "jdbc:mysql://hadoop102:3306/rdd").option("dbtable", " rddtable").option("user", "root").option("password", "000000").load()

val connectionProperties = new Properties()
connectionProperties.put("user", "root")
connectionProperties.put("password", "hive")
val jdbcDF2 = spark.read
.jdbc("jdbc:mysql://hadoop102:3306/rdd", "rddtable", connectionProperties)

// Saving data to a JDBC source
jdbcDF.write
.format("jdbc")
.option("url", "jdbc:mysql://hadoop102:3306/rdd")
.option("dbtable", "dftable")
.option("user", "root")
.option("password", "000000")
.save()

jdbcDF2.write
.jdbc("jdbc:mysql://hadoop102:3306/mysql", "db", connectionProperties)

3.5 Hive數據庫

Apache HiveHadoop上的SQL引擎,Spark SQL編譯時能夠包含Hive支持,也能夠不包含。包含Hive支持的Spark SQL能夠支持Hive表訪問、UDF(用戶自定義函數)以及 Hive 查詢語言(HiveQL/HQL)。須要強調的一點是,若是要在Spark SQL中包含Hive的庫,並不須要事先安裝Hive。通常來講,最好仍是在編譯Spark SQL時引入Hive支持,這樣就可使用這些特性了。若是你下載的是二進制版本的 Spark,它應該已經在編譯時添加了 Hive 支持。

若要把Spark SQL鏈接到一個部署好的Hive上,你必須把hive-site.xml複製到 Spark的配置文件目錄中($SPARK_HOME/conf)。即便沒有部署好HiveSpark SQL也能夠運行。 須要注意的是,若是你沒有部署好HiveSpark SQL會在當前的工做目錄中建立出本身的Hive 元數據倉庫,叫做 metastore_db。此外,若是你嘗試使用 HiveQL 中的 CREATE TABLE (並不是 CREATE EXTERNAL TABLE)語句來建立表,這些表會被放在你默認的文件系統中的 /user/hive/warehouse 目錄中(若是你的 classpath 中有配好的 hdfs-site.xml,默認的文件系統就是 HDFS,不然就是本地文件系統)

import java.io.File

import org.apache.spark.sql.Row
import org.apache.spark.sql.SparkSession

case class Record(key: Int, value: String)

// warehouseLocation points to the default location for managed databases and tables
val warehouseLocation = new File("spark-warehouse").getAbsolutePath

val spark = SparkSession
.builder()
.appName("Spark Hive Example")
.config("spark.sql.warehouse.dir", warehouseLocation)
.enableHiveSupport()
.getOrCreate()

import spark.implicits._
import spark.sql

sql("CREATE TABLE IF NOT EXISTS src (key INT, value STRING)")
sql("LOAD DATA LOCAL INPATH 'examples/src/main/resources/kv1.txt' INTO TABLE src")

// Queries are expressed in HiveQL
sql("SELECT * FROM src").show()
// +---+-------+
// |key|  value|
// +---+-------+
// |238|val_238|
// | 86| val_86|
// |311|val_311|
// ...

// Aggregation queries are also supported.
sql("SELECT COUNT(*) FROM src").show()
// +--------+
// |count(1)|
// +--------+
// |    500 |
// +--------+

// The results of SQL queries are themselves DataFrames and support all normal functions.
val sqlDF = sql("SELECT key, value FROM src WHERE key < 10 ORDER BY key")

// The items in DataFrames are of type Row, which allows you to access each column by ordinal.
val stringsDS = sqlDF.map {
case Row(key: Int, value: String) => s"Key: $key, Value: $value"
}
stringsDS.show()
// +--------------------+
// |               value|
// +--------------------+
// |Key: 0, Value: val_0|
// |Key: 0, Value: val_0|
// |Key: 0, Value: val_0|
// ...

// You can also use DataFrames to create temporary views within a SparkSession.
val recordsDF = spark.createDataFrame((1 to 100).map(i => Record(i, s"val_$i")))
recordsDF.createOrReplaceTempView("records")

// Queries can then join DataFrame data with data stored in Hive.
sql("SELECT * FROM records r JOIN src s ON r.key = s.key").show()
// +---+------+---+------+
// |key| value|key| value|
// +---+------+---+------+
// |  2| val_2|  2| val_2|
// |  4| val_4|  4| val_4|
// |  5| val_5|  5| val_5|

3.5.1 內嵌Hive應用

若是要使用內嵌的Hive,什麼都不用作,直接用就能夠了。 --conf : spark.sql.warehouse.dir=

 

注意若是你使用的是內部的Hive,Spark2.0以後,spark.sql.warehouse.dir用於指定數據倉庫的地址若是你須要是用HDFS做爲路徑那麼須要將core-site.xmlhdfs-site.xml 加入到Spark conf目錄不然只會建立master節點上的warehouse目錄查詢時會出現文件找不到的問題這是須要向使用HDFS,則須要將metastore刪除重啓集羣

3.5.2 外部Hive應用

若是想鏈接外部已經部署好的Hive,須要經過如下幾個步驟。

1) Hive中的hive-site.xml拷貝或者軟鏈接到Spark安裝目錄下的conf目錄下。

2) 打開spark shell,注意帶上訪問Hive元數據庫的JDBC客戶端

$ bin/spark-shell --master spark://hadoop102:7077 --jars mysql-connector-java-5.1.27-bin.jar

3.5.3 運行Spark SQL CLI

Spark SQL CLI能夠很方便的在本地運行Hive元數據服務以及從命令行執行查詢任務。在Spark目錄下執行以下命令啓動Spark SQL CLI

./bin/spark-sql

配置Hive須要替換 conf/ 下的 hive-site.xml 

// +---------------+----+

4 Spark SQL實戰

4.1 數聽說明

數據集是貨品交易數據集。

每一個訂單可能包含多個貨品,每一個訂單能夠產生屢次交易,不一樣的貨品有不一樣的單價。

4.2 加載數據

tbStock

scala> case class tbStock(ordernumber:String,locationid:String,dateid:String) extends Serializable

defined class tbStock

 

scala> val tbStockRdd = spark.sparkContext.textFile("tbStock.txt")

tbStockRdd: org.apache.spark.rdd.RDD[String] = tbStock.txt MapPartitionsRDD[1] at textFile at <console>:23

 

scala> val tbStockDS = tbStockRdd.map(_.split(",")).map(attr=>tbStock(attr(0),attr(1),attr(2))).toDS

tbStockDS: org.apache.spark.sql.Dataset[tbStock] = [ordernumber: string, locationid: string ... 1 more field]

 

scala> tbStockDS.show()

+------------+----------+---------+

| ordernumber|locationid|   dataid|

+------------+----------+---------+

|BYSL00000893|      ZHAO|2007-8-23|

|BYSL00000897|      ZHAO|2007-8-24|

|BYSL00000898|      ZHAO|2007-8-25|

|BYSL00000899|      ZHAO|2007-8-26|

|BYSL00000900|      ZHAO|2007-8-26|

|BYSL00000901|      ZHAO|2007-8-27|

|BYSL00000902|      ZHAO|2007-8-27|

|BYSL00000904|      ZHAO|2007-8-28|

|BYSL00000905|      ZHAO|2007-8-28|

|BYSL00000906|      ZHAO|2007-8-28|

|BYSL00000907|      ZHAO|2007-8-29|

|BYSL00000908|      ZHAO|2007-8-30|

|BYSL00000909|      ZHAO| 2007-9-1|

|BYSL00000910|      ZHAO| 2007-9-1|

|BYSL00000911|      ZHAO|2007-8-31|

|BYSL00000912|      ZHAO| 2007-9-2|

|BYSL00000913|      ZHAO| 2007-9-3|

|BYSL00000914|      ZHAO| 2007-9-3|

|BYSL00000915|      ZHAO| 2007-9-4|

|BYSL00000916|      ZHAO| 2007-9-4|

+------------+----------+---------+

only showing top 20 rows

 

tbStockDetail:

scala> case class tbStockDetail(ordernumber:String, rownum:Int, itemid:String, number:Int, price:Double, amount:Double) extends Serializable

defined class tbStockDetail

 

scala> val tbStockDetailRdd = spark.sparkContext.textFile("tbStockDetail.txt")

tbStockDetailRdd: org.apache.spark.rdd.RDD[String] = tbStockDetail.txt MapPartitionsRDD[13] at textFile at <console>:23

 

scala> val tbStockDetailDS = tbStockDetailRdd.map(_.split(",")).map(attr=> tbStockDetail(attr(0),attr(1).trim().toInt,attr(2),attr(3).trim().toInt,attr(4).trim().toDouble, attr(5).trim().toDouble)).toDS

tbStockDetailDS: org.apache.spark.sql.Dataset[tbStockDetail] = [ordernumber: string, rownum: int ... 4 more fields]

 

scala> tbStockDetailDS.show()

+------------+------+--------------+------+-----+------+

| ordernumber|rownum|        itemid|number|price|amount|

+------------+------+--------------+------+-----+------+

|BYSL00000893|     0|FS527258160501|    -1|268.0|-268.0|

|BYSL00000893|     1|FS527258169701|     1|268.0| 268.0|

|BYSL00000893|     2|FS527230163001|     1|198.0| 198.0|

|BYSL00000893|     3|24627209125406|     1|298.0| 298.0|

|BYSL00000893|     4|K9527220210202|     1|120.0| 120.0|

|BYSL00000893|     5|01527291670102|     1|268.0| 268.0|

|BYSL00000893|     6|QY527271800242|     1|158.0| 158.0|

|BYSL00000893|     7|ST040000010000|     8|  0.0|   0.0|

|BYSL00000897|     0|04527200711305|     1|198.0| 198.0|

|BYSL00000897|     1|MY627234650201|     1|120.0| 120.0|

|BYSL00000897|     2|01227111791001|     1|249.0| 249.0|

|BYSL00000897|     3|MY627234610402|     1|120.0| 120.0|

|BYSL00000897|     4|01527282681202|     1|268.0| 268.0|

|BYSL00000897|     5|84126182820102|     1|158.0| 158.0|

|BYSL00000897|     6|K9127105010402|     1|239.0| 239.0|

|BYSL00000897|     7|QY127175210405|     1|199.0| 199.0|

|BYSL00000897|     8|24127151630206|     1|299.0| 299.0|

|BYSL00000897|     9|G1126101350002|     1|158.0| 158.0|

|BYSL00000897|    10|FS527258160501|     1|198.0| 198.0|

|BYSL00000897|    11|ST040000010000|    13|  0.0|   0.0|

+------------+------+--------------+------+-----+------+

only showing top 20 rows

 

tbDate:

scala> case class tbDate(dateid:String, years:Int, theyear:Int, month:Int, day:Int, weekday:Int, week:Int, quarter:Int, period:Int, halfmonth:Int) extends Serializable

defined class tbDate

 

scala> val tbDateRdd = spark.sparkContext.textFile("tbDate.txt")

tbDateRdd: org.apache.spark.rdd.RDD[String] = tbDate.txt MapPartitionsRDD[20] at textFile at <console>:23

 

scala> val tbDateDS = tbDateRdd.map(_.split(",")).map(attr=> tbDate(attr(0),attr(1).trim().toInt, attr(2).trim().toInt,attr(3).trim().toInt, attr(4).trim().toInt, attr(5).trim().toInt, attr(6).trim().toInt, attr(7).trim().toInt, attr(8).trim().toInt, attr(9).trim().toInt)).toDS

tbDateDS: org.apache.spark.sql.Dataset[tbDate] = [dateid: string, years: int ... 8 more fields]

 

scala> tbDateDS.show()

+---------+------+-------+-----+---+-------+----+-------+------+---------+

|   dateid| years|theyear|month|day|weekday|week|quarter|period|halfmonth|

+---------+------+-------+-----+---+-------+----+-------+------+---------+

| 2003-1-1|200301|   2003|    1|  1|      3|   1|      1|     1|        1|

| 2003-1-2|200301|   2003|    1|  2|      4|   1|      1|     1|        1|

| 2003-1-3|200301|   2003|    1|  3|      5|   1|      1|     1|        1|

| 2003-1-4|200301|   2003|    1|  4|      6|   1|      1|     1|        1|

| 2003-1-5|200301|   2003|    1|  5|      7|   1|      1|     1|        1|

| 2003-1-6|200301|   2003|    1|  6|      1|   2|      1|     1|        1|

| 2003-1-7|200301|   2003|    1|  7|      2|   2|      1|     1|        1|

| 2003-1-8|200301|   2003|    1|  8|      3|   2|      1|     1|        1|

| 2003-1-9|200301|   2003|    1|  9|      4|   2|      1|     1|        1|

|2003-1-10|200301|   2003|    1| 10|      5|   2|      1|     1|        1|

|2003-1-11|200301|   2003|    1| 11|      6|   2|      1|     2|        1|

|2003-1-12|200301|   2003|    1| 12|      7|   2|      1|     2|        1|

|2003-1-13|200301|   2003|    1| 13|      1|   3|      1|     2|        1|

|2003-1-14|200301|   2003|    1| 14|      2|   3|      1|     2|        1|

|2003-1-15|200301|   2003|    1| 15|      3|   3|      1|     2|        1|

|2003-1-16|200301|   2003|    1| 16|      4|   3|      1|     2|        2|

|2003-1-17|200301|   2003|    1| 17|      5|   3|      1|     2|        2|

|2003-1-18|200301|   2003|    1| 18|      6|   3|      1|     2|        2|

|2003-1-19|200301|   2003|    1| 19|      7|   3|      1|     2|        2|

|2003-1-20|200301|   2003|    1| 20|      1|   4|      1|     2|        2|

+---------+------+-------+-----+---+-------+----+-------+------+---------+

only showing top 20 rows

註冊表:

scala> tbStockDS.createOrReplaceTempView("tbStock")

 

scala> tbDateDS.createOrReplaceTempView("tbDate")

 

scala> tbStockDetailDS.createOrReplaceTempView("tbStockDetail")

4.3 計算全部訂單中每一年的銷售單數、銷售總額

統計全部訂單中每一年的銷售單數、銷售總額

三個錶鏈接後以count(distinct a.ordernumber)計銷售單數,sum(b.amount)計銷售總額

 

SELECT c.theyear, COUNT(DISTINCT a.ordernumber), SUM(b.amount)

FROM tbStock a

JOIN tbStockDetail b ON a.ordernumber = b.ordernumber

JOIN tbDate c ON a.dateid = c.dateid

GROUP BY c.theyear

ORDER BY c.theyear

 

spark.sql("SELECT c.theyear, COUNT(DISTINCT a.ordernumber), SUM(b.amount) FROM tbStock a JOIN tbStockDetail b ON a.ordernumber = b.ordernumber JOIN tbDate c ON a.dateid = c.dateid GROUP BY c.theyear ORDER BY c.theyear").show

結果以下:

+-------+---------------------------+--------------------+                      

|theyear|count(DISTINCT ordernumber)|         sum(amount)|

+-------+---------------------------+--------------------+

|   2004|                          1094|   3268115.499199999|

|   2005|                          3828|1.3257564149999991E7|

|   2006|                         3772|1.3680982900000006E7|

|   2007|                          4885|1.6719354559999993E7|

|   2008|                           4861| 1.467429530000001E7|

|   2009|                            2619|   6323697.189999999|

|   2010|                              94|  210949.65999999997|

+-------+---------------------------+--------------------+

4.4 計算全部訂單每一年最大金額訂單的銷售額

目標:統計每一年最大金額訂單的銷售額:

 

1) 統計每一年,每一個訂單一共有多少銷售額

SELECT a.dateid, a.ordernumber, SUM(b.amount) AS SumOfAmount

FROM tbStock a

JOIN tbStockDetail b ON a.ordernumber = b.ordernumber

GROUP BY a.dateid, a.ordernumber

 

spark.sql("SELECT a.dateid, a.ordernumber, SUM(b.amount) AS SumOfAmount FROM tbStock a JOIN tbStockDetail b ON a.ordernumber = b.ordernumber GROUP BY a.dateid, a.ordernumber").show

結果以下:

+----------+------------+------------------+

|    dateid| ordernumber|       SumOfAmount|

+----------+------------+------------------+

|  2008-4-9|BYSL00001175|             350.0|

| 2008-5-12|BYSL00001214|             592.0|

| 2008-7-29|BYSL00011545|            2064.0|

|  2008-9-5|DGSL00012056|            1782.0|

| 2008-12-1|DGSL00013189|             318.0|

|2008-12-18|DGSL00013374|             963.0|

|  2009-8-9|DGSL00015223|            4655.0|

| 2009-10-5|DGSL00015585|            3445.0|

| 2010-1-14|DGSL00016374|            2934.0|

| 2006-9-24|GCSL00000673|3556.1000000000004|

| 2007-1-26|GCSL00000826| 9375.199999999999|

| 2007-5-24|GCSL00001020| 6171.300000000002|

|  2008-1-8|GCSL00001217|            7601.6|

| 2008-9-16|GCSL00012204|            2018.0|

| 2006-7-27|GHSL00000603|            2835.6|

|2006-11-15|GHSL00000741|           3951.94|

|  2007-6-6|GHSL00001149|               0.0|

| 2008-4-18|GHSL00001631|              12.0|

| 2008-7-15|GHSL00011367|             578.0|

|  2009-5-8|GHSL00014637|            1797.6|

+----------+------------+------------------+

2) 以上一步查詢結果爲基礎表,和表tbDate使用dateid join,求出每一年最大金額訂單的銷售額

SELECT theyear, MAX(c.SumOfAmount) AS SumOfAmount

FROM (SELECT a.dateid, a.ordernumber, SUM(b.amount) AS SumOfAmount

FROM tbStock a

JOIN tbStockDetail b ON a.ordernumber = b.ordernumber

GROUP BY a.dateid, a.ordernumber

) c

JOIN tbDate d ON c.dateid = d.dateid

GROUP BY theyear

ORDER BY theyear DESC

 

spark.sql("SELECT theyear, MAX(c.SumOfAmount) AS SumOfAmount FROM (SELECT a.dateid, a.ordernumber, SUM(b.amount) AS SumOfAmount FROM tbStock a JOIN tbStockDetail b ON a.ordernumber = b.ordernumber GROUP BY a.dateid, a.ordernumber ) c JOIN tbDate d ON c.dateid = d.dateid GROUP BY theyear ORDER BY theyear DESC").show

結果以下:

+-------+------------------+                                                    

|theyear|       SumOfAmount|

+-------+------------------+

|   2010|13065.280000000002|

|   2009|25813.200000000008|

|   2008|           55828.0|

|   2007|          159126.0|

|   2006|           36124.0|

|   2005|38186.399999999994|

|   2004| 23656.79999999997|

+-------+------------------+

4.5 計算全部訂單中每一年最暢銷貨品

目標:統計每一年最暢銷貨品(哪一個貨品銷售額amount在當年最高,哪一個就是最暢銷貨品)

 

第一步、求出每一年每一個貨品的銷售額

SELECT c.theyear, b.itemid, SUM(b.amount) AS SumOfAmount

FROM tbStock a

JOIN tbStockDetail b ON a.ordernumber = b.ordernumber

JOIN tbDate c ON a.dateid = c.dateid

GROUP BY c.theyear, b.itemid

 

spark.sql("SELECT c.theyear, b.itemid, SUM(b.amount) AS SumOfAmount FROM tbStock a JOIN tbStockDetail b ON a.ordernumber = b.ordernumber JOIN tbDate c ON a.dateid = c.dateid GROUP BY c.theyear, b.itemid").show

結果以下:

+-------+--------------+------------------+                                     

|theyear|        itemid|       SumOfAmount|

+-------+--------------+------------------+

|   2004|43824480810202|           4474.72|

|   2006|YA214325360101|             556.0|

|   2006|BT624202120102|             360.0|

|   2007|AK215371910101|24603.639999999992|

|   2008|AK216169120201|29144.199999999997|

|   2008|YL526228310106|16073.099999999999|

|   2009|KM529221590106| 5124.800000000001|

|   2004|HT224181030201|2898.6000000000004|

|   2004|SG224308320206|           7307.06|

|   2007|04426485470201|14468.800000000001|

|   2007|84326389100102|           9134.11|

|   2007|B4426438020201|           19884.2|

|   2008|YL427437320101|12331.799999999997|

|   2008|MH215303070101|            8827.0|

|   2009|YL629228280106|           12698.4|

|   2009|BL529298020602|            2415.8|

|   2009|F5127363019006|             614.0|

|   2005|24425428180101|          34890.74|

|   2007|YA214127270101|             240.0|

|   2007|MY127134830105|          11099.92|

+-------+--------------+------------------+

第二步、在第一步的基礎上,統計每一年單個貨品中的最大金額

SELECT d.theyear, MAX(d.SumOfAmount) AS MaxOfAmount

FROM (SELECT c.theyear, b.itemid, SUM(b.amount) AS SumOfAmount

FROM tbStock a

JOIN tbStockDetail b ON a.ordernumber = b.ordernumber

JOIN tbDate c ON a.dateid = c.dateid

GROUP BY c.theyear, b.itemid

) d

GROUP BY d.theyear

 

spark.sql("SELECT d.theyear, MAX(d.SumOfAmount) AS MaxOfAmount FROM (SELECT c.theyear, b.itemid, SUM(b.amount) AS SumOfAmount FROM tbStock a JOIN tbStockDetail b ON a.ordernumber = b.ordernumber JOIN tbDate c ON a.dateid = c.dateid GROUP BY c.theyear, b.itemid ) d GROUP BY d.theyear").show

結果以下:

+-------+------------------+                                                    

|theyear|       MaxOfAmount|

+-------+------------------+

|   2007|           70225.1|

|   2006|          113720.6|

|   2004|53401.759999999995|

|   2009|           30029.2|

|   2005|56627.329999999994|

|   2010|            4494.0|

|   2008| 98003.60000000003|

+-------+------------------+

第三步、用最大銷售額和統計好的每一個貨品的銷售額join,以及用年join,集合獲得最暢銷貨品那一行信息

SELECT DISTINCT e.theyear, e.itemid, f.MaxOfAmount

FROM (SELECT c.theyear, b.itemid, SUM(b.amount) AS SumOfAmount

FROM tbStock a

JOIN tbStockDetail b ON a.ordernumber = b.ordernumber

JOIN tbDate c ON a.dateid = c.dateid

GROUP BY c.theyear, b.itemid

) e

JOIN (SELECT d.theyear, MAX(d.SumOfAmount) AS MaxOfAmount

FROM (SELECT c.theyear, b.itemid, SUM(b.amount) AS SumOfAmount

FROM tbStock a

JOIN tbStockDetail b ON a.ordernumber = b.ordernumber

JOIN tbDate c ON a.dateid = c.dateid

GROUP BY c.theyear, b.itemid

) d

GROUP BY d.theyear

) f ON e.theyear = f.theyear

AND e.SumOfAmount = f.MaxOfAmount

ORDER BY e.theyear

 

spark.sql("SELECT DISTINCT e.theyear, e.itemid, f.maxofamount FROM (SELECT c.theyear, b.itemid, SUM(b.amount) AS sumofamount FROM tbStock a JOIN tbStockDetail b ON a.ordernumber = b.ordernumber JOIN tbDate c ON a.dateid = c.dateid GROUP BY c.theyear, b.itemid ) e JOIN (SELECT d.theyear, MAX(d.sumofamount) AS maxofamount FROM (SELECT c.theyear, b.itemid, SUM(b.amount) AS sumofamount FROM tbStock a JOIN tbStockDetail b ON a.ordernumber = b.ordernumber JOIN tbDate c ON a.dateid = c.dateid GROUP BY c.theyear, b.itemid ) d GROUP BY d.theyear ) f ON e.theyear = f.theyear AND e.sumofamount = f.maxofamount ORDER BY e.theyear").show

結果以下:

+-------+--------------+------------------+                                     

|theyear|        itemid|       maxofamount|

+-------+--------------+------------------+

|   2004|JY424420810101|53401.759999999995|

|   2005|24124118880102|56627.329999999994|

|   2006|JY425468460101|          113720.6|

|   2007|JY425468460101|           70225.1|

|   2008|E2628204040101| 98003.60000000003|

|   2009|YL327439080102|           30029.2|

|   2010|SQ429425090101|            4494.0|

+-------+--------------+------------------+

相關文章
相關標籤/搜索