大數據(Spark-S3-SparkSQL架構及原理)

Spark SQL的發展


HDFS -> HIVE 
因爲Hadoop在企業生產中的大量使用,HDFS上積累了大量數據,爲了給熟悉RDBMS但又不理解MapReduce的技術人員提供快速上手的工具,Hive應運而生。Hive的原理是將SQL語句翻譯成MapReduce計算。java

HIVE -> SHARK
MapReduce計算過程當中大量的中間磁盤落地過程消耗了大量的I/O,下降了運行效率,爲了提供SQL-on-Hadoop的效率,Shark出現了。mysql

Shark是伯克利AMPLab實驗室Spark生態環境的組件之一,它修改了Hive中的內存管理、物理計劃和執行三個模塊,使得SQL語句直接運行在Spark上,從而使得SQL查詢的速度獲得10-100倍的提高。
sql

SHARK -> SPARK SQL數據庫

2014年6月1日,Shark項目和SparkSQL項目的主持人Reynold Xin宣佈:中止對Shark的開發,團隊將全部資源放sparkSQL項目上,至此,Shark的發展畫上了句號。
express

隨着Spark的發展,Shark對於Hive的太多依賴制約了Spark的One Stack rule them all的方針,制約了Spark各個組件的相互集成,同時Shark也沒法利用Spark的特性進行深度優化,因此決定放棄Shark,提出了SparkSQL項目。編程

隨着Shark的結束,兩個新的項目應運而生:SparkSQL和Hive on Spark。其中SparkSQL做爲Spark生態的一員繼續發展,而再也不受限於Hive,只是兼容Hive;而Hive on Spark是一個Hive的發展計劃,該計劃將Spark做爲Hive的底層引擎之一,也就是說,Hive將再也不受限於一個引擎,能夠採用Map-Reduce、Tez、Spark等引擎。json

SparkSQL優點數組

SparkSQL擺脫了對Hive的依賴性,不管在數據兼容、性能優化、組件擴展方面都獲得了極大的方便。
一、數據兼容方面  
不但兼容Hive,還能夠從RDD、parquet文件、JSON文件中獲取數據,將來版本甚至支持獲取RDBMS數據以及cassandra等NOSQL數據;性能優化

二、性能優化方面  
除了採起In-Memory Columnar Storage、byte-code generation等優化技術外、將會引進Cost Model對查詢進行動態評估、獲取最佳物理計劃等等;
    
三、組件擴展方面  
不管是SQL的語法解析器、分析器仍是優化器均可以從新定義,進行擴展;架構

 

Spark SQL的性能優化

內存列存儲(In-Memory Columnar Storage)
對於內存列存儲來講,將全部原生數據類型的列採用原生數組來存儲,將Hive支持的複雜數據類型(如array、map等)先序列化後並接成一個字節數組來存儲。

這樣,每一個列建立一個JVM對象,從而致使能夠快速地GC和緊湊的數據存儲。

額外的,還能夠用低廉CPU開銷的高效壓縮方法來下降內存開銷。

更有趣的是,對於分析查詢中頻繁使用的聚合特定列,性能會獲得很大的提升,緣由就是這些列的數據放在一塊兒,更容易讀入內存進行計算。

字節碼生成技術(bytecode generation,即CG)

在數據庫查詢中有個昂貴的操做就是查詢語句中的表達式,主要是由JVM的內存模型引發的。如SELECT a+b FROM table,這個查詢裏若是採用通用的SQL語法途徑去處理,會先生成一個表達樹,會屢次設計虛函數的調用,這會打斷CPU的正常流水線處理,減緩執行速度。
    
spark -1.1.0在catalyst模塊的expressions增長了codegen模塊,若是使用動態字節碼生成技術,Spark SQL在執行物理計劃時,會對匹配的表達式採用特定的代碼動態編譯,而後運行。

Scala代碼的優化

Spark SQL在使用Scala語言編寫代碼時,應儘可能避免容易GC的低效代碼。儘管增長了編寫代碼的難度,但對於用戶來講,仍是使用了統一的接口,讓開發在使用上更加容易。

Spark SQL的運行架構

SparkSQL有兩個分支,sqlContext和hiveContext,sqlContext如今只支持SQL語法解析器;hiveContext如今支持SQL語法解析器和hivesql語法解析器,默認爲hiveSQL語法解析器,用戶能夠經過配置切換成SQL語法解析器,來運行hiveSQL不支持的語法。

Spark SQL由Core、Catalyst、Hive、Hive-ThriftServer四部分構成:
1.Core: 負責處理數據的輸入和輸出,如獲取數據,查詢結果輸出成DataFrame等
2.Catalyst: 負責處理整個查詢過程,包括解析、綁定、優化等
3.Hive: 負責對Hive數據進行處理
4.Hive-ThriftServer: 主要用於對hive的訪問

SparkSQL有兩個分支,sqlContext和hiveContext,sqlContext如今只支持SQL語法解析器;hiveContext如今支持SQL語法解析器和hivesql語法解析器,默認爲hiveSQL語法解析器,用戶能夠經過配置切換成SQL語法解析器,來運行hiveSQL不支持的語法。

