SQL on Hadoop中用到的主要技術——MPP vs Runtime Framework

轉載聲明

本文轉載自盤點SQL on Hadoop中用到的主要技術,我的以爲該文章對於諸如Impala這樣的MPP架構的SQL引擎和Runtime Framework架構的Hive/Spark SQL進行對比,感受總結的特別好,而且和本人最近的公司相近,學習轉載之。node

自hive出現以後,通過幾年的發展,SQL on Hadoop相關的系統已經百花齊放,速度愈來愈快,功能也愈來愈齊全。本文並非要去比較所謂「交互式查詢哪家強」,而是試圖梳理出一個統一的視角,來看看各家系統有哪些技術上相通之處。mysql

考慮到系統使用的普遍程度與成熟度,在具體舉例時通常會拿Hive和Impala爲例,固然在調研的過程當中也會涉及到一些其餘系統,如Spark SQL,Presto,TAJO等。而對於hawq這樣的商業產品和apache drill這樣成熟度還不是很高的開源方案就不作過多瞭解了。git

系統架構

runtime framework v.s. mpp

在SQL on Hadoop系統中,有兩種架構,一種是基於某個運行時框架來構建查詢引擎,典型案例是Hive;另外一種是仿照過去關係數據庫的MPP架構。前者現有運行時框架,而後套上sql層,後者則是從頭打造一個一體化的查詢引擎。有時咱們能聽到一種聲音,說後者的架構優於前者,至少在性能上。那麼是否果然如此?github

通常來講,對於SQL on Hadoop系統很重要的一個評價指標就是:快。後面提到的全部內容也大可能是爲了查詢速度更快。在Hive逐漸普及以後,就逐漸有了所謂交互式查詢的需求,由於不管是BI系統,仍是adhoc,都不能按照離線那種節奏玩。這時候不管是有實力的大公司(好比Facebook),仍是專業的供應商(好比Cloudera),都試圖去解決這個問題。短時間能夠靠商業方案或者關係數據庫去支撐一下,可是長遠的解決方案就是參考過去的MPP數據庫架構打造一個專門的系統,因而就有了Impala,Presto等等。從任務執行的角度說,這類引擎的任務執行其實跟DAG模型是相似的,當時也有Spark這個DAG模型的計算框架了,但這終究是別人家的孩子,並且往Spark上套sql又是Hive的那種玩法了。因而在Impala問世以後就強調本身「計算所有在內存中完成」,性能也是各類碾壓當時還只有MR做爲計算模型的Hive。那麼Hive所表明的「基於已有的計算模型」方式是否真的不行?算法

不能否認,按照這種方式去比較,那麼類MPP模式確實有不少優點:sql

  • DAG v.s. MR:最主要的優點,中間結果不寫磁盤(除非內存不夠),一鼓作氣。
  • 流水線計算:上游stage一出結果立刻推送或者拉到下一個stage處理,好比多表join時前兩個表有結果直接給第三個表,不像MR要等兩個表徹底join完再給第三個表join。
  • 高效的IO:本地查詢沒有多餘的消耗,充分利用磁盤。這個後面細說。
  • 線程級別的併發:相比之下MR每一個task要啓動JVM,自己就有很大延遲,佔用資源也多。

固然MPP模式也有其劣勢,一個是擴展性不是很高,這在關係數據庫時代就已經有過結論;另外一個是容錯性差,對於Impala來講一旦運行過程當中出點問題,整個查詢就掛了。數據庫

可是,通過不斷的發展,Hive也能跑在DAG框架上了,不只有Tez,還有Spark。上面提到的一些劣勢,其實大都也能夠在計算模型中解決,只不過考慮到計算模型的通用性和自己的設計目標,不會去專門知足(因此若是從這個角度分類,Impala屬於「專用系統」,Spark則屬於「通用系統」)。在最近Cloudera作的benchmark中,雖然Impala仍然一路領先,可是基於Spark的Spark SQL徹底不遜色於Presto,基於Tez的Hive也不算不好,至少在多用戶併發模式下能超過Presto,足見MPP模式並非絕對佔上風的。因此這種架構上的區別在我看來並非制勝的關鍵,至少不是惟一的因素,真正要作到快速查詢,各個方面的細節都要有所把握。後面說的都是這些細節。express

核心組件

