Hive是基於Hadoop平臺的數據倉庫,最初由Facebook開發,在通過多年發展以後,已經成爲Hadoop事實上的SQL引擎標準。相較於其餘諸如Impala、Shark(SparkSQL的前身)等引擎而言,Hive擁有更爲普遍的用戶基礎以及對SQL語法更全面的支持。Hive最初的計算引擎爲MapReduce,受限於其自身的Map+Reduce計算模式,以及不夠充分的大內利用,MapReduce的性能難以獲得提高。
Hortonworks於2013年提出將Tez做爲另外一個計算引擎以提升Hive的性能。Spark則是最初由加州大學伯克利分校開發的分佈式計算引擎,藉助於其靈活的DAG執行模式、對內存的充分利用,以及RDD所能表達的豐富語義,Spark受到了Hadoop社區的普遍關注。在成爲Apache頂級項目以後,Spark更是集成了流處理、圖計算、機器學習等功能,是業界公認最具潛力的下一代通用計算框架。鑑於此,Hive社區於2014年推出了Hive on Spark項目(HIVE-7292),將Spark做爲繼MapReduce和Tez以後Hive的第三個計算引擎。該項目由Cloudera、Intel和MapR等幾家公司共同開發,並受到了來自Hive和Spark兩個社區的共同關注。目前Hive onSpark的功能開發已基本完成,並於2015年1月初合併回trunk,預計會在Hive下一個版本中發佈。本文將介紹Hive onSpark的設計架構,包括如何在Spark上執行Hive查詢,以及如何藉助Spark來提升Hive的性能等。另外本文還將介紹Hive onSpark的進度和計劃,以及初步的性能測試數據。
背景
Hive on Spark是由Cloudera發起,由Intel、MapR等公司共同參與的開源項目,其目的是把Spark做爲Hive的一個計算引擎,將Hive的查詢做爲Spark的任務提交到Spark集羣上進行計算。尚學堂陳老師提示經過該項目,能夠提升Hive查詢的性能,同時爲已經部署了Hive或者Spark的用戶提供了更加靈活的選擇,從而進一步提升Hive和Spark的普及率。
在介紹Hive on Spark的具體設計以前,先簡單介紹一下Hive的工做原理,以便於你們理解如何把Spark做爲新的計算引擎供給Hive使用。
在Hive中, 一條SQL語句從用戶提交到計算並返回結果,大體流程以下圖所示(Hive 0.14中引入了基於開銷的優化器(Cost BasedOptimizer,CBO),優化的流程會略有不一樣。
語法分析階段,Hive利用Antlr將用戶提交的SQL語句解析成一棵抽象語法樹(Abstract Syntax Tree,AST)。
生成邏輯計劃包括經過Metastore獲取相關的元數據,以及對AST進行語義分析。獲得的邏輯計劃爲一棵由Hive操做符組成的樹,Hive操做符即Hive對錶數據的處理邏輯,好比對錶進行掃描的TableScanOperator,對錶作Group的GroupByOperator等。
邏輯優化即對Operator Tree進行優化,與以後的物理優化的區別主要有兩點:一是在操做符級別進行調整;二是這些優化不針對特定的計算引擎。好比謂詞下推(Predicate Pushdown)就是一個邏輯優化:儘早的對底層數據進行過濾以減小後續須要處理的數據量,這對於不一樣的計算引擎都是有優化效果的。
生成物理計劃即針對不一樣的引擎,將Operator Tree劃分爲若干個Task,並按照依賴關係生成一棵Task的樹(在生成物理計劃以前,各計算引擎還能夠針對自身需求,對Operator Tree再進行一輪邏輯優化)。好比,對於MapReduce,一個GROUP BY+ORDER BY的查詢會被轉化成兩個MapReduce的Task,第一個進行Group,第二個進行排序。
物理優化則是各計算引擎根據自身的特色,對Task Tree進行優化。好比對於MapReduce,Runtime Skew Join的優化就是在原始的Join Task以後加入一個Conditional Task來處理可能出現傾斜的數據。
最後按照依賴關係,依次執行Task Tree中的各個Task,並將結果返回給用戶。每一個Task按照不一樣的實現,會把任務提交到不一樣的計算引擎上執行。
整體設計
Hive on Spark整體的設計思路是,儘量重用Hive邏輯層面的功能;從生成物理計劃開始,提供一整套針對Spark的實現,好比SparkCompiler、SparkTask等,這樣Hive的查詢就能夠做爲Spark的任務來執行了。如下是幾點主要的設計原則。
儘量減小對Hive原有代碼的修改。這是和以前的Shark設計思路最大的不一樣。Shark對Hive的改動太大以致於沒法被Hive社區接受,Hive on Spark儘量少改動Hive的代碼,從而不影響Hive目前對MapReduce和Tez的支持。同時,Hive on Spark保證對現有的MapReduce和Tez模式在功能和性能方面不會有任何影響。
對於選擇Spark的用戶,應使其可以自動的獲取Hive現有的和將來新增的功能。
儘量下降維護成本,保持對Spark依賴的鬆耦合。
基於以上思路和原則,具體的一些設計架構以下。
新的計算引擎
Hive的用戶能夠經過hive.execution.engine來設置計算引擎,目前該參數可選的值爲mr和tez。爲了實現Hive onSpark,咱們將spark做爲該參數的第三個選項。要開啓Hive on Spark模式,用戶僅需將這個參數設置爲spark便可。
以Hive的表做爲RDD
Spark以分佈式可靠數據集(Resilient Distributed Dataset,RDD)做爲其數據抽象,所以咱們須要將Hive的錶轉化爲RDD以便Spark處理。本質上,Hive的表和Spark的HadoopRDD都是HDFS上的一組文件,經過InputFormat和RecordReader讀取其中的數據,所以這個轉化是天然而然的。
使用Hive原語
這裏主要是指使用Hive的操做符對數據進行處理。Spark爲RDD提供了一系列的轉換(Transformation),其中有些轉換也是面向SQL的,如groupByKey、join等。但若是使用這些轉換(就如Shark所作的那樣),就意味着咱們要從新實現一些Hive已有的功能;並且當Hive增長新的功能時,咱們須要相應地修改Hive on Spark模式。有鑑於此,咱們選擇將Hive的操做符包裝爲Function,而後應用到RDD上。這樣,咱們只須要依賴較少的幾種RDD的轉換,而主要的計算邏輯仍由Hive提供。
因爲使用了Hive的原語,所以咱們須要顯式地調用一些Transformation來實現Shuffle的功能。下表中列舉了Hive onSpark使用的全部轉換。
架構