Spark SQL語句的執行順序
1.對讀入的SQL語句進行解析(Parse),分辨出SQL語句中哪些詞是關鍵詞(如SELECT、FROM、WHERE),哪些是表達式、哪些是Projection、哪些是Data Source等,從而判斷SQL語句是否規範;
2.將SQL語句和數據庫的數據字典(列、表、視圖等等)進行綁定(Bind),若是相關的Projection、Data Source等都是存在的話,就表示這個SQL語句是能夠執行的;
3.通常的數據庫會提供幾個執行計劃,這些計劃通常都有運行統計數據,數據庫會在這些計劃中選擇一個最優計劃(Optimize);
4.計劃執行(Execute),按Operation-->Data Source-->Result的次序來進行的,在執行過程有時候甚至不須要讀取物理表就能夠返回結果,好比從新運行剛運

 

Spark SQL的運行原理

1,使用SessionCatalog保存元數據
在解析SQL語句以前,會建立SparkSession,或者若是是2.0以前的版本初始化SQLContext,SparkSession只是封裝了SparkContext和SQLContext的建立而已。會把元數據保存在SessionCatalog中,涉及到表名,字段名稱和字段類型。建立臨時表或者視圖,其實就會往SessionCatalog註冊。

2,解析SQL使用ANTLR生成未綁定的邏輯計劃
當調用SparkSession的SQL或者SQLContext的SQL方法,咱們以2.0爲準,就會使用SparkSqlParser進行解析SQL。使用的ANTLR進行詞法解析和語法解析。它分爲2個步驟來生成Unresolved LogicalPlan:
詞法分析:Lexical Analysis, 負責將token分組成符號類。
構建一個分析樹或者語法樹AST。

3,使用分析器Analyzer綁定邏輯計劃
在該階段,Analyzer會使用Analyzer Rules,並結合SessionCatalog,對未綁定的邏輯計劃進行解析,生成已綁定的邏輯計劃。

4,使用優化器Optimizer優化邏輯計劃
優化器也是會定義一套Rules,利用這些Rule對邏輯計劃和Exepression進行迭代處理,從而使得樹的節點進行合併和優化。

5,使用SparkPlanner生成物理計劃
SparkSpanner使用Planning Strategies,對優化後的邏輯計劃進行轉換,生成能夠執行的物理計劃SparkPlan.

6,使用QueryExecution執行物理計劃
此時調用SparkPlan的execute方法,底層其實已經再觸發JOB了,而後返回RDD。

 

Spark SQL的運行架構

TreeNode
邏輯計劃、表達式等均可以用tree來表示,它只是在內存中維護,並不會進行磁盤的持久化,分析器和優化器對樹的修改只是替換已有節點。

TreeNode有2個直接子類,QueryPlan和Expression。QueryPlan下又有LogicalPlan和SparkPlan. Expression是表達式體系,不須要執行引擎計算而是能夠直接處理或者計算的節點,包括投影操做,操做符運算等

Rule & RuleExecutor
Rule就是指對邏輯計劃要應用的規則,以到達綁定和優化。他的實現類就是RuleExecutor。優化器和分析器都須要繼承RuleExecutor。

每個子類中都會定義Batch、Once、FixPoint. 其中每個Batch表明着一套規則,Once表示對樹進行一次操做,FixPoint表示對樹進行屢次的迭代操做。

RuleExecutor內部提供一個Seq[Batch]屬性,裏面定義的是RuleExecutor的處理邏輯,具體的處理邏輯由具體的Rule子類實現。
 

Catalyst優化器

SparkSQL1.1整體上由四個模塊組成:core、catalyst、hive、hive-Thriftserver:
    core處理數據的輸入輸出,從不一樣的數據源獲取數據(RDD、Parquet、json等),將查詢結果輸出成schemaRDD;
    catalyst處理查詢語句的整個處理過程,包括解析、綁定、優化、物理計劃等,說其是優化器,還不如說是查詢引擎;
    hive對hive數據的處理
    hive-ThriftServer提供CLI和JDBC/ODBC接口
在這四個模塊中,catalyst處於最核心的部分,其性能優劣將影響總體的性能。因爲發展時間尚短,還有不少不足的地方,但其插件式的設計,爲將來的發展留下了很大的空間。

從上圖看,catalyst主要的實現組件有:
    1.sqlParse,完成sql語句的語法解析功能,目前只提供了一個簡單的sql解析器;
    2.Analyzer,主要完成綁定工做,將不一樣來源的Unresolved LogicalPlan和數據元數據(如hive metastore、Schema catalog)進行綁定,生成resolved LogicalPlan;
    3.optimizer對resolved LogicalPlan進行優化,生成optimized LogicalPlan;
    4.Planner將LogicalPlan轉換成PhysicalPlan;、
    5.CostModel,主要根據過去的性能統計數據,選擇最佳的物理執行計劃