無論是上面提到的那種架構,一個SQL on Hadoop系統通常都會有一些通用的核心組件,這些組件根據設計者的考慮放在不一樣的節點角色中,在物理上節點都按照master/worker的方式去作,若是master壓力太大,一些原本適合放在master上的組件能夠放到一個輔助master上。apache

  • UI層負責提供用戶輸入查詢的接口。通常有Web/GUI,命令行,編程方式3類。
  • QL層負責把用戶提交的查詢解析成能夠運行的執行計劃(好比MR Job)。這部分在後面會專門提到。
  • 執行層就是運行具體的Job。通常會有一個master負責query的運行管理,好比申請資源,觀察進度等等,同時master也負責最終聚合局部結果到全局結果。而每一個節點上會有相應的worker作本地計算。
  • IO層提供與存儲層交互的接口。對於HDFS來講,須要根據I/O Format把文件轉換成K/V,Serde再完成K/V到數據行的映射。對於非HDFS存儲來講就須要一些專門的handler/connector。
  • 存儲層通常是HDFS,但也有能夠查詢NoSQL,或者關係數據庫的。
  • 系統另外還須要一個元數據管理服務,管理表結構等。

這裏寫圖片描述

執行計劃

編譯流程

從SQL到執行計劃,大體分爲5步。編程

  • 第一步將SQL轉換成抽象語法樹AST。這一步通常都有第三方工具庫能夠完成,好比antlr。
  • 第二步對AST進行語義分析,好比表是否存在,字段是否存在,SQL語義是否有誤(好比select中被斷定爲聚合的字段在group by中有沒有出現)。
  • 第三步生成邏輯執行計劃,這是一個由邏輯操做符組成的DAG。好比對於Hive來講掃表會產生TableScanOperator,聚合會產生GroupByOperator。對於類MPP系統來講,狀況稍微有點不一樣。邏輯操做符的種類仍是差很少,可是會先生成單機版本,而後生成多機版本。多機版本主要是把aggregate,join,還有top n這幾個操做並行化,好比aggregate會分紅相似MR那樣的本地aggregate,shuffle和全局aggregate三步。
  • 第四步作邏輯執行計劃作優化,這步在下面單獨介紹。
  • 第五步把邏輯執行計劃轉換成能夠在機器上運行的物理計劃。對於Hive來講,就是MR/Tez Job等;對於Impala來講,就是plan fragment。其餘類MPP系統也是相似的概念。物理計劃中的一個計算單元(或者說Job),有「輸入,處理,輸出」三要素組成,而邏輯執行計劃中的operator相對粒度更細,一個邏輯操做符通常處於這三要素之一的角色。

下面分別舉兩個例子,直觀的認識下sql、邏輯計劃、物理計劃之間的關係,具體解釋各個operator的話會比較細碎,就不展開了。

Hive on MR

select count(1) from status_updates where ds = '2009-08-01'
  • 1
  • 2

這裏寫圖片描述

Presto

引用自美團技術團隊,其中SubPlan就是物理計劃的一個計算單元

select c1.rank, count(*) 
from dim.city c1 join dim.city c2 on c1.id = c2.id 
where c1.id > 10 group by c1.rank limit 10;
  • 1
  • 2
  • 3
  • 4

這裏寫圖片描述

優化器

關於執行計劃的優化,雖然不必定是整個編譯流程中最難的部分,但倒是最有看點的部分,並且目前還在不斷髮展中。Spark系之因此放棄Shark另起爐竈作Spark SQL,很大一部分緣由是想本身作優化策略,避免受Hive的限制,爲此還專門獨立出優化器組件Catalyst(固然Spark SQL目前仍是很是新,其將來發展給人很多想象空間)。總之這部分工做能夠不斷的創新,優化器越智能,越傻瓜化,用戶就越能解放出來解決業務問題。

早期在Hive中只有一些簡單的規則優化,好比謂詞下推(把過濾條件儘量的放在table scan以後就完成),操做合併(連續的filter用and合併成一個operator,連續的projection也能夠合併)。後來逐漸增長了一些略複雜的規則,好比相同key的join + group by合併爲1個MR,還有star schema join。在Hive 0.12引入的相關性優化(correlation optimizer)算是規則優化的一個高峯,他可以減小數據的重複掃描,具體來講,若是查詢的兩個部分用到了相同的數據,而且各自作group by / join的時候用到了相同的key,這個時候因爲數據源和shuffle的key是同樣的,因此能夠把原來須要兩個job分別處理的地方合成一個job處理。 
好比下面這個sql:

SELECT 
    sum(l_extendedprice) / 7.0 as avg_yearly 
FROM 
     (SELECT l_partkey, l_quantity, l_extendedprice 
      FROM lineitem JOIN part ON (p_partkey=l_partkey) 
      WHERE p_brand='Brand#35' AND p_container = 'MED PKG')touter 
