SparkSQL簡介及入門

SparkSQL簡介及入門

1、概述

    Spark爲結構化數據處理引入了一個稱爲Spark SQL的編程模塊。它提供了一個稱爲DataFrame(數據框)的編程抽象,DF的底層仍然是RDD,而且能夠充當分佈式SQL查詢引擎。java

一、SparkSQL的由來

    SparkSQL的前身是Shark。在Hadoop發展過程當中,爲了給熟悉RDBMS但又不理解MapReduce的技術人員提供快速上手的工具,Hive應運而生,是當時惟一運行在hadoop上的SQL-on-Hadoop工具。可是,MapReduce計算過程當中大量的中間磁盤落地過程消耗了大量的I/O,運行效率較低。mysql

    後來,爲了提升SQL-on-Hadoop的效率,大量的SQL-on-Hadoop工具開始產生,其中表現較爲突出的是:es6

    1)MapR的Drill算法

    2)Cloudera的Impalasql

    3)Shark數據庫

    其中Shark是伯克利實驗室Spark生態環境的組件之一,它基於Hive實施了一些改進,好比引入緩存管理,改進和優化執行器等,並使之能運行在Spark引擎上,從而使得SQL查詢的速度獲得10-100倍的提高。apache

 

    可是,隨着Spark的發展,對於野心勃勃的Spark團隊來講,Shark對於hive的太多依賴(如採用hive的語法解析器、查詢優化器等等),制約了Spark的One Stack rule them all的既定方針,制約了spark各個組件的相互集成,因此提出了sparkSQL項目。編程

    SparkSQL拋棄原有Shark的代碼,汲取了Shark的一些優勢,如內存列存儲(In-Memory Columnar Storage)、Hive兼容性等,從新開發了SparkSQL代碼。json

    因爲擺脫了對hive的依賴性,SparkSQL不管在數據兼容、性能優化、組件擴展方面都獲得了極大的方便。數組

    2014年6月1日,Shark項目和SparkSQL項目的主持人Reynold Xin宣佈:中止對Shark的開發,團隊將全部資源放SparkSQL項目上,至此,Shark的發展畫上了句話。

二、SparkSql特色

    1)引入了新的RDD類型SchemaRDD,能夠像傳統數據庫定義表同樣來定義SchemaRDD。

    2)在應用程序中能夠混合使用不一樣來源的數據,如能夠未來自HiveQL的數據和來自SQL的數據進行Join操做。

    3)內嵌了查詢優化框架,在把SQL解析成邏輯執行計劃以後,最後變成RDD的計算。

2、列存儲相關

    爲何sparkSQL的性能會獲得怎麼大的提高呢?

    主要sparkSQL在下面幾點作了優化:

一、內存列存儲(In-Memory Columnar Storage)

    SparkSQL的表數據在內存中存儲不是採用原生態的JVM對象存儲方式,而是採用內存列存儲,以下圖所示。

    該存儲方式不管在空間佔用量和讀取吞吐率上都佔有很大優點。

    對於原生態的JVM對象存儲方式,每一個對象一般要增長12-16字節的額外開銷(toString、hashcode等方法),如對於一個270MB的電商的商品表數據,使用這種方式讀入內存,要使用970MB左右的內存空間(一般是2~5倍於原生數據空間)。

    另外,使用這種方式,每一個數據記錄產生一個JVM對象,若是是大小爲200GB的數據記錄,堆棧將產生1.6億個對象,這麼多的對象,對於GC來講,可能要消耗幾分鐘的時間來處理(JVM的垃圾收集時間與堆棧中的對象數量呈線性相關。顯然這種內存存儲方式對於基於內存計算的spark來講,很昂貴也負擔不起)

二、SparkSql的存儲方式

    對於內存列存儲來講,將全部原生數據類型的列採用原生數組來存儲,將Hive支持的複雜數據類型(如array、map等)先序化後並接成一個字節數組來存儲。

    此外,基於列存儲,每列數據都是同質的,因此能夠數據類型轉換的CPU消耗。此外,能夠採用高效的壓縮算法來壓縮,是的數據更少。好比針對二元數據列,能夠用字節編碼壓縮來實現(010101)

    這樣,每一個列建立一個JVM對象,從而能夠快速的GC和緊湊的數據存儲;額外的,還可使用低廉CPU開銷的高效壓縮方法(如字典編碼、行長度編碼等壓縮方法)下降內存開銷;更有趣的是,對於分析查詢中頻繁使用的聚合特定列,性能會獲得很大的提升,緣由就是這些列的數據放在一塊兒,更容易讀入內存進行計算。

