Spark SQL
是 Spark
用來處理結構化數據的一個模塊,它提供了2個編程抽象:php
DataFrame
和DataSet
,而且做爲分佈式 SQL 查詢引擎的做用。html
以 Hive
做爲對比,Hive
是將 Hive SQL
轉換成 MapReduce
而後提交到集羣上執行,大大簡化了編寫 MapReduce
的程序的複雜性,因爲 MapReduce
這種計算模型執行效率比較慢。全部Spark SQL
的應運而生,它是將 Spark SQL
轉換成 RDD
,而後提交到集羣執行,執行效率很是快。java
這裏引用 Spark 官網:mysql
在 Spark
中,DataFrame
是一種以 RDD
爲基礎的分佈式數據集,相似於傳統數據庫中的二維表格。sql
DataFrame
與 RDD
的主要區別在於,前者帶有 schema
元信息,即 DataFrame
所表示的二維表數據集的每一列都帶有名稱和類型。這使得 Spark SQL
得以洞察更多的結構信息,從而對藏於DataFrame
背後的數據源以及做用於 DataFrame
之上的變換進行了針對性的優化,最終達到大幅提高運行時效率的目標。shell
反觀 RDD
,因爲無從得知所存二維數據的內部結構,Spark Core
只能在 stage
層面進行簡單、通用的流水線優化。數據庫
圖示apache
DataFrame
也是懶執行的,但性能上比 RDD
要高,主要緣由:編程
優化的執行計劃,即查詢計劃經過 Spark Catalyst Optimiser
進行優化。好比下面一個例子:json
看看 Spark Core
和 Spark SQL
模塊對這個計劃的執行步驟:
DataSet
也是分佈式數據集合。
DataSet
是 Spark 1.6
中添加的一個新抽象,是 DataFrame
的一個擴展。它提供了 RDD
的優點(強類型,使用強大的 Lambda
函數的能力)以及 Spark SQL
優化執行引擎的優勢,DataSet
也可使用功能性的轉換(操做 map
,flatMap
,filter
等等)。
具體的優點以下:
1)是 DataFrame API
的一個擴展,SparkSQL
最新的數據抽象;
2)用戶友好的 API
風格,既具備類型安全檢查也具備 DataFrame
的查詢優化特性;
3)用樣例類來對 DataSet
中定義數據的結構信息,樣例類中每一個屬性的名稱直接映射到 DataSet
中的字段名稱;
4)DataSet
是強類型的。好比能夠有 DataSet[Car]
,DataSet[Person]
在老的版本中,Spark SQL
提供兩種 SQL
查詢起始點:一個叫 SQLContext
,用於 Spark
本身提供的 SQL
查詢;一個叫 HiveContext
,用於鏈接 Hive
的查詢。
SparkSession
是 Spark
最新的 SQL
查詢起始點,實質上是 SQLContext
和 HiveContext
的組合,因此在 SQLContex
和 HiveContext
上可用的 API
在 SparkSession
上一樣是可使用的。
SparkSession
內部封裝了 SparkContext
,因此計算其實是由 SparkContext
完成的。
在 Spark SQL
中 SparkSession
是建立 DataFrame
和執行 SQL
的入口,建立 DataFrame
有三種方式
Spark
的數據源進行建立;RDD
進行轉換;Hive Table
進行查詢返回Spark
數據源進行建立的文件格式json
文件建立 DataFrame
RDD
轉換(詳見 2.5 節)Hive Table
轉換(詳見 3.3節)直接經過 SQL
語句對 DataFrame
的數據進行操做
DataFrame
DataFrame
建立一個臨時表建立臨時表的三種方式
SQL
語句實現查詢全表注意:普通臨時表是 Session
範圍內的,若是想應用範圍內有效,可使用全局臨時表。使用全局臨時表時須要全路徑訪問,如:global_temp.people
DataFrame
建立一個全局表df.createGlobalTempView("people")
複製代碼
SQL
語句實現查詢全表spark.sql("SELECT * FROM global_temp.people").show()
複製代碼
spark.newSession().sql("SELECT * FROM global_temp.people").show()
複製代碼
以上兩行代碼的執行效果一致~
使用更爲簡潔的語法對 DataFrame
的數據操做
建立一個 DataFrame
(同上)
查看 DataFrame
的 Schema
信息
name
列數據name
列數據以及 age+1
數據age
大於 21
的數據age
分組,查看數據條數我的感受簡單的操做可使用 DSL
,複雜查詢再使用 SQL
是一個很不錯的方案
注意:DSL
方法由 DataFrame
調用,而 SQL
由 SparkSession
調用
注意:若是須要 RDD
與 DF
或者 DS
之間操做,那麼都須要引入 import spark.implicits._
【spark不是包名,而是 SparkSession
對象的名稱】
前置條件:導入隱式轉換並建立一個 RDD
經過反射肯定(須要用到樣例類)
case class People(name:String, age:Int)
複製代碼
RDD
轉換爲 DataFrame
import org.apache.spark.sql.types._
複製代碼
Schema
val structType: StructType = StructType(StructField("name", StringType) :: StructField("age", IntegerType) :: Nil)
複製代碼
import org.apache.spark.sql.Row
複製代碼
RDD
val data = rdd.map{ x => val para = x.split(",");Row(para(0),para(1).trim.toInt)}
複製代碼
schema
建立 DataFrame
val dataFrame = spark.createDataFrame(data, structType)
複製代碼
DataSet
是具備強類型的數據集合,須要提供對應的類型信息。
DataSet
的建立能夠直接使用 Seq
靜態方法建立 或者 RDD
轉換 或者 DataFrame
轉換
case class Person(name: String, age: Long)
複製代碼
DataSet
Spark SQL
可以自動將包含有 case
類的 RDD
轉換成 DataFrame
,case
類定義了 table
的結構,case
類屬性經過反射變成了表的列名。case
類能夠包含諸如 Seqs
或者 Array
等複雜的結構。
RDD
case class Person(name: String, age: Int)
複製代碼
RDD
轉化爲 DataSet
DateFrame
DataSet
val ds = Seq(Person("Andy", 32)).toDS()
複製代碼
DataSet
轉化爲 DataFrame
val df = ds.toDF
複製代碼
使用 as
方法,轉成 Dataset
,這在數據類型是 DataFrame
又須要針對各個字段處理時極爲方便。在使用一些特殊的操做時,必定要加上 import spark.implicits._
否則 toDF
、toDS
沒法使用。
在 Spark SQL
中 Spark
爲咱們提供了兩個新的抽象,分別是 DataFrame
和 DataSet
。他們和 RDD
有什麼區別呢?首先從版本的產生上來看:
RDD (Spark1.0) —> Dataframe(Spark1.3) —> Dataset(Spark1.6)
若是一樣的數據都給到這三個數據結構,他們分別計算以後,都會給出相同的結果。不一樣是的他們的執行效率和執行方式。在後期的 Spark
版本中,DataSet
有可能會逐步取代 RDD
和 DataFrame
成爲惟一的 API
接口。
(1)RDD
、DataFrame
、Dataset
全都是 spark
平臺下的分佈式彈性數據集,爲處理超大型數據提供便利;
(2)三者都有惰性機制,在進行建立、轉換,如 map
方法時,不會當即執行,只有在遇到 Action
如 foreach
時,三者纔會開始遍歷運算;
(3)三者有許多共同的函數,如 filter
,排序
等;
(4)在對 DataFrame
和 Dataset
進行操做許多操做都須要這個包:import spark.implicits._
(在建立好 SparkSession
對象後儘可能直接導入)
這裏給出關於這三者講解比較深刻的文章
經過一個簡單的案例快速入手如何在 IDEA
上開發 Spark SQL
程序
導入如下依賴
<dependencies>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-sql_2.11</artifactId>
<version>2.1.1</version>
</dependency>
</dependencies>
複製代碼
代碼實現
object Main2 {
def main(args: Array[String]): Unit = {
val session: SparkSession = SparkSession.builder().appName("spark sql").master("local[*]").getOrCreate()
import session.implicits._
val dataFrame: DataFrame = session.read.json("/home/cris/people.json")
//打印
dataFrame.show()
//DSL風格:查詢年齡在21歲以上的
dataFrame.filter($"age" > 21).show()
//建立臨時表
dataFrame.createOrReplaceTempView("persons")
//SQL風格:查詢年齡在21歲以上的
session.sql("SELECT * FROM persons where age > 21").show()
//關閉鏈接
session.stop()
}
}
複製代碼
若是在執行 Scala
或者是 java
程序中,報沒法找到主類執行的異常,多是項目的結構有問題,將父模塊直接移除掉,而後從新導入父模塊便可
object MyFunc {
def main(args: Array[String]): Unit = {
val session: SparkSession = SparkSession.builder().appName("spark sql").master("local[*]").getOrCreate()
val dataFrame: DataFrame = session.read.json("/home/cris/people.json")
/*用戶自定義 UDF 函數*/
session.udf.register("addName", (x: String) => {
"cool:" + x
})
dataFrame.createOrReplaceTempView("people")
session.sql("select addName(name),age from people").show()
session.stop()
}
}
複製代碼
結果以下
強類型的 Dataset
和弱類型的 DataFrame
都提供了相關的聚合函數, 如 count()
,countDistinct()
,avg()
,max()
,min()
。
除此以外,用戶能夠設定本身的自定義聚合函數。經過繼承 UserDefinedAggregateFunction
來實現用戶自定義聚合函數
/** * 定義本身的 UDAF 函數 * * @author cris * @version 1.0 **/
object MyFunc extends UserDefinedAggregateFunction {
// 聚合函數輸入參數的數據類型
override def inputSchema: StructType = StructType(StructField("inputField", LongType) :: Nil)
// 聚合緩衝區中值得數據類型
override def bufferSchema: StructType = {
StructType(StructField("sum", LongType) :: StructField("count", LongType) :: Nil)
}
// 返回值的數據類型
override def dataType: DataType = DoubleType
// 對於相同的輸入是否一直返回相同的輸出
override def deterministic: Boolean = true
// 初始化
override def initialize(buffer: MutableAggregationBuffer): Unit = {
// 工資的總額
buffer(0) = 0L
// 員工人數
buffer(1) = 0L
}
// 相同 Executor 間的數據合併
override def update(buffer: MutableAggregationBuffer, input: Row): Unit = {
buffer(0) = buffer.getLong(0) + input.getLong(0)
buffer(1) = buffer.getLong(1) + 1
}
// 不一樣 Executor 間的數據合併
override def merge(buffer1: MutableAggregationBuffer, buffer2: Row): Unit = {
buffer1(0) = buffer1.getLong(0) + buffer2.getLong(0)
buffer1(1) = buffer1.getLong(1) + buffer2.getLong(1)
}
// 最終函數計算的返回值
override def evaluate(buffer: Row): Double = {
buffer.getLong(0).toDouble / buffer.getLong(1)
}
}
複製代碼
測試代碼
object MyFuncTest2 {
def main(args: Array[String]): Unit = {
val session: SparkSession = SparkSession.builder().appName("spark sql").master("local[*]").getOrCreate()
val dataFrame: DataFrame = session.read.json("/home/cris/employees.json")
session.udf.register("avg", MyFunc)
dataFrame.createTempView("emp")
session.sql("select avg(salary) as avg_sal from emp").show()
session.stop()
}
}
複製代碼
測試以下
read
方法直接加載數據scala> spark.read.
csv jdbc json orc parquet textFile… …
複製代碼
注意:加載數據的相關參數需寫到上述方法中。如:textFile
需傳入加載數據的路徑,jdbc
需傳入 JDBC
相關參數
format
方法(瞭解)scala> spark.read.format("…")[.option("…")].load("…")
複製代碼
用法詳解:
(1)format("…"):指定加載的數據類型,包括"csv"、"jdbc"、"json"、"orc"、"parquet"和"textFile"。
(2)load("…"):在"csv"、"orc"、"parquet"和"textFile"格式下須要傳入加載數據的路徑。
(3)option("…"):在"jdbc"格式下須要傳入JDBC相應參數,url、user、password和dbtable
write
直接保存數據scala> df.write.
csv jdbc json orc parquet textFile… …
複製代碼
注意:保存數據的相關參數需寫到上述方法中。如:textFile
需傳入加載數據的路徑,jdbc
需傳入 JDBC
相關參數
format
指定保存數據類型(瞭解)scala> df.write.format("…")[.option("…")].save("…")
複製代碼
用法詳解:
(1)format("…"):指定保存的數據類型,包括"csv"、"jdbc"、"json"、"orc"、"parquet"和"textFile"。
(2)save ("…"):在"csv"、"orc"、"parquet"和"textFile"格式下須要傳入保存數據的路徑。
(3)option("…"):在"jdbc"格式下須要傳入JDBC相應參數,url、user、password和dbtable
object Main2 {
def main(args: Array[String]): Unit = {
val session: SparkSession = SparkSession.builder().appName("spark sql").master("local[*]").getOrCreate()
val dataFrame: DataFrame = session.read.json("/home/cris/people.json")
//建立臨時表
dataFrame.createOrReplaceTempView("persons")
//SQL風格:查詢年齡在21歲以上的
val frame: DataFrame = session.sql("SELECT * FROM persons where age > 21")
frame.show()
frame.write.json("/home/cris/output")
//關閉鏈接
session.stop()
}
}
複製代碼
執行效果
能夠採用SaveMode
執行存儲操做,SaveMode
定義了對數據的處理模式。SaveMode
是一個枚舉類,其中的常量包括:
(1)Append
:當保存路徑或者表已存在時,追加內容;
(2)Overwrite
: 當保存路徑或者表已存在時,覆寫內容;
(3)ErrorIfExists
:當保存路徑或者表已存在時,報錯;
(4)Ignore
:當保存路徑或者表已存在時,忽略當前的保存操做
使用以下
df.write.mode(SaveMode.Append).save("… …")
複製代碼
記得保存選項放在 save
操做以前執行
Spark SQL
的默認數據源爲 Parquet
格式。數據源爲 Parquet
文件時,Spark SQL
能夠方便的執行全部的操做。修改配置項 spark.sql.sources.default
,可修改默認數據源格式。
val df = spark.read.load("./examples/src/main/resources/users.parquet")
複製代碼
df.select("name", " color").write.save("user.parquet")
複製代碼
Spark SQL
可以自動推測 JSON
數據集的結構,並將它加載爲一個 Dataset[Row]
. 能夠經過 SparkSession.read.json()
去加載一個 一個 JSON
文件。
注意:這個JSON文件不是一個傳統的JSON文件,每一行都得是一個JSON串。格式以下:
{"name":"Michael"}
{"name":"Andy", "age":30}
{"name":"Justin", "age":19}
複製代碼
Spark-Shell
操做以下:
import spark.implicits._
複製代碼
JSON
文件val path = "examples/src/main/resources/people.json"
val peopleDF = spark.read.json(path)
複製代碼
peopleDF.createOrReplaceTempView("people")
複製代碼
val teenagerNamesDF = spark.sql("SELECT name FROM people WHERE age BETWEEN 13 AND 19")
teenagerNamesDF.show()
+------+
| name|
+------+
|Justin|
+------+
複製代碼
Spark SQL
能夠經過 JDBC
從關係型數據庫中讀取數據的方式建立 DataFrame
,經過對 DataFrame
一系列的計算後,還能夠將數據再寫回關係型數據庫中。
可在啓動 shell
時指定相關的數據庫驅動路徑,或者將相關的數據庫驅動放到 Spark
的類路徑下(推薦)。
Spark-Shell
[cris@hadoop101 spark-local]$ bin/spark-shell --master spark://hadoop101:7077 [--jars mysql-connector-java-5.1.27-bin.jar]
複製代碼
建議將 MySQL
的驅動直接放入到 Spark
的類(jars
)路徑下,就不用每次進入 Spark-Shell
帶上 --jar
參數了
JDBC
相關參數配置信息val connectionProperties = new Properties()
connectionProperties.put("user", "root")
connectionProperties.put("password", "000000")
複製代碼
read.jdbc
加載參數val jdbcDF2 = spark.read.jdbc("jdbc:mysql://hadoop102:3306/spark", "person", connectionProperties)
複製代碼
format
形式加載配置參數(不推薦)val jdbcDF = spark.read.format("jdbc").option("url", "jdbc:mysql://hadoop102:3306/spark").option("dbtable", " person").option("user", "root").option("password", "000000").load()
複製代碼
write.jdbc
保存數據(可使用文件保存選項)jdbcDF2.write.mode(org.apache.spark.sql.SaveMode.Append).jdbc("jdbc:mysql://hadoop102:3306/spark", "person", connectionProperties)
複製代碼
format
形式保存數據(不推薦)jdbcDF.write
.format("jdbc")
.option("url", "jdbc:mysql://hadoop102:3306/spark")
.option("dbtable", "person")
.option("user", "root")
.option("password", "000000")
.save()
複製代碼
pom.xml
導入 MySQL
驅動依賴<!-- https://mvnrepository.com/artifact/mysql/mysql-connector-java -->
<dependency>
<groupId>mysql</groupId>
<artifactId>mysql-connector-java</artifactId>
<version>5.1.47</version>
</dependency>
複製代碼
MySQL
表數據IDEA
操做代碼以下/** * IDEA 測試 Spark SQL 鏈接遠程的 MySQL 獲取數據和寫入數據 * * @author cris * @version 1.0 **/
object MysqlTest {
def main(args: Array[String]): Unit = {
// 獲取 SparkSession
val session: SparkSession = SparkSession.builder().appName("spark sql").master("local[*]").getOrCreate()
// 設置配置參數
val properties = new Properties()
properties.put("user", "root")
properties.put("password", "000000")
// 從 MySQL 獲取數據,show() 方法實際調用的是 show(20),默認顯示 20 行數據
val dataFrame: DataFrame = session.read.jdbc("jdbc:mysql://hadoop102:3306/spark?characterEncoding=UTF-8", "person", properties)
dataFrame.show()
// 修改並保存數據到 MySQL
dataFrame.write.mode(SaveMode.Append).jdbc("jdbc:mysql://hadoop102:3306/spark?characterEncoding=UTF-8", "person", properties)
session.stop()
}
}
複製代碼
注意:防止中文亂碼,url
加上 ?characterEncoding=UTF-8
;寫入數據最好指定保存模式 SaveMode
測試以下:
Apache Hive
是 Hadoop
上的 SQL
引擎,Spark SQL
編譯時能夠包含 Hive
支持,也能夠不包含。包含 Hive
支持的 Spark SQL
能夠支持 Hive
表訪問、UDF
(用戶自定義函數)以及 Hive
查詢語言(HQL
)等。Spark-Shell
默認是Hive
支持的;代碼中是默認不支持的,須要手動指定(加一個參數便可)。
若是要使用內嵌的 Hive
,直接用就能夠了。
指定路徑下就會生成該表的文件夾
在當前 Spark-local
路徑下,建立文件 bb
1
2
3
4
5
複製代碼
而後建立表,導入數據
查詢也沒有問題
對應目錄下也生成了 bb
表的文件夾
若是想鏈接外部已經部署好的 Hive
,須要經過如下幾個步驟:
Hive
中的 hive-site.xml
拷貝或者軟鏈接到 Spark
安裝目錄下的 conf
目錄下[cris@hadoop101 spark-local]$ cp /opt/module/hive-1.2.1/conf/hive-site.xml ./conf/
複製代碼
JDBC
的驅動包放置在 Spark
的 .jars
目錄下,啓動 Spark-Shell
[cris@hadoop101 spark-local]$ cp /opt/module/mysql-libs/mysql-connector-java-5.1.27/mysql-connector-java-5.1.27-bin.jar ./jars/
複製代碼
能夠經過 Hive
的客戶端建立一張表 users
hive> create table users(id int, name string) ROW FORMAT DELIMITED FIELDS TERMINATED BY '\t';
複製代碼
並導入數據
hive> load data local inpath './user.txt' into users;
複製代碼
此時 HDFS
顯示數據導入成功
在 Spark-Shell
窗口查看
執行 Hive
的查詢語句
能夠在 Spark-Shell
執行全部的 Hive
語句,而且執行流程走的是 Spark
,而不是 MapReduce
Spark SQL CLI
能夠很方便的在本地運行 Hive
元數據服務以及從命令行執行查詢任務。在 Spark
目錄下執行以下命令啓動 Spark SQL CLI
,直接執行 SQL
語句,相似一個 Hive
窗口。
[cris@hadoop101 spark-local]$ bin/spark-sql
複製代碼
若是使用這種模式進行測試,最好將 log4j
的日誌級別設置爲 error
,不然會有不少 info
級別的日誌輸出
首先 pom.xml
引入 Hive
依賴
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-hive_2.11</artifactId>
<version>2.1.1</version>
</dependency>
<dependency>
<groupId>org.apache.hive</groupId>
<artifactId>hive-exec</artifactId>
<version>1.2.1</version>
</dependency>
複製代碼
而後將 Hive
的配置文件 hive-site.xml
放入 resource
路徑下
hive-site.xml
<?xml version="1.0"?>
<?xml-stylesheet type="text/xsl" href="configuration.xsl"?>
<configuration>
<property>
<name>javax.jdo.option.ConnectionURL</name>
<value>jdbc:mysql://hadoop102:3306/metastore?createDatabaseIfNotExist=true</value>
<description>JDBC connect string for a JDBC metastore</description>
</property>
<property>
<name>javax.jdo.option.ConnectionDriverName</name>
<value>com.mysql.jdbc.Driver</value>
<description>Driver class name for a JDBC metastore</description>
</property>
<property>
<name>javax.jdo.option.ConnectionUserName</name>
<value>root</value>
<description>username to use against metastore database</description>
</property>
<property>
<name>javax.jdo.option.ConnectionPassword</name>
<value>000000</value>
<description>password to use against metastore database</description>
</property>
<property>
<name>hive.cli.print.header</name>
<value>true</value>
</property>
<property>
<name>hive.cli.print.current.db</name>
<value>true</value>
</property>
<property>
<name>hive.zookeeper.quorum</name>
<value>hadoop101,hadoop102,hadoop103</value>
<description>The list of ZooKeeper servers to talk to. This is only needed for read/write locks.</description>
</property>
<property>
<name>hive.zookeeper.client.port</name>
<value>2181</value>
<description>The port of ZooKeeper servers to talk to. This is only needed for read/write locks.</description>
</property>
</configuration>
複製代碼
具體的配置介紹這裏再也不贅述,能夠參考個人 Hive 筆記
測試代碼以下:
/** * IDEA 測試 Spark SQL 和 Hive 的聯動 * * @author cris * @version 1.0 **/
object HiveTest {
def main(args: Array[String]): Unit = {
// 注意開啓 enableHiveSupport
val session: SparkSession = SparkSession.builder().enableHiveSupport().appName("spark sql").master("local[*]")
.getOrCreate()
session.sql("show tables").show()
// 注意關閉 session 鏈接
session.stop()
}
}
複製代碼
執行結果以下
正好就是剛纔建立的 Hive
表
Cris
的 IDEA
設置一行字數最多 120
,不然就自動換行,大大提升閱讀的溫馨感和編碼的規範性
由於 Cris
使用的是 Linux
桌面系統 Deepin
,因此常用自帶的 Terminal
鏈接遠程服務器,這裏給出快速右鍵複製 Terminal
內容的設置
由於 Cris
以前使用的是 MacBook
,輸入法搜狗會很智能的爲輸入的英文進行先後空格分割,換了 Deepin
後,自帶的雖然也是搜狗輸入法,可是沒有對英文自動空格分割的功能了,後面想一想辦法,看怎麼解決~
由於要對英文和重要內容進行突出顯示,Typora
中設置 code
的快捷鍵默認爲 Ctrl+Shift+`,比較麻煩,網上找了找自定義快捷鍵的設置,最後設置成 Ctrl+C