JOIN 
     (SELECT l_partkey as lp, 0.2 * avg(l_quantity) as lq 
      FROM lineitem GROUP BY l_partkey) tinner 
ON (touter.l_partkey = tinnter.lp) 
WHERE touter.l_quantity < tinner.lq
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12

這個查詢中兩次出現lineitem表,group by和兩處join用的都是l_partkey,因此原本兩個子查詢和一個join用到三個job,如今只須要用到一個job就能夠完成。

這裏寫圖片描述

可是,基於規則的優化(RBO)不能解決全部問題。在關係數據庫中早有另外一種優化方式,也就是基於代價的優化CBO。CBO經過收集表的數據信息(好比字段的基數,數據分佈直方圖等等)來對一些問題做出解答,其中最主要的問題就是肯定多表join的順序。CBO經過搜索join順序的全部解空間(表太多的狀況下能夠用有限深度的貪婪算法),而且算出對應的代價,能夠找到最好的順序。這些都已經在關係數據庫中獲得了實踐。

目前Hive已經啓動專門的項目,也就是Apache Optiq來作這個事情,而其餘系統也沒有作的很好的CBO,因此這塊內容還有很大的進步空間。

執行效率

即便有了高效的執行計劃,若是在運行過程自己效率較低,那麼再好的執行計劃也會大打折扣。這裏主要關注CPU和IO方面的執行效率。

CPU

在具體的計算執行過程當中,低效的cpu會致使系統的瓶頸落在CPU上,致使IO沒法充分利用。在一項針對Impala和Hive的對比時發現,Hive在某些簡單查詢上(TPC-H Query 1)也比Impala慢主要是由於Hive運行時徹底處於CPU bound的狀態中,磁盤IO只有20%,而Impala的IO至少在85%。

在SQL on Hadoop中出現CPU bound的主要緣由有如下幾種:

  • 大量虛函數調用:這個問題在多處出現,好比對於a + 2 * b之類的表達式計算,解釋器會構造一個expression tree,解釋的過程就是遞歸調用子節點作evaluation的過程。又好比以DAG形式的operator/task在執行的過程當中,上游節點會層層調用下游節點來獲取產生的數據。這些都會產生大量的調用。
  • 類型裝箱:因爲表達式解釋器須要對不一樣數據類型的變量作解釋,因此在Java中須要把這些原本是primitive的變量包裝成Object,累積起來也消耗很多資源。這算是上面一個問題附帶出來的。
  • branch instruction: 如今的CPU都是有並行流水線的,可是若是出現條件判斷會致使沒法並行。這種狀況可能出如今判斷數據的類型(是string仍是int),或者在判斷某一列是否由於其餘字段的過濾條件致使本行不須要被讀取(列存儲狀況下)。
  • cache miss:每次處理一行數據的方式致使cpu cache命中率不高。(這麼說已經暗示瞭解決方案)

針對上面的問題,目前大多數系統中已經加入瞭如下兩個解決辦法中至少一個。

一個方法是動態代碼生成,也就是不使用解釋性的統一代碼。好比a + 2 * b這個表達式就會生成對應的執行語言的代碼,並且能夠直接用primitive type,而不是用固定的解釋性代碼。具體實現來講,JVM系的如Spark SQL,Presto能夠用反射,C++系的Impala則使用了llvm生成中間碼。對於判斷數據類型形成的分支判斷,動態代碼的效果能夠消除這些類型判斷,還能夠展開循環,能夠對比下面這段代碼,左邊是解釋性代碼,右邊是動態生成代碼。

這裏寫圖片描述

另外一個方法是vectorization(向量化),基本思路是放棄每次處理一行的模式,改用每次處理一小批數據(好比1k行),固然前提條件是使用列存儲格式。這樣一來,這一小批連續的數據能夠放進cache裏面,cpu不只減小了branch instruction,甚至能夠用SIMD加快處理速度。具體的實現參考下面的代碼,對一個long型的字段增長一個常量。經過把數據表示成數組,過濾條件也用selVec裝進數組,造成了很緊湊的循環:

add(int vecNum, long[] result, long[] col1, int[] col2, int[] selVec) 
{   
  if (selVec == null)   
     for (int i = 0; i < vecNum; i++) 
         result[i] = col1[i] + col2[i];
  else 
     for (int i = 0; i < vecNum; i++) 
     {
         int selIdx = selVec[i];
         result[selIdx] = col1[selIdx] + col2[selIdx];
     }
}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13

IO

