MapReduce: 提升MapReduce性能的七點建議【譯】

做者是經過這個網站翻譯過來的:javascript

http://blog.cloudera.com/blog/2009/12/7-tips-for-improving-mapreduce-performance/   java

    Cloudera提供給客戶的服務內容之一就是調整和優化MapReduce job執行性能。MapReduce和HDFS組成一個複雜的分佈式系統,而且它們運行着各式各樣用戶的代碼,這樣致使沒有一個快速有效的規則來實現優化代碼性能的目的。在我看來,調整cluster或job的運行更像一個醫生對待病人同樣,找出關鍵的「症狀」,對於不一樣的症狀有不一樣的診斷和處理方式。

        在醫學領域,沒有什麼能夠代替一位經驗豐富的醫生;在複雜的分佈式系統上,這個道理依然正確—有經驗的用戶和操做者在面對不少常見問題上都會有「第六感」。我曾經爲Cloudera不一樣行業的客戶解決過問題,他們面對的工做量、數據集和cluster硬件有很大區別,所以我在這方面積累了不少的經驗,而且想把這些經驗分享給諸位。

        在這篇blog裏,我會高亮那些提升MapReduce性能的建議。前面的一些建議是面向整個cluster的,這可能會對cluster 操做者和開發者有幫助。後面一部分建議是爲那些用Java編寫MapReduce job的開發者而提出。在每個建議中,我列出一些「症狀」或是「診斷測試」來講明一些針對這些問題的改進措施,可能會對你有所幫助。

        請注意,這些建議中包含不少我以往從各類不一樣場景下總結出來的直觀經驗。它們可能不太適用於你所面對的特殊的工做量、數據集或cluster,若是你想使用它,就須要測試使用前和使用後它在你的cluster環境中的表現。對於這些建議,我會展現一些對比性的數據,數據產生的環境是一個4個節點的cluster來運行40GB的Wordcount job。應用了我如下所提到的這些建議後,這個job中的每一個map task大概運行33秒,job總共執行了差很少8分30秒。

第一點  正確地配置你的Cluster
診斷結果/症狀:
1. Linux top命令的結果顯示slave節點在全部map和reduce slot都有task運行時依然很空閒。
2. top命令顯示內核的進程,如RAID(mdX_raid*)或pdflush佔去大量的CPU時間。
3. Linux的平均負載一般是系統CPU數量的2倍。
4. 即便系統正在運行job,Linux平均負載老是保持在系統CPU數量的一半的狀態。
5. 一些節點上的swap利用率超過幾MB

    優化你的MapReduce性能的第一步是確保你整個cluster的配置文件被調整過。對於新手,請參考這裏關於配置參數的一篇blog:配置參數。 除了這些配置參數 ,在你想修改job參數以期提升性能時,你應該參照下我這裏的一些你應該注意的項:

1.  確保你正在DFS和MapReduce中使用的存儲mount被設置了noatime選項。這項若是設置就不會啓動對磁盤訪問時間的記錄,會顯著提升IO的性能。

2. 避免在TaskTracker和DataNode的機器上執行RAID和LVM操做,這一般會下降性能

3. 在這兩個參數mapred.local.dirdfs.data.dir 配置的值應當是分佈在各個磁盤上目錄,這樣能夠充分利用節點的IO讀寫能力。運行 Linux sysstat包下的iostat -dx 5命令可讓每一個磁盤都顯示它的利用率。

4. 你應該有一個聰明的監控系統來監控磁盤設備的健康狀態。MapReduce job的設計是可容忍磁盤失敗,但磁盤的異常會致使一些task重複執行而使性能降低。若是你發如今某個TaskTracker被不少job中列入黑名單,那麼它就可能有問題。

5. 使用像Ganglia這樣的工具監控並繪出swap和網絡的利用率圖。若是你從監控的圖看出機器正在使用swap內存,那麼減小mapred.child.java.opts屬性所表示的內存分配。

基準測試:
    很遺憾我不能爲這個建議去生成一些測試數據,由於這須要構建整個cluster。若是你有相關的經驗,請把你的建議及結果附到下面的留言區。

