MapReduce中map並行度優化及源碼分析

mapTask並行度的決定機制

  一個job的map階段並行度由客戶端在提交job時決定,而客戶端對map階段並行度的規劃的基本邏輯爲:將待處理數據執行邏輯切片(即按照一個特定切片大小,將待處理數據劃分紅邏輯上的多個split),而後每個split分配一個mapTask並行實例處理。html

FileInputFormat切片機制

原文和做者一塊兒討論:http://www.cnblogs.com/intsmaze/p/6733968.htmlapache

1默認切片定義在InputFormat類中的getSplit()方法

二、FileInputFormat中默認的切片機制:

a) 簡單地按照文件的內容長度進行切片併發

b) 切片大小,默認等於hdfs的block大小app

c) 切片時不考慮數據集總體,而是逐個針對每個文件單獨切片jvm

好比待處理數據有兩個文件:oop

file1.txt    260M
file2.txt    10M

通過FileInputFormat的切片機制運算後,造成的切片信息以下:  源碼分析

file1.txt.split1--  0~128
file1.txt.split2--  128~260 //若是剩餘的文件長度/切片長度<=1.1則會將剩餘文件的長度並未一個切片
file2.txt.split1--  0~10M

三、FileInputFormat中切片的大小的參數配置

經過分析源碼,在FileInputFormat中,計算切片大小的邏輯:Math.max(minSize, Math.min(maxSize, blockSize)); 切片主要由這幾個值來運算決定。字體

minsize:默認值:1  
   配置參數: mapreduce.input.fileinputformat.split.minsize    

maxsize:默認值:Long.MAXValue  
    配置參數:mapreduce.input.fileinputformat.split.maxsize

blocksize:值爲hdfs的對應文件的blocksize

配置讀取目錄下文件數量的線程數:public static final String LIST_STATUS_NUM_THREADS =
      "mapreduce.input.fileinputformat.list-status.num-threads";

所以,默認狀況下,Math.max(minSize, Math.min(maxSize, blockSize));切片大小=blocksizespa

maxsize(切片最大值):參數若是調得比blocksize小,則會讓切片變小。pwa

minsize(切片最小值):參數調的比blockSize大,則可讓切片變得比blocksize還大。

選擇併發數的影響因素:

一、運算節點的硬件配置

二、運算任務的類型:CPU密集型仍是IO密集型

三、運算任務的數據量

三、hadoop2.6.4源碼解析

org.apache.hadoop.mapreduce.JobSubmitter類

   //獲得job的map任務的並行數量
   private int writeSplits(org.apache.hadoop.mapreduce.JobContext job,
      Path jobSubmitDir) throws IOException,
      InterruptedException, ClassNotFoundException {
    JobConf jConf = (JobConf)job.getConfiguration();
    int maps;
    if (jConf.getUseNewMapper()) {
      maps = writeNewSplits(job, jobSubmitDir);
    } else {
      maps = writeOldSplits(jConf, jobSubmitDir);
    }
    return maps;
  }
  
  @SuppressWarnings("unchecked")
  private <T extends InputSplit>
  int writeNewSplits(JobContext job, Path jobSubmitDir) throws IOException,
      InterruptedException, ClassNotFoundException {
    Configuration conf = job.getConfiguration();
    InputFormat<?, ?> input =
     ReflectionUtils.newInstance(job.getInputFormatClass(), conf);
   
    List<InputSplit> splits = input.getSplits(job);
    T[] array = (T[]) splits.toArray(new InputSplit[splits.size()]);

    // sort the splits into order based on size, so that the biggest
    // go first
    Arrays.sort(array, new SplitComparator());
    JobSplitWriter.createSplitFiles(jobSubmitDir, conf, 
        jobSubmitDir.getFileSystem(conf), array);
    return array.length;
  }

 

切片計算邏輯,關注紅色字體代碼便可。

