MapReduce 源碼簡析 - Client端

前言

文章主要研究 Client 端具體作的哪些事情, 以及計算向數據移動具體是如何實現的java

輸出來源:拉勾教育大數據訓練營apache

代碼入口

咱們在編寫 MapReduce 業務邏輯時, 最後基本都是經過 job.waitForCompletion(true) 來提交 Job ,能夠進入該方法研究一下具體的實現緩存

爲了方便閱讀, 刪除了部分代碼, 重點關注在代碼的邏輯流程app

// org\apache\hadoop\mapreduce\Job.java
private synchronized void connect(){
	//...
	return new Cluster(getConfiguration());
	//...
}

public boolean waitForCompletion() {
  if (state == JobState.DEFINE) 
    submit(); //*
  return isSuccessful();
}

public void submit(){
  //...
  connect();//*
  final JobSubmitter submitter = 
    getJobSubmitter(cluster.getFileSystem(), cluster.getClient()); //*
  status = ugi.doAs((PrivilegedExceptionAction) () -> {
    return submitter.submitJobInternal(Job.this, cluster); //*
  });
  //...
}

public JobSubmitter getJobSubmitter(FileSystem fs, ClientProtocol submitClient) {
  return new JobSubmitter(fs, submitClient);
}
複製代碼

在 Job.java 的 submit() 中能夠看到經過 connect() 方法 cluster 對象獲得了項目的配置信息, 又經過這些配置信息獲得了具體的 FileSystem 和 Client 並建立了用於提交 Job 的 submitter 對象分佈式

submitter 使用 submitJobInternal 方法開始提交做業, 在該方法處能夠看到如下詳盡的註釋ide

The job submission process involves:oop

  1. Checking the input and output specifications of the job.學習

    //檢查這次 Job 的輸入輸出規範性大數據

  2. Computing the InputSplits for the job.優化

    //計算這次 Job 的切片, 表明着確認多少個 MapTask

  3. Setup the requisite accounting information for the DistributedCache of the job, if necessary.

    //大概意思是, 若是須要的話對這次 Job 進行分佈式緩存的優化

  4. Copying the job's jar and configuration to the map-reduce system directory on the distributed file-system.

    //將 Job 的 jar 和配置文件複製到 HDFS

  5. Submitting the job to the JobTracker and optionally monitoring it's status.

    //提交 Job 到 JobTracker 並監控, 這裏的 JobTracker 是 hadoop 1.x 的實現, 如今用 Yarn 的話應該是提交 ResourceManager

經過以上註釋已經明確接下來的代碼能夠看到 MapTask 並行度如何肯定以及切片的具體機制, 那進入 JobSubmitter 源碼好好分析一下

// org\apache\hadoop\mapreduce\JobSubmitter.java
JobStatus submitJobInternal(Job job, Cluster cluster) {
  //...
  Path submitJobDir = new Path(jobStagingArea, jobId.toString());
  copyAndConfigureFiles(job, submitJobDir);
  int maps = writeSplits(job, submitJobDir);//* 這裏計算 map 的數量
  //...
}

private int writeSplits(JobContext job, Path jobSubmitDir) {
  JobConf jConf = (JobConf)job.getConfiguration();
  int maps;
  if (jConf.getUseNewMapper()) {
    // hadoop 2.x
    maps = writeNewSplits(job, jobSubmitDir);//*
  } else {
    // hadoop 1.x
    maps = writeOldSplits(jConf, jobSubmitDir);
  }
  return maps;
}

private <T extends InputSplit>
  int writeNewSplits(JobContext job, Path jobSubmitDir) {
  Configuration conf = job.getConfiguration();
  InputFormat<?, ?> input = //* 經過反射獲得 Input
    ReflectionUtils.newInstance(job.getInputFormatClass(), conf);//*

  List<InputSplit> splits = input.getSplits(job);//*
  //...
  return array.length;
}

public JobContextImpl(Configuration conf, JobID jobId) {
  //...
  public Class<? extends InputFormat<?,?>> getInputFormatClass() {
    // INPUT_FORMAT_CLASS_ATTR對象表明着配置文件中的 mapreduce.job.inputformat.class
    return conf.getClass(INPUT_FORMAT_CLASS_ATTR, TextInputFormat.class);
  }
  //...
}
複製代碼

進入 submitJobInternal() 後, 看到 writeSplits(job, submitJobDir) 計算返回 MapTask 的數量, writeSplits() 方法中調用 writeNewSplits(job, jobSubmitDir)

writeNewSplits() 裏的 input 對象, 經過 ReflectionUtils 名字能夠看出來是反射獲得的 Input 具體格式, Hadoop 很多地方都是使用反射獲取類型, 經過 getInputFormatClass() 方法得知, InputFormatClass 是用戶能夠指定的, 若是沒有指定就設置成 TextInputFormat.class

