Spark爲結構化數據處理引入了一個稱爲Spark SQL的編程模塊。它提供了一個稱爲DataFrame(數據框)的編程抽象,DF的底層仍然是RDD,而且能夠充當分佈式SQL查詢引擎。java
SparkSQL的前身是Shark。在Hadoop發展過程當中,爲了給熟悉RDBMS但又不理解MapReduce的技術人員提供快速上手的工具,Hive應運而生,是當時惟一運行在hadoop上的SQL-on-Hadoop工具。可是,MapReduce計算過程當中大量的中間磁盤落地過程消耗了大量的I/O,運行效率較低。mysql
後來,爲了提升SQL-on-Hadoop的效率,大量的SQL-on-Hadoop工具開始產生,其中表現較爲突出的是:es6
1)MapR的Drill算法
2)Cloudera的Impalasql
3)Shark數據庫
其中Shark是伯克利實驗室Spark生態環境的組件之一,它基於Hive實施了一些改進,好比引入緩存管理,改進和優化執行器等,並使之能運行在Spark引擎上,從而使得SQL查詢的速度獲得10-100倍的提高。apache
可是,隨着Spark的發展,對於野心勃勃的Spark團隊來講,Shark對於hive的太多依賴(如採用hive的語法解析器、查詢優化器等等),制約了Spark的One Stack rule them all的既定方針,制約了spark各個組件的相互集成,因此提出了sparkSQL項目。編程
SparkSQL拋棄原有Shark的代碼,汲取了Shark的一些優勢,如內存列存儲(In-Memory Columnar Storage)、Hive兼容性等,從新開發了SparkSQL代碼。json
因爲擺脫了對hive的依賴性,SparkSQL不管在數據兼容、性能優化、組件擴展方面都獲得了極大的方便。數組
2014年6月1日,Shark項目和SparkSQL項目的主持人Reynold Xin宣佈:中止對Shark的開發,團隊將全部資源放SparkSQL項目上,至此,Shark的發展畫上了句話。
1)引入了新的RDD類型SchemaRDD,能夠像傳統數據庫定義表同樣來定義SchemaRDD。
2)在應用程序中能夠混合使用不一樣來源的數據,如能夠未來自HiveQL的數據和來自SQL的數據進行Join操做。
3)內嵌了查詢優化框架,在把SQL解析成邏輯執行計劃以後,最後變成RDD的計算。
爲何sparkSQL的性能會獲得怎麼大的提高呢?
主要sparkSQL在下面幾點作了優化:
SparkSQL的表數據在內存中存儲不是採用原生態的JVM對象存儲方式,而是採用內存列存儲,以下圖所示。
該存儲方式不管在空間佔用量和讀取吞吐率上都佔有很大優點。
對於原生態的JVM對象存儲方式,每一個對象一般要增長12-16字節的額外開銷(toString、hashcode等方法),如對於一個270MB的電商的商品表數據,使用這種方式讀入內存,要使用970MB左右的內存空間(一般是2~5倍於原生數據空間)。
另外,使用這種方式,每一個數據記錄產生一個JVM對象,若是是大小爲200GB的數據記錄,堆棧將產生1.6億個對象,這麼多的對象,對於GC來講,可能要消耗幾分鐘的時間來處理(JVM的垃圾收集時間與堆棧中的對象數量呈線性相關。顯然這種內存存儲方式對於基於內存計算的spark來講,很昂貴也負擔不起)
對於內存列存儲來講,將全部原生數據類型的列採用原生數組來存儲,將Hive支持的複雜數據類型(如array、map等)先序化後並接成一個字節數組來存儲。
此外,基於列存儲,每列數據都是同質的,因此能夠數據類型轉換的CPU消耗。此外,能夠採用高效的壓縮算法來壓縮,是的數據更少。好比針對二元數據列,能夠用字節編碼壓縮來實現(010101)
這樣,每一個列建立一個JVM對象,從而能夠快速的GC和緊湊的數據存儲;額外的,還可使用低廉CPU開銷的高效壓縮方法(如字典編碼、行長度編碼等壓縮方法)下降內存開銷;更有趣的是,對於分析查詢中頻繁使用的聚合特定列,性能會獲得很大的提升,緣由就是這些列的數據放在一塊兒,更容易讀入內存進行計算。
目前大數據存儲有兩種方案可供選擇:行存儲(Row-Based)和列存儲(Column-Based)。 業界對兩種存儲方案有不少爭持,集中焦點是:誰可以更有效地處理海量數據,且兼顧安全、可靠、完整性。從目前發展狀況看,關係數據庫已經不適應這種巨大的存儲量和計算要求,基本是淘汰出局。在已知的幾種大數據處理軟件中,Hadoop的HBase採用列存儲,MongoDB是文檔型的行存儲,Lexst是二進制型的行存儲。
什麼是列存儲?
列式存儲(column-based)是相對於傳統關係型數據庫的行式存儲(Row-basedstorage)來講的。簡單來講二者的區別就是如何組織表:
Row-based storage stores atable in a sequence of rows.
Column-based storage storesa table in a sequence of columns.
從上圖能夠很清楚地看到,行式存儲下一張表的數據都是放在一塊兒的,但列式存儲下都被分開保存了。因此它們就有了以下這些優缺點對比:
1)行存儲的寫入是一次完成。若是這種寫入創建在操做系統的文件系統上,能夠保證寫入過程的成功或者失敗,數據的完整性所以能夠肯定。
2)列存儲因爲須要把一行記錄拆分紅單列保存,寫入次數明顯比行存儲多(意味着磁頭調度次數多,而磁頭調度是須要時間的,通常在1ms~10ms),再加上磁頭須要在盤片上移動和定位花費的時間,實際時間消耗會更大。因此,行存儲在寫入上佔有很大的優點。
3)還有數據修改,這實際也是一次寫入過程。不一樣的是,數據修改是對磁盤上的記錄作刪除標記。行存儲是在指定位置寫入一次,列存儲是將磁盤定位到多個列上分別寫入,這個過程還是行存儲的列數倍。因此,數據修改也是以行存儲佔優。
1)數據讀取時,行存儲一般將一行數據徹底讀出,若是隻須要其中幾列數據的狀況,就會存在冗餘列,出於縮短處理時間的考量,消除冗餘列的過程一般是在內存中進行的。
2)列存儲每次讀取的數據是集合的一段或者所有,不存在冗餘性問題。
3) 兩種存儲的數據分佈。因爲列存儲的每一列數據類型是同質的,不存在二義性問題。好比說某列數據類型爲整型(int),那麼它的數據集合必定是整型數據。這種狀況使數據解析變得十分容易。相比之下,行存儲則要複雜得多,由於在一行記錄中保存了多種類型的數據,數據解析須要在多種數據類型之間頻繁轉換,這個操做很消耗CPU,增長了解析的時間。因此,列存儲的解析過程更有利於分析大數據。
4)從數據的壓縮以及更性能的讀取來對比
顯而易見,兩種存儲格式都有各自的優缺點:
1)行存儲的寫入是一次性完成,消耗的時間比列存儲少,而且可以保證數據的完整性,缺點是數據讀取過程當中會產生冗餘數據,若是隻有少許數據,此影響能夠忽略;數量大可能會影響到數據的處理效率。
2)列存儲在寫入效率、保證數據完整性上都不如行存儲,它的優點是在讀取過程,不會產生冗餘數據,這對數據完整性要求不高的大數據處理領域,好比互聯網,猶爲重要。
兩種存儲格式各自的特性都決定了它們的使用場景。
1)通常來講,一個OLAP類型的查詢可能須要訪問幾百萬甚至幾十億個數據行,且該查詢每每只關心少數幾個數據列。例如,查詢今年銷量最高的前20個商品,這個查詢只關心三個數據列:時間(date)、商品(item)以及銷售量(sales amount)。商品的其餘數據列,例如商品URL、商品描述、商品所屬店鋪,等等,對這個查詢都是沒有意義的。
而列式數據庫只須要讀取存儲着「時間、商品、銷量」的數據列,而行式數據庫須要讀取全部的數據列。所以,列式數據庫大大地提升了OLAP大數據量查詢的效率
OLTP OnLine Transaction Processor 在線聯機事務處理系統(好比Mysql,Oracle等產品)
OLAP OnLine Analaysier Processor 在線聯機分析處理系統(好比Hive Hbase等)
2)不少列式數據庫還支持列族(column group,Bigtable系統中稱爲locality group),即將多個常常一塊兒訪問的數據列的各個值存放在一塊兒。若是讀取的數據列屬於相同的列族,列式數據庫能夠從相同的地方一次性讀取多個數據列的值,避免了多個數據列的合併。列族是一種行列混合存儲模式,這種模式可以同時知足OLTP和OLAP的查詢需求。
3)此外,因爲同一個數據列的數據重複度很高,所以,列式數據庫壓縮時有很大的優點。
例如,Google Bigtable列式數據庫對網頁庫壓縮能夠達到15倍以上的壓縮率。另外,能夠針對列式存儲作專門的索引優化。好比,性別列只有兩個值,「男」和「女」,能夠對這一列創建位圖索引:
以下圖所示
「男」對應的位圖爲100101,表示第一、四、6行值爲「男」
「女」對應的位圖爲011010,表示第二、三、5行值爲「女」
若是須要查找男性或者女性的個數,只須要統計相應的位圖中1出現的次數便可。另外,創建位圖索引後0和1的重複度高,能夠採用專門的編碼方式對其進行壓縮。
固然,若是每次查詢涉及的數據量較小或者大部分查詢都須要整行的數據,列式數據庫並不適用。
傳統行式數據庫的特性以下:
①數據是按行存儲的。
②沒有索引的查詢使用大量I/O。好比通常的數據庫表都會創建索引,經過索引加快查詢效率。
③創建索引和物化視圖須要花費大量的時間和資源。
④面對查詢需求,數據庫必須被大量膨脹才能知足需求。
列式數據庫的特性以下:
①數據按列存儲,即每一列單獨存放。
②數據即索引。
③只訪問查詢涉及的列,能夠大量下降系統I/O。
④每一列由一個線程來處理,即查詢的併發處理性能高。
⑤數據類型一致,數據特徵類似,能夠高效壓縮。好比有增量壓縮、前綴壓縮算法都是基於列存儲的類型定製的,因此能夠大幅度提升壓縮比,有利於存儲和網絡輸出數據帶寬的消耗。
SparkSql將RDD封裝成一個DataFrame對象,這個對象相似於關係型數據庫中的表。
DataFrame就至關於數據庫的一張表。它是個只讀的表,不能在運算過程再往裏加元素。
RDD.toDF(「列名」)
scala> val rdd = sc.parallelize(List(1,2,3,4,5,6)) rdd: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[0] at parallelize at <console>:21 scala> rdd.toDF("id") res0: org.apache.spark.sql.DataFrame = [id: int] scala> res0.show#默認只顯示20條數據 +---+ | id| +---+ | 1| | 2| | 3| | 4| | 5| | 6| +---+ scala> res0.printSchema #查看列的類型等屬性 root |-- id: integer (nullable = true)
建立多列DataFrame對象
DataFrame就至關於數據庫的一張表。
scala> sc.parallelize(List( (1,"beijing"),(2,"shanghai") ) ) res3: org.apache.spark.rdd.RDD[(Int, String)] = ParallelCollectionRDD[5] at parallelize at <console>:22 scala> res3.toDF("id","name") res4: org.apache.spark.sql.DataFrame = [id: int, name: string] scala> res4.show +---+--------+ | id| name| +---+--------+ | 1| beijing| | 2|shanghai| +---+--------+
例如3列的
scala> sc.parallelize(List( (1,"beijing",100780),(2,"shanghai",560090),(3,"xi'an",600329))) res6: org.apache.spark.rdd.RDD[(Int, String, Int)] = ParallelCollectionRDD[10] at parallelize at <console>:22 scala> res6.toDF("id","name","postcode") res7: org.apache.spark.sql.DataFrame = [id: int, name: string, postcode: int] scala> res7.show +---+--------+--------+ | id| name|postcode| +---+--------+--------+ | 1| beijing| 100780| | 2|shanghai| 560090| | 3| xi'an| 600329| +---+--------+--------+
能夠看出,須要構建幾列,tuple就有幾個內容。
txt文件不能直接轉換成,先利用RDD轉換爲tuple。而後toDF()轉換爲DataFrame。
scala> val rdd = sc.textFile("/root/words.txt") .map( x => (x,1) ) .reduceByKey( (x,y) => x+y ) rdd: org.apache.spark.rdd.RDD[(String, Int)] = ShuffledRDD[18] at reduceByKey at <console>:21 scala> rdd.toDF("word","count") res9: org.apache.spark.sql.DataFrame = [word: string, count: int] scala> res9.show +------+-----+ | word|count| +------+-----+ | spark| 3| | hive| 1| |hadoop| 2| | big| 2| | scla| 1| | data| 1| +------+-----+
文件代碼:
{"id":1, "name":"leo", "age":18} {"id":2, "name":"jack", "age":19} {"id":3, "name":"marry", "age":17}
實現:
import org.apache.spark.sql.SQLContext scala>val sqc=new SQLContext(sc) scala> val tb4=sqc.read.json("/home/software/people.json") scala> tb4.show
格式以下:
Parquet是一種列式存儲格式,能夠被多種查詢引擎支持(Hive、Impala、Drill等),而且它是語言和平臺無關的。
Parquet文件下載後是否能夠直接讀取和修改呢?
Parquet文件是以二進制方式存儲的,是不能夠直接讀取和修改的。Parquet文件是自解析的,文件中包括該文件的數據和元數據。
列式存儲和行式存儲相比有哪些優點呢?
能夠只讀取須要的數據,下降IO數據量;
壓縮編碼能夠下降磁盤存儲空間。因爲同一列的數據類型是同樣的,可使用更高效的壓縮編碼進一步節約存儲空間。
參考連接:
http://blog.csdn.net/yu616568/article/details/51868447 講解了parquet文件格式
http://www.infoq.com/cn/articles/in-depth-analysis-of-parquet-column-storage-format 講解了parquet列式存儲。
實現:
scala>val tb5=sqc.read.parquet("/home/software/users.parquet") scala> tb5.show
實現步驟:
1)將mysql 的驅動jar上傳到spark的jars目錄下
2)重啓spark服務
3)進入spark客戶端
4)執行代碼,好比在Mysql數據庫下,有一個test庫,在test庫下有一張表爲tabx
執行代碼:
import org.apache.spark.sql.SQLContext scala> val sqc = new SQLContext(sc); scala> val prop = new java.util.Properties scala> prop.put("user","root") scala> prop.put("password","root") scala>val tabx=sqc.read.jdbc("jdbc:mysql://hadoop01:3306/test","tabx",prop) scala> tabx.show +---+----+ | id|name| +---+----+ | 1| aaa| | 2| bbb| | 3| ccc| | 1| ddd| | 2| eee| | 3| fff| +---+----+
注:若是報權限不足,則進入mysql,執行:
grant all privileges on *.* to 'root'@'hadoop01' identified by 'root' with grant option;
而後執行:
flush privileges;
上一篇:Spark Shuffle
下一篇:SparkSQL語法及API