因爲SQL on Hadoop存儲數據都是在HDFS上,因此IO層的優化其實大多數都是HDFS的事情,各大查詢引擎則提出需求去進行推進。要作到高效IO,一方面要低延遲,屏蔽沒必要要的消耗;另外一方面要高吞吐,充分利用每一塊磁盤。目前與這方面有關的特性有:

  • short-circuit local reads:當發現讀取的數據是本地數據時,不走DataNode(由於要走一次socket鏈接),而是用DFS Client直接讀本地的block replica。HDFS參數是dfs.client.read.shortcircuit和dfs.domain.socket.path。
  • zero copy:避免數據在內核buffer和用戶buffer之間反覆copy,在早期的HDFS中已經有這個默認實現。
  • disk-aware scheduling:經過知道每一個block所在磁盤,能夠在調度cpu資源時讓不一樣的cpu讀不一樣的磁盤,避免查詢內和查詢間的IO競爭。HDFS參數是dfs.datanode.hdfs-blocks-metadata.enabled。

存儲格式

對於分析類型的workload來講,最好的存儲格式天然是列存儲,這已經在關係數據庫時代獲得了證實。目前hadoop生態中有兩大列存儲格式,一個是由Hortonworks和Microsoft開發的ORCFile,另外一個是由Cloudera和Twitter開發的Parquet。

ORCFile顧名思義,是在RCFile的基礎之上改造的。RCFile雖然號稱列存儲,可是隻是「按列存儲」而已,將數據先劃分紅row group,而後row group內部按照列進行存儲。這其中沒有列存儲的一些關鍵特性,而這些特性在之前的列式數據庫中(好比我之前用過的Infobright)早已用到。好在ORCFile已經彌補了這些特性,包括:

  • 塊過濾與塊統計:每一列按照固定行數或大小進一步切分,對於切分出來的每個數據單元,預先計算好這些單元的min/max/sum/count/null值,min/max用於在過濾數據的時候直接跳過數據單元,而全部這些統計值則能夠在作聚合操做的時候直接採用,而沒必要解開這個數據單元作進一步的計算。
  • 更高效的編碼方式:RCFile中沒有標註每一列的類型,事實上當知道數據類型時,能夠採起特定的編碼方式,自己就能很大程度上進行數據的壓縮。常見的針對列存儲的編碼方式有RLE(大量重複數據),字典(字符串),位圖(數字且基數不大),級差(排序過的數據,好比日誌中用戶訪問時間)等等。

ORCFile的結構以下圖,數據先按照默認256M分爲row group,也叫strip。每一個strip配一個index,存放每一個數據單元(默認10000行)的min/max值用於過濾;數據按照上面提到的編碼方式序列化成stream,而後再進行snappy或gz壓縮。footer提供讀取stream的位置信息,以及更多的統計值如sum/count等。尾部的file footer和post script提供全局信息,如每一個strip的行數,各列數據類型,壓縮參數等。

這裏寫圖片描述

Parquet的設計原理跟ORC相似,不過它有兩個特色:

  • 通用性:相比ORCFile專門給Hive使用而言,Parquet不只僅是給Impala使用,還能夠給其餘查詢工具使用,如Hive、Pig,進一步還能對接avro/thrift/pb等序列化格式。
  • 基於Dremel思想的嵌套格式存儲:關係數據庫設計模式中反對存儲複雜格式(違反第一範式),可是如今的大數據計算不只出現了這種需求(半結構化數據),也可以高效的實現存儲和查詢效率,在語法上也有相應的支持(各類UDF,Hive的lateral view等)。Google Dremel就在實現層面作出了範例,Parquet則徹底仿照了Dremel。

對嵌套格式作列存儲的難點在於,存儲時須要標記某個數據對應於哪個存儲結構,或者說是哪條記錄,因此須要用數據清楚的進行標記。 在Dremel中提出用definition level和repetition level來進行標記。definition level指的是,這條記錄在嵌套結構中所處於第幾層,而repetition level指的是,這條記錄相對上一條記錄,在第幾層重複。好比下圖是一個二級嵌套數組。圖中的e跟f在都屬於第二層的重複記錄(同一個level2),因此f的r值爲2,而c跟d則是不一樣的level2,但屬於同一個level1,因此d的r值爲1。對於頂層而言(新的一個嵌套結構),r值就爲0。

這裏寫圖片描述

可是僅僅這樣還不夠。上圖說明了r值的做用,可是尚未說明d值的做用,由於按照字面解釋,d值對於每個字段都是能夠根據schema獲得的,那爲何還要從行記錄級別標記?這是由於記錄中會插入一些null值,這些null值表明着他們「能夠存在」可是由於是repeated或者是optional因此沒有值的狀況,null值是用來佔位的(或者說是「想象」出來的),因此他們的值須要單獨計算。null的d值就是說這個結構往上追溯到哪一層(不包括平級)就不是null(不是想象)了。在dremel paper中有完整的例子,例子中country的第一個null在code = en所在的結構裏面,那麼language不是null(不考慮code,他跟country平級),他就是第二層;又好比country的第二個null在url = http://B 所在的結構裏面,那麼name不是null(不考慮url,由於他跟原本就是null的language平級),因此就是第一層。

