Hive默認使用的計算框架是MapReduce,在咱們使用Hive的時候經過寫SQL語句,Hive會自動將SQL語句轉化成MapReduce做業去執行,可是MapReduce的執行速度遠差與Spark。經過搭建一個Hive On Spark能夠修改Hive底層的計算引擎,將MapReduce替換成Spark,從而大幅度提高計算速度。接下來就如何搭建Hive On Spark展開描述。node
注:本人使用的是CDH5.9.1,使用的Spark版本是1.6.0,使用的集羣配置爲4個節點,每臺內存32+G,4 Core。框架
Yarn須要配置兩個參數:yarn.nodemanager.resource.cpu-vcores和yarn.nodemanager.resource.memory-mb。yarn.nodemanager.resource.cpu-vcores表明能夠爲container分配的CPU 內核的數量。yarn.nodemanager.resource.memory-mb表明可分配給容器的物理內存大小。性能
爲每一個服務分配一個core,爲操做系統預留2個core,剩餘的可用的core分配給yarn。我使用的集羣共有16個core,留出4個,剩餘的12個core分配給yarn。fetch
設置Yarn內存爲36G優化
給Yarn分配完資源後,須要配置一些Spark的參數,設置Spark可以使用的資源。包括executor和Driver的內存,分配executor和設置並行度。spa
在配置executor的內存大小的時候,須要考慮如下因素:操作系統
好比說,由於咱們有12個core可用,咱們能夠設置爲4,這樣12/4餘數爲0,設置爲5的話會剩餘兩個空閒。設置4個可以使得空閒的core儘量的少。orm
這樣配置以後咱們能夠同時運行三個executor,每一個executor最多能夠運行4個任務(每一個core一個)。blog
還有一點是要求spark.executor.memoryOverhead和spark.executor.memory的和不能超過yarn.scheduler.maximum-allocation-mb設置的值。個人scheduler請求最大內存分配的是12G。內存
Spark Driver端的配置以下:
Spark在Driver端的內存不會直接影響性能,可是在沒有足夠內存的狀況下在driver端強制運行Spark任務須要調整。
集羣的executor個數設置由集羣中每一個節點的executor個數和集羣的worker個數決定,若是集羣中有3個worker,則Hive On Spark可使用的executor最大個數是12個(3 * 4)。
Hive的性能受可用的executor的個數影響很明顯,通常狀況下,性能和executor的個數成正比,4個executor的性能大約是2個executor性能的一倍,可是性能在executor設置爲必定數量的時候會達到極值,達到這個極值以後再增長executor的個數不會增長性能,反而有可能會爲集羣增長負擔。
設置spark.executor.instances到最大值可使得Spark集羣發揮最大性能。可是這樣有個問題是當集羣有多個用戶運行Hive查詢時會有問題,應避免爲每一個用戶的會話分配固定數量的executor,由於executor分配後不能回其餘用戶的查詢使用,若是有空閒的executor,在生產環境中,計劃分配好executor能夠更充分的利用Spark集羣資源。
Spark容許動態的給Spark做業分配集羣資源,cloudera推薦開啓動態分配。
爲了更加充分的利用executor,必須同時容許足夠多的並行任務。在大多數狀況下,hive會自動決定並行度,可是有時候咱們可能會手動的調整並行度。在輸入端,map task的個數等於輸入端按照必定格式切分的生成的數目,Hive On Spark的輸入格式是CombineHiveInputFormat,能夠根據須要切分底層輸入格式。調整hive.exec.reducers.bytes.per.reducer控制每一個reducer處理多少數據。可是實際狀況下,Spark相比於MapReduce,對於指定的hive.exec.reducers.bytes.per.reducer不敏感。咱們須要足夠的任務讓可用的executor保持工做不空閒,當Hive可以生成足夠多的任務,儘量的利用空閒的executor。
Hive on Spark的配置大部分即便不使用Hive,也能夠對這些參數調優。可是hive.auto.convert.join.noconditionaltask.size這個參數是將普通的join轉化成map join的閾值,這個參數調優對於性能有很大影響。MapReduce和Spark均可以經過這個參數進行調優,可是這個參數在Hive On MR上的含義不一樣於Hive On Spark。
數據的大小由兩個統計量標識:
Hive On MapReduce使用的是totalSize,Spark使用rawDataSize。數據因爲通過一系列壓縮、序列化等操做,即便是相同的數據集,也會有很大的不一樣,對於Hive On Spark,須要設置 hive.auto.convert.join.noconditionaltask.size,將普通的join操做轉化成map join來提高性能,集羣資源充足的狀況下能夠把這個參數的值適當調大,來更多的觸發map join。可是設置過高的話,小表的數據會佔用過多的內存致使整個任務由於內存耗盡而失敗,全部這個參數須要根據集羣的資源來進行調整。
Cloudera推薦配置兩個額外的配置項:
hive.stats.fetch.column.stats=true
hive.optimize.index.filter=true
如下還整理了一些配置項用於hive調優:
hive.optimize.reducededuplication.min.reducer=4 hive.optimize.reducededuplication=true hive.merge.mapfiles=true hive.merge.mapredfiles=false hive.merge.smallfiles.avgsize=16000000 hive.merge.size.per.task=256000000 hive.merge.sparkfiles=true hive.auto.convert.join=true hive.auto.convert.join.noconditionaltask=true hive.auto.convert.join.noconditionaltask.size=20M(might need to increase for Spark, 200M) hive.optimize.bucketmapjoin.sortedmerge=false hive.map.aggr.hash.percentmemory=0.5 hive.map.aggr=true hive.optimize.sort.dynamic.partition=false hive.stats.autogather=true hive.stats.fetch.column.stats=true hive.compute.query.using.stats=true hive.limit.pushdown.memory.usage=0.4 (MR and Spark) hive.optimize.index.filter=true hive.exec.reducers.bytes.per.reducer=67108864 hive.smbjoin.cache.rows=10000 hive.fetch.task.conversion=more hive.fetch.task.conversion.threshold=1073741824 hive.optimize.ppd=true |
咱們使用Hive On Spark的時候,提交第一個查詢時,看到查詢結果可能會有比較長的延遲,可是再次運行相同的SQL查詢,完成速度要比第一個查詢快得多。
當Spark使用yarn管理資源調度時,Spark executor須要額外的時間來啓動和初始化,在程序運行以前,Spark不會等待全部的executor準備好以後運行,全部在任務提交到集羣以後,仍有一些executor處於啓動狀態。在Spark上運行的做業運行速度與executor個數相關,當可用的executor的個數沒有達到最大值的時候,做業達不到最大的並行性,全部Hive上提交的第一個SQL查詢會慢。
若是是在長時間會話這個應該問題影響很小,由於只有執行第一個SQL的時候會慢,問題不大,可是不少時候咱們寫的Hive腳本,須要用一些調度框架去啓動(如Oozie)。這時候咱們須要考慮進行優化。
爲了減小啓動時間,咱們能夠開啓container pre-warming機制,開啓後只有當任務請求的全部executor準備就緒,做業纔會開始運行。這樣會提高Spark做業的並行度。