優化前咱們須要知道hadoop適合幹什麼活,適合什麼場景,在工做中,咱們要知道業務是怎樣的,能才結合平臺資源達到最有優化。除了這些咱們固然還要知道mapreduce的執行過程,好比從文件的讀取,map處理,shuffle過程,reduce處理,文件的輸出或者存儲。在工做中,每每平臺的參數都是固定的,不可能爲了某一個做業去修改整個平臺的參數,因此在做業的執行過程當中,須要對做業進行單獨的設定,這樣既不會對其餘做業產生影響,也能很好的提升做業的性能,提升優化的靈活性。html
如今回顧下hadoop的優點(適用場景):
一、可構建在廉價機器上,設備成本相對低
二、高容錯性,HDFS將數據自動保存多個副本,副本丟失後,自動恢復,防止數據丟失或損壞
三、適合批處理,HDFS適合一次寫入、屢次查詢(讀取)的狀況,適合在已有的數據進行屢次分析,穩定性好
四、適合存儲大文件,其中的大表示能夠存儲單個大文件,由於是分塊存儲,以及表示存儲大量的數據算法
從概述中咱們知道,很明顯hadoop適合大文件的處理和存儲,那爲何不適合小文件呢?sql
一、從存儲方面來講:hadoop的存儲每一個文件都會在NameNode上記錄元數據,若是一樣大小的文件,文件很小的話,就會產生不少文件,形成NameNode的壓力。
二、從讀取方面來講:一樣大小的文件分爲不少小文件的話,會增長磁盤尋址次數,下降性能
三、從計算方面來講:咱們知道一個map默認處理一個分片或者一個小文件,若是map的啓動時間都比數據處理的時間還要長,那麼就會形成性能低,並且在map端溢寫磁盤的時候每個map最終會產生reduce數量個數的中間結果,若是map數量特別多,就會形成臨時文件不少,並且在reduce拉取數據的時候增長磁盤的IO。apache
好,咱們明白小文件形成的弊端以後,那咱們應該怎麼處理這些小文件呢?網絡
一、從源頭幹掉,也就是在hdfs上咱們不存儲小文件,也就是數據上傳hdfs的時候咱們就合併小文件
二、在FileInputFormat讀取入數據的時候咱們使用實現類CombineFileInputFormat讀取數據,在讀取數據的時候進行合併。數據結構
咱們都知道mapreduce是一個並行處理,那麼處理的時間確定是做業中全部任務最慢的那個了,可謂木桶效應?爲何會這樣呢?app
一、數據傾斜,每一個reduce處理的數據量不是同一個級別的,全部致使有些已經跑完了,而有些跑的很慢。
二、還有可能就是某些做業所在的NodeManager有問題或者container有問題,致使做業執行緩慢。函數
那麼爲何會產生數據傾斜呢?oop
數據自己就不平衡,因此在默認的hashpartition時形成分區數據不一致問題,還有就是代碼設計不合理等。post
那如何解決數據傾斜的問題呢?
一、既然默認的是hash算法進行分區,那咱們自定義分區,修改分區實現邏輯,結合業務特色,使得每一個分區數據基本平衡
二、既然有默認的分區算法,那麼咱們能夠修改分區的鍵,讓其符合hash分區,而且使得最後的分區平衡,好比在key前加隨機數n-key。
三、既然reduce處理慢,咱們能夠增長reduce的內存和vcore呀,這樣挺高性能就快了,雖然沒從根本上解決問題,可是還有效果
四、既然一個reduce處理慢,那咱們能夠增長reduce的個數來分攤一些壓力呀,也不能根本解決問題,仍是有必定的效果。
那麼若是不是數據傾斜帶來的問題,而是節點服務有問題形成某些map和reduce執行緩慢呢?
那麼咱們可使用推測執行呀,你跑的慢,咱們能夠找個其餘的節點重啓同樣的任務競爭,誰快誰爲準。推測執行時以空間換時間的優化。會帶來集羣資源的浪費,會給集羣增長壓力,因此我司集羣的推測執行都是關閉的。其實在做業執行的時候能夠偷偷開啓的呀
推測執行參數控制:
mapreduce.map.speculative
mapreduce.reduce.speculative
上面咱們從hadoop的特性場景等聊了下mapreduce的優化,接下來咱們從mapreduce的執行過程進行優化。
好吧,咱們就從源頭開始說,從數據的讀取以及map數的肯定:
在前面咱們聊太小文件的問題,因此在數據的讀取這裏也能夠作優化,因此選擇一個合適數據的文件的讀取類(FIleInputFormat的實現類)也很重要咱們在做業提交的過程當中,會把jar,分片信息,資源信息提交到hdfs的臨時目錄,默認會有10個複本,經過參數mapreduce.client.submit.file.replication控制後期做業執行都會去下載這些東西到本地,中間會產生磁盤IO,因此若是集羣很大的時候,能夠增長該值,提升下載的效率。
分片的計算公式:
計算切片大小的邏輯:Math.max(minSize, Math.min(maxSize, blockSize))
minSize的默認值是1,而maxSize的默認值是long類型的最大值,便可得切片的默認大小是blockSize(128M)
maxSize參數若是調得比blocksize小,則會讓切片變小,並且就等於配置的這個參數的值
minSize參數調的比blockSize大,則可讓切片變得比blocksize還大
由於map數沒有具體的參數指定,因此咱們能夠經過如上的公式調整切片的大小,這樣咱們就能夠設置map數了,那麼問題來了,map數該如何設置呢?
這些東西必定要結合業務,map數太多,會產生不少中間結果,致使reduce拉取數據變慢,太少,每一個map處理的時間又很長,結合數據的需求,能夠把map的執行時間調至到一分鐘左右比較合適,那若是數據量就是很大呢,咱們有時候仍是須要控制map的數量,這個時候每一個map的執行時間就比較長了,那麼咱們能夠調整每一個map的資源來提高map的處理能力呀,我司就調整了mapreduce.map.memory.mb=3G(默認1G)mapreduce.map.cpu.vcores=1(默認也是1)
從源頭上咱們肯定好map以後。那麼接下來看map的具體執行過程咯。
首先寫環形換衝區,那爲啥要寫環形換衝區呢,而不是直接寫磁盤呢?這樣的目的主要是爲了減小磁盤i/o。
每一個Map任務不斷地將鍵值對輸出到在內存中構造的一個環形數據結構中。使用環形數據結構是爲了更有效地使用內存空間,在內存中放置儘量多的數據。執行流程是,該緩衝默認100M(mapreduce.task.io.sort.mb參數控制),當到達80%(mapreduce.map.sort.spill.percent參數控制)時就會溢寫磁盤。每達到80%都會重寫溢寫到一個新的文件。那麼,咱們徹底能夠根據機器的配置和數據來兩種這兩個參數,當內存足夠,咱們增大mapreduce.task.io.sort.mb徹底會提升溢寫的過程,並且會減小中間結果的文件數量。我司調整mapreduce.task.io.sort.mb=512。當文件溢寫完後,會對這些文件進行合併,默認每次合併10(mapreduce.task.io.sort.factor參數控制)個溢寫的文件,我司調整mapreduce.task.io.sort.factor=64。這樣能夠提升合併的並行度,減小合併的次數,下降對磁盤操做的次數。
mapreduce.shuffle.max.threads(默認爲0,表示可用處理器的兩倍),該參數表示每一個節點管理器的工做線程,用於map輸出到reduce。
那麼map算是完整了,在reduce拉取數據以前,咱們徹底還能夠combiner呀(不影響最終結果的狀況下),此時會根據Combiner定義的函數對map的結果進行合併這樣就能夠減小數據的傳輸,下降磁盤io,提升性能了。
終於走到了map到reduce的數據傳輸過程了:
這中間主要的影響無非就是磁盤IO,網絡IO,數據量的大小了(是否壓縮),其實減小數據量的大小,就能夠作到優化了,因此咱們能夠選擇性壓縮數據,這樣在傳輸的過程當中
就能夠下降磁盤IO,網絡IO等。能夠經過mapreduce.map.output.compress(default:false)設置爲true進行壓縮,數據會被壓縮寫入磁盤,讀數據讀的是壓縮數據須要解壓,在實際經驗中Hive在Hadoop的運行的瓶頸通常都是IO而不是CPU,壓縮通常能夠10倍的減小IO操做,壓縮的方式Gzip,Lzo,BZip2,Lzma等,其中Lzo是一種比較平衡選擇,mapreduce.map.output.compress.codec(default:org.apache.hadoop.io.compress.DefaultCodec)參數設置。我司使用org.apache.hadoop.io.compress.SnappyCodec算法,但這個過程會消耗CPU,適合IO瓶頸比較大。
mapreduce.task.io.sort.mb #排序map輸出所須要使用內存緩衝的大小,以兆爲單位, 默認爲100 mapreduce.map.sort.spill.percent #map輸出緩衝和用來磁盤溢寫過程的記錄邊界索引,這二者使用的閾值,默認0.8 mapreduce.task.io.sort.factor #排序文件時,一次最多合併的文件數,默認10 mapreduce.map.output.compress #在map溢寫磁盤的過程是否使用壓縮,默認false org.apache.hadoop.io.compress.SnappyCodec #map溢寫磁盤的壓縮算法,默認org.apache.hadoop.io.compress.DefaultCodec mapreduce.shuffle.max.threads #該參數表示每一個節點管理器的工做線程,用於map輸出到reduce,默認爲0,表示可用處理器的兩倍
接下來就是reduce了,首先咱們能夠經過參數設置合理的reduce個數(mapreduce.job.reduces參數控制),以及經過參數設置每一個reduce的資源,mapreduce.reduce.memory.mb=5G(默認1G)
mapreduce.reduce.cpu.vcores=1(默認爲1)。
reduce在copy的過程當中默認使用5(mapreduce.reduce.shuffle.parallelcopies參數控制)個並行度進行復制數據,我司調了mapreduce.reduce.shuffle.parallelcopies=100.reduce的每個下載線程在下載某個map數據的時候,有可能由於那個map中間結果所在機器發生錯誤,或者中間結果的文件丟失,或者網絡瞬斷等等狀況,這樣reduce的下載就有可能失敗,因此reduce的下載線程並不會無休止的等待下去,當必定時間後下載仍然失敗,那麼下載線程就會放棄此次下載,並在隨後嘗試從另外的地方下載(由於這段時間map可能重跑)。reduce下載線程的這個最大的下載時間段是能夠經過mapreduce.reduce.shuffle.read.timeout(default180000秒)調整的。
Copy過來的數據會先放入內存緩衝區中,而後當使用內存達到必定量的時候才spill磁盤。這裏的緩衝區大小要比map端的更爲靈活,它基於JVM的heap size設置。這個內存大小的控制就不像map同樣能夠經過io.sort.mb來設定了,而是經過另一個參數 mapreduce.reduce.shuffle.input.buffer.percent(default 0.7)控制的。意思是說,shuffile在reduce內存中的數據最多使用內存量爲:0.7 × maxHeap of reduce task,內存到磁盤merge的啓動門限能夠經過mapreduce.reduce.shuffle.merge.percent(default0.66)配置。
copy完成後,reduce進入歸併排序階段,合併因子默認爲10(mapreduce.task.io.sort.factor參數控制),若是map輸出不少,則須要合併不少趟,因此能夠提升此參數來減小合併次數。
mapreduce.reduce.shuffle.parallelcopies #把map輸出複製到reduce的線程數,默認5 mapreduce.task.io.sort.factor #排序文件時一次最多合併文件的個數 mapreduce.reduce.shuffle.input.buffer.percent #在shuffle的複製階段,分配給map輸出緩衝區佔堆內存的百分比,默認0.7 mapreduce.reduce.shuffle.merge.percent #map輸出緩衝區的閾值,用於啓動合併輸出和磁盤溢寫的過程
更多hadoop生態文章見: hadoop生態系列