Spark SQL是Spark用來處理結構化數據的一個模塊,它提供了2個編程抽象:DataFrame和java
DataSet,而且做爲分佈式SQL查詢引擎的做用。mysql
咱們已經學習了Hive,它是將Hive SQL轉換成MapReduce而後提交到集羣上執行,大大簡化了編寫MapReduc的程序的複雜性,因爲MapReduce這種計算模型執行效率比較慢。全部Spark SQL的應運而生,它是將Spark SQL轉換成RDD,而後提交到集羣執行,執行效率很是快!sql
1)易整合shell
2)統一的數據訪問方式數據庫
3)兼容Hiveexpress
4)標準的數據鏈接apache
與RDD相似,DataFrame也是一個分佈式數據容器。然而DataFrame更像傳統數據庫的二維表格,除了數據之外,還記錄數據的結構信息,即schema。同時,與Hive相似,DataFrame也支持嵌套數據類型(struct、array和map)。從API易用性的角度上看,DataFrame API提供的是一套高層的關係操做,比函數式的RDD API要更加友好,門檻更低。編程
上圖直觀地體現了DataFrame和RDD的區別。左側的RDD[Person]雖然以Person爲類型參數,但Spark框架自己不瞭解Person類的內部結構。而右側的DataFrame卻提供了詳細的結構信息,使得Spark SQL能夠清楚地知道該數據集中包含哪些列,每列的名稱和類型各是什麼。DataFrame是爲數據提供了Schema的視圖。能夠把它當作數據庫中的一張表來對待,DataFrame也是懶執行的。性能上比RDD要高,主要緣由:json
優化的執行計劃:查詢計劃經過Spark catalyst optimiser進行優化。緩存
好比下面一個例子:
爲了說明查詢優化,咱們來看上圖展現的人口數據分析的示例。圖中構造了兩個DataFrame,將它們join以後又作了一次filter操做。若是原封不動地執行這個執行計劃,最終的執行效率是不高的。由於join是一個代價較大的操做,也可能會產生一個較大的數據集。若是咱們能將filter下推到 join下方,先對DataFrame進行過濾,再join過濾後的較小的結果集,即可以有效縮短執行時間。而Spark SQL的查詢優化器正是這樣作的。簡而言之,邏輯查詢計劃優化就是一個利用基於關係代數的等價變換,將高成本的操做替換爲低成本操做的過程。
1)是Dataframe API的一個擴展,是Spark最新的數據抽象。
2)用戶友好的API風格,既具備類型安全檢查也具備Dataframe的查詢優化特性。
3)Dataset支持編解碼器,當須要訪問非堆上的數據時能夠避免反序列化整個對象,提升了效率。
4)樣例類被用來在Dataset中定義數據的結構信息,樣例類中每一個屬性的名稱直接映射到DataSet中的字段名稱。
5) Dataframe是Dataset的特列,DataFrame=Dataset[Row] ,因此能夠經過as方法將Dataframe轉換爲Dataset。Row是一個類型,跟Car、Person這些的類型同樣,全部的表結構信息我都用Row來表示。
6)DataSet是強類型的。好比能夠有Dataset[Car],Dataset[Person].
7)DataFrame只是知道字段,可是不知道字段的類型,因此在執行這些操做的時候是沒辦法在編譯的時候檢查是否類型失敗的,好比你能夠對一個String進行減法操做,在執行的時候才報錯,而DataSet不只僅知道字段,並且知道字段類型,因此有更嚴格的錯誤檢查。就跟JSON對象和類對象之間的類比。
在老的版本中,SparkSQL提供兩種SQL查詢起始點:一個叫SQLContext,用於Spark本身提供的SQL查詢;一個叫HiveContext,用於鏈接Hive的查詢。
SparkSession是Spark最新的SQL查詢起始點,實質上是SQLContext和HiveContext的組合,因此在SQLContext和HiveContext上可用的API在SparkSession上一樣是可使用的。SparkSession內部封裝了sparkContext,因此計算其實是由sparkContext完成的。
在Spark SQL中SparkSession是建立DataFrame和執行SQL的入口,建立DataFrame有三種方式:經過Spark的數據源進行建立;從一個存在的RDD進行轉換;還能夠從Hive Table進行查詢返回。
1)從Spark數據源進行創建
(1)查看Spark數據源進行創建的文件格式
scala> spark.read.
csv format jdbc json load option options orc parquet schema table text textFile
(2)讀取json文件建立DataFrame
scala> val df = spark.read.json("/opt/module/spark/examples/src/main/resources/people.json")
df: org.apache.spark.sql.DataFrame = [age: bigint, name: string]
(3)展現結果
scala> df.show
+----+-------+
| age| name|
+----+-------+
|null|Michael|
| 30| Andy|
| 19| Justin|
+----+-------+
2)從RDD進行轉換
2.5節咱們專門討論
3)從Hive Table進行查詢返回
3.3節咱們專門討論
1)建立一個DataFrame
scala> val df = spark.read.json("/opt/module/spark/examples/src/main/resources/people.json")
df: org.apache.spark.sql.DataFrame = [age: bigint, name: string]
2)對DataFrame建立一個臨時表
scala> df.createOrReplaceTempView("people")
3)經過SQL語句實現查詢全表
scala> val sqlDF = spark.sql("SELECT * FROM people")
sqlDF: org.apache.spark.sql.DataFrame = [age: bigint, name: string]
4)結果展現
scala> sqlDF.show
+----+-------+
| age| name|
+----+-------+
|null|Michael|
| 30| Andy|
| 19| Justin|
+----+-------+
注意:臨時表是Session範圍內的,Session退出後,表就失效了。若是想應用範圍內有效,可使用全局表。注意使用全局表時須要全路徑訪問,如:global_temp.people
5)對於DataFrame建立一個全局表
scala> df.createGlobalTempView("people")
6)經過SQL語句實現查詢全表
scala> spark.sql("SELECT * FROM global_temp.people").show()
+----+-------+
| age| name|
+----+-------+
|null|Michael|
| 30| Andy|
| 19| Justin|
scala> spark.newSession().sql("SELECT * FROM global_temp.people").show()
+----+-------+
| age| name|
+----+-------+
|null|Michael|
| 30| Andy|
| 19| Justin|
+----+-------+
1)建立一個DateFrame
scala> spark.read.
csv format jdbc json load option options orc parquet schema table text textFile
2)查看DataFrame的Schema信息
scala> df.printSchema
root
|-- age: long (nullable = true)
|-- name: string (nullable = true)
3)只查看」name」列數據
scala> df.select("name").show()
+-------+
| name|
+-------+
|Michael|
| Andy|
| Justin|
+-------+
4)查看」name」列數據以及」age+1」數據
scala> df.select($"name", $"age" + 1).show()
+-------+---------+
| name|(age + 1)|
+-------+---------+
|Michael| null|
| Andy| 31|
| Justin| 20|
+-------+---------+
5)查看」age」大於」21」的數據
scala> df.filter($"age" > 21).show()
+---+----+
|age|name|
+---+----+
| 30|Andy|
+---+----+
6)按照」age」分組,查看數據條數
scala> df.groupBy("age").count().show()
+----+-----+
| age|count|
+----+-----+
| 19| 1|
|null| 1|
| 30| 1|
+----+-----+
注意:若是須要RDD與DF或者DS之間操做,那麼都須要引入 import spark.implicits._ 【spark不是包名,而是sparkSession對象的名稱】
前置條件:導入隱式轉換並建立一個RDD
scala> import spark.implicits._
import spark.implicits._
scala> val peopleRDD = sc.textFile("examples/src/main/resources/people.txt")
peopleRDD: org.apache.spark.rdd.RDD[String] = examples/src/main/resources/people.txt MapPartitionsRDD[3] at textFile at <console>:27
1)經過手動肯定轉換
scala> peopleRDD.map{x=>val para = x.split(",");(para(0),para(1).trim.toInt)}.toDF("name","age")
res1: org.apache.spark.sql.DataFrame = [name: string, age: int]
2)經過反射肯定(須要用到樣例類)
(1)建立一個樣例類
scala> case class People(name:String, age:Int)
(2)根據樣例類將RDD轉換爲DataFrame
scala> peopleRDD.map{ x => val para = x.split(",");People(para(0),para(1).trim.toInt)}.toDF
res2: org.apache.spark.sql.DataFrame = [name: string, age: int]
3)經過編程的方式(瞭解)
(1)導入所需的類型
scala> import org.apache.spark.sql.types._
import org.apache.spark.sql.types._
(2)建立Schema
scala> val structType: StructType = StructType(StructField("name", StringType) :: StructField("age", IntegerType) :: Nil)
structType: org.apache.spark.sql.types.StructType = StructType(StructField(name,StringType,true), StructField(age,IntegerType,true))
(3)導入所需的類型
scala> import org.apache.spark.sql.Row
import org.apache.spark.sql.Row
(4)根據給定的類型建立二元組RDD
scala> val data = peopleRDD.map{ x => val para = x.split(",");Row(para(0),para(1).trim.toInt)}
data: org.apache.spark.rdd.RDD[org.apache.spark.sql.Row] = MapPartitionsRDD[6] at map at <console>:33
(5)根據數據及給定的schema建立DataFrame
scala> val dataFrame = spark.createDataFrame(data, structType)
dataFrame: org.apache.spark.sql.DataFrame = [name: string, age: int]
直接調用rdd便可
1)建立一個DataFrame
scala> val df = spark.read.json("/opt/module/spark/examples/src/main/resources/people.json")
df: org.apache.spark.sql.DataFrame = [age: bigint, name: string]
2)將DataFrame轉換爲RDD
scala> val dfToRDD = df.rdd
dfToRDD: org.apache.spark.rdd.RDD[org.apache.spark.sql.Row] = MapPartitionsRDD[19] at rdd at <console>:29
3)打印RDD
scala> dfToRDD.collect
res13: Array[org.apache.spark.sql.Row] = Array([Michael, 29], [Andy, 30], [Justin, 19])
Dataset是具備強類型的數據集合,須要提供對應的類型信息。
1)建立一個樣例類
scala> case class Person(name: String, age: Long)
defined class Person
2)建立DataSet
scala> val caseClassDS = Seq(Person("Andy", 32)).toDS()
caseClassDS: org.apache.spark.sql.Dataset[Person] = [name: string, age: bigint]
SparkSQL可以自動將包含有case類的RDD轉換成DataFrame,case類定義了table的結構,case類屬性經過反射變成了表的列名。Case類能夠包含諸如Seqs或者Array等複雜的結構。
1)建立一個RDD
scala> val peopleRDD = sc.textFile("examples/src/main/resources/people.txt")
peopleRDD: org.apache.spark.rdd.RDD[String] = examples/src/main/resources/people.txt MapPartitionsRDD[3] at textFile at <console>:27
2)建立一個樣例類
scala> case class Person(name: String, age: Long)
defined class Person
3)將RDD轉化爲DataSet
4)
scala> peopleRDD.map(line => {val para = line.split(",");Person(para(0),para(1).trim.toInt)}).toDS()
res8: org.apache.spark.sql.DataSet[People] = [name: string, age: bigint]
調用rdd方法便可。
1)建立一個DataSet
scala> val DS = Seq(Person("Andy", 32)).toDS()
DS: org.apache.spark.sql.Dataset[Person] = [name: string, age: bigint]
2)將DataSet轉換爲RDD
scala> DS.rdd
res11: org.apache.spark.rdd.RDD[Person] = MapPartitionsRDD[15] at rdd at <console>:28
1. DataFrame轉換爲DataSet
1)建立一個DateFrame
scala> val df = spark.read.json("examples/src/main/resources/people.json")
df: org.apache.spark.sql.DataFrame = [age: bigint, name: string]
2)建立一個樣例類
scala> case class Person(name: String, age: Long)
defined class Person
3)將DateFrame轉化爲DataSet
scala> df.as[Person]
res14: org.apache.spark.sql.Dataset[Person] = [age: bigint, name: string]
2. DataSet轉換爲DataFrame
1)建立一個樣例類
scala> case class Person(name: String, age: Long)
defined class Person
2)建立DataSet
scala> val ds = Seq(Person("Andy", 32)).toDS()
ds: org.apache.spark.sql.Dataset[Person] = [name: string, age: bigint]
3)將DataSet轉化爲DataFrame
scala> val df = ds.toDF
df: org.apache.spark.sql.DataFrame = [name: string, age: bigint]
4)展現
scala> df.show
+----+---+
|name|age|
+----+---+
|Andy| 32|
+----+---+
這個很簡單,由於只是把case class封裝成Row
(1)導入隱式轉換
import spark.implicits._
(2)轉換
val testDF = testDS.toDF
(1)導入隱式轉換
import spark.implicits._
(2)建立樣例類
case class Coltest(col1:String,col2:Int)extends Serializable //定義字段名和類型
(3)轉換
val testDS = testDF.as[Coltest]
這種方法就是在給出每一列的類型後,使用as方法,轉成Dataset,這在數據類型是DataFrame又須要針對各個字段處理時極爲方便。在使用一些特殊的操做時,必定要加上 import spark.implicits._ 否則toDF、toDS沒法使用。
在SparkSQL中Spark爲咱們提供了兩個新的抽象,分別是DataFrame和DataSet。他們和RDD有什麼區別呢?首先從版本的產生上來看:
RDD (Spark1.0) —> Dataframe(Spark1.3) —> Dataset(Spark1.6)
若是一樣的數據都給到這三個數據結構,他們分別計算以後,都會給出相同的結果。不一樣是的他們的執行效率和執行方式。
在後期的Spark版本中,DataSet會逐步取代RDD和DataFrame成爲惟一的API接口。
1、RDD、DataFrame、Dataset全都是spark平臺下的分佈式彈性數據集,爲處理超大型數據提供便利
2、三者都有惰性機制,在進行建立、轉換,如map方法時,不會當即執行,只有在遇到Action如foreach時,三者纔會開始遍歷運算。
3、三者都會根據spark的內存狀況自動緩存運算,這樣即便數據量很大,也不用擔憂會內存溢出
4、三者都有partition的概念
5、三者有許多共同的函數,如filter,排序等
6、在對DataFrame和Dataset進行操做許多操做都須要這個包進行支持
import spark.implicits._
7、DataFrame和Dataset都可使用模式匹配獲取各個字段的值和類型
DataFrame:
testDF.map{
case Row(col1:String,col2:Int)=>
println(col1);println(col2)
col1
case _=>
""
}
Dataset:
case class Coltest(col1:String,col2:Int)extends Serializable //定義字段名和類型
testDS.map{
case Coltest(col1:String,col2:Int)=>
println(col1);println(col2)
col1
case _=>
""
}
1. RDD:
1)RDD通常和spark mlib同時使用
2)RDD不支持sparksql操做
2. DataFrame:
1)與RDD和Dataset不一樣,DataFrame每一行的類型固定爲Row,每一列的值無法直接訪問,只有經過解析才能獲取各個字段的值,如:
testDF.foreach{
line =>
val col1=line.getAs[String]("col1")
val col2=line.getAs[String]("col2")
}
2)DataFrame與Dataset通常不與spark mlib同時使用
3)DataFrame與Dataset均支持sparksql的操做,好比select,groupby之類,還能註冊臨時表/視窗,進行sql語句操做,如:
dataDF.createOrReplaceTempView("tmp")
spark.sql("select ROW,DATE from tmp where DATE is not null order by DATE").show(100,false)
4)DataFrame與Dataset支持一些特別方便的保存方式,好比保存成csv,能夠帶上表頭,這樣每一列的字段名一目瞭然
//保存
val saveoptions = Map("header" -> "true", "delimiter" -> "\t", "path" -> "hdfs://hadoop102:9000/test")
datawDF.write.format("com.atguigu.spark.csv").mode(SaveMode.Overwrite).options(saveoptions).save()
//讀取
val options = Map("header" -> "true", "delimiter" -> "\t", "path" -> "hdfs://hadoop102:9000/test")
val datarDF= spark.read.options(options).format("com.atguigu.spark.csv").load()
利用這樣的保存方式,能夠方便的得到字段名和列的對應,並且分隔符(delimiter)能夠自由指定。
3. Dataset:
1)Dataset和DataFrame擁有徹底相同的成員函數,區別只是每一行的數據類型不一樣。
2)DataFrame也能夠叫Dataset[Row],每一行的類型是Row,不解析,每一行究竟有哪些字段,各個字段又是什麼類型都無從得知,只能用上面提到的getAS方法或者共性中的第七條提到的模式匹配拿出特定字段。而Dataset中,每一行是什麼類型是不必定的,在自定義了case class以後能夠很自由的得到每一行的信息
case class Coltest(col1:String,col2:Int)extends Serializable //定義字段名和類型
/**
rdd
("a", 1)
("b", 1)
("a", 1)
**/
val test: Dataset[Coltest]=rdd.map{line=>
Coltest(line._1,line._2)
}.toDS
test.map{
line=>
println(line.col1)
println(line.col2)
}
能夠看出,Dataset在須要訪問列中的某個字段時是很是方便的,然而,若是要寫一些適配性很強的函數時,若是使用Dataset,行的類型又不肯定,多是各類case class,沒法實現適配,這時候用DataFrame即Dataset[Row]就能比較好的解決問題
IDEA中程序的打包和運行方式都和SparkCore相似,Maven依賴中須要添加新的依賴項:
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-sql_2.11</artifactId>
<version>2.1.1</version>
</dependency>
程序以下:
package com.atguigu.sparksql
import org.apache.spark.sql.SparkSession
import org.apache.spark.{SparkConf, SparkContext}
import org.slf4j.LoggerFactory
object HelloWorld {
def main(args: Array[String]) {
//建立SparkConf()並設置App名稱
val spark = SparkSession
.builder().master(「local[*]」)
.appName("Spark SQL basic example")
.getOrCreate()
// For implicit conversions like converting RDDs to DataFrames
import spark.implicits._
val df = spark.read.json("file:/F:/hajima.json")
// Displays the content of the DataFrame to stdout
df.show()
df.filter($"age" > 21).show()
df.createOrReplaceTempView("persons")
spark.sql("SELECT * FROM persons where age > 21").show()
spark.stop()
}
}
在Shell窗口中能夠經過spark.udf功能用戶能夠自定義函數。
scala> val df = spark.read.json("examples/src/main/resources/people.json")
df: org.apache.spark.sql.DataFrame = [age: bigint, name: string]
scala> df.show()
+----+-------+
| age| name|
+----+-------+
|null|Michael|
| 30| Andy|
| 19| Justin|
+----+-------+
scala> spark.udf.register("addName", (x:String)=> "Name:"+x)
res5: org.apache.spark.sql.expressions.UserDefinedFunction = UserDefinedFunction(<function1>,StringType,Some(List(StringType)))
scala> df.createOrReplaceTempView("people")
scala> spark.sql("Select addName(name), age from people").show()
+-----------------+----+
|UDF:addName(name)| age|
+-----------------+----+
| Name:Michael|null|
| Name:Andy| 30|
| Name:Justin| 19|
+-----------------+----+
強類型的Dataset和弱類型的DataFrame都提供了相關的聚合函數, 如 count(),countDistinct(),avg(),max(),min()。除此以外,用戶能夠設定本身的自定義聚合函數。
弱類型用戶自定義聚合函數:經過繼承UserDefinedAggregateFunction來實現用戶自定義聚合函數。下面展現一個求平均工資的自定義聚合函數。
import org.apache.spark.sql.expressions.MutableAggregationBuffer
import org.apache.spark.sql.expressions.UserDefinedAggregateFunction
import org.apache.spark.sql.types._
import org.apache.spark.sql.Row
import org.apache.spark.sql.SparkSession
object MyAverage extends UserDefinedAggregateFunction {
// 聚合函數輸入參數的數據類型
def inputSchema: StructType = StructType(StructField("inputColumn", LongType) :: Nil)
// 聚合緩衝區中值得數據類型
def bufferSchema: StructType = {
StructType(StructField("sum", LongType) :: StructField("count", LongType) :: Nil)
}
// 返回值的數據類型
def dataType: DataType = DoubleType
// 對於相同的輸入是否一直返回相同的輸出。
def deterministic: Boolean = true
// 初始化
def initialize(buffer: MutableAggregationBuffer): Unit = {
// 存工資的總額
buffer(0) = 0L
// 存工資的個數
buffer(1) = 0L
}
// 相同Execute間的數據合併。
def update(buffer: MutableAggregationBuffer, input: Row): Unit = {
if (!input.isNullAt(0)) {
buffer(0) = buffer.getLong(0) + input.getLong(0)
buffer(1) = buffer.getLong(1) + 1
}
}
// 不一樣Execute間的數據合併
def merge(buffer1: MutableAggregationBuffer, buffer2: Row): Unit = {
buffer1(0) = buffer1.getLong(0) + buffer2.getLong(0)
buffer1(1) = buffer1.getLong(1) + buffer2.getLong(1)
}
// 計算最終結果
def evaluate(buffer: Row): Double = buffer.getLong(0).toDouble / buffer.getLong(1)
}
// 註冊函數
spark.udf.register("myAverage", MyAverage)
val df = spark.read.json("examples/src/main/resources/employees.json")
df.createOrReplaceTempView("employees")
df.show()
// +-------+------+
// | name|salary|
// +-------+------+
// |Michael| 3000|
// | Andy| 4500|
// | Justin| 3500|
// | Berta| 4000|
// +-------+------+
val result = spark.sql("SELECT myAverage(salary) as average_salary FROM employees")
result.show()
// +--------------+
// |average_salary|
// +--------------+
// | 3750.0|
// +--------------+
強類型用戶自定義聚合函數:經過繼承Aggregator來實現強類型自定義聚合函數,一樣是求平均工資
import org.apache.spark.sql.expressions.Aggregator
import org.apache.spark.sql.Encoder
import org.apache.spark.sql.Encoders
import org.apache.spark.sql.SparkSession
// 既然是強類型,可能有case類
case class Employee(name: String, salary: Long)
case class Average(var sum: Long, var count: Long)
object MyAverage extends Aggregator[Employee, Average, Double] {
// 定義一個數據結構,保存工資總數和工資總個數,初始都爲0
def zero: Average = Average(0L, 0L)
// Combine two values to produce a new value. For performance, the function may modify `buffer`
// and return it instead of constructing a new object
def reduce(buffer: Average, employee: Employee): Average = {
buffer.sum += employee.salary
buffer.count += 1
buffer
}
// 聚合不一樣execute的結果
def merge(b1: Average, b2: Average): Average = {
b1.sum += b2.sum
b1.count += b2.count
b1
}
// 計算輸出
def finish(reduction: Average): Double = reduction.sum.toDouble / reduction.count
// 設定之間值類型的編碼器,要轉換成case類
// Encoders.product是進行scala元組和case類轉換的編碼器
def bufferEncoder: Encoder[Average] = Encoders.product
// 設定最終輸出值的編碼器
def outputEncoder: Encoder[Double] = Encoders.scalaDouble
}
import spark.implicits._
val ds = spark.read.json("examples/src/main/resources/employees.json").as[Employee]
ds.show()
// +-------+------+
// | name|salary|
// +-------+------+
// |Michael| 3000|
// | Andy| 4500|
// | Justin| 3500|
// | Berta| 4000|
// +-------+------+
// Convert the function to a `TypedColumn` and give it a name
val averageSalary = MyAverage.toColumn.name("average_salary")
val result = ds.select(averageSalary)
result.show()
// +--------------+
// |average_salary|
// +--------------+
// | 3750.0|
// +--------------+
Spark SQL的DataFrame接口支持多種數據源的操做。一個DataFrame能夠進行RDDs方式的操做,也能夠被註冊爲臨時表。把DataFrame註冊爲臨時表以後,就能夠對該DataFrame執行SQL查詢。
Spark SQL的默認數據源爲Parquet格式。數據源爲Parquet文件時,Spark SQL能夠方便的執行全部的操做。修改配置項spark.sql.sources.default,可修改默認數據源格式。
val df = spark.read.load("examples/src/main/resources/users.parquet") df.select("name", "favorite_color").write.save("namesAndFavColors.parquet")
當數據源格式不是parquet格式文件時,須要手動指定數據源的格式。數據源格式須要指定全名(例如:org.apache.spark.sql.parquet),若是數據源格式爲內置格式,則只須要指定簡稱定json, parquet, jdbc, orc, libsvm, csv, text來指定數據的格式。
能夠經過SparkSession提供的read.load方法用於通用加載數據,使用write和save保存數據。
val peopleDF = spark.read.format("json").load("examples/src/main/resources/people.json")
peopleDF.write.format("parquet").save("hdfs://hadoop102:9000/namesAndAges.parquet")
除此以外,能夠直接運行SQL在文件上:
val sqlDF = spark.sql("SELECT * FROM parquet.`hdfs://hadoop102:9000/namesAndAges.parquet`")
sqlDF.show()
scala> val peopleDF = spark.read.format("json").load("examples/src/main/resources/people.json")
peopleDF: org.apache.spark.sql.DataFrame = [age: bigint, name: string]
scala> peopleDF.write.format("parquet").save("hdfs://hadoop102:9000/namesAndAges.parquet")
scala> peopleDF.show()
+----+-------+
| age| name|
+----+-------+
|null|Michael|
| 30| Andy|
| 19| Justin|
+----+-------+
scala> val sqlDF = spark.sql("SELECT * FROM parquet.`hdfs:// hadoop102:9000/namesAndAges.parquet`")
17/09/05 04:21:11 WARN ObjectStore: Failed to get database parquet, returning NoSuchObjectException
sqlDF: org.apache.spark.sql.DataFrame = [age: bigint, name: string]
scala> sqlDF.show()
+----+-------+
| age| name|
+----+-------+
|null|Michael|
| 30| Andy|
| 19| Justin|
+----+-------+
能夠採用SaveMode執行存儲操做,SaveMode定義了對數據的處理模式。須要注意的是,這些保存模式不使用任何鎖定,不是原子操做。此外,當使用Overwrite方式執行時,在輸出新數據以前原數據就已經被刪除。SaveMode詳細介紹以下表:
Scala/Java |
Any Language |
Meaning |
SaveMode.ErrorIfExists(default) |
"error"(default) |
若是文件存在,則報錯 |
SaveMode.Append |
"append" |
追加 |
SaveMode.Overwrite |
"overwrite" |
覆寫 |
SaveMode.Ignore |
"ignore" |
數據存在,則忽略 |
Spark SQL 可以自動推測 JSON數據集的結構,並將它加載爲一個Dataset[Row]. 能夠經過SparkSession.read.json()去加載一個 一個JSON 文件。
注意:這個JSON文件不是一個傳統的JSON文件,每一行都得是一個JSON串。
{"name":"Michael"}
{"name":"Andy", "age":30}
{"name":"Justin", "age":19}
// Primitive types (Int, String, etc) and Product types (case classes) encoders are
// supported by importing this when creating a Dataset.
import spark.implicits._
// A JSON dataset is pointed to by path.
// The path can be either a single text file or a directory storing text files
val path = "examples/src/main/resources/people.json"
val peopleDF = spark.read.json(path)
// The inferred schema can be visualized using the printSchema() method
peopleDF.printSchema()
// root
// |-- age: long (nullable = true)
// |-- name: string (nullable = true)
// Creates a temporary view using the DataFrame
peopleDF.createOrReplaceTempView("people")
// SQL statements can be run by using the sql methods provided by spark
val teenagerNamesDF = spark.sql("SELECT name FROM people WHERE age BETWEEN 13 AND 19")
teenagerNamesDF.show()
// +------+
// | name|
// +------+
// |Justin|
// +------+
// Alternatively, a DataFrame can be created for a JSON dataset represented by
// a Dataset[String] storing one JSON object per string
val otherPeopleDataset = spark.createDataset(
"""{"name":"Yin","address":{"city":"Columbus","state":"Ohio"}}""" :: Nil)
val otherPeople = spark.read.json(otherPeopleDataset)
otherPeople.show()
// +---------------+----+
// | address|name|
// +---------------+----+
// |[Columbus,Ohio]| Yin|
Parquet是一種流行的列式存儲格式,能夠高效地存儲具備嵌套字段的記錄。Parquet格式常常在Hadoop生態圈中被使用,它也支持Spark SQL的所有數據類型。Spark SQL 提供了直接讀取和存儲 Parquet 格式文件的方法。
importing spark.implicits._
import spark.implicits._
val peopleDF = spark.read.json("examples/src/main/resources/people.json")
peopleDF.write.parquet("hdfs://hadoop102:9000/people.parquet")
val parquetFileDF = spark.read.parquet("hdfs:// hadoop102:9000/people.parquet")
parquetFileDF.createOrReplaceTempView("parquetFile")
val namesDF = spark.sql("SELECT name FROM parquetFile WHERE age BETWEEN 13 AND 19")
namesDF.map(attributes => "Name: " + attributes(0)).show()
// +------------+
// | value|
// +------------+
// |Name: Justin|
// +------------+
Spark SQL能夠經過JDBC從關係型數據庫中讀取數據的方式建立DataFrame,經過對DataFrame一系列的計算後,還能夠將數據再寫回關係型數據庫中。
注意:須要將相關的數據庫驅動放到spark的類路徑下。
$ bin/spark-shell --master spark://hadoop102:7077 --jars mysql-connector-java-5.1.27-bin.jar
// Note: JDBC loading and saving can be achieved via either the load/save or jdbc methods
// Loading data from a JDBC source
val jdbcDF = spark.read.format("jdbc").option("url", "jdbc:mysql://hadoop102:3306/rdd").option("dbtable", " rddtable").option("user", "root").option("password", "000000").load()
val connectionProperties = new Properties()
connectionProperties.put("user", "root")
connectionProperties.put("password", "hive")
val jdbcDF2 = spark.read
.jdbc("jdbc:mysql://hadoop102:3306/rdd", "rddtable", connectionProperties)
// Saving data to a JDBC source
jdbcDF.write
.format("jdbc")
.option("url", "jdbc:mysql://hadoop102:3306/rdd")
.option("dbtable", "dftable")
.option("user", "root")
.option("password", "000000")
.save()
jdbcDF2.write
.jdbc("jdbc:mysql://hadoop102:3306/mysql", "db", connectionProperties)
Apache Hive是Hadoop上的SQL引擎,Spark SQL編譯時能夠包含Hive支持,也能夠不包含。包含Hive支持的Spark SQL能夠支持Hive表訪問、UDF(用戶自定義函數)以及 Hive 查詢語言(HiveQL/HQL)等。須要強調的一點是,若是要在Spark SQL中包含Hive的庫,並不須要事先安裝Hive。通常來講,最好仍是在編譯Spark SQL時引入Hive支持,這樣就可使用這些特性了。若是你下載的是二進制版本的 Spark,它應該已經在編譯時添加了 Hive 支持。
若要把Spark SQL鏈接到一個部署好的Hive上,你必須把hive-site.xml複製到 Spark的配置文件目錄中($SPARK_HOME/conf)。即便沒有部署好Hive,Spark SQL也能夠運行。 須要注意的是,若是你沒有部署好Hive,Spark SQL會在當前的工做目錄中建立出本身的Hive 元數據倉庫,叫做 metastore_db。此外,若是你嘗試使用 HiveQL 中的 CREATE TABLE (並不是 CREATE EXTERNAL TABLE)語句來建立表,這些表會被放在你默認的文件系統中的 /user/hive/warehouse 目錄中(若是你的 classpath 中有配好的 hdfs-site.xml,默認的文件系統就是 HDFS,不然就是本地文件系統)。
import java.io.File
import org.apache.spark.sql.Row
import org.apache.spark.sql.SparkSession
case class Record(key: Int, value: String)
// warehouseLocation points to the default location for managed databases and tables
val warehouseLocation = new File("spark-warehouse").getAbsolutePath
val spark = SparkSession
.builder()
.appName("Spark Hive Example")
.config("spark.sql.warehouse.dir", warehouseLocation)
.enableHiveSupport()
.getOrCreate()
import spark.implicits._
import spark.sql
sql("CREATE TABLE IF NOT EXISTS src (key INT, value STRING)")
sql("LOAD DATA LOCAL INPATH 'examples/src/main/resources/kv1.txt' INTO TABLE src")
// Queries are expressed in HiveQL
sql("SELECT * FROM src").show()
// +---+-------+
// |key| value|
// +---+-------+
// |238|val_238|
// | 86| val_86|
// |311|val_311|
// ...
// Aggregation queries are also supported.
sql("SELECT COUNT(*) FROM src").show()
// +--------+
// |count(1)|
// +--------+
// | 500 |
// +--------+
// The results of SQL queries are themselves DataFrames and support all normal functions.
val sqlDF = sql("SELECT key, value FROM src WHERE key < 10 ORDER BY key")
// The items in DataFrames are of type Row, which allows you to access each column by ordinal.
val stringsDS = sqlDF.map {
case Row(key: Int, value: String) => s"Key: $key, Value: $value"
}
stringsDS.show()
// +--------------------+
// | value|
// +--------------------+
// |Key: 0, Value: val_0|
// |Key: 0, Value: val_0|
// |Key: 0, Value: val_0|
// ...
// You can also use DataFrames to create temporary views within a SparkSession.
val recordsDF = spark.createDataFrame((1 to 100).map(i => Record(i, s"val_$i")))
recordsDF.createOrReplaceTempView("records")
// Queries can then join DataFrame data with data stored in Hive.
sql("SELECT * FROM records r JOIN src s ON r.key = s.key").show()
// +---+------+---+------+
// |key| value|key| value|
// +---+------+---+------+
// | 2| val_2| 2| val_2|
// | 4| val_4| 4| val_4|
// | 5| val_5| 5| val_5|
若是要使用內嵌的Hive,什麼都不用作,直接用就能夠了。 --conf : spark.sql.warehouse.dir=
注意:若是你使用的是內部的Hive,在Spark2.0以後,spark.sql.warehouse.dir用於指定數據倉庫的地址,若是你須要是用HDFS做爲路徑,那麼須要將core-site.xml和hdfs-site.xml 加入到Spark conf目錄,不然只會建立master節點上的warehouse目錄,查詢時會出現文件找不到的問題,這是須要向使用HDFS,則須要將metastore刪除,重啓集羣。
若是想鏈接外部已經部署好的Hive,須要經過如下幾個步驟。
1) 將Hive中的hive-site.xml拷貝或者軟鏈接到Spark安裝目錄下的conf目錄下。
2) 打開spark shell,注意帶上訪問Hive元數據庫的JDBC客戶端
$ bin/spark-shell --master spark://hadoop102:7077 --jars mysql-connector-java-5.1.27-bin.jar
Spark SQL CLI能夠很方便的在本地運行Hive元數據服務以及從命令行執行查詢任務。在Spark目錄下執行以下命令啓動Spark SQL CLI:
./bin/spark-sql
配置Hive須要替換 conf/ 下的 hive-site.xml 。
// +---------------+----+
數據集是貨品交易數據集。
每一個訂單可能包含多個貨品,每一個訂單能夠產生屢次交易,不一樣的貨品有不一樣的單價。
tbStock:
scala> case class tbStock(ordernumber:String,locationid:String,dateid:String) extends Serializable
defined class tbStock
scala> val tbStockRdd = spark.sparkContext.textFile("tbStock.txt")
tbStockRdd: org.apache.spark.rdd.RDD[String] = tbStock.txt MapPartitionsRDD[1] at textFile at <console>:23
scala> val tbStockDS = tbStockRdd.map(_.split(",")).map(attr=>tbStock(attr(0),attr(1),attr(2))).toDS
tbStockDS: org.apache.spark.sql.Dataset[tbStock] = [ordernumber: string, locationid: string ... 1 more field]
scala> tbStockDS.show()
+------------+----------+---------+
| ordernumber|locationid| dataid|
+------------+----------+---------+
|BYSL00000893| ZHAO|2007-8-23|
|BYSL00000897| ZHAO|2007-8-24|
|BYSL00000898| ZHAO|2007-8-25|
|BYSL00000899| ZHAO|2007-8-26|
|BYSL00000900| ZHAO|2007-8-26|
|BYSL00000901| ZHAO|2007-8-27|
|BYSL00000902| ZHAO|2007-8-27|
|BYSL00000904| ZHAO|2007-8-28|
|BYSL00000905| ZHAO|2007-8-28|
|BYSL00000906| ZHAO|2007-8-28|
|BYSL00000907| ZHAO|2007-8-29|
|BYSL00000908| ZHAO|2007-8-30|
|BYSL00000909| ZHAO| 2007-9-1|
|BYSL00000910| ZHAO| 2007-9-1|
|BYSL00000911| ZHAO|2007-8-31|
|BYSL00000912| ZHAO| 2007-9-2|
|BYSL00000913| ZHAO| 2007-9-3|
|BYSL00000914| ZHAO| 2007-9-3|
|BYSL00000915| ZHAO| 2007-9-4|
|BYSL00000916| ZHAO| 2007-9-4|
+------------+----------+---------+
only showing top 20 rows
tbStockDetail:
scala> case class tbStockDetail(ordernumber:String, rownum:Int, itemid:String, number:Int, price:Double, amount:Double) extends Serializable
defined class tbStockDetail
scala> val tbStockDetailRdd = spark.sparkContext.textFile("tbStockDetail.txt")
tbStockDetailRdd: org.apache.spark.rdd.RDD[String] = tbStockDetail.txt MapPartitionsRDD[13] at textFile at <console>:23
scala> val tbStockDetailDS = tbStockDetailRdd.map(_.split(",")).map(attr=> tbStockDetail(attr(0),attr(1).trim().toInt,attr(2),attr(3).trim().toInt,attr(4).trim().toDouble, attr(5).trim().toDouble)).toDS
tbStockDetailDS: org.apache.spark.sql.Dataset[tbStockDetail] = [ordernumber: string, rownum: int ... 4 more fields]
scala> tbStockDetailDS.show()
+------------+------+--------------+------+-----+------+
| ordernumber|rownum| itemid|number|price|amount|
+------------+------+--------------+------+-----+------+
|BYSL00000893| 0|FS527258160501| -1|268.0|-268.0|
|BYSL00000893| 1|FS527258169701| 1|268.0| 268.0|
|BYSL00000893| 2|FS527230163001| 1|198.0| 198.0|
|BYSL00000893| 3|24627209125406| 1|298.0| 298.0|
|BYSL00000893| 4|K9527220210202| 1|120.0| 120.0|
|BYSL00000893| 5|01527291670102| 1|268.0| 268.0|
|BYSL00000893| 6|QY527271800242| 1|158.0| 158.0|
|BYSL00000893| 7|ST040000010000| 8| 0.0| 0.0|
|BYSL00000897| 0|04527200711305| 1|198.0| 198.0|
|BYSL00000897| 1|MY627234650201| 1|120.0| 120.0|
|BYSL00000897| 2|01227111791001| 1|249.0| 249.0|
|BYSL00000897| 3|MY627234610402| 1|120.0| 120.0|
|BYSL00000897| 4|01527282681202| 1|268.0| 268.0|
|BYSL00000897| 5|84126182820102| 1|158.0| 158.0|
|BYSL00000897| 6|K9127105010402| 1|239.0| 239.0|
|BYSL00000897| 7|QY127175210405| 1|199.0| 199.0|
|BYSL00000897| 8|24127151630206| 1|299.0| 299.0|
|BYSL00000897| 9|G1126101350002| 1|158.0| 158.0|
|BYSL00000897| 10|FS527258160501| 1|198.0| 198.0|
|BYSL00000897| 11|ST040000010000| 13| 0.0| 0.0|
+------------+------+--------------+------+-----+------+
only showing top 20 rows
tbDate:
scala> case class tbDate(dateid:String, years:Int, theyear:Int, month:Int, day:Int, weekday:Int, week:Int, quarter:Int, period:Int, halfmonth:Int) extends Serializable
defined class tbDate
scala> val tbDateRdd = spark.sparkContext.textFile("tbDate.txt")
tbDateRdd: org.apache.spark.rdd.RDD[String] = tbDate.txt MapPartitionsRDD[20] at textFile at <console>:23
scala> val tbDateDS = tbDateRdd.map(_.split(",")).map(attr=> tbDate(attr(0),attr(1).trim().toInt, attr(2).trim().toInt,attr(3).trim().toInt, attr(4).trim().toInt, attr(5).trim().toInt, attr(6).trim().toInt, attr(7).trim().toInt, attr(8).trim().toInt, attr(9).trim().toInt)).toDS
tbDateDS: org.apache.spark.sql.Dataset[tbDate] = [dateid: string, years: int ... 8 more fields]
scala> tbDateDS.show()
+---------+------+-------+-----+---+-------+----+-------+------+---------+
| dateid| years|theyear|month|day|weekday|week|quarter|period|halfmonth|
+---------+------+-------+-----+---+-------+----+-------+------+---------+
| 2003-1-1|200301| 2003| 1| 1| 3| 1| 1| 1| 1|
| 2003-1-2|200301| 2003| 1| 2| 4| 1| 1| 1| 1|
| 2003-1-3|200301| 2003| 1| 3| 5| 1| 1| 1| 1|
| 2003-1-4|200301| 2003| 1| 4| 6| 1| 1| 1| 1|
| 2003-1-5|200301| 2003| 1| 5| 7| 1| 1| 1| 1|
| 2003-1-6|200301| 2003| 1| 6| 1| 2| 1| 1| 1|
| 2003-1-7|200301| 2003| 1| 7| 2| 2| 1| 1| 1|
| 2003-1-8|200301| 2003| 1| 8| 3| 2| 1| 1| 1|
| 2003-1-9|200301| 2003| 1| 9| 4| 2| 1| 1| 1|
|2003-1-10|200301| 2003| 1| 10| 5| 2| 1| 1| 1|
|2003-1-11|200301| 2003| 1| 11| 6| 2| 1| 2| 1|
|2003-1-12|200301| 2003| 1| 12| 7| 2| 1| 2| 1|
|2003-1-13|200301| 2003| 1| 13| 1| 3| 1| 2| 1|
|2003-1-14|200301| 2003| 1| 14| 2| 3| 1| 2| 1|
|2003-1-15|200301| 2003| 1| 15| 3| 3| 1| 2| 1|
|2003-1-16|200301| 2003| 1| 16| 4| 3| 1| 2| 2|
|2003-1-17|200301| 2003| 1| 17| 5| 3| 1| 2| 2|
|2003-1-18|200301| 2003| 1| 18| 6| 3| 1| 2| 2|
|2003-1-19|200301| 2003| 1| 19| 7| 3| 1| 2| 2|
|2003-1-20|200301| 2003| 1| 20| 1| 4| 1| 2| 2|
+---------+------+-------+-----+---+-------+----+-------+------+---------+
only showing top 20 rows
註冊表:
scala> tbStockDS.createOrReplaceTempView("tbStock")
scala> tbDateDS.createOrReplaceTempView("tbDate")
scala> tbStockDetailDS.createOrReplaceTempView("tbStockDetail")
統計全部訂單中每一年的銷售單數、銷售總額
三個錶鏈接後以count(distinct a.ordernumber)計銷售單數,sum(b.amount)計銷售總額
SELECT c.theyear, COUNT(DISTINCT a.ordernumber), SUM(b.amount)
FROM tbStock a
JOIN tbStockDetail b ON a.ordernumber = b.ordernumber
JOIN tbDate c ON a.dateid = c.dateid
GROUP BY c.theyear
ORDER BY c.theyear
spark.sql("SELECT c.theyear, COUNT(DISTINCT a.ordernumber), SUM(b.amount) FROM tbStock a JOIN tbStockDetail b ON a.ordernumber = b.ordernumber JOIN tbDate c ON a.dateid = c.dateid GROUP BY c.theyear ORDER BY c.theyear").show
結果以下:
+-------+---------------------------+--------------------+
|theyear|count(DISTINCT ordernumber)| sum(amount)|
+-------+---------------------------+--------------------+
| 2004| 1094| 3268115.499199999|
| 2005| 3828|1.3257564149999991E7|
| 2006| 3772|1.3680982900000006E7|
| 2007| 4885|1.6719354559999993E7|
| 2008| 4861| 1.467429530000001E7|
| 2009| 2619| 6323697.189999999|
| 2010| 94| 210949.65999999997|
+-------+---------------------------+--------------------+
目標:統計每一年最大金額訂單的銷售額:
1) 統計每一年,每一個訂單一共有多少銷售額
SELECT a.dateid, a.ordernumber, SUM(b.amount) AS SumOfAmount
FROM tbStock a
JOIN tbStockDetail b ON a.ordernumber = b.ordernumber
GROUP BY a.dateid, a.ordernumber
spark.sql("SELECT a.dateid, a.ordernumber, SUM(b.amount) AS SumOfAmount FROM tbStock a JOIN tbStockDetail b ON a.ordernumber = b.ordernumber GROUP BY a.dateid, a.ordernumber").show
結果以下:
+----------+------------+------------------+
| dateid| ordernumber| SumOfAmount|
+----------+------------+------------------+
| 2008-4-9|BYSL00001175| 350.0|
| 2008-5-12|BYSL00001214| 592.0|
| 2008-7-29|BYSL00011545| 2064.0|
| 2008-9-5|DGSL00012056| 1782.0|
| 2008-12-1|DGSL00013189| 318.0|
|2008-12-18|DGSL00013374| 963.0|
| 2009-8-9|DGSL00015223| 4655.0|
| 2009-10-5|DGSL00015585| 3445.0|
| 2010-1-14|DGSL00016374| 2934.0|
| 2006-9-24|GCSL00000673|3556.1000000000004|
| 2007-1-26|GCSL00000826| 9375.199999999999|
| 2007-5-24|GCSL00001020| 6171.300000000002|
| 2008-1-8|GCSL00001217| 7601.6|
| 2008-9-16|GCSL00012204| 2018.0|
| 2006-7-27|GHSL00000603| 2835.6|
|2006-11-15|GHSL00000741| 3951.94|
| 2007-6-6|GHSL00001149| 0.0|
| 2008-4-18|GHSL00001631| 12.0|
| 2008-7-15|GHSL00011367| 578.0|
| 2009-5-8|GHSL00014637| 1797.6|
+----------+------------+------------------+
2) 以上一步查詢結果爲基礎表,和表tbDate使用dateid join,求出每一年最大金額訂單的銷售額
SELECT theyear, MAX(c.SumOfAmount) AS SumOfAmount
FROM (SELECT a.dateid, a.ordernumber, SUM(b.amount) AS SumOfAmount
FROM tbStock a
JOIN tbStockDetail b ON a.ordernumber = b.ordernumber
GROUP BY a.dateid, a.ordernumber
) c
JOIN tbDate d ON c.dateid = d.dateid
GROUP BY theyear
ORDER BY theyear DESC
spark.sql("SELECT theyear, MAX(c.SumOfAmount) AS SumOfAmount FROM (SELECT a.dateid, a.ordernumber, SUM(b.amount) AS SumOfAmount FROM tbStock a JOIN tbStockDetail b ON a.ordernumber = b.ordernumber GROUP BY a.dateid, a.ordernumber ) c JOIN tbDate d ON c.dateid = d.dateid GROUP BY theyear ORDER BY theyear DESC").show
結果以下:
+-------+------------------+
|theyear| SumOfAmount|
+-------+------------------+
| 2010|13065.280000000002|
| 2009|25813.200000000008|
| 2008| 55828.0|
| 2007| 159126.0|
| 2006| 36124.0|
| 2005|38186.399999999994|
| 2004| 23656.79999999997|
+-------+------------------+
目標:統計每一年最暢銷貨品(哪一個貨品銷售額amount在當年最高,哪一個就是最暢銷貨品)
第一步、求出每一年每一個貨品的銷售額
SELECT c.theyear, b.itemid, SUM(b.amount) AS SumOfAmount
FROM tbStock a
JOIN tbStockDetail b ON a.ordernumber = b.ordernumber
JOIN tbDate c ON a.dateid = c.dateid
GROUP BY c.theyear, b.itemid
spark.sql("SELECT c.theyear, b.itemid, SUM(b.amount) AS SumOfAmount FROM tbStock a JOIN tbStockDetail b ON a.ordernumber = b.ordernumber JOIN tbDate c ON a.dateid = c.dateid GROUP BY c.theyear, b.itemid").show
結果以下:
+-------+--------------+------------------+
|theyear| itemid| SumOfAmount|
+-------+--------------+------------------+
| 2004|43824480810202| 4474.72|
| 2006|YA214325360101| 556.0|
| 2006|BT624202120102| 360.0|
| 2007|AK215371910101|24603.639999999992|
| 2008|AK216169120201|29144.199999999997|
| 2008|YL526228310106|16073.099999999999|
| 2009|KM529221590106| 5124.800000000001|
| 2004|HT224181030201|2898.6000000000004|
| 2004|SG224308320206| 7307.06|
| 2007|04426485470201|14468.800000000001|
| 2007|84326389100102| 9134.11|
| 2007|B4426438020201| 19884.2|
| 2008|YL427437320101|12331.799999999997|
| 2008|MH215303070101| 8827.0|
| 2009|YL629228280106| 12698.4|
| 2009|BL529298020602| 2415.8|
| 2009|F5127363019006| 614.0|
| 2005|24425428180101| 34890.74|
| 2007|YA214127270101| 240.0|
| 2007|MY127134830105| 11099.92|
+-------+--------------+------------------+
第二步、在第一步的基礎上,統計每一年單個貨品中的最大金額
SELECT d.theyear, MAX(d.SumOfAmount) AS MaxOfAmount
FROM (SELECT c.theyear, b.itemid, SUM(b.amount) AS SumOfAmount
FROM tbStock a
JOIN tbStockDetail b ON a.ordernumber = b.ordernumber
JOIN tbDate c ON a.dateid = c.dateid
GROUP BY c.theyear, b.itemid
) d
GROUP BY d.theyear
spark.sql("SELECT d.theyear, MAX(d.SumOfAmount) AS MaxOfAmount FROM (SELECT c.theyear, b.itemid, SUM(b.amount) AS SumOfAmount FROM tbStock a JOIN tbStockDetail b ON a.ordernumber = b.ordernumber JOIN tbDate c ON a.dateid = c.dateid GROUP BY c.theyear, b.itemid ) d GROUP BY d.theyear").show
結果以下:
+-------+------------------+
|theyear| MaxOfAmount|
+-------+------------------+
| 2007| 70225.1|
| 2006| 113720.6|
| 2004|53401.759999999995|
| 2009| 30029.2|
| 2005|56627.329999999994|
| 2010| 4494.0|
| 2008| 98003.60000000003|
+-------+------------------+
第三步、用最大銷售額和統計好的每一個貨品的銷售額join,以及用年join,集合獲得最暢銷貨品那一行信息
SELECT DISTINCT e.theyear, e.itemid, f.MaxOfAmount
FROM (SELECT c.theyear, b.itemid, SUM(b.amount) AS SumOfAmount
FROM tbStock a
JOIN tbStockDetail b ON a.ordernumber = b.ordernumber
JOIN tbDate c ON a.dateid = c.dateid
GROUP BY c.theyear, b.itemid
) e
JOIN (SELECT d.theyear, MAX(d.SumOfAmount) AS MaxOfAmount
FROM (SELECT c.theyear, b.itemid, SUM(b.amount) AS SumOfAmount
FROM tbStock a
JOIN tbStockDetail b ON a.ordernumber = b.ordernumber
JOIN tbDate c ON a.dateid = c.dateid
GROUP BY c.theyear, b.itemid
) d
GROUP BY d.theyear
) f ON e.theyear = f.theyear
AND e.SumOfAmount = f.MaxOfAmount
ORDER BY e.theyear
spark.sql("SELECT DISTINCT e.theyear, e.itemid, f.maxofamount FROM (SELECT c.theyear, b.itemid, SUM(b.amount) AS sumofamount FROM tbStock a JOIN tbStockDetail b ON a.ordernumber = b.ordernumber JOIN tbDate c ON a.dateid = c.dateid GROUP BY c.theyear, b.itemid ) e JOIN (SELECT d.theyear, MAX(d.sumofamount) AS maxofamount FROM (SELECT c.theyear, b.itemid, SUM(b.amount) AS sumofamount FROM tbStock a JOIN tbStockDetail b ON a.ordernumber = b.ordernumber JOIN tbDate c ON a.dateid = c.dateid GROUP BY c.theyear, b.itemid ) d GROUP BY d.theyear ) f ON e.theyear = f.theyear AND e.sumofamount = f.maxofamount ORDER BY e.theyear").show
結果以下:
+-------+--------------+------------------+
|theyear| itemid| maxofamount|
+-------+--------------+------------------+
| 2004|JY424420810101|53401.759999999995|
| 2005|24124118880102|56627.329999999994|
| 2006|JY425468460101| 113720.6|
| 2007|JY425468460101| 70225.1|
| 2008|E2628204040101| 98003.60000000003|
| 2009|YL327439080102| 30029.2|
| 2010|SQ429425090101| 4494.0|
+-------+--------------+------------------+