一、數據量大不是問題,數據傾斜是個問題node
二、jobs 數比較多的做業運行效率相對比較低,好比即便有幾百行的表,若是屢次關聯屢次 彙總,產生十幾個 jobs,耗時很長。緣由是 map reduce 做業初始化的時間是比較長的算法
三、sum,count,max,min 等 UDAF,不怕數據傾斜問題,hadoop 在 map 端的彙總合併優化,使 數據傾斜不成問題sql
四、count(distinct userid),在數據量大的狀況下,效率較低,若是是多 count(distinct userid,month)效率更低,由於 count(distinct)是按 group by 字段分組,按 distinct 字段排序, 通常這種分佈方式是很apache
傾斜的,好比 PV 數據,淘寶一天 30 億的 pv,若是按性別分組,分 配 2 個 reduce,每一個 reduce 指望處理 15 億數據,但現實一定是男少女多編程
一、好的模型設計事半功倍緩存
二、解決數據傾斜問題服務器
三、減小 job 數網絡
四、設置合理的 MapReduce 的 task 數,能有效提高性能。(好比,10w+級別的計算,用 160個 reduce,那是至關的浪費,1 個足夠)併發
五、瞭解數據分佈,本身動手解決數據傾斜問題是個不錯的選擇。這是通用的算法優化,但 算法優化有時不能適應特定業務背景,開發人員瞭解業務,瞭解數據,能夠經過業務邏輯精 確有效的解決數據傾斜問題負載均衡
六、數據量較大的狀況下,慎用 count(distinct),group by 容易產生傾斜問題
七、對小文件進行合併,是行之有效的提升調度效率的方法,假如全部的做業設置合理的文 件數,對雲梯的總體調度效率也會產生積極的正向影響
八、優化時把握總體,單個做業最優不如總體最優
cluster by:對同一字段分桶並排序,不能和 sort by 連用
distribute by + sort by:分桶,保證同一字段值只存在一個結果文件當中,結合 sort by 保證 每一個 reduceTask 結果有序
sort by:單機排序,單個 reduce 結果有序
order by:全局排序,缺陷是隻能使用一個 reduce
必定要區分這四種排序的使用方式和適用場景
當 Hive 設定爲嚴格模式(hive.mapred.mode=strict)時,不容許在 HQL 語句中出現笛卡爾積, 這實際說明了 Hive 對笛卡爾積支持較弱。由於找不到 Join key,Hive 只能使用 1 個 reducer 來完成笛卡爾積。
固然也可使用 limit 的辦法來減小某個表參與 join 的數據量,但對於須要笛卡爾積語義的 需求來講,常常是一個大表和一個小表的 Join 操做,結果仍然很大(以致於沒法用單機處 理),這時 MapJoin纔是最好的解決辦法。MapJoin,顧名思義,會在 Map 端完成 Join 操做。 這須要將 Join 操做的一個或多個表徹底讀入內存。
PS:MapJoin 在子查詢中可能出現未知 BUG。在大表和小表作笛卡爾積時,規避笛卡爾積的 方法是,給 Join 添加一個 Join key,原理很簡單:將小表擴充一列 join key,並將小表的條 目複製數倍,join key 各不相同;將大表擴充一列 join key 爲隨機數。
精髓就在於複製幾倍,最後就有幾個 reduce 來作,並且大表的數據是前面小表擴張 key 值 範圍裏面隨機出來的,因此複製了幾倍 n,就至關於這個隨機範圍就有多大 n,那麼相應的, 大表的數據就被隨機的分爲了 n 份。而且最後處理所用的 reduce 數量也是 n,並且也不會 出現數據傾斜。
雖然通過測驗,hive1.2.1 也支持 in/exists 操做,但仍是推薦使用 hive 的一個高效替代方案:left semi join
好比說:
select a.id, a.name from a where a.id in (select b.id from b); select a.id, a.name from a where exists (select id from b where a.id = b.id);
應該轉換成:
select a.id, a.name from a left semi join b on a.id = b.id;
Map 數過大
Map 階段輸出文件過小,產生大量小文件
初始化和建立 Map 的開銷很大
Map 數過小
文件處理或查詢併發度小,Job 執行時間過長
大量做業時,容易堵塞集羣
在 MapReduce 的編程案例中,咱們得知,一個MR Job的 MapTask 數量是由輸入分片 InputSplit 決定的。而輸入分片是由 FileInputFormat.getSplit()決定的。一個輸入分片對應一個 MapTask, 而輸入分片是由三個參數決定的:
輸入分片大小的計算是這麼計算出來的:
long splitSize = Math.max(minSize, Math.min(maxSize, blockSize))
默認狀況下,輸入分片大小和 HDFS 集羣默認數據塊大小一致,也就是默認一個數據塊,啓 用一個 MapTask 進行處理,這樣作的好處是避免了服務器節點之間的數據傳輸,提升 job 處 理效率
兩種經典的控制 MapTask 的個數方案:減小 MapTask 數或者增長 MapTask 數
一、 減小 MapTask 數是經過合併小文件來實現,這一點主要是針對數據源
二、 增長 MapTask 數能夠經過控制上一個 job 的 reduceTask 個數
由於 Hive 語句最終要轉換爲一系列的 MapReduce Job 的,而每個 MapReduce Job 是由一 系列的 MapTask 和 ReduceTask 組成的,默認狀況下, MapReduce 中一個 MapTask 或者一個 ReduceTask 就會啓動一個 JVM 進程,一個 Task 執行完畢後, JVM 進程就退出。這樣若是任 務花費時間很短,又要屢次啓動 JVM 的狀況下,JVM 的啓動時間會變成一個比較大的消耗, 這個時候,就能夠經過重用 JVM 來解決:
set mapred.job.reuse.jvm.num.tasks=5
文件數目過多,會給 HDFS 帶來壓力,而且會影響處理效率,能夠經過合併 Map 和 Reduce 的 結果文件來消除這樣的影響:
set hive.merge.mapfiles = true ##在 map only 的任務結束時合併小文件
set hive.merge.mapredfiles = false ## true 時在 MapReduce 的任務結束時合併小文件
set hive.merge.size.per.task = 256*1000*1000 ##合併文件的大小
set mapred.max.split.size=256000000; ##每一個 Map 最大分割大小
set mapred.min.split.size.per.node=1; ##一個節點上 split 的最少值
set hive.input.format=org.apache.hadoop.hive.ql.io.CombineHiveInputFormat; ##執行 Map 前進行小文件合併
Hadoop MapReduce 程序中,reducer 個數的設定極大影響執行效率,這使得 Hive 怎樣決定 reducer 個數成爲一個關鍵問題。遺憾的是 Hive 的估計機制很弱,不指定 reducer 個數的情 況下,Hive 會猜想肯定一個 reducer 個數,基於如下兩個設定:
一、hive.exec.reducers.bytes.per.reducer(默認爲 256000000)
二、hive.exec.reducers.max(默認爲 1009)
三、mapreduce.job.reduces=-1(設置一個常量 reducetask 數量)
計算 reducer 數的公式很簡單: N=min(參數 2,總輸入數據量/參數 1) 一般狀況下,有必要手動指定 reducer 個數。考慮到 map 階段的輸出數據量一般會比輸入有 大幅減小,所以即便不設定 reducer 個數,重設參數 2 仍是必要的。
依據 Hadoop 的經驗,能夠將參數 2 設定爲 0.95*(集羣中 datanode 個數)。
Multi-group by 是 Hive 的一個很是好的特性,它使得 Hive 中利用中間結果變得很是方便。 例如:
FROM (SELECT a.status, b.school, b.gender FROM status_updates a JOIN profiles b ON (a.userid = b.userid and a.ds='2009-03-20' ) ) subq1 INSERT OVERWRITE TABLE gender_summary PARTITION(ds='2009-03-20') SELECT subq1.gender, COUNT(1) GROUP BY subq1.gender INSERT OVERWRITE TABLE school_summary PARTITION(ds='2009-03-20') SELECT subq1.school, COUNT(1) GROUP BY subq1.school
上述查詢語句使用了 multi-group by 特性連續 group by 了 2 次數據,使用不一樣的 group by key。 這一特性能夠減小一次 MapReduce 操做
Bucket 是指將數據以指定列的值爲 key 進行 hash,hash 到指定數目的桶中。這樣就能夠支 持高效採樣了。以下例就是以 userid 這一列爲 bucket 的依據,共設置 32 個 buckets
CREATE TABLE page_view(viewTime INT, userid BIGINT, page_url STRING, referrer_url STRING, ip STRING COMMENT 'IP Address of the User') COMMENT 'This is the page view table' PARTITIONED BY(dt STRING, country STRING) CLUSTERED BY(userid) SORTED BY(viewTime) INTO 32 BUCKETS ROW FORMAT DELIMITED FIELDS TERMINATED BY '1' COLLECTION ITEMS TERMINATED BY '2' MAP KEYS TERMINATED BY '3' STORED AS SEQUENCEFILE;
一般狀況下,Sampling 在全體數據上進行採樣,這樣效率天然就低,它要去訪問全部數據。 而若是一個表已經對某一列製做了 bucket,就能夠採樣全部桶中指定序號的某個桶,這就 減小了訪問量。
以下例所示就是採樣了 page_view 中 32 個桶中的第三個桶的所有數據:
SELECT * FROM page_view TABLESAMPLE(BUCKET 3 OUT OF 32);
以下例所示就是採樣了 page_view 中 32 個桶中的第三個桶的一半數據:
SELECT * FROM page_view TABLESAMPLE(BUCKET 3 OUT OF 64);
Partition 就是分區。分區經過在建立表時啓用 partitioned by 實現,用來 partition 的維度並不 是實際數據的某一列,具體分區的標誌是由插入內容時給定的。當要查詢某一分區的內容時 能夠採用 where 語句,形似 where tablename.partition_column = a 來實現。
建立含分區的表
CREATE TABLE page_view(viewTime INT, userid BIGINT, page_url STRING, referrer_url STRING, ip STRING COMMENT 'IP Address of the User') PARTITIONED BY(date STRING, country STRING) ROW FORMAT DELIMITED FIELDS TERMINATED BY '1' STORED AS TEXTFILE;
載入內容,並指定分區標誌
load data local inpath '/home/hadoop/pv_2008-06-08_us.txt' into table page_view partition(date='2008-06-08', country='US');
查詢指定標誌的分區內容
SELECT page_views.* FROM page_views WHERE page_views.date >= '2008-03-01' AND page_views.date <= '2008-03-31' AND page_views.referrer_url like '%xyz.com';
整體原則:
一、 優先過濾後再進行 Join 操做,最大限度的減小參與 join 的數據量
二、 小表 join 大表,最好啓動 mapjoin
三、 Join on 的條件相同的話,最好放入同一個 job,而且 join 表的排列順序從小到大
在使用寫有 Join 操做的查詢語句時有一條原則:應該將條目少的表/子查詢放在 Join 操做 符的左邊。緣由是在 Join 操做的 Reduce 階段,位於 Join 操做符左邊的表的內容會被加 載進內存,將條目少的表放在左邊,能夠有效減小發生 OOM 錯誤的概率。對於一條語句 中有多個 Join 的狀況,若是 Join 的條件相同,好比查詢
INSERT OVERWRITE TABLE pv_users SELECT pv.pageid, u.age FROM page_view p JOIN user u ON (pv.userid = u.userid) JOIN newuser x ON (u.userid = x.userid);
若是 Join 的 key 相同,無論有多少個表,都會則會合併爲一個 Map-Reduce 任務,而不 是」n」個,在作 OUTER JOIN 的時候也是同樣
若是 join 的條件不相同,好比:
INSERT OVERWRITE TABLE pv_users SELECT pv.pageid, u.age FROM page_view p JOIN user u ON (pv.userid = u.userid) JOIN newuser x on (u.age = x.age);
Map-Reduce 的任務數目和 Join 操做的數目是對應的,上述查詢和如下查詢是等價的
--先 page_view 表和 user 表作連接 INSERT OVERWRITE TABLE tmptable SELECT * FROM page_view p JOIN user u ON (pv.userid = u.userid); -- 而後結果表 temptable 和 newuser 表作連接 INSERT OVERWRITE TABLE pv_users SELECT x.pageid, x.age FROM tmptable x JOIN newuser y ON (x.age = y.age);
在編寫 Join 查詢語句時,若是肯定是因爲 join 出現的數據傾斜,那麼請作以下設置:
set hive.skewjoin.key=100000; // 這個是 join 的鍵對應的記錄條數超過這個值則會進行 分拆,值根據具體數據量設置
set hive.optimize.skewjoin=true; // 若是是 join 過程出現傾斜應該設置爲 true
並非全部的聚合操做都須要在 Reduce 端完成,不少聚合操做均可以先在 Map 端進 行部分聚合,最後在 Reduce 端得出最終結果。
MapReduce 的 combiner 組件參數包括:
set hive.map.aggr = true 是否在 Map 端進行聚合,默認爲 True
set hive.groupby.mapaggr.checkinterval = 100000 在 Map 端進行聚合操做的條目數目
set hive.groupby.skewindata = true
當 sql 語句使用 groupby 時數據出現傾斜時,若是該變量設置爲 true,那麼 Hive 會自動進行 負載均衡。策略就是把 MR 任務拆分紅兩個:第一個先作預彙總,第二個再作最終彙總
在 MR 的第一個階段中,Map 的輸出結果集合會緩存到 maptaks 中,每一個 Reduce 作部分聚 合操做,並輸出結果,這樣處理的結果是相同 Group By Key 有可能被分發到不一樣的 Reduce 中, 從而達到負載均衡的目的;第二個階段 再根據預處理的數據結果按照 Group By Key 分佈到 Reduce 中(這個過程能夠保證相同的 Group By Key 被分佈到同一個 Reduce 中),最後完成 最終的聚合操做。
建立表時,儘可能使用 orc、parquet 這些列式存儲格式,由於列式存儲的表,每一列的數據在 物理上是存儲在一塊兒的,Hive 查詢時會只遍歷須要列數據,大大減小處理的數據量。
Hive 在集羣上查詢時,默認是在集羣上 N 臺機器上運行, 須要多個機器進行協調運行,這 個方式很好地解決了大數據量的查詢問題。可是當 Hive 查詢處理的數據量比較小時,其實 沒有必要啓動分佈式模式去執行,由於以分佈式方式執行就涉及到跨網絡傳輸、多節點協調 等,而且消耗資源。這個時間能夠只使用本地模式來執行 mapreduce job,只在一臺機器上 執行,速度會很快。啓動本地模式涉及到三個參數:
set hive.exec.mode.local.auto=true 是打開 hive 自動判斷是否啓動本地模式的開關,可是隻 是打開這個參數並不能保證啓動本地模式,要當 map 任務數不超過
hive.exec.mode.local.auto.input.files.max 的個數而且 map 輸入文件大小不超過
hive.exec.mode.local.auto.inputbytes.max 所指定的大小時,才能啓動本地模式。
一個 hive sql 語句可能會轉爲多個 mapreduce Job,每個 job 就是一個 stage,這些 job 順序 執行,這個在 cli 的運行日誌中也能夠看到。可是有時候這些任務之間並非是相互依賴的, 若是集羣資源容許的話,可讓多個並不相互依賴 stage 併發執行,這樣就節約了時間,提 高了執行速度,可是若是集羣資源匱乏時,啓用並行化反卻是會致使各個 job 相互搶佔資源 而致使總體執行性能的降低。啓用並行化:
set hive.exec.parallel=true;
set hive.exec.parallel.thread.number=8; //同一個 sql 容許並行任務的最大線程數
Hive 最終是轉爲 MapReduce 程序來執行的,而 MapReduce 的性能瓶頸在於網絡 IO 和 磁盤 IO,要解決性能瓶頸,最主要的是減小數據量,對數據進行壓縮是個好的方式。壓縮 雖然是減小了數據量,可是壓縮過程要消耗 CPU 的,可是在 Hadoop 中, 每每性能瓶頸不 在於 CPU,CPU 壓力並不大,因此壓縮充分利用了比較空閒的 CPU
各個壓縮方式所對應的 Class 類:
壓縮比率
壓縮解壓縮速度
是否支持 Split
Job 輸出文件按照 block 以 GZip 的方式進行壓縮:
set mapreduce.output.fileoutputformat.compress=true // 默認值是 false
set mapreduce.output.fileoutputformat.compress.type=BLOCK // 默認值是 Record
set mapreduce.output.fileoutputformat.compress.codec=org.apache.hadoop.io.compress.GzipCodec // 默認值是 org.apache.hadoop.io.compress.DefaultCodec
Map 輸出結果也以 Gzip 進行壓縮:
set mapred.map.output.compress=true
set mapreduce.map.output.compress.codec=org.apache.hadoop.io.compress.GzipCodec // 默認值是 org.apache.hadoop.io.compress.DefaultCodec
對 Hive 輸出結果和中間都進行壓縮:
set hive.exec.compress.output=true // 默認值是 false,不壓縮
set hive.exec.compress.intermediate=true // 默認值是 false,爲 true 時 MR 設置的壓縮才啓用