代碼中的 input.getSplits(job) 獲取全部的 split 是 client 最核心的功能, 當點進去發現 InputFormat 是個抽象類, 大致的繼承關係以下圖

image-20200528223046989

TextInputFormat 中沒有 getSplits() 的實現, 往上找具體實現, 看來是在 FileInputFormat 中了

計算 split

// org\apache\hadoop\mapreduce\lib\input\FileInputFormat.java
public List<InputSplit> getSplits(JobContext job) {
  StopWatch sw = new StopWatch().start();
  // 默認狀況 minSize = 1, 或者修改 mapreduce.input.fileinputformat.split.minsize 屬性
  long minSize = Math.max(getFormatMinSplitSize(), getMinSplitSize(job));
  // 默認狀況 maxSize 很是大, 是Long.max
  long maxSize = getMaxSplitSize(job);

  
  List<InputSplit> splits = new ArrayList<InputSplit>();
  List<FileStatus> files = listStatus(job);
  // 1.
  for (FileStatus file: files) {
    Path path = file.getPath();
    long length = file.getLen(); // length 是當前文件的實際大小
    if (length != 0) {
      BlockLocation[] blkLocations;
      if (file instanceof LocatedFileStatus) {
        blkLocations = ((LocatedFileStatus) file).getBlockLocations();
      } else {
        FileSystem fs = path.getFileSystem(job.getConfiguration());
        blkLocations = fs.getFileBlockLocations(file, 0, length);
        
      }
      if (isSplitable(job, path)) {
        long blockSize = file.getBlockSize();
        long splitSize = computeSplitSize(blockSize, minSize, maxSize); //*
        // 2.
        long bytesRemaining = length;
        while (((double) bytesRemaining)/splitSize > SPLIT_SLOP) {
          int blkIndex = getBlockIndex(blkLocations, length-bytesRemaining); //*
          splits.add(makeSplit(path, length-bytesRemaining, splitSize,
                                blkLocations[blkIndex].getHosts(),
                                blkLocations[blkIndex].getCachedHosts()));// 緩存優化
          bytesRemaining -= splitSize;
        }
      }
    }
    
  return splits;
}

protected long getFormatMinSplitSize() {
  return 1;
}

public static long getMinSplitSize(JobContext job) {
  //SPLIT_MINSIZE 是配置 mapreduce.input.fileinputformat.split.minsize 屬性
  return job.getConfiguration().getLong(SPLIT_MINSIZE, 1L);
}

public static long getMaxSplitSize(JobContext context) {
  //SPLIT_MAXSIZE 是配置 mapreduce.input.fileinputformat.split.maxsize 屬性
  return context.getConfiguration().getLong(SPLIT_MAXSIZE, Long.MAX_VALUE);
}

protected long computeSplitSize(long blockSize, long minSize,long maxSize) {
  return Math.max(minSize, Math.min(maxSize, blockSize));
}

protected int getBlockIndex(BlockLocation[] blkLocations,long offset) {
  for (int i = 0 ; i < blkLocations.length; i++) {
    // is the offset inside this block?
    if ((blkLocations[i].getOffset() <= offset) &&
      (offset < blkLocations[i].getOffset() + blkLocations[i].getLength())){
      return i;
    }
  }
  BlockLocation last = blkLocations[blkLocations.length -1];
  long fileLength = last.getOffset() + last.getLength() -1;
}
複製代碼

在標記的 1. 處開始先是遍歷 Job 中每一個 File, 獲取 File 中全部 block 的 location 和 blockSize, 並經過計算獲取 splitSize, 具體計算公式是 splitSize = Math.max(minSize, Math.min(maxSize, blockSize))

在標記的 2. 處就是實際劃分 split 的代碼, while 循環條件是剩餘文件體積 > split 大小, 默認狀況 split 和 block 一一對應

循環體中 length-bytesRemaining 是當前 split 的offset, getBlockIndex(blkLocations, length-bytesRemaining) 方法是計算當前 split 所在的 block 具體位置

循環結束之後 splits 會包含全部的文件的 split 具體關鍵信息, 同時 splits.size 也就肯定了 MapTask 的數量

代碼看到這裏就清楚了 Client 是如何計算 MapTask 的並行度以及爲計算向數據移動作了哪些具體的工做

結語

雖然我已經參與開發工做有段時間了, 實際上對於看源碼我仍是有些抵觸了, 老是摸不着頭腦不清楚哪裏是重點, 屢次之後就對源碼至關反感.

這裏仍是多虧拉勾教育的墨蕭講師對總體學習思路的引導以及訓練營對於大數據總體課程安排的合理性.

相關文章
相關標籤/搜索