三、行存儲VS列存儲

    目前大數據存儲有兩種方案可供選擇:行存儲(Row-Based)和列存儲(Column-Based)。 業界對兩種存儲方案有不少爭持,集中焦點是:誰可以更有效地處理海量數據,且兼顧安全、可靠、完整性。從目前發展狀況看,關係數據庫已經不適應這種巨大的存儲量和計算要求,基本是淘汰出局。在已知的幾種大數據處理軟件中,Hadoop的HBase採用列存儲,MongoDB是文檔型的行存儲,Lexst是二進制型的行存儲。

1.列存儲

    什麼是列存儲?

    列式存儲(column-based)是相對於傳統關係型數據庫的行式存儲(Row-basedstorage)來講的。簡單來講二者的區別就是如何組織表:

    Row-based storage stores atable in a sequence of rows.

    Column-based storage storesa table in a sequence of columns.

 

    從上圖能夠很清楚地看到,行式存儲下一張表的數據都是放在一塊兒的,但列式存儲下都被分開保存了。因此它們就有了以下這些優缺點對比:

1>在數據寫入上的對比

    1)行存儲的寫入是一次完成。若是這種寫入創建在操做系統的文件系統上,能夠保證寫入過程的成功或者失敗,數據的完整性所以能夠肯定。

    2)列存儲因爲須要把一行記錄拆分紅單列保存,寫入次數明顯比行存儲多(意味着磁頭調度次數多,而磁頭調度是須要時間的,通常在1ms~10ms),再加上磁頭須要在盤片上移動和定位花費的時間,實際時間消耗會更大。因此,行存儲在寫入上佔有很大的優點。

    3)還有數據修改,這實際也是一次寫入過程。不一樣的是,數據修改是對磁盤上的記錄作刪除標記。行存儲是在指定位置寫入一次,列存儲是將磁盤定位到多個列上分別寫入,這個過程還是行存儲的列數倍。因此,數據修改也是以行存儲佔優。

2>在數據讀取上的對比

    1)數據讀取時,行存儲一般將一行數據徹底讀出,若是隻須要其中幾列數據的狀況,就會存在冗餘列,出於縮短處理時間的考量,消除冗餘列的過程一般是在內存中進行的。

    2)列存儲每次讀取的數據是集合的一段或者所有,不存在冗餘性問題。

    3) 兩種存儲的數據分佈。因爲列存儲的每一列數據類型是同質的,不存在二義性問題。好比說某列數據類型爲整型(int),那麼它的數據集合必定是整型數據。這種狀況使數據解析變得十分容易。相比之下,行存儲則要複雜得多,由於在一行記錄中保存了多種類型的數據,數據解析須要在多種數據類型之間頻繁轉換,這個操做很消耗CPU,增長了解析的時間。因此,列存儲的解析過程更有利於分析大數據。

    4)從數據的壓縮以及更性能的讀取來對比

2.優缺點

    顯而易見,兩種存儲格式都有各自的優缺點:

    1)行存儲的寫入是一次性完成,消耗的時間比列存儲少,而且可以保證數據的完整性,缺點是數據讀取過程當中會產生冗餘數據,若是隻有少許數據,此影響能夠忽略;數量大可能會影響到數據的處理效率。

    2)列存儲在寫入效率、保證數據完整性上都不如行存儲,它的優點是在讀取過程,不會產生冗餘數據,這對數據完整性要求不高的大數據處理領域,好比互聯網,猶爲重要。

兩種存儲格式各自的特性都決定了它們的使用場景。