這裏寫圖片描述

這裏寫圖片描述

經過這種方式,就對一個樹狀的嵌套格式完成了存儲。在讀取的時候能夠經過構造一個狀態機進行遍歷。

有意思的是,雖然parquet支持嵌套格式,可是Impala尚未來得及像Hive那樣增長array,map,struct等複雜格式,固然這項功能已經被列入roadmap了,相信不久就會出現。

在最近咱們作的Impala2.0測試中,順便測試了存儲格式的影響。parquet相比sequencefile在壓縮比上達到1:5,查詢性能也相差5-10倍,足見列存儲一項就給查詢引擎帶來的提高。

資源控制

運行時資源調整

對於一個MR Job,reduce task的數量一直是須要人爲估算的一個麻煩事,基於MR的Hive也只是根據數據源大小粗略的作估計,不考慮具體的Job邏輯。可是在以後的框架中考慮到了這個狀況,增長了運行時調整資源分配的功能。Tez中引入了vertex manager,能夠根據運行時收集到的數據智能的判斷reduce動做須要的task。相似的功能在TAJO中也有提到,叫progressive query optimization,並且TAJO不只能作到動態調整task數量,還能動態調整join順序。

資源集成

在Hadoop已經進入2.x的時代,全部想要獲得普遍應用的SQL on Hadoop系統勢必要能與YARN進行集成。雖然這是一個有利於資源合理利用的好事,可是因爲加入了YARN這一層,卻給系統的性能帶來了必定的障礙,由於啓動AppMaster和申請container也會佔用很多時間,尤爲是前者,並且container的供應若是時斷時續,那麼會極大的影響時效性。在Tez和Impala中對這些問題給出了相應的解決辦法:

  • AppMaster啓動延遲的問題,採起long lived app master,AppMaster啓動後長期駐守,而非像是MR那樣one AM per Job。具體實現時,能夠給fair scheduler或capacity scheduler配置的每一個隊列配上一個AM池,有必定量的AM爲提交給這個隊列的任務服務。
  • container供應的問題,在Tez中採起了container複用的方式,有點像jvm複用,即container用完之後不立刻釋放,等一段時間,實在是沒合適的task來接班了再釋放,這樣不只減小container斷供的可能,並且能夠把上一個task留下的結果cache住給下一個task複用,好比作map join;Impala則採起比較激進的方式,一次性等全部的container分配到位了纔開始執行查詢,這種方式也能讓它的流水線式的計算不至於阻塞。

其餘

到這裏爲止,已經從上到下順了一遍各個層面用到的技術,固然SQL on Hadoop自己就至關複雜,涉及到方方面面,時間精力有限不可能一一去琢磨。好比其餘一些具備技術複雜度的功能有:

  • 多數據源查詢:Presto支持從mysql,cassandra,甚至kafka中去讀取數據,這就大大減小了數據整合時間,不須要放到HDFS裏才能查詢。Impala和Hive也支持查詢hbase。Spark SQL也在1.2版本開始支持External Datasource。國內也有相似的工做,如秒針改造Impala使之能查詢postgres。
  • 近似查詢:count distinct(基數估計)一直是sql性能殺手之一,若是能接受必定偏差的話能夠採用近似算法。Impala中已經實現了近似算法(ndv),Presto則是請blinkDB合做完成。二者都是採用了HyperLogLog Counting。固然,不只僅是count distinct可使用近似算法,其餘的如取中位數之類的也能夠用。

結束語

儘管如今相關係統已經不少,也通過了幾年的發展,可是目前各家系統仍然在不斷的進行完善,好比:

  • 增長分析函數,複雜數據類型,SQL語法集的擴展。
  • 對於已經成形的技術也在不斷的改進,如列存儲還能夠增長更多的encoding方式。
  • 甚至對於像CBO這樣的領域,開源界拿出來的東西還算是剛剛起步,相比HAWQ中的ORCA這種商業系統提供的優化器還差的不少。

畢竟相比已經比較成熟的關係數據庫,分佈式環境下須要解決的問題更多,將來必定還會出現不少精彩的技術實踐,讓咱們在海量數據中更快更方便的查到想要的數據。

相關文章
相關標籤/搜索