第二點  使用LZO壓縮
診斷結果/症狀:
1. 對 job的中間結果數據使用壓縮是很好的想法。
2. MapReduce job的輸出數據大小是不可忽略的。
3. 在job運行時,經過linux top 和 iostat命令能夠看出slave節點的iowait利用率很高。

    幾乎每一個Hadoop job均可以經過對map task輸出的中間數據作LZO壓縮得到較好的空間效益。儘管LZO壓縮會增長一些CPU的負載,但在shuffle過程當中會減小磁盤IO的數據量,整體上老是能夠節省時間的。

    當一個job須要輸出大量數據時,應用LZO壓縮能夠提升輸出端的輸出性能。這是由於默認狀況下每一個文件的輸出都會保存3個幅本,1GB的輸出文件你將要保存3GB的磁盤數據,當採用壓縮後固然更能節省空間並提升性能。

    爲了使LZO壓縮有效,請設置參數mapred.compress.map.output值爲true。

基準測試:
    在個人cluster裏,Wordcount例子中不使用LZO壓縮的話,job的運行時間只是稍微增長。但FILE_BYTES_WRITTEN計數器卻從3.5GB增加到9.2GB,這表示壓縮會減小62%的磁盤IO。在個人cluster裏,每一個數據節點上磁盤數量對task數量的比例很高,但Wordcount job並無在整個cluster中共享,因此cluster中IO不是瓶頸,磁盤IO增加不會有什麼大的問題。但對於磁盤因不少併發活動而受限的環境來講,磁盤IO減小60%能夠大幅提升job的執行速度。

第三點  調整map和reduce task的數量到合適的值linux

本身的經驗,通常來講,不可能跑一個job, 改變整個集羣的hdfs block的大小。一般提交job時候設置參數。ios

   job.setMapOutputValueClass(IntWritable.class);
            job.setNumReduceTasks(1);
            //設置最小分片爲512M
            FileInputFormat.setMinInputSplitSize(job, 1024*1024*512);

            FileInputFormat.addInputPath(job, new Path("/usr/keyword/input"));git

 


診斷結果/症狀:
1. 每一個map或reduce task的完成時間少於30到40秒。
2. 大型的job不能徹底利用cluster中全部空閒的slot。
3. 大多數map或reduce task被調度執行了,但有一到兩個task還在準備狀態,在其它task完成以後才單獨執行

    調整job中map和reduce task的數量是一件很重要且經常被忽略的事情。下面是我在設置這些參數時的一些直觀經驗:

1. 若是每一個task的執行時間少於30到40秒,就減小task的數量。Task的建立與調度通常耗費幾秒的時間,若是task完成的很快,咱們就是在浪費時間。同時,設置JVM重用也能夠解決這個問題。

2. 若是一個job的輸入數據大於1TB,咱們就增長block size到256或者512,這樣能夠減小task的數量。你可使用這個命令去修改已存在文件的block size: hadoop distcp -Ddfs.block.size=$[256*1024*1024] /path/to/inputdata  /path/to/inputdata-with/largeblocks。在執行完這個命令後,你就能夠刪除原始的輸入文件了(/path/to/inputdata)。

3. 只要每一個task運行至少30到40秒,那麼就增長map task的數量,增長到整個cluster上map slot總數的幾倍。若是你的cluster中有100個map slot,那就避免運行一個有101個map task的job — 若是運行的話,前100個map同時執行,第101個task會在reduce執行以前單獨運行。這個建議對於小型cluste和小型job是很重要的。

4. 不要調度太多的reduce task — 對於大多數job來講,咱們推薦reduce task的數量應當等於或是略小於cluster中reduce slot的數量。

基準測試:
    爲了讓Wordcount job有不少的task運行,我設置了以下的參數:Dmapred.max.split.size=$[16*1024*1024]。之前默認會產生360個map task,如今就會有2640個。當完成這個設置以後,每一個task執行耗費9秒,而且在JobTracker的Cluster Summar視圖中能夠觀看到,正在運行的map task數量在0到24之間浮動。job在17分52秒以後結束,比原來的執行要慢兩倍多。