四、列存儲的適用場景

    1)通常來講,一個OLAP類型的查詢可能須要訪問幾百萬甚至幾十億個數據行,且該查詢每每只關心少數幾個數據列。例如,查詢今年銷量最高的前20個商品,這個查詢只關心三個數據列:時間(date)、商品(item)以及銷售量(sales amount)。商品的其餘數據列,例如商品URL、商品描述、商品所屬店鋪,等等,對這個查詢都是沒有意義的。

    而列式數據庫只須要讀取存儲着「時間、商品、銷量」的數據列,而行式數據庫須要讀取全部的數據列。所以,列式數據庫大大地提升了OLAP大數據量查詢的效率

    OLTP    OnLine Transaction Processor 在線聯機事務處理系統(好比Mysql,Oracle等產品)

    OLAP    OnLine Analaysier Processor  在線聯機分析處理系統(好比Hive  Hbase等)

    2)不少列式數據庫還支持列族(column group,Bigtable系統中稱爲locality group),即將多個常常一塊兒訪問的數據列的各個值存放在一塊兒。若是讀取的數據列屬於相同的列族,列式數據庫能夠從相同的地方一次性讀取多個數據列的值,避免了多個數據列的合併。列族是一種行列混合存儲模式,這種模式可以同時知足OLTP和OLAP的查詢需求。

    3)此外,因爲同一個數據列的數據重複度很高,所以,列式數據庫壓縮時有很大的優點。

    例如,Google Bigtable列式數據庫對網頁庫壓縮能夠達到15倍以上的壓縮率。另外,能夠針對列式存儲作專門的索引優化。好比,性別列只有兩個值,「男」和「女」,能夠對這一列創建位圖索引:

    以下圖所示

    「男」對應的位圖爲100101,表示第一、四、6行值爲「男」

    「女」對應的位圖爲011010,表示第二、三、5行值爲「女」

    若是須要查找男性或者女性的個數,只須要統計相應的位圖中1出現的次數便可。另外,創建位圖索引後0和1的重複度高,能夠採用專門的編碼方式對其進行壓縮。

    固然,若是每次查詢涉及的數據量較小或者大部分查詢都須要整行的數據,列式數據庫並不適用。

五、總結

1.行存儲特性

    傳統行式數據庫的特性以下:

    ①數據是按行存儲的。

    ②沒有索引的查詢使用大量I/O。好比通常的數據庫表都會創建索引,經過索引加快查詢效率。

    ③創建索引和物化視圖須要花費大量的時間和資源。

    ④面對查詢需求,數據庫必須被大量膨脹才能知足需求。

2.列存儲特性

    列式數據庫的特性以下:

    ①數據按列存儲,即每一列單獨存放。

    ②數據即索引。

    ③只訪問查詢涉及的列,能夠大量下降系統I/O。

    ④每一列由一個線程來處理,即查詢的併發處理性能高。

    ⑤數據類型一致,數據特徵類似,能夠高效壓縮。好比有增量壓縮、前綴壓縮算法都是基於列存儲的類型定製的,因此能夠大幅度提升壓縮比,有利於存儲和網絡輸出數據帶寬的消耗。

3、SparkSQL入門

    SparkSql將RDD封裝成一個DataFrame對象,這個對象相似於關係型數據庫中的表。

一、建立DataFrame對象

    DataFrame就至關於數據庫的一張表。它是個只讀的表,不能在運算過程再往裏加元素。

    RDD.toDF(「列名」)

scala> val rdd = sc.parallelize(List(1,2,3,4,5,6))
rdd: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[0] at parallelize at <console>:21
scala> rdd.toDF("id")
res0: org.apache.spark.sql.DataFrame = [id: int]
scala> res0.show#默認只顯示20條數據
+---+
| id|
+---+
|  1|
|  2|
|  3|
|  4|
|  5|
|  6|
+---+
scala> res0.printSchema #查看列的類型等屬性
root
|-- id: integer (nullable = true)

    建立多列DataFrame對象

    DataFrame就至關於數據庫的一張表。

scala> sc.parallelize(List( (1,"beijing"),(2,"shanghai") ) )
res3: org.apache.spark.rdd.RDD[(Int, String)] = ParallelCollectionRDD[5] at parallelize at <console>:22
scala> res3.toDF("id","name")
res4: org.apache.spark.sql.DataFrame = [id: int, name: string]
scala> res4.show
+---+--------+
| id| name|
+---+--------+
|  1| beijing|
|  2|shanghai|
+---+--------+

    例如3列的

scala> sc.parallelize(List( (1,"beijing",100780),(2,"shanghai",560090),(3,"xi'an",600329)))
res6: org.apache.spark.rdd.RDD[(Int, String, Int)] = ParallelCollectionRDD[10] at parallelize at <console>:22
scala> res6.toDF("id","name","postcode")
res7: org.apache.spark.sql.DataFrame = [id: int, name: string, postcode: int]
scala> res7.show
+---+--------+--------+
| id|    name|postcode|
+---+--------+--------+
|  1| beijing|  100780|
|  2|shanghai|  560090|
|  3|   xi'an|  600329|
+---+--------+--------+

    能夠看出,須要構建幾列,tuple就有幾個內容。

二、由外部文件構造DataFrame對象

1.讀取txt文件

    txt文件不能直接轉換成,先利用RDD轉換爲tuple。而後toDF()轉換爲DataFrame。

scala> val rdd = sc.textFile("/root/words.txt")
.map( x => (x,1) )
.reduceByKey( (x,y) => x+y )
rdd: org.apache.spark.rdd.RDD[(String, Int)] = ShuffledRDD[18] at reduceByKey at <console>:21
 
scala> rdd.toDF("word","count")
res9: org.apache.spark.sql.DataFrame = [word: string, count: int]
 
scala> res9.show
+------+-----+
|  word|count|
+------+-----+
| spark|    3|
|  hive|    1|
|hadoop|    2|
|   big|    2|
|  scla|    1|
|  data|    1|
+------+-----+

2.讀取json文件

    文件代碼:

{"id":1, "name":"leo", "age":18}
{"id":2, "name":"jack", "age":19}
{"id":3, "name":"marry", "age":17}

    實現:

import org.apache.spark.sql.SQLContext
scala>val sqc=new SQLContext(sc)
scala> val tb4=sqc.read.json("/home/software/people.json")
scala> tb4.show

3.讀取parquet文件

    格式以下:

1>Parquet數據格式

    Parquet是一種列式存儲格式,能夠被多種查詢引擎支持(Hive、Impala、Drill等),而且它是語言和平臺無關的。

    Parquet文件下載後是否能夠直接讀取和修改呢?

    Parquet文件是以二進制方式存儲的,是不能夠直接讀取和修改的。Parquet文件是自解析的,文件中包括該文件的數據和元數據。

    列式存儲和行式存儲相比有哪些優點呢?

    能夠只讀取須要的數據,下降IO數據量;

    壓縮編碼能夠下降磁盤存儲空間。因爲同一列的數據類型是同樣的,可使用更高效的壓縮編碼進一步節約存儲空間。

    參考連接:

http://blog.csdn.net/yu616568/article/details/51868447 講解了parquet文件格式

http://www.infoq.com/cn/articles/in-depth-analysis-of-parquet-column-storage-format 講解了parquet列式存儲。

    實現:

scala>val tb5=sqc.read.parquet("/home/software/users.parquet")
scala> tb5.show

 

4.jdbc讀取

    實現步驟:

    1)將mysql 的驅動jar上傳到spark的jars目錄下

    2)重啓spark服務

    3)進入spark客戶端

    4)執行代碼,好比在Mysql數據庫下,有一個test庫,在test庫下有一張表爲tabx

    執行代碼:

import org.apache.spark.sql.SQLContext
scala> val sqc = new SQLContext(sc);
scala> val prop = new java.util.Properties
scala> prop.put("user","root")
scala> prop.put("password","root")
scala>val tabx=sqc.read.jdbc("jdbc:mysql://hadoop01:3306/test","tabx",prop)
scala> tabx.show
+---+----+
| id|name|
+---+----+
|  1| aaa|
|  2| bbb|
|  3| ccc|
|  1| ddd|
|  2| eee|
|  3| fff|
+---+----+

    注:若是報權限不足,則進入mysql,執行:

grant all privileges on *.* to 'root'@'hadoop01' identified by 'root' with grant option;

    而後執行:

flush privileges;

上一篇:Spark Shuffle

下一篇:SparkSQL語法及API

相關文章
相關標籤/搜索