這些組件的基本實現方法:
    1.先將sql語句經過解析生成Tree,而後在不一樣階段使用不一樣的Rule應用到Tree上,經過轉換完成各個組件的功能。
    2.Analyzer使用Analysis Rules,配合數據元數據(如hive metastore、Schema catalog),完善Unresolved LogicalPlan的屬性而轉換成resolved LogicalPlan;
    3.optimizer使用Optimization Rules,對resolved LogicalPlan進行合併、列裁剪、過濾器下推等優化做業而轉換成optimized LogicalPlan;
    4.Planner使用Planning Strategies,對optimized LogicalPlan

 

執行優化

爲了說明查詢優化,咱們來看下圖展現的人口數據分析的示例。圖中構造了兩個DataFrame,將它們join以後又作了一次filter操做。若是原封不動地執行這個執行計劃,最終的執行效率是不高的。由於join是一個代價較大的操做,也可能會產生一個較大的數據集。若是咱們能將filter下推到 join下方,先對DataFrame進行過濾,再join過濾後的較小的結果集,即可以有效縮短執行時間。而Spark SQL的查詢優化器正是這樣作的。簡而言之,邏輯查詢計劃優化就是一個利用基於關係代數的等價變換,將高成本的操做替換爲低成本操做的過程。

獲得的優化執行計劃在轉換成物 理執行計劃的過程當中,還能夠根據具體的數據源的特性將過濾條件下推至數據源內。最右側的物理執行計劃中Filter之因此消失不見,就是由於溶入了用於執行最終的讀取操做的表掃描節點內。

 

DataFrame初探

在Spark中,DataFrame是一種以RDD爲基礎的分佈式數據集,相似於傳統數據庫中的二維表格。DataFrame與RDD的主要區別在於,前者帶有schema元信息,即DataFrame所表示的二維表數據集的每一列都帶有名稱和類型。這使得Spark SQL得以洞察更多的結構信息,從而對藏於DataFrame背後的數據源以及做用於DataFrame之上的變換進行了針對性的優化,最終達到大幅提高運行時效率的目標。反觀RDD,因爲無從得知所存數據元素的具體內部結構,Spark Core只能在stage層面進行簡單、通用的流水線優化。

DataFrame的特性

可以將單個節點集羣上的大小爲Kilobytes到Petabytes的數據處理爲大型集羣。
支持不一樣的數據格式(Avro,csv,彈性搜索和Cassandra)和存儲系統(HDFS,HIVE表,mysql等)。
經過Spark SQL Catalyst優化器(樹變換框架)的最早進的優化和代碼生成。
能夠經過Spark-Core輕鬆地與全部大數據工具和框架集成。
提供用於Python,Java,Scala和R編程的API。

建立DataFrame
在Spark SQL中,開發者能夠很是便捷地將各類內、外部的單機、分佈式數據轉換爲DataFrame。

# 從Hive中的users表構造
DataFrame users = sqlContext.table("users") 
# 加載S3上的JSON文件 
logs = sqlContext.load("s3n://path/to/data.json", "json") 
# 加載HDFS上的Parquet文件 
clicks = sqlContext.load("hdfs://path/to/data.parquet", "parquet") 
# 經過JDBC訪問MySQL comments = sqlContext.jdbc("jdbc:mysql://localhost/comments", "user") 
# 將普通RDD轉變爲

DataFrame rdd = sparkContext.textFile("article.txt") \ 
                   .flatMap(lambda line: line.split()) \ 
                   .map(lambda word: (word, 1)) \ 
                   .reduceByKey(lambda a, b: a + b) \ 
wordCounts = sqlContext.createDataFrame(rdd, ["word", "count"]) 

# 將本地數據容器轉變爲

DataFrame data = [("Alice", 21), ("Bob", 24)] 
people = sqlContext.createDataFrame(data, ["name", "age"]) 

使用DataFrame

和R、Pandas相似,Spark DataFrame也提供了一整套用於操縱數據的DSL。這些DSL在語義上與SQL關係查詢很是相近(這也是Spark SQL可以爲DataFrame提供無縫支持的重要緣由之一) 。

# 建立一個只包含"年輕"用戶的DataFrame
df = users.filter(users.age < 21)

# 也可使用Pandas風格的語法
df = users[users.age < 21]

# 將全部人的年齡加1
df.select(young.name, young.age + 1)

# 統計年輕用戶中各性別人數
df.groupBy("gender").count()

# 將全部年輕用戶與另外一個名爲logs的DataFrame聯接起來
df.join(logs, logs.userId == users.userId, "left_outer")

保存DataFrame
當數據分析邏輯編寫完畢後,咱們即可以將最終結果保存下來或展示出來。

# 追加至HDFS上的Parquet文件

df.save(path="hdfs://path/to/data.parquet",
           source="parquet",
           mode="append")

# 覆寫S3上的JSON文件

df.save(path="s3n://path/to/data.json",
           source="json",
           mode="append")

# 保存爲SQL表
df.saveAsTable(tableName="young", source="parquet" mode="overwrite")

# 轉換爲Pandas DataFrame(Python API特有功能)
pandasDF = young.toPandas()

# 以表格形式打印輸出 df.show()  

相關文章
相關標籤/搜索