第四點  爲job添加一個Combiner
診斷結果/症狀:
1. job在執行分類的聚合時,REDUCE_INPUT_GROUPS計數器遠小於REDUCE_INPUT_RECORDS計數器。
2. job執行一個大的shuffle任務(例如,map的輸出數據每一個節點就是好幾個GB)。
3. 從job計數器中看出,SPILLED_RECORDS遠大於MAP_OUTPUT_RECORDS。

    若是你的算法涉及到一些分類的聚合,那麼你就可使用Combiner來完成數據到達reduce端以前的初始聚合工做。MapReduce框架很明智地運用Combiner來減小寫入磁盤以及經過網絡傳輸到reduce端的數據量。

基準測試:
    我刪去Wordcount例子中對setCombinerClass方法的調用。僅這個修改就讓map task的平均運行時間由33秒增加到48秒,shuffle的數據量也從1GB提升到1.4GB。整個job的運行時間由原來的8分30秒變成15分42秒,差很少慢了兩倍。此次測試過程當中開啓了map輸出結果的壓縮功能,若是沒有開啓這個壓縮功能的話,那麼Combiner的影響就會變得更加明顯。

第五點  爲你的數據使用最合適和簡潔的Writable類型
診斷/症狀:
1. Text 對象在非文本或混合數據中使用。
2. 大部分的輸出值很小的時候使用IntWritable 或 LongWritable對象。

    當一個開發者是初次編寫MapReduce,或是從開發Hadoop Streaming轉到Java MapReduce,他們會常常在沒必要要的時候使用Text 對象。儘管Text對象使用起來很方便,但它在由數值轉換到文本或是由UTF8字符串轉換到文本時都是低效的,且會消耗大量的CPU時間。當處理那些非文本的數據時,可使用二進制的Writable類型,如IntWritable, FloatWritable等。

    除了避免文件轉換的消耗外,二進制Writable類型做爲中間結果時會佔用更少的空間。當磁盤IO和網絡傳輸成爲大型job所遇到的瓶頸時,減小些中間結果的大小能夠得到更好的性能。在處理整形數值時,有時使用VIntWritable或VLongWritable類型可能會更快些—這些實現了變長整形編碼的類型在序列化小數值時會更節省空間。例如,整數4會被序列化成單字節,而整數10000會被序列化成兩個字節。這些變長類型用在統計等任務時更加有效,在這些任務中咱們只要確保大部分的記錄都是一個很小的值,這樣值就能夠匹配一或兩個字節。

    若是Hadoop自帶的Writable類型不能知足你的需求,你能夠開發本身的Writable類型。這應該是挺簡單的,可能會在處理文本方面更快些。若是你編寫了本身的Writable類型,請務必提供一個RawComparator類—你能夠之內置的Writable類型作爲例子。

基準測試:
    對於Wordcount例子,我修改了它在map計數時的中間變量,由IntWritable改成Text。而且在reduce統計最終和時使用Integer.parseString(value.toString)來轉換出真正的數值。這個版本比原始版本要慢近10%—整個job完成差很少超過9分鐘,且每一個map task要運行36秒,比以前的33秒要慢。儘可能看起來整形轉換仍是挺快的,但這不說明什麼狀況。在正常狀況下,我曾經看到過選用合適的Writable類型能夠有2到3倍的性能提高的例子。

第六點  重用Writable類型
診斷/症狀:
1. 在mapred.child.java.opts參數上增長-verbose:gc -XX:+PriintGCDetails,而後查看一些task的日誌。若是垃圾回收頻繁工做且消耗一些時間,你須要注意那些無用的對象。
2. 在你的代碼中搜索"new Text" 或"new IntWritable"。若是它們出如今一個內部循環或是map/reduce方法的內部時,這條建議可能會頗有用。
3. 這條建議在task內存受限的狀況下特別有用。

    不少MapReduce用戶常犯的一個錯誤是,在一個map/reduce方法中爲每一個輸出都建立Writable對象。例如,你的Wordcout mapper方法可能這樣寫:github

Java代碼算法

 收藏代碼

  1. public void map(...) {  
  2.       …  
  3.       for (String word : words) {  
  4.               output.collect(new Text(word), new IntWritable(1));  
  5.       }  
  6. }  



    這樣會致使程序分配出成千上萬個短週期的對象。Java垃圾收集器就要爲此作不少的工做。更有效的寫法是:網絡

Java代碼併發

 收藏代碼

  1. class MyMapper … {  
  2.    Text wordText = new Text();  
  3.    IntWritable one = new IntWritable(1);  
  4.    public void map(...) {  
  5.          for (String word: words) {  
  6.                wordText.set(word);  
  7.                output.collect(wordText, one);  
  8.           }  
  9.        }  
  10. }  


基準測試:
    當我以上面的描述修改了Wordcount例子後,起初我發現job運行時與修改以前沒有任何不一樣。這是由於在個人cluster中默認爲每一個task都分配一個1GB的堆大小 ,因此垃圾回收機制沒有啓動。當我從新設置參數,爲每一個task只分配200MB的堆時,沒有重用Writable對象的這個版本執行出現了很嚴重的減緩 —job的執行時間由之前的大概8分30秒變成如今的超過17分鐘。原始的那個重用Writable的版本,在設置更小的堆時仍是保持相同的執行速度。所以重用Writable是一個很簡單的問題修正,我推薦你們老是這樣作。它可能不會在每一個job的執行中得到很好的性能,但當你的task有內存限制時就會有至關大的區別。

第七點  使用簡易的剖析方式查看task的運行
    這是我在查看MapReduce job性能問題時經常使用的一個小技巧。那些不但願這些作的人就會反對說這樣是行不通的,可是事實是擺在面前。

    爲了實現簡易的剖析,能夠當job中一些task運行很慢時,用ssh工具鏈接上task所在的那臺task tracker機器。執行5到10次這個簡單的命令 sudo killall -QUIT java(每次執行間隔幾秒)。別擔憂,不要被命令的名字嚇着,它不會致使任何東西退出。而後使用JobTracker的界面跳轉到那臺機器上某個task的stdout 文件上,或者查看正在運行的機器上/var/log/hadoop/userlogs/目錄中那個task的stdout文件。你就能夠看到當你執行那段命令時,命令發送到JVM的SIGQUIT信號而產生的棧追蹤信息的dump文件。([]在JobTracker的界面上有Cluster Summary的表格,進入Nodes連接,選中你執行上面命令的server,在界面的最下方有Local Logs,點擊LOG進入,而後選擇userlogs目錄,這裏能夠看到以server執行過的jobID命名的幾個目錄,無論進入哪一個目錄均可以看到不少task的列表,每一個task的log中有個stdout文件,若是這個文件不爲空,那麼這個文件就是做者所說的棧信息文件)

    解析處理這個輸出文件須要一點點以經驗,這裏我介紹下平時是怎樣處理的:
對於棧信息中的每一個線程,很快地查找你的java包的名字(假如是com.mycompany.mrjobs)。若是你當前線程的棧信息中沒有找到任何與你的代碼有關的信息,那麼跳到另外的線程再看。

    若是你在某些棧信息中看到你查找的代碼,很快地查閱並大概記下它在作什麼事。假如你看到一些與NumberFormat相關的信息,那麼此時你須要記下它,暫時不須要關注它是代碼的哪些行。

    轉到日誌中的下一個dump,而後也花一些時間作相似的事情而後記下些你關注的內容。

    在查閱了4到5個棧信息後,你可能會意識到在每次查閱時都會有一些似曾相識的東西。若是這些你意識到的問題是阻礙你的程序變快的緣由,那麼你可能就找到了程序真正的問題。假如你取到10個線程的棧信息,而後從5個裏面看到過NumberFormat相似的信息,那麼可能意味着你將50%的CPU浪費在數據格式轉換的事情上了。

    固然,這沒有你使用真正的分析程序那麼科學。但我發現這是一種有效的方法,能夠在不須要引入其它工具的時候發現那些明顯的CPU瓶頸。更重要的是,這是一種讓你會變的更強的技術,你會在實踐中知道一個正常的和有問題的dump是啥樣子。

    經過這項技術我發現了一些一般出如今性能調優方面的誤解,列出在下面。
1. NumberFormat 至關慢,儘可能避免使用它。
2. String.split—無論是編碼或是解碼UTF8的字符串都是慢的超出你的想像— 參照上面提到的建議,使用合適的Writable類型。
3. 使用StringBuffer.append來鏈接字符串

    上面只是一些提升MapReduce性能的建議。作基準測試的那些代碼我放在了這裏:performance blog codeapp

相關文章
相關標籤/搜索