深度分析如何在Hadoop中控制Map的數量

深度分析如何在Hadoop中控制Map的數量

guibin.beijing@gmail.comhtml


不少文檔中描述,Mapper的數量在默認狀況下不可直接控制干預,由於Mapper的數量由輸入的大小和個數決定。在默認狀況下,最終input 佔據了多少block,就應該啓動多少個Mapper。若是輸入的文件數量巨大,可是每一個文件的size都小於HDFS的blockSize,那麼會形成 啓動的Mapper等於文件的數量(即每一個文件都佔據了一個block),那麼極可能形成啓動的Mapper數量超出限制而致使崩潰。這些邏輯確實是正確 的,但都是在默認狀況下的邏輯。其實若是進行一些客戶化的設置,就能夠控制了。
java

在Hadoop中,設置Map task的數量不像設置Reduce task數量那樣直接,即:不可以經過API直接精確的告訴Hadoop應該啓動多少個Map task。node

你也許奇怪了,在API中不是提供了接口 org.apache.hadoop.mapred.JobConf.setNumMapTasks(int n)嗎?這個值難道不能夠設置Map task的數量嗎?這個API的確沒錯,在文檔上解釋」Note: This is only a hint to the framework.「,即這個值對Hadoop的框架來講僅僅是個提示,不起決定性的做用。也就是說,即使你設置了,也不必定獲得你想要的效果。數據庫


1. InputFormat介紹

在具體設置Map task數量以前,很是有必要了解一下與Map-Reduce輸入相關的基礎知識。apache

這個接口(org.apache.hadoop.mapred.InputFormat)描述了Map-Reduce job的輸入規格說明(input-specification),它將全部的輸入文件分割成邏輯上的InputSplit,每個InputSplit將會分給一個單獨的mapper;它還提供RecordReader的具體實現,這個Reader從邏輯的InputSplit上獲取input records並傳給Mapper處理。app

InputFormat有多種具體實現,諸如FileInputFormat(處理基於文件的輸入的基礎抽象類), DBInputFormat(處理基於數據庫的輸入,數據來自於一個能用SQL查詢的表),KeyValueTextInputFormat(特 殊的FineInputFormat,處理Plain Text File,文件由回車或者回車換行符分割成行,每一行由key.value.separator.in.input.line分割成Key和 Value),CompositeInputFormat,DelegatingInputFormat等。在絕大多數應用場景中都會使用 FileInputFormat及其子類型。框架

經過以上的簡單介紹,咱們知道InputFormat決定着InputSplit,每一個InputSplit會分配給一個單獨的Mapper,所以InputFormat決定了具體的Map task數量svn


2. FileInputFormat中影響Map數量的因素

在平常使用中,FileInputFormat是最經常使用的InputFormat,它有不少具體的實現。如下分析的影響Map數量的因素僅對 FileInputFormat及其子類有效,其餘非FileInputFormat能夠去查看相應的 getSplits(JobConf job, int numSplits) 具體實現便可。函數

請看以下代碼段(摘抄自org.apache.hadoop.mapred.FileInputFormat.getSplits,hadoop-0.20.205.0源代碼):oop