public List<InputSplit> getSplits(JobContext job) throws IOException {
    Stopwatch sw = new Stopwatch().start();
long minSize = Math.max(getFormatMinSplitSize(), getMinSplitSize(job)); long maxSize = getMaxSplitSize(job); // generate splits List<InputSplit> splits = new ArrayList<InputSplit>(); List<FileStatus> files = listStatus(job);
   //遍歷文件,對每個文件進行以下處理:得到文件的blocksize,獲取文件的長度,獲得切片信息(spilt 文件路徑,切片編號,偏移量範圍)
for (FileStatus file: files) { Path path = file.getPath(); long length = file.getLen(); if (length != 0) { BlockLocation[] blkLocations; if (file instanceof LocatedFileStatus) { blkLocations = ((LocatedFileStatus) file).getBlockLocations(); } else { FileSystem fs = path.getFileSystem(job.getConfiguration()); blkLocations = fs.getFileBlockLocations(file, 0, length); } if (isSplitable(job, path)) { long blockSize = file.getBlockSize(); long splitSize = computeSplitSize(blockSize, minSize, maxSize); long bytesRemaining = length; while (((double) bytesRemaining)/splitSize > SPLIT_SLOP) { int blkIndex = getBlockIndex(blkLocations, length-bytesRemaining); splits.add(makeSplit(path, length-bytesRemaining, splitSize, blkLocations[blkIndex].getHosts(), blkLocations[blkIndex].getCachedHosts())); bytesRemaining -= splitSize; } if (bytesRemaining != 0) { int blkIndex = getBlockIndex(blkLocations, length-bytesRemaining); splits.add(makeSplit(path, length-bytesRemaining, bytesRemaining, blkLocations[blkIndex].getHosts(), blkLocations[blkIndex].getCachedHosts())); } } else { // not splitable splits.add(makeSplit(path, 0, length, blkLocations[0].getHosts(), blkLocations[0].getCachedHosts())); } } else { //Create empty hosts array for zero length files splits.add(makeSplit(path, 0, length, new String[0])); } } // Save the number of input files for metrics/loadgen job.getConfiguration().setLong(NUM_INPUT_FILES, files.size()); sw.stop(); if (LOG.isDebugEnabled()) { LOG.debug("Total # of splits generated by getSplits: " + splits.size() + ", TimeTaken: " + sw.elapsedMillis()); } return splits; }

 

 public static final String SPLIT_MINSIZE = 
    "mapreduce.input.fileinputformat.split.minsize";
  
  public static final String SPLIT_MAXSIZE = 
    "mapreduce.input.fileinputformat.split.maxsize";
    
  long minSize = Math.max(getFormatMinSplitSize(), getMinSplitSize(job));
    
  //保證切分的文件長度最小不得小於1字節
  protected long getFormatMinSplitSize() {
    return 1;
  }
  
  //若是沒有在conf中設置SPLIT_MINSIZE參數,則取默認值1字節。
  public static long getMinSplitSize(JobContext job) {
    return job.getConfiguration().getLong(SPLIT_MINSIZE, 1L);
  }
  
  //獲得切片文件的最大長度
  long maxSize = getMaxSplitSize(job);
  
  //若是沒有在conf中設置SPLIT_MAXSIZE參數,則去默認值Long.MAX_VALUE字節。
  public static long getMaxSplitSize(JobContext context) {
    return context.getConfiguration().getLong(SPLIT_MAXSIZE, 
                                              Long.MAX_VALUE);
  }
  
   //讀取指定目錄下的全部文件的信息
   List<FileStatus> files = listStatus(job);
   //若是沒有指定開啓幾個線程讀取,則默認一個線程去讀文件信息,由於存在目錄下有上億個文件的狀況,因此有須要開啓多個線程加快讀取。
   int numThreads = job.getConfiguration().getInt(LIST_STATUS_NUM_THREADS,
        DEFAULT_LIST_STATUS_NUM_THREADS);
   public static final String LIST_STATUS_NUM_THREADS =
      "mapreduce.input.fileinputformat.list-status.num-threads";
   public static final int DEFAULT_LIST_STATUS_NUM_THREADS = 1;
  
  //計算切片文件的邏輯大小
  long splitSize = computeSplitSize(blockSize, minSize, maxSize);
  protected long computeSplitSize(long blockSize, long minSize,
                                  long maxSize) {
    return Math.max(minSize, Math.min(maxSize, blockSize));
  }
  
  private static final double SPLIT_SLOP = 1.1;   // 10% slop
  //判斷剩餘文件與切片大小的比是否爲1.1.
  while (((double) bytesRemaining)/splitSize > SPLIT_SLOP) {
          int blkIndex = getBlockIndex(blkLocations, length-bytesRemaining);
          splits.add(makeSplit(path, length-bytesRemaining, splitSize,
                      blkLocations[blkIndex].getHosts(),
                      blkLocations[blkIndex].getCachedHosts()));
          bytesRemaining -= splitSize;
    }

map並行度

  若是job的每一個map或者reduce的task的運行時間都只有30-40秒鐘(最好每一個map的執行時間最少不低於一分鐘),那麼就減小該job的map或者reduce數。每個task的啓動和加入到調度器中進行調度,這個中間的過程可能都要花費幾秒鐘,因此若是每一個task都很是快就跑完了,就會在task的開始和結束的時候浪費太多的時間。

  配置task的JVM重用能夠改善該問題:
  (mapred.job.reuse.jvm.num.tasks,默認是1,表示一個JVM上最多能夠順序執行的task數目(屬於同一個Job)是1。也就是說一個task啓一個JVM)。

小文件的場景下,默認的切片機制會形成大量的maptask處理不多量的數據,效率低下:

解決方案:

  推薦:把小文件存入hdfs以前進行預處理,先合併爲大文件後再上傳。

  折中:寫程序對hdfs上小文件進行合併再跑job處理。

  補救措施:若是大量的小文件已經存在hdfs上了,使用combineInputFormate組件,它能夠將衆多的小文件從邏輯上規劃到一個切片中,這樣多個小文件就能夠交給一個maptask操做了。

相關文章
相關標籤/搜索