Fetch抓取是指,Hive中對某些狀況的查詢能夠沒必要使用MapReduce計算。 例如:select * from employee;在這種狀況下,Hive能夠簡單的讀取employee對應存儲目錄下的文件, 而後輸出查詢結果到控制檯上。 在hive-default.xml.template文件中hive.fetch.task.conversion默認是more。 老版本默認是minimal,該屬性改成more之後,在全局查找,字段查找,limit查找都不走MapReduce。 <property> <name>hive.fetch.task.conversion</name> <value>more</value> <description> Expects one of [none, minimal, more]. Some select queries can be converted to single FETCH task minimizing latency. Currently the query should be single sourced not having any subquery and should not have any aggregations or distincts (which incurs RS), lateral views and joins. 0. none : disable hive.fetch.task.conversion 1. minimal : SELECT STAR, FILTER on partition columns, LIMIT only 2. more : SELECT, FILTER, LIMIT only (support TABLESAMPLE and virtual columns) </description> </property> 把 hive.fetch.task.conversion 設置成 none,而後執行查詢語句,都會執行 mapreduce程序。 把 hive.fetch.task.conversion 設置成 more,而後執行查詢語句,以下查詢方式都不會執行 mapreduce 程序 hive (default)> set hive.fetch.task.conversion=more; hive (default)> select * from emp; hive (default)> select ename from emp; hive (default)> select ename from emp limit 3;
大多數的 Hadoop Job 是須要 Hadoop 提供的完整的可擴展性來處理大數據集的。不過,
有時 Hive 的輸入數據量是很是小的。在這種狀況下,爲查詢觸發執行任務時消耗可能會比
實際 job 的執行時間要多的多。對於大多數這種狀況,Hive 能夠經過本地模式在單臺機器上
處理全部的任務。對於小數據集,執行時間能夠明顯被縮短。
用戶能夠經過設置 hive.exec.mode.local.auto 的值爲 true,來讓 Hive 在適當的時候自動啓動這個優化node
set hive.exec.mode.local.auto=true; //開啓本地 mr //設置 local mr 的最大輸入數據量,當輸入數據量小於這個值時採用 local mr 的方式, 默認爲 134217728,即 128M set hive.exec.mode.local.auto.inputbytes.max=50000000; //設置 local mr 的最大輸入文件個數,當輸入文件個數小於這個值時採用 local mr 的方式,默認爲 4 set hive.exec.mode.local.auto.input.files.max=10; 1)開啓本地模式,並執行查詢語句 hive (default)> set hive.exec.mode.local.auto=true; hive (default)> select * from emp cluster by deptno; Time taken: 1.328 seconds, Fetched: 14 row(s) 2)關閉本地模式,並執行查詢語句 hive (default)> set hive.exec.mode.local.auto=false; hive (default)> select * from emp cluster by deptno; Time taken: 20.09 seconds, Fetched: 14 row(s)
將 key 相對分散,而且數據量小的表放在 join 的左邊,這樣能夠有效減小內存溢出錯誤發生的概率;再進一步,可使用 Group 讓小的維度表(1000 條如下的記錄條數)先進內存。在 map 端完成 reduce。實際測試發現:新版的 hive 已經對小表 JOIN 大表和大表 JOIN 小表進行了優化。小表放在左邊和右邊已經沒有明顯區別。web
1)空 KEY 過濾
有時 join 超時是由於某些 key 對應的數據太多,而相同 key 對應的數據都會發送到相同
的 reducer 上,從而致使內存不夠。此時咱們應該仔細分析這些異常的 key,不少狀況下,
這些 key 對應的數據是異常數據,咱們須要在 SQL 語句中進行過濾。例如 key 對應的字段
爲空,操做以下sql
(1)配置歷史服務器 配置 mapred-site.xml數據庫
<property> <name>mapreduce.jobhistory.address</name> <value>hadoop102:10020</value> </property> <property> <name>mapreduce.jobhistory.webapp.address</name> <value>hadoop102:19888</value> </property>
啓動歷史服務器apache
sbin/mr-jobhistory-daemon.sh start historyserver
查看 jobhistory服務器
http://192.168.1.102:19888/jobhistory
(2)建立原始數據表、空 id 表、合併後數據表併發
create table ori(id bigint, time bigint, uid string, keyword string, url_rank int, click_num int, click_url string) row format delimited fields terminated by '\t'; create table nullidtable(id bigint, time bigint, uid string, keyword string, url_rank int, click_num int, click_url string) row format delimited fields terminated by '\t'; create table jointable(id bigint, time bigint, uid string, keyword string, url_rank int, click_num int, click_url string) row format delimited fields terminated by '\t';
(3)分別加載原始數據和空 id 數據到對應表中app
hive (default)> load data local inpath '/opt/module/datas/ori' into table ori; hive (default)> load data local inpath '/opt/module/datas/nullid' into table nullidtable;
(4)測試不過濾空 id負載均衡
hive (default)> insert overwrite table jointable select n.* from nullidtable n left join ori o on n.id = o.id; Time taken: 42.038 seconds
(5)測試過濾空 idwebapp
hive (default)> insert overwrite table jointable select n.* from (select * from nullidtable where id is not null ) n left join ori o on n.id = o.id; Time taken: 31.725 seconds
2)空 key 轉換
有時雖然某個 key 爲空對應的數據不少,可是相應的數據不是異常數據,必需要包含在
join 的結果中,此時咱們能夠表 a 中 key 爲空的字段賦一個隨機的值,使得數據隨機均勻地
分不到不一樣的 reducer 上。例如:
不隨機分佈空 null 值:
(1)設置 5 個 reduce 個數
set mapreduce.job.reduces = 5;
(2)JOIN 兩張表
insert overwrite table jointable select n.* from nullidtable n left join ori b on n.id = b.id;
結果:能夠看出來,出現了數據傾斜,某些 reducer 的資源消耗遠大於其餘 reducer
隨機分佈空 null 值
(1)設置 5 個 reduce 個數
set mapreduce.job.reduces = 5;
(2)JOIN 兩張表
判斷若是id爲null,那麼隨機拼接字符串
insert overwrite table jointable select n.* from nullidtable n full join ori o on case when n.id is null then concat('hive', rand()) else n.id end = o.id;
結果:能夠看出來,消除了數據傾斜,負載均衡 reducer 的資源消耗
mapjoin原理圖
若是不指定 MapJoin 或者不符合 MapJoin 的條件,那麼 Hive 解析器會將 Join 操做轉換成 Common Join,即:在 Reduce 階段完成 join。容易發生數據傾斜。能夠用 MapJoin 把小表所有加載到內存在 map 端進行 join,避免 reducer 處理。
1)開啓 MapJoin 參數設置:
(1)設置自動選擇 Mapjoin
set hive.auto.convert.join = true; 默認爲 true
(2)大表小表的閥值設置(默認 25M 一下認爲是小表):
set hive.mapjoin.smalltable.filesize=25000000;
案例實操: (1)開啓 Mapjoin 功能 set hive.auto.convert.join = true; 默認爲 true (2)執行小表 JOIN 大表語句 insert overwrite table jointable select b.id, b.time, b.uid, b.keyword, b.url_rank, b.click_num, b.click_url from smalltable s join bigtable b on s.id = b.id; Time taken: 24.594 seconds (3)執行大表 JOIN 小表語句 insert overwrite table jointable select b.id, b.time, b.uid, b.keyword, b.url_rank, b.click_num, b.click_url from bigtable b join smalltable s on s.id = b.id; Time taken: 24.315 seconds
默認狀況下,Map 階段同一 Key 數據分發給一個 reduce,當一個 key 數據過大時就傾斜了。
並非全部的聚合操做都須要在 Reduce 端完成,不少聚合操做均可以先在 Map 端進行部分聚合,最後在 Reduce 端得出最終結果。
開啓 Map 端聚合參數設置 (1)是否在 Map 端進行聚合,默認爲 True hive.map.aggr = true (2)在 Map 端進行聚合操做的條目數目 hive.groupby.mapaggr.checkinterval = 100000 (3)有數據傾斜的時候進行負載均衡(默認是 false) hive.groupby.skewindata = true 當選項設定爲 true,生成的查詢計劃會有兩個 MR Job。第一個 MR Job 中,Map 的輸 出結果會隨機分佈到 Reduce 中,每一個 Reduce 作部分聚合操做,並輸出結果,這樣處理的結 果是相同的 Group By Key 有可能被分發到不一樣的 Reduce 中,從而達到負載均衡的目的;第 二個 MR Job 再根據預處理的數據結果按照 Group By Key 分佈到 Reduce 中(這個過程能夠 保證相同的 Group By Key 被分佈到同一個 Reduce 中),最後完成最終的聚合操做。
數據量小的時候無所謂,數據量大的狀況下,因爲 COUNT DISTINCT 操做須要用一個 Reduce Task 來完成,這一個 Reduce 須要處理的數據量太大,就會致使整個 Job 很難完成, 通常 COUNT DISTINCT 使用先 GROUP BY 再 COUNT 的方式替換 設置 5 個 reduce 個數 set mapreduce.job.reduces = 5; 執行去重 id 查詢 hive (default)> select count(distinct id) from bigtable; Stage-Stage-1: Map: 1 Reduce: 1 Cumulative CPU: 7.12 sec HDFS Read: 120741990 HDFS Write: 7 SUCCESS Total MapReduce CPU Time Spent: 7 seconds 120 msec OK c0 100001 Time taken: 23.607 seconds, Fetched: 1 row(s) Time taken: 34.941 seconds, Fetched: 1 row(s) 採用 GROUP by 去重 id hive (default)> select count(id) from (select id from bigtable group by id) a; Stage-Stage-1: Map: 1 Reduce: 5 Cumulative CPU: 17.53 sec HDFS Read: 120752703 HDFS Write: 580 SUCCESS Stage-Stage-2: Map: 3 Reduce: 1 Cumulative CPU: 4.29 sec HDFS Read: 9409 HDFS Write: 7 SUCCESS Total MapReduce CPU Time Spent: 21 seconds 820 msec OK _c0 100001 Time taken: 50.795 seconds, Fetched: 1 row(s) 雖然會多用一個 Job 來完成,但在數據量大的狀況下,這個絕對是值得的。
儘可能避免笛卡爾積,join 的時候不加 on 條件,或者無效的 on 條件,Hive 只能使用 1個 reducer 來完成笛卡爾積
列處理:在 SELECT 中,只拿須要的列,若是有,儘可能使用分區過濾,少用 SELECT *。
行處理:在分區剪裁中,當使用外關聯時,若是將副表的過濾條件寫在 Where 後面,
那麼就會先全表關聯,以後再過濾,好比:
(1)測試先關聯兩張表,再用 where 條件過濾 hive (default)> select o.id from bigtable b join ori o on o.id = b.id where o.id <= 10; Time taken: 34.406 seconds, Fetched: 100 row(s) Time taken: 26.043 seconds, Fetched: 100 row(s) (2)經過子查詢後,再關聯表 hive (default)> select b.id from bigtable b join (select id from ori where id <= 10 ) o on b.id = o.id; Time taken: 30.058 seconds, Fetched: 100 row(s) Time taken: 29.106 seconds, Fetched: 100 row(s)
關係型數據庫中,對分區表 Insert 數據時候,數據庫自動會根據分區字段的值,將數據 插入到相應的分區中,Hive 中也提供了相似的機制,即動態分區(Dynamic Partition),只不 過,使用 Hive 的動態分區,須要進行相應的配置。
(1)開啓動態分區功能(默認 true,開啓)
hive.exec.dynamic.partition=true
(2)設置爲非嚴格模式(動態分區的模式,默認 strict,表示必須指定至少一個分區爲靜態分區,nonstrict 模式表示容許全部的分區字段均可以使用動態分區。)
hive.exec.dynamic.partition.mode=nonstrict
(3)在全部執行 MR 的節點上,最大一共能夠建立多少個動態分區。
hive.exec.max.dynamic.partitions=1000
(4)在每一個執行 MR 的節點上,最大能夠建立多少個動態分區。該參數須要根據實際的數據來設定。好比:源數據中包含了一年的數據,即 day 字段有 365 個值,那麼該參數就須要設置成大於 365,若是使用默認值 100,則會報錯。
hive.exec.max.dynamic.partitions.pernode=100
(5)整個 MR Job 中,最大能夠建立多少個 HDFS 文件。
hive.exec.max.created.files=100000
(6)當有空分區生成時,是否拋出異常。通常不須要設置。
hive.error.on.empty.partition=false
需求:將 ori 中的數據按照時間(如:20111230000008),插入到目標表 ori_partitioned_target
的相應分區中。
(1)建立分區表 create table ori_partitioned(id bigint, time bigint, uid string, keyword string, url_rank int, click_num int, click_url string) partitioned by (p_time bigint) row format delimited fields terminated by '\t'; (2)加載數據到分區表中 hive (default)> load data local inpath '/opt/module/datas/ds1' into table ori_partitioned partition(p_time='20111230000010') ; hive (default)> load data local inpath '/opt/module/datas/ds2' into table ori_partitioned partition(p_time='20111230000011') ; (3)建立目標分區表 create table ori_partitioned_target(id bigint, time bigint, uid string, keyword string, url_rank int, click_num int, click_url string) PARTITIONED BY (p_time STRING) row format delimited fields terminated by '\t'; (4)設置動態分區 set hive.exec.dynamic.partition = true; set hive.exec.dynamic.partition.mode = nonstrict; set hive.exec.max.dynamic.partitions = 1000; set hive.exec.max.dynamic.partitions.pernode = 100; set hive.exec.max.created.files = 100000; set hive.error.on.empty.partition = false; hive (default)> insert overwrite table ori_partitioned_target partition (p_time) select id, time, uid, keyword, url_rank, click_num, click_url, p_time from ori_partitioned; (5)查看目標分區表的分區狀況 hive (default)> show partitions ori_partitioned_target;
1 )一般狀況下,做業會經過 input 的目錄產生一個或者多個 map 任務。 主要的決定因素有:input 的文件總個數,input 的文件大小,集羣設置的文件塊大小。 2 )是否是 map 數越多越好? 答案是否認的。若是一個任務有不少小文件(遠遠小於塊大小 128m),則每一個小文件 也會被當作一個塊,用一個 map 任務來完成,而一個 map 任務啓動和初始化的時間遠遠大 於邏輯處理的時間,就會形成很大的資源浪費。並且,同時可執行的 map 數是受限的。 3 )是否是保證每一個 map 處理接近 128m 的文件塊,就高枕無憂了? 答案也是不必定。好比有一個 127m的文件,正常會用一個 map 去完成,但這個文件只 有一個或者兩個小字段,卻有幾千萬的記錄,若是 map 處理的邏輯比較複雜,用一個 map 任務去作,確定也比較耗時。 針對上面的問題 2 和 3,咱們須要採起兩種方式來解決:即減小 map 數和增長 map 數;
在 map 執行前合併小文件,減小 map 數:CombineHiveInputFormat 具備對小文件進行 合併的功能(系統默認的格式)。HiveInputFormat 沒有對小文件合併功能。 set hive.input.format= org.apache.hadoop.hive.ql.io.CombineHiveInputFormat;
當 input 的文件都很大,任務邏輯複雜,map 執行很是慢的時候,能夠考慮增長 Map 數,來使得每一個 map 處理的數據量減小,從而提升任務的執行效率。 增長 map 的方法爲:根據 computeSliteSize(Math.max(minSize,Math.min(maxSize,blocksize)))=blocksize=128M 公式,調 整 maxSize 最大值。讓 maxSize 最大值低於 blocksize 就能夠增長 map 的個數。 案例實操: (1)執行查詢 hive (default)> select count(*) from emp; Hadoop job information for Stage-1: number of mappers: 1; number of reducers: 1 (2)設置最大切片值爲 100 個字節 hive (default)> set mapreduce.input.fileinputformat.split.maxsize=100; hive (default)> select count(*) from emp; Hadoop job information for Stage-1: number of mappers: 6; number of reducers: 1
1 )調整 reduce 個數方法一 (1)每一個 Reduce 處理的數據量默認是 256MB hive.exec.reducers.bytes.per.reducer=256000000 (2)每一個任務最大的 reduce 數,默認爲 1009 hive.exec.reducers.max=1009 (3)計算 reducer 數的公式 N=min(參數 2,總輸入數據量/參數 1) 2 )調整 reduce 個數方法二 在 hadoop 的 mapred-default.xml 文件中修改 設置每一個 job 的 Reduce 個數 set mapreduce.job.reduces = 15; 3 )reduce 個數並非越多越好 1)過多的啓動和初始化 reduce 也會消耗時間和資源; 2)另外,有多少個 reduce,就會有多少個輸出文件,若是生成了不少個小文件,那麼 若是這些小文件做爲下一個任務的輸入,則也會出現小文件過多的問題; 在設置 reduce 個數的時候也須要考慮這兩個原則:處理大數據量利用合適的 reduce 數; 使單個 reduce 任務處理數據量大小要合適;
Hive 會將一個查詢轉化成一個或者多個階段。這樣的階段能夠是 MapReduce 階段、抽
樣階段、合併階段、limit 階段。或者 Hive 執行過程當中可能須要的其餘階段。默認狀況下,
Hive 一次只會執行一個階段。不過,某個特定的 job 可能包含衆多的階段,而這些階段可能
並不是徹底互相依賴的,也就是說有些階段是能夠並行執行的,這樣可能使得整個 job 的執行
時間縮短。不過,若是有更多的階段能夠並行執行,那麼 job 可能就越快完成。
經過設置參數 hive.exec.parallel 值爲 true,就能夠開啓併發執行。不過,在共享集羣中,
須要注意下,若是 job 中並行階段增多,那麼集羣利用率就會增長。
set hive.exec.parallel=true; //打開任務並行執行 set hive.exec.parallel.thread.number=16; //同一個 sql 容許最大並行度,默認爲 8。
固然,得是在系統資源比較空閒的時候纔有優點,不然,沒資源,並行也起不來。
Hive 提供了一個嚴格模式,能夠防止用戶執行那些可能意向不到的很差的影響的查詢。
經過設置屬性 hive.mapred.mode 值爲默認是非嚴格模式 nonstrict 。開啓嚴格模式須要
修改 hive.mapred.mode 值爲 strict,開啓嚴格模式能夠禁止 3 種類型的查詢。
<property> <name>hive.mapred.mode</name> <value>strict</value> <description> The mode in which the Hive operations are being performed. In strict mode, some risky queries are not allowed to run. They include: Cartesian Product. No partition being picked up for a query. Comparing bigints and strings. Comparing bigints and doubles. Orderby without limit. </description> </property>
1)對於分區表,除非 where 語句中含有分區字段過濾條件來限制範圍,不然不容許執行。
換句話說,就是用戶不容許掃描全部分區。進行這個限制的緣由是,一般分區表都擁有很是
大的數據集,並且數據增長迅速。沒有進行分區限制的查詢可能會消耗使人不可接受的巨大
資源來處理這個表。
2)對於使用了 order by 語句的查詢,要求必須使用 limit 語句。由於 order by 爲了執行排序
過程會將全部的結果數據分發到同一個 Reducer 中進行處理,強制要求用戶增長這個 LIMIT
語句能夠防止 Reducer 額外執行很長一段時間。
3)限制笛卡爾積的查詢。對關係型數據庫很是瞭解的用戶可能指望在執行 JOIN 查詢的時
候不使用 ON 語句而是使用 where 語句,這樣關係數據庫的執行優化器就能夠高效地將
WHERE 語句轉化成那個 ON 語句。不幸的是,Hive 並不會執行這種優化,所以,若是表足
夠大,那麼這個查詢就會出現不可控的狀況。
JVM 重用是 Hadoop 調優參數的內容,其對 Hive 的性能具備很是大的影響,特別是對
於很難避免小文件的場景或 task 特別多的場景,這類場景大多數執行時間都很短。
Hadoop 的默認配置一般是使用派生 JVM 來執行 map 和 Reduce 任務的。這時 JVM 的
啓動過程可能會形成至關大的開銷,尤爲是執行的job包含有成百上千task任務的狀況。JVM
重用可使得 JVM 實例在同一個 job 中從新使用 N 次。N 的值能夠在 Hadoop 的
mapred-site.xml 文件中進行配置。一般在 10-20 之間,具體多少須要根據具體業務場景測試
得出。
<property> <name>mapreduce.job.jvm.numtasks</name> <value>10</value> <description>How many tasks to run per jvm. If set to -1, there is no limit. </description> </property>
這個功能的缺點是,開啓 JVM 重用將一直佔用使用到的 task 插槽,以便進行重用,直
到任務完成後才能釋放。若是某個「不平衡的」job 中有某幾個 reduce task 執行的時間要比其餘 Reduce task 消耗的時間多的多的話,那麼保留的插槽就會一直空閒着卻沒法被其餘的 job
使用,直到全部的 task 都結束了纔會釋放。
在分佈式集羣環境下,由於程序 Bug(包括 Hadoop 自己的 bug),負載不均衡或者資源
分佈不均等緣由,會形成同一個做業的多個任務之間運行速度不一致,有些任務的運行速度
可能明顯慢於其餘任務(好比一個做業的某個任務進度只有 50%,而其餘全部任務已經運
行完畢),則這些任務會拖慢做業的總體執行進度。爲了不這種狀況發生,Hadoop 採用
了推測執行(Speculative Execution)機制,它根據必定的法則推測出「拖後腿」的任務,併爲
這樣的任務啓動一個備份任務,讓該任務與原始任務同時處理同一份數據,並最終選用最早
成功運行完成任務的計算結果做爲最終結果。
設置開啓推測執行參數:Hadoop 的 mapred-site.xml 文件中進行配置
<property> <name>mapreduce.map.speculative</name> <value>true</value> <description>If true, then multiple instances of some map tasks may be executed in parallel.</description> </property> <property> <name>mapreduce.reduce.speculative</name> <value>true</value> <description>If true, then multiple instances of some reduce tasks may be executed in parallel.</description> </property>
不過 hive 自己也提供了配置項來控制 reduce-side 的推測執行:
<property> <name>hive.mapred.reduce.tasks.speculative.execution</name> <value>true</value> <description>Whether speculative execution for reducers should be turned on. </description> </property>
關於調優這些推測執行變量,還很難給一個具體的建議。若是用戶對於運行時的誤差非
常敏感的話,那麼能夠將這些功能關閉掉。若是用戶由於輸入數據量很大而須要執行長時間
的 map 或者 Reduce task 的話,那麼啓動推測執行形成的浪費是很是巨大大。
1)基本語法 EXPLAIN [EXTENDED | DEPENDENCY | AUTHORIZATION] query 2)案例實操 (1)查看下面這條語句的執行計劃 hive (default)> explain select * from emp; hive (default)> explain select deptno, avg(sal) avg_sal from emp group by deptno; (2)查看詳細執行計劃 hive (default)> explain extended select * from emp; hive (default)> explain extended select deptno, avg(sal) avg_sal from emp group by deptno;