========== Spark SQL ==========
一、Spark SQL 是 Spark 的一個模塊,能夠和 RDD 進行混合編程、支持標準的數據源、能夠集成和替代 Hive、能夠提供 JDBC、ODBC 服務器功能。java
二、Spark SQL 的特色:
(1)和 Spark Core 的無縫集成,能夠在寫整個 RDD 應用的時候,配合 Spark SQL 來實現邏輯。
(2)統一的數據訪問方式,Spark SQL 提供標準化的 SQL 查詢。
(3)Hive 的集成,Spark SQL 經過內嵌的 Hive 或者鏈接外部已經部署好的 Hive 實例,實現了對 Hive 語法的集成和操做。
(4)標準化的鏈接方式,Spark SQL 能夠經過啓動 thrift Server 來支持 JDBC、ODBC 的訪問,即將本身做爲一個 BI Server 來使用。mysql
三、Spark SQL 能夠執行 SQL 語句,也能夠執行 HQL 語句,將運行的結果做爲 Dataset 和 DataFrame(將查詢出來的結果轉換成 RDD,相似於 hive 將 sql 語句轉換成 mapreduce)。算法
四、Spark SQL 的計算速度(Spark sql 比 Hive 快了至少一個數量級,尤爲是在 Tungsten 成熟之後會更加無可匹敵),Spark SQL 推出的 DataFrame 可讓數據倉庫直接使用機器學習、圖計算等複雜的算法庫來對數據倉庫進行復雜深度數據價值的挖掘。sql
五、老版本中使用 hivecontext,如今使用 sparkSession。shell
========== Spark SQL 的數據抽象 ==========
0、RDD(Spark1.0)-> DataFrame(Spark1.3)-> DataSet(Spark1.6)
一、Spark SQL 提供了 DataFrame 和 DataSet 數據抽象。
二、DataFrame 就是 RDD + Schema,能夠認爲是一張二維表格。DataFrame 也是懶執行的、不可變的。DataFrame 性能上比 RDD 要高。
三、DataFrame 是一個弱類型的數據對象,DataFrame 的劣勢是在編譯期不進行表格中的字段的類型檢查。在運行期進行檢查。相似於 java.sql.ResultSet 類,只能經過 getString 這種方式來獲取具體數據。
四、DataSet 是 Spark 最新的數據抽象,Spark 的發展會逐步將 DataSet 做爲主要的數據抽象,弱化 RDD 和 DataFrame。DataSet 包含了 DataFrame 全部的優化機制。除此以外提供了以樣例類爲 Schema 模型的強類型。
五、type DataFrame = Dataset[Row]
六、DataFrame 和 DataSet 都有可控的內存管理機制,全部數據都保存在非堆內存
上,節省了大量空間以外,還擺脫了GC的限制。都使用了 catalyst 進行 SQL 的優化。可使得不太會使用 RDD 的工程師寫出相對高效的代碼。
七、RDD 和 DataFrame 和 DataSet 之間能夠進行數據轉換。數據庫
========== Spark SQL 的初探 -- 客戶端查詢 ==========
一、你能夠經過 spark-shell 或者 spark-sql 來操做 Spark SQL,注意:spark 做爲 SparkSession 的變量名,sc 做爲 SparkContext 的變量名。
二、你能夠經過 Spark 提供的方法讀取 JSON 文件,將 JSON 文件轉換成 DataFrame。
三、你能夠經過 DataFrame 提供的 API 來操做 DataFrame 裏面的數據。
四、你能夠經過將 DataFrame 註冊成爲一個臨時表的方式,來經過 Spark.sql 方法運行標準的 SQL 語句來查詢。express
小細節:
show() --> 表格
collect() --> RDD 打印apache
========== IDEA 建立 Spark SQL 程序 ==========
一、Spark SQL 讀取 json 須要 json 文件中一行是一個 json 對象。
二、經過建立 SparkSession 來使用 SparkSQL:
示例代碼以下:編程
package com.atguigu.sparksql
import org.apache.spark.sql.SparkSession
import org.slf4j.LoggerFactory
object HelloWorld {
val logger = LoggerFactory.getLogger(HelloWorld.getClass)
def main(args: Array[String]) {
// 建立 SparkSession 並設置 App 名稱
val spark = SparkSession
.builder()
.appName("Spark SQL basic example")
.config("spark.some.config.option", "some-value")
.getOrCreate()
// 經過隱式轉換將 RDD 操做添加到 DataFrame 上(將 RDD 轉成 DataFrame)
import spark.implicits._
// 經過 spark.read 操做讀取 JSON 數據
val df = spark.read.json("examples/src/main/resources/people.json")
// show 操做相似於 Action,將 DataFrame 直接打印到 Console 上
df.show()
// DSL 風格的使用方式:屬性的獲取方法 $
df.filter($"age" > 21).show()
//將 DataFrame 註冊爲表
df.createOrReplaceTempView("persons")
// 執行 Spark SQL 查詢操做
spark.sql("select * from perosns where age > 21").show()
// 關閉資源
spark.stop()
}
}
========== DataFrame 查詢方式 ==========
一、DataFrame 支持兩種查詢方式:一種是 DSL 風格,另一種是 SQL 風格。
DSL 風格:
(1)你須要引入 import spark.implicit._ 這個隱式轉換,能夠將 DataFrame 隱式轉換成 RDD。
示例:
df.select("name").show()
df.filter($"age" > 25).show()json
SQL 風格:
(1)你須要將 DataFrame 註冊成一張表格,若是你經過 createOrReplaceTempView 這種方式來建立,那麼該表當前 Session 有效,若是你經過 createGlobalTempView 來建立,那麼該表跨 Session 有效,可是 SQL 語句訪問該表的時候須要加上前綴 global_temp.xxx。
(2)你須要經過 sparkSession.sql 方法來運行你的 SQL 語句。
示例:
一個 SparkContext 能夠屢次建立 SparkSession。
// Session 內可訪問,一個 SparkSession 結束後,表自動刪除。
df.createOrReplaceTempView("persons") // 使用表名不須要任何前綴
// 應用級別內可訪問,一個 SparkContext 結束後,表自動刪除。
df.createGlobalTempView("persons") // 使用表名須要加上「global_temp.」 前綴,好比:global_temp.persons
========== DataSet 建立方式 ==========
一、定義一個 DataSet,首先你須要先定義一個 case 類。
========== RDD、DataFrame、DataSet 之間的轉換總結 ==========
一、RDD -> DataFrame : rdd.map(para => (para(0).trim(), para(1).trim().toInt)).toDF("name", "age") // RDD -> 元組 -> toDF()(注意:這是第一種方式)
二、DataFrame -> RDD : df.rdd 注意輸出類型
:res2: Array[org.apache.spark.sql.Row] = Array([Michael,29], [Andy,30], [Justin,19])
一、 RDD -> DataSet : rdd.map(para => Person(para(0).trim(), para(1).trim().toInt)).toDS() // 須要先定義樣例類 -> toDS()
二、 DataSet -> RDD : ds.rdd注意輸出類型
:res5: Array[Person] = Array(Person(Michael,29), Person(Andy,30), Person(Justin,19))
一、 DataFrame -> DataSet : df.as[Person] // 傳入類型
二、 DataSet -> DataFrame : ds.toDF()
========== DataFrame 的 Schema 的獲取方式 ==========
RDD -> DataFram 的三種方式:
// 將沒有包含 case 類的 RDD 轉換成 DataFrame
rdd.map(para => (para(0).trim(), para(1).trim().toInt)).toDF("name", "age") // RDD -> 元組 -> toDF()(注意:這是第一種方式)
// 將包含有 case 類的 RDD 轉換成 DataFrame,注意:須要咱們先定義 case 類
// 經過反射的方式來設置 Schema 信息,適合於編譯期能肯定列的狀況
rdd.map(attributes => Person(attributes(0), attributes(1).trim().toInt)).toDF() // 樣例類-> RDD -> toDF()(注意:這是第二種方式)
// 經過編程的方式來設置 Schema 信息,適合於編譯期不能肯定列的狀況(注意:這是第三種方式)
val schemaString = "name age" // 實際開發中 schemaString 是動態生成的
val fields = schemaString.map(fieldName => StructField(fieldName, StringType, nullable = true))
val schema = StructType(fields)
val rdd[Row] = rdd.map(attributes => Row(attributes(0.trim), attributes(1).trim))
val peopeDF = spark.createDataFrame(rdd[Row], schema)
========== 對於 DataFrame Row 對象的訪問方式 ==========
一、由 DataFrame = Dataset[Row] 可知, DataFrame 裏面每一行都是 Row 對象。
二、若是須要訪問 Row 對象中的每個元素,能夠經過索引 row(0);也能夠經過列名 row.getAsString 或者索引 row.getAsInt。
========== 應用 UDF 函數(用戶自定義函數) ==========
一、經過 spark.udf.register(funcName, func) 來註冊一個 UDF 函數,name 是 UDF 調用時的標識符,即函數名,fun 是一個函數,用於處理字段。
二、你須要將一個 DF 或者 DS 註冊爲一個臨時表。
三、經過 spark.sql 去運行一個 SQL 語句,在 SQL 語句中能夠經過 funcName(列名) 方式來應用 UDF 函數。
示例代碼以下:
scala> val df = spark.read.json("examples/src/main/resources/people.json")
df: org.apache.spark.sql.DataFrame = [age: bigint, name: string]
scala> df.show()
scala> spark.udf.register("addName", (x: String) => "Name:" + x)
res5: org.apache.spark.sql.expressions.UserDefinedFunction = UserDefinedFunction(<function1>,StringType,Some(List(StringType)))
scala> df.createOrReplaceTempView("people")
scala> spark.sql("select addName(name), age from people").show()
scala> spark.sql("select addName(name) as newName, age from people").show()
========== 應用 UDAF 函數(用戶自定義聚合函數) ==========
一、弱類型用戶自定義聚合函數
步驟以下:
(1)新建一個 Class 繼承UserDefinedAggregateFunction,而後複寫方法:
// 聚合函數須要輸入參數的數據類型
override def inputSchema: StructType = ???
// 聚合緩衝區中值的數據類型
override def bufferSchema: StructType = ???
// 返回值的數據類型
override def dataType: DataType = ???
// 對於相同的輸入一直有相同的輸出
override def deterministic: Boolean = true
// 用於初始化你的數據結構
override def initialize(buffer: MutableAggregationBuffer): Unit = ???
// 相同 Execute 間的數據合併(同一分區)
override def update(buffer: MutableAggregationBuffer, input: Row): Unit = ???
// 不一樣 Execute 間的數據合併(不一樣分區)
override def merge(buffer1: MutableAggregationBuffer, buffer2: Row): Unit = ???
// 計算最終結果
override def evaluate(buffer: Row): Any = ???
(2)你須要經過 spark.udf.resigter 去註冊你的 UDAF 函數。
(3)須要經過 spark.sql 去運行你的 SQL 語句,能夠經過 select UDAF(列名) 來應用你的用戶自定義聚合函數。
二、強類型的用戶自定義聚合函數
步驟以下:
(1)新建一個class,繼承Aggregator[Employee, Average, Double]
其中 Employee 是在應用聚合函數的時候傳入的對象,Average 是聚合函數在運行的時候內部須要的數據結構,Double 是聚合函數最終須要輸出的類型。這些能夠根據本身的業務需求去調整。
複寫相對應的方法:
// 用於定義一個聚合函數內部須要的數據結構
override def zero: Average = ???
// 針對每一個分區內部每個輸入來更新你的數據結構
override def reduce(b: Average, a: Employee): Average = ???
// 用於對於不一樣分區的結構進行聚合
override def merge(b1: Average, b2: Average): Average = ???
// 計算輸出
override def finish(reduction: Average): Double = ???
// 設定之間值類型的編碼器,要轉換成 case 類
// Encoders.product 是進行 scala 元組和 case 類轉換的編碼器
override def bufferEncoder: Encoder[Average] = ???
// 設定最終輸出值的編碼器
override def outputEncoder: Encoder[Double] = ???
二、新建一個 UDAF 實例,經過 DF 或者 DS 的 DSL 風格語法去應用。
========== Spark SQL 的輸入和輸出 ==========
一、對於 Spark SQL 的輸入須要使用 sparkSession.read 方法
(1)通用模式 sparkSession.read.format("json").load("path") 支持的類型有:parquet、json、text、csv、orc、jdbc、......
(2)專業模式 sparkSession.read.json("path") 或 csv 或 ... 即直接指定類型
二、對於 Spark SQL 的輸出須要使用 sparkSession.write 方法
(1)通用模式 dataFrame.write.format("json").save("path") 支持的類型有:parquet、json、text、csv、orc、jdbc、......
(2)專業模式 dataFrame.write.csv("path") 或 json 或 ... 即直接指定類型
三、若是使用通用模式,則 spark 默認的 parquet 是默認格式,那麼 sparkSession.read.load 它加載的默認是 parquet 格式;dataFrame.write.save 也是默認保存成 parquet 格式。
四、注意
:若是須要保存成一個 text 文件,那麼須要 dataFrame 裏面只有一列數據。
========== Spark SQL 與 Hive 的集成 ==========
內置 Hive
一、Spark 內置有 Hive,Spark 2.1.1 內置的 Hive 是 1.2.1。
二、若是要使用內嵌的 Hive,什麼都不用作,直接用就能夠了。可是呢,此時的咱們只能建立表,若是查詢表的話會報錯,緣由是:本地有 spark-warehouse 目錄,而其餘機器節點沒有 spark-warehouse 目錄。解決辦法以下:
三、須要將 core-site.xml 和 hdfs-site.xml 拷貝到 spark 的 conf 目錄下,而後分發至其餘機器節點。若是 spark 路徑下發現有 metastore_db 和 spark-warehouse,刪除掉。而後重啓集羣。
四、在第一次啓動建立 metastore 的時候,須要指定 spark.sql.warehouse.dir 這個參數,
好比:bin/spark-shell --conf spark.sql.warehouse.dir=hdfs://hadoop102:9000/spark_warehouse
五、注意
:若是在 load 數據的時候,須要先將數據放到 HDFS 上。
外部 Hive
一、須要將 hive-site.xml 拷貝到 spark 的 conf 目錄下,而後分發至其餘機器節點。
二、若是 hive 的 metestore 使用的是 mysql 數據庫,那麼須要將 mysql 的 jdbc 驅動包放到 spark 的 jars 目錄下。
三、能夠經過 spark-sql 或者 spark-shell 來進行 sql 的查詢,完成和 hive 的鏈接。
hive、spark、hdfs 關係: spark 文件中有兩個文件夾:spark-warehouse、metastore_db,當咱們拷貝 hive-site.xml 文件到 spark 的 conf 目錄後,會讀取 Hive 中的 warehouse 文件,獲取到 hive 中的表格數據。