[java] view plain copy
  1. long goalSize = totalSize / (numSplits == 0 ? 1 : numSplits);  
  2. long minSize = Math.max(job.getLong("mapred.min.split.size", 1), minSplitSize);  
  3.   
  4. for (FileStatus file: files) {  
  5.   Path path = file.getPath();  
  6.   FileSystem fs = path.getFileSystem(job);  
  7.   if ((length != 0) && isSplitable(fs, path)) {   
  8.     long blockSize = file.getBlockSize();  
  9.     long splitSize = computeSplitSize(goalSize, minSize, blockSize);  
  10.       
  11.     long bytesRemaining = length;  
  12.     while (((double) bytesRemaining)/splitSize > SPLIT_SLOP) {  
  13.       String[] splitHosts = getSplitHosts(blkLocations,length-bytesRemaining, splitSize, clusterMap);  
  14.       splits.add(new FileSplit(path, length-bytesRemaining, splitSize, splitHosts));  
  15.       bytesRemaining -= splitSize;  
  16.     }  
  17.   
  18.     if (bytesRemaining != 0) {  
  19.       splits.add(new FileSplit(path, length-bytesRemaining, bytesRemaining, blkLocations[blkLocations.length-1].getHosts()));  
  20.     }  
  21.   } else if (length != 0) {  
  22.     String[] splitHosts = getSplitHosts(blkLocations,0,length,clusterMap);  
  23.     splits.add(new FileSplit(path, 0, length, splitHosts));  
  24.   } else {   
  25.     //Create empty hosts array for zero length files  
  26.     splits.add(new FileSplit(path, 0, length, new String[0]));  
  27.   }  
  28. }  
  29.   
  30. return splits.toArray(new FileSplit[splits.size()]);  
  31.   
  32. protected long computeSplitSize(long goalSize, long minSize, long blockSize) {  
  33.     return Math.max(minSize, Math.min(goalSize, blockSize));  
  34. }  

totalSize:是整個Map-Reduce job全部輸入的總大小。

numSplits:來自job.getNumMapTasks(),即在job啓動時用org.apache.hadoop.mapred.JobConf.setNumMapTasks(int n)設置的值,給M-R框架的Map數量的提示。

goalSize:是輸入總大小與提示Map task數量的比值,即指望每一個Mapper處理多少的數據,僅僅是指望,具體處理的數據數由下面的computeSplitSize決定。

minSplitSize:默認爲1,可由子類複寫函數protected void setMinSplitSize(long minSplitSize) 從新設置。通常狀況下,都爲1,特殊狀況除外

minSize:取的1和mapred.min.split.size中較大的一個。

blockSize:HDFS的塊大小,默認爲64M,通常大的HDFS都設置成128M。

splitSize:就是最終每一個Split的大小,那麼Map的數量基本上就是totalSize/splitSize。

接下來看看computeSplitSize的邏輯:首先在goalSize(指望每一個Mapper處理的數據量)和HDFS的block size中取較小的,而後與mapred.min.split.size相比取較大的


3. 如何調整Map的數量

有了2的分析,下面調整Map的數量就很容易了。


3.1 減少Map-Reduce job 啓動時建立的Mapper數量

當處理大批量的大數據時,一種常見的狀況是job啓動的mapper數量太多而超出了系統限制,致使Hadoop拋出異常終止執行。解決這種異常的思路是減小mapper的數量。具體以下:

3.1.1 輸入文件size巨大,但不是小文件

這種狀況能夠經過增大每一個mapper的input size,即增大minSize或者增大blockSize來減小所需的mapper的數量。增大blockSize一般不可行,由於當HDFS被 hadoop namenode -format以後,blockSize就已經肯定了(由格式化時dfs.block.size決定),若是要更改blockSize,須要從新格式化 HDFS,這樣固然會丟失已有的數據。因此一般狀況下只能經過增大minSize,即增大mapred.min.split.size的值。


3.1.2 輸入文件數量巨大,且都是小文件

所謂小文件,就是單個文件的size小於blockSize。這種狀況經過增大mapred.min.split.size不可行,須要使用 FileInputFormat衍生的CombineFileInputFormat將多個input path合併成一個InputSplit送給mapper處理,從而減小mapper的數量。具體細節稍後會更新並展開。


3.2 增長Map-Reduce job 啓動時建立的Mapper數量

增長mapper的數量,能夠經過減少每一個mapper的輸入作到,即減少blockSize或者減少mapred.min.split.size的值。


參考資料

http://yaseminavcular.blogspot.com/2011/06/how-to-set-number-of-maps-with-hadoop.html

http://svn.apache.org/repos/asf/hadoop/common/tags/release-0.20.205.0

相關文章
相關標籤/搜索