Spark SQL是Spark用來處理結構化數據的一個模塊,它提供了一個編程抽象叫作DataFrame而且做爲分佈式SQL查詢引擎的做用。
http://spark.apache.org/sql/html
爲何要學習Spark SQL?咱們已經學習了Hive,它是將Hive SQL轉換成MapReduce而後提交到集羣上執行,大大簡化了編寫MapReduce的程序的複雜性,因爲MapReduce這種計算模型執行效率比較慢。因此Spark SQL的應運而生,它是將Spark SQL轉換成RDD,而後提交到集羣執行,執行效率很是快!同時Spark SQL也支持從Hive中讀取數據。java
Spark SQL的特色:mysql
1.容易整合(集成) sql
2.統一的數據訪問方式shell
3.兼容Hive 數據庫
4.標準的數據鏈接apache
DataFrame編程
DataFrame是組織成命名列的數據集。它在概念上等同於關係數據庫中的表,但在底層具備更豐富的優化。DataFrames能夠從各類來源構建,json
例如:api
結構化數據文件
hive中的表
外部數據庫或現有RDDs
DataFrame API支持的語言有Scala,Java,Python和R。
從上圖能夠看出,DataFrame多了數據的結構信息,即schema。RDD是分佈式的 Java對象的集合。DataFrame是分佈式的Row對象的集合。DataFrame除了提供了比RDD更豐富的算子之外,更重要的特色是提高執行效率、減小數據讀取以及執行計劃的優化
Datasets
Dataset是數據的分佈式集合。Dataset是在Spark 1.6中添加的一個新接口,是DataFrame之上更高一級的抽象。它提供了RDD的優勢(強類型化,使用強大的lambda函數的能力)以及Spark SQL優化後的執行引擎的優勢。一個Dataset 能夠從JVM對象構造,而後使用函數轉換(map, flatMap,filter等)去操做。 Dataset API 支持Scala和Java。 Python不支持Dataset API。
使用員工表的數據,並已經將其保存到了HDFS上。
emp.csv
dept.csv
(*)經過Case Class建立DataFrames
① 定義case class(至關於表的結構:Schema)
注意:因爲mgr和comm列中包含null值,簡單起見,將對應的case class類型定義爲String
② 將HDFS上的數據讀入RDD,並將RDD與case Class關聯
③ 將RDD轉換成DataFrames
④ 經過DataFrames查詢數據
(*)使用SparkSession
① 什麼是SparkSession
Apache Spark 2.0引入了SparkSession,其爲用戶提供了一個統一的切入點來使用Spark的各項功能,而且容許用戶經過它調用DataFrame和Dataset相關API來編寫Spark程序。最重要的是,它減小了用戶須要瞭解的一些概念,使得咱們能夠很容易地與Spark交互。
在2.0版本以前,與Spark交互以前必須先建立SparkConf和SparkContext。然而在Spark 2.0中,咱們能夠經過SparkSession來實現一樣的功能,而不須要顯式地建立SparkConf, SparkContext 以及 SQLContext,由於這些對象已經封裝在SparkSession中。
② 建立StructType,來定義Schema結構信息
注意,須要:import org.apache.spark.sql.types._
③ 讀入數據而且切分數據
④ 將RDD中的數據映射成Row
注意,須要:import org.apache.spark.sql.Row
⑤ 建立DataFrames
val df = spark.createDataFrame(rowRDD,myschema)
再舉一個例子,使用JSon文件來建立DataFame
① 源文件:$SPARK_HOME/examples/src/main/resources/people.json
② val df = spark.read.json("源文件")
③ 查看數據和Schema信息
DataFrame操做也稱爲無類型的Dataset操做
(*)查詢全部的員工姓名
(*)查詢全部的員工姓名和薪水,並給薪水加100塊錢
(*)查詢工資大於2000的員工
(*)求每一個部門的員工人數
完整的例子,請參考:
http://spark.apache.org/docs/2.1.0/api/scala/index.html#org.apache.spark.sql.Dataset
(*)在DataFrame中使用SQL語句
① 將DataFrame註冊成表(視圖):df.createOrReplaceTempView("emp")
② 執行查詢:spark.sql("select * from emp").show
spark.sql("select * from emp where deptno=10").show
spark.sql("select deptno,sum(sal) from emp group by deptno").show
上面使用的是一個在Session生命週期中的臨時views。在Spark SQL中,若是你想擁有一個臨時的view,並想在不一樣的Session中共享,並且在application的運行週期內可用,那麼就須要建立一個全局的臨時view。並記得使用的時候加上global_temp做爲前綴來引用它,由於全局的臨時view是綁定到系統保留的數據庫global_temp上。
① 建立一個普通的view和一個全局的view
df.createOrReplaceTempView("emp1")
df.createGlobalTempView("emp2")
② 在當前會話中執行查詢,都可查詢出結果。
spark.sql("select * from emp1").show
spark.sql("select * from global_temp.emp2").show
③ 開啓一個新的會話,執行一樣的查詢
spark.newSession.sql("select * from emp1").show (運行出錯)
spark.newSession.sql("select * from global_temp.emp2").show
DataFrame的引入,可讓Spark更好的處理結構數據的計算,但其中一個主要的問題是:缺少編譯時類型安全。爲了解決這個問題,Spark採用新的Dataset API (DataFrame API的類型擴展)。
Dataset是一個分佈式的數據收集器。這是在Spark1.6以後新加的一個接口,兼顧了RDD的優勢(強類型,可使用功能強大的lambda)以及Spark SQL的執行器高效性的優勢。因此能夠把DataFrames當作是一種特殊的Datasets,即:Dataset(Row)
(*)建立DataSet,方式一:使用序列
1、定義case class
case class MyData(a:Int,b:String)
2、生成序列,並建立DataSet
val ds = Seq(MyData(1,"Tom"),MyData(2,"Mary")).toDS
3、查看結果
(*)建立DataSet,方式二:使用JSON數據
1、定義case class
case class Person(name: String, gender: String)
2、經過JSON數據生成DataFrame
val df = spark.read.json(sc.parallelize("""{"gender": "Male", "name": "Tom"}""" :: Nil))
3、將DataFrame轉成DataSet
df.as[Person].show
df.as[Person].collect
(*)建立DataSet,方式三:使用HDFS數據
1、讀取HDFS數據,並建立DataSet
val linesDS = spark.read.text("hdfs://hadoop111:9000/data/data.txt").as[String]
2、對DataSet進行操做:分詞後,查詢長度大於3的單詞
val words = linesDS.flatMap(_.split(" ")).filter(_.length > 3)
words.show
words.collect
3、執行WordCount程序
val result = linesDS.flatMap(_.split(" ")).map((_,1)).groupByKey(x => x._1).count
result.show
排序:result.orderBy($"value").show
(*)使用emp.json 生成DataFrame
val empDF = spark.read.json("/root/resources/emp.json")
查詢工資大於3000的員工
empDF.where($"sal" >= 3000).show
(*)建立case class
case class Emp(empno:Long,ename:String,job:String,hiredate:String,mgr:String,sal:Long,comm:String,deptno:Long)
(*)生成DataSets,並查詢數據
val empDS = empDF.as[Emp]
查詢工資大於3000的員工
empDS.filter(_.sal > 3000).show
查看10號部門的員工
empDS.filter(_.deptno == 10).show
(*)多表查詢
1、建立部門表
val deptRDD=sc.textFile("/root/temp/dept.csv").map(_.split(","))
case class Dept(deptno:Int,dname:String,loc:String)
val deptDS = deptRDD.map(x=>Dept(x(0).toInt,x(1),x(2))).toDS
2、建立員工表
case class Emp(empno:Int,ename:String,job:String,mgr:String,hiredate:String,sal:Int,comm:String,deptno:Int)
val empRDD = sc.textFile("/root/temp/emp.csv").map(_.split(","))
val empDS = empRDD.map(x => Emp(x(0).toInt,x(1),x(2),x(3),x(4),x(5).toInt,x(6),x(7).toInt)).toDS
3、執行多表查詢:等值連接
val result = deptDS.join(empDS,"deptno")
另外一種寫法:注意有三個等號
val result = deptDS.joinWith(empDS,deptDS("deptno")=== empDS("deptno"))
joinWith和join的區別是鏈接後的新Dataset的schema會不同
(*)查看執行計劃:result.explain
(*)什麼是parquet文件?
Parquet是列式存儲格式的一種文件類型,列式存儲有如下的核心:
l 能夠跳過不符合條件的數據,只讀取須要的數據,下降IO數據量。
l 壓縮編碼能夠下降磁盤存儲空間。因爲同一列的數據類型是同樣的,可使用更高效的壓縮編碼(例如Run Length Encoding和Delta Encoding)進一步節約存儲空間。
l 只讀取須要的列,支持向量運算,可以獲取更好的掃描性能。
l Parquet格式是Spark SQL的默認數據源,可經過spark.sql.sources.default配置
(*)通用的Load/Save函數
l 讀取Parquet文件
val usersDF = spark.read.load("/root/resources/users.parquet")
l 查詢Schema和數據
l 查詢用戶的name和喜好顏色,並保存
usersDF.select($"name",$"favorite_color").write.save("/root/result/parquet")
l 驗證結果
(*)顯式指定文件格式:加載json格式
l 直接加載:val usersDF = spark.read.load("/root/resources/people.json")
會出錯
l val usersDF = spark.read.format("json").load("/root/resources/people.json")
(*)存儲模式(Save Modes)
能夠採用SaveMode執行存儲操做,SaveMode定義了對數據的處理模式。須要注意的是,這些保存模式不使用任何鎖定,不是原子操做。此外,當使用Overwrite方式執行時,在輸出新數據以前原數據就已經被刪除。SaveMode詳細介紹以下表:
Demo:
usersDF.select($"name").write.save("/root/result/parquet1")
--> 出錯:由於/root/result/parquet1已經存在
usersDF.select($"name").write.mode("overwrite").save("/root/result/parquet1")
(*)將結果保存爲表
usersDF.select($"name").write.saveAsTable("table1")
也能夠進行分區、分桶等操做:partitionBy、bucketBy
Parquet是一個列格式並且用於多個數據處理系統中。Spark SQL提供支持對於Parquet文件的讀寫,也就是自動保存原始數據的schema。當寫Parquet文件時,全部的列被自動轉化爲nullable,由於兼容性的緣故。
(*)案例:
讀入json格式的數據,將其轉換成parquet格式,並建立相應的表來使用SQL進行查詢。
(*)Schema的合併:
Parquet支持Schema evolution(Schema演變,即:合併)。用戶能夠先定義一個簡單的Schema,而後逐漸的向Schema中增長列描述。經過這種方式,用戶能夠獲取多個有不一樣Schema但相互兼容的Parquet文件。
Demo:
Spark SQL能自動解析JSON數據集的Schema,讀取JSON數據集爲DataFrame格式。讀取JSON數據集方法爲SQLContext.read().json()。該方法將String格式的RDD或JSON文件轉換爲DataFrame。
須要注意的是,這裏的JSON文件不是常規的JSON格式。JSON文件每一行必須包含一個獨立的、自知足有效的JSON對象。若是用多行描述一個JSON對象,會致使讀取出錯。讀取JSON數據集示例以下:
(*)Demo1:使用Spark自帶的示例文件 --> people.json 文件
定義路徑:
val path ="/root/resources/people.json"
讀取Json文件,生成DataFrame:
val peopleDF = spark.read.json(path)
打印Schema結構信息:
peopleDF.printSchema()
建立臨時視圖:
peopleDF.createOrReplaceTempView("people")
執行查詢
spark.sql("SELECT name FROM people WHERE age=19").show
Spark SQL一樣支持經過JDBC讀取其餘數據庫的數據做爲數據源。
Demo演示:使用Spark SQL讀取Oracle數據庫中的表。
l 啓動Spark Shell的時候,指定Oracle數據庫的驅動
spark-shell --master spark://spark81:7077 \\
--jars /root/temp/ojdbc6.jar \\
--driver-class-path /root/temp/ojdbc6.jar
讀取Oracle數據庫中的數據
(*)方式一:
val oracleDF = spark.read.format("jdbc").
option("url","jdbc:oracle:thin:@192.168.88.101:1521/orcl.example.com").
option("dbtable","scott.emp").
option("user","scott").
option("password","tiger").
load
(*)方式二:
導入須要的類:
import java.util.Properties
定義屬性:
val oracleprops = new Properties()
oracleprops.setProperty("user","scott")
oracleprops.setProperty("password","tiger")
讀取數據:
val oracleEmpDF =
spark.read.jdbc("jdbc:oracle:thin:@192.168.88.101:1521/orcl.example.com",
"scott.emp",oracleprops)
注意:下面是讀取Oracle 10g(Windows 上)的步驟
l 首先,搭建好Hive的環境(須要Hadoop)
l 配置Spark SQL支持Hive
只須要將如下文件拷貝到$SPARK_HOME/conf的目錄下,便可
$HIVE_HOME/conf/hive-site.xml
$HADOOP_CONF_DIR/core-site.xml
$HADOOP_CONF_DIR/hdfs-site.xml
l 使用Spark Shell操做Hive
啓動Spark Shell的時候,須要使用--jars指定mysql的驅動程序
建立表
spark.sql("create table src (key INT, value STRING) row format delimited fields terminated by ','")
導入數據
spark.sql("load data local path '/root/temp/data.txt' into table src")
查詢數據
spark.sql("select * from src").show
l 使用spark-sql操做Hive
啓動spark-sql的時候,須要使用--jars指定mysql的驅動程序
操做Hive
show tables;
select * from 表;
性能調優主要是將數據放入內存中操做。經過spark.cacheTable("tableName")或者dataFrame.cache()。使用spark.uncacheTable("tableName")來從內存中去除table。
Demo案例:
(*)從Oracle數據庫中讀取數據,生成DataFrame
val oracleDF = spark.read.format("jdbc")
.option("url","jdbc:oracle:thin:@192.168.88.101:1521/orcl.example.com")
.option("dbtable","scott.emp")
.option("user","scott")
.option("password","tiger").load
(*)將DataFrame註冊成表: oracleDF.registerTempTable("emp")
(*)執行查詢,並經過Web Console監控執行的時間
spark.sql("select * from emp").show
(*)將表進行緩存,並查詢兩次,並經過Web Console監控執行的時間
spark.sqlContext.cacheTable("emp")
(*)清空緩存:
spark.sqlContext.cacheTable("emp")
spark.sqlContext.clearCache
1.將數據緩存到內存中的相關優化參數
(1)spark.sql.inMemoryColumnarStorage.compressed
默認爲 true
Spark SQL 將會基於統計信息自動地爲每一列選擇一種壓縮編碼方式。
(2)spark.sql.inMemoryColumnarStorage.batchSize
默認值:10000
緩存批處理大小。緩存數據時, 較大的批處理大小能夠提升內存利用率和壓縮率,但同時也會帶來 OOM(Out Of Memory)的風險。
2.其餘性能相關的配置選項(不過不推薦手動修改,可能在後續版本自動的自適應修改)
(1)spark.sql.files.maxPartitionBytes
默認值:128 MB
讀取文件時單個分區可容納的最大字節數
(2)spark.sql.files.openCostInBytes
默認值:4M
打開文件的估算成本, 按照同一時間可以掃描的字節數來測量。當往一個分區寫入多個文件的時候會使用。高估更好, 這樣的話小文件分區將比大文件分區更快 (先被調度)。
(3)spark.sql.autoBroadcastJoinThreshold
默認值:10M
用於配置一個表在執行 join 操做時可以廣播給全部 worker 節點的最大字節大小。經過將這個值設置爲 -1 能夠禁用廣播。注意,當前數據統計僅支持已經運行了 ANALYZE TABLE <tableName> COMPUTE STATISTICS noscan 命令的 Hive Metastore 表。
(4)spark.sql.shuffle.partitions
默認值:200
用於配置 join 或聚合操做混洗(shuffle)數據時使用的分區數。
package sparksql import org.apache.spark.sql.{Row, SQLContext, SaveMode, SparkSession} import org.apache.spark.sql.types._ import org.apache.spark.{SparkConf, SparkContext} object SpecifyingSchema { def main(args: Array[String]) { //建立Spark Session對象 val spark = SparkSession.builder().master("local").appName("UnderstandingSparkSession").getOrCreate() //從指定的地址建立RDD val personRDD = spark.sparkContext.textFile("D:\\temp\\student.txt").map(_.split(" ")) //經過StructType直接指定每一個字段的schema val schema = StructType( List( StructField("id", IntegerType, true), StructField("name", StringType, true), StructField("age", IntegerType, true) ) ) //將RDD映射到rowRDD val rowRDD = personRDD.map(p => Row(p(0).toInt, p(1).trim, p(2).toInt)) //將schema信息應用到rowRDD上 val personDataFrame = spark.createDataFrame(rowRDD, schema) //註冊表 personDataFrame.createOrReplaceTempView("t_person") //執行SQL val df = spark.sql("select * from t_person order by age desc limit 4") //顯示結果 df.show() //中止Spark Context spark.stop() } }
package demo import org.apache.spark.sql.SparkSession //使用case class object Demo2 { def main(args: Array[String]): Unit = { //建立SparkSession val spark = SparkSession.builder().master("local").appName("My Demo 1").getOrCreate() //從指定的文件中讀取數據,生成對應的RDD val lineRDD = spark.sparkContext.textFile("d:\\temp\\student.txt").map(_.split(" ")) //將RDD和case class 關聯 val studentRDD = lineRDD.map( x => Student(x(0).toInt,x(1),x(2).toInt)) //生成 DataFrame,經過RDD 生成DF,導入隱式轉換 import spark.sqlContext.implicits._ val studentDF = studentRDD.toDF //註冊表 視圖 studentDF.createOrReplaceTempView("student") //執行SQL spark.sql("select * from student").show() spark.stop() } } //case class 必定放在外面 case class Student(stuID:Int,stuName:String,stuAge:Int)
package demo import java.util.Properties import org.apache.spark.sql.{Row, SparkSession} import org.apache.spark.sql.types.{IntegerType, StringType, StructField, StructType} //做用:讀取本地一個文件, 生成對應 DataFrame,註冊表 object Demo1 { def main(args: Array[String]): Unit = { //建立SparkSession val spark = SparkSession.builder().master("local").appName("My Demo 1").getOrCreate() //從指定的文件中讀取數據,生成對應的RDD val personRDD = spark.sparkContext.textFile("d:\\temp\\student.txt").map(_.split(" ")) //建立schema ,經過StructType val schema = StructType( List( StructField("personID",IntegerType,true), StructField("personName",StringType,true), StructField("personAge",IntegerType,true) ) ) //將RDD映射到Row RDD 行的數據上 val rowRDD = personRDD.map(p => Row(p(0).toInt,p(1).trim,p(2).toInt)) //生成DataFrame val personDF = spark.createDataFrame(rowRDD,schema) //將DF註冊成表 personDF.createOrReplaceTempView("myperson") //執行SQL val result = spark.sql("select * from myperson") //顯示 //result.show() //將結果保存到oracle中 val props = new Properties() props.setProperty("user","scott") props.setProperty("password","tiger") result.write.jdbc("jdbc:oracle:thin:@192.168.88.101:1521/orcl.example.com","scott.myperson",props) //若是表已經存在,append的方式數據 //result.write.mode("append").jdbc("jdbc:oracle:thin:@192.168.88.101:1521/orcl.example.com","scott.myperson",props) //中止spark context spark.stop() } }