MapReduce 源碼簡析 - Client端


文章主要研究 Client 端具體作的哪些事情, 以及計算向數據移動具體是如何實現的java



咱們在編寫 MapReduce 業務邏輯時, 最後基本都是經過 job.waitForCompletion(true) 來提交 Job ,能夠進入該方法研究一下具體的實現緩存

爲了方便閱讀, 刪除了部分代碼, 重點關注在代碼的邏輯流程app

// org\apache\hadoop\mapreduce\
private synchronized void connect(){
	return new Cluster(getConfiguration());

public boolean waitForCompletion() {
  if (state == JobState.DEFINE) 
    submit(); //*
  return isSuccessful();

public void submit(){
  final JobSubmitter submitter = 
    getJobSubmitter(cluster.getFileSystem(), cluster.getClient()); //*
  status = ugi.doAs((PrivilegedExceptionAction) () -> {
    return submitter.submitJobInternal(Job.this, cluster); //*

public JobSubmitter getJobSubmitter(FileSystem fs, ClientProtocol submitClient) {
  return new JobSubmitter(fs, submitClient);

在 的 submit() 中能夠看到經過 connect() 方法 cluster 對象獲得了項目的配置信息, 又經過這些配置信息獲得了具體的 FileSystem 和 Client 並建立了用於提交 Job 的 submitter 對象分佈式

submitter 使用 submitJobInternal 方法開始提交做業, 在該方法處能夠看到如下詳盡的註釋ide

The job submission process involves:oop

  1. Checking the input and output specifications of the job.學習

    //檢查這次 Job 的輸入輸出規範性大數據

  2. Computing the InputSplits for the job.優化

    //計算這次 Job 的切片, 表明着確認多少個 MapTask

  3. Setup the requisite accounting information for the DistributedCache of the job, if necessary.

    //大概意思是, 若是須要的話對這次 Job 進行分佈式緩存的優化

  4. Copying the job's jar and configuration to the map-reduce system directory on the distributed file-system.

    //將 Job 的 jar 和配置文件複製到 HDFS

  5. Submitting the job to the JobTracker and optionally monitoring it's status.

    //提交 Job 到 JobTracker 並監控, 這裏的 JobTracker 是 hadoop 1.x 的實現, 如今用 Yarn 的話應該是提交 ResourceManager

經過以上註釋已經明確接下來的代碼能夠看到 MapTask 並行度如何肯定以及切片的具體機制, 那進入 JobSubmitter 源碼好好分析一下

// org\apache\hadoop\mapreduce\
JobStatus submitJobInternal(Job job, Cluster cluster) {
  Path submitJobDir = new Path(jobStagingArea, jobId.toString());
  copyAndConfigureFiles(job, submitJobDir);
  int maps = writeSplits(job, submitJobDir);//* 這裏計算 map 的數量

private int writeSplits(JobContext job, Path jobSubmitDir) {
  JobConf jConf = (JobConf)job.getConfiguration();
  int maps;
  if (jConf.getUseNewMapper()) {
    // hadoop 2.x
    maps = writeNewSplits(job, jobSubmitDir);//*
  } else {
    // hadoop 1.x
    maps = writeOldSplits(jConf, jobSubmitDir);
  return maps;

private <T extends InputSplit>
  int writeNewSplits(JobContext job, Path jobSubmitDir) {
  Configuration conf = job.getConfiguration();
  InputFormat<?, ?> input = //* 經過反射獲得 Input
    ReflectionUtils.newInstance(job.getInputFormatClass(), conf);//*

  List<InputSplit> splits = input.getSplits(job);//*
  return array.length;

public JobContextImpl(Configuration conf, JobID jobId) {
  public Class<? extends InputFormat<?,?>> getInputFormatClass() {
    // INPUT_FORMAT_CLASS_ATTR對象表明着配置文件中的 mapreduce.job.inputformat.class
    return conf.getClass(INPUT_FORMAT_CLASS_ATTR, TextInputFormat.class);

進入 submitJobInternal() 後, 看到 writeSplits(job, submitJobDir) 計算返回 MapTask 的數量, writeSplits() 方法中調用 writeNewSplits(job, jobSubmitDir)

writeNewSplits() 裏的 input 對象, 經過 ReflectionUtils 名字能夠看出來是反射獲得的 Input 具體格式, Hadoop 很多地方都是使用反射獲取類型, 經過 getInputFormatClass() 方法得知, InputFormatClass 是用戶能夠指定的, 若是沒有指定就設置成 TextInputFormat.class

代碼中的 input.getSplits(job) 獲取全部的 split 是 client 最核心的功能, 當點進去發現 InputFormat 是個抽象類, 大致的繼承關係以下圖


TextInputFormat 中沒有 getSplits() 的實現, 往上找具體實現, 看來是在 FileInputFormat 中了

計算 split

// org\apache\hadoop\mapreduce\lib\input\
public List<InputSplit> getSplits(JobContext job) {
  StopWatch sw = new StopWatch().start();
  // 默認狀況 minSize = 1, 或者修改 mapreduce.input.fileinputformat.split.minsize 屬性
  long minSize = Math.max(getFormatMinSplitSize(), getMinSplitSize(job));
  // 默認狀況 maxSize 很是大, 是Long.max
  long maxSize = getMaxSplitSize(job);

  List<InputSplit> splits = new ArrayList<InputSplit>();
  List<FileStatus> files = listStatus(job);
  // 1.
  for (FileStatus file: files) {
    Path path = file.getPath();
    long length = file.getLen(); // length 是當前文件的實際大小
    if (length != 0) {
      BlockLocation[] blkLocations;
      if (file instanceof LocatedFileStatus) {
        blkLocations = ((LocatedFileStatus) file).getBlockLocations();
      } else {
        FileSystem fs = path.getFileSystem(job.getConfiguration());
        blkLocations = fs.getFileBlockLocations(file, 0, length);
      if (isSplitable(job, path)) {
        long blockSize = file.getBlockSize();
        long splitSize = computeSplitSize(blockSize, minSize, maxSize); //*
        // 2.
        long bytesRemaining = length;
        while (((double) bytesRemaining)/splitSize > SPLIT_SLOP) {
          int blkIndex = getBlockIndex(blkLocations, length-bytesRemaining); //*
          splits.add(makeSplit(path, length-bytesRemaining, splitSize,
                                blkLocations[blkIndex].getCachedHosts()));// 緩存優化
          bytesRemaining -= splitSize;
  return splits;

protected long getFormatMinSplitSize() {
  return 1;

public static long getMinSplitSize(JobContext job) {
  //SPLIT_MINSIZE 是配置 mapreduce.input.fileinputformat.split.minsize 屬性
  return job.getConfiguration().getLong(SPLIT_MINSIZE, 1L);

public static long getMaxSplitSize(JobContext context) {
  //SPLIT_MAXSIZE 是配置 mapreduce.input.fileinputformat.split.maxsize 屬性
  return context.getConfiguration().getLong(SPLIT_MAXSIZE, Long.MAX_VALUE);

protected long computeSplitSize(long blockSize, long minSize,long maxSize) {
  return Math.max(minSize, Math.min(maxSize, blockSize));

protected int getBlockIndex(BlockLocation[] blkLocations,long offset) {
  for (int i = 0 ; i < blkLocations.length; i++) {
    // is the offset inside this block?
    if ((blkLocations[i].getOffset() <= offset) &&
      (offset < blkLocations[i].getOffset() + blkLocations[i].getLength())){
      return i;
  BlockLocation last = blkLocations[blkLocations.length -1];
  long fileLength = last.getOffset() + last.getLength() -1;

在標記的 1. 處開始先是遍歷 Job 中每一個 File, 獲取 File 中全部 block 的 location 和 blockSize, 並經過計算獲取 splitSize, 具體計算公式是 splitSize = Math.max(minSize, Math.min(maxSize, blockSize))

在標記的 2. 處就是實際劃分 split 的代碼, while 循環條件是剩餘文件體積 > split 大小, 默認狀況 split 和 block 一一對應

循環體中 length-bytesRemaining 是當前 split 的offset, getBlockIndex(blkLocations, length-bytesRemaining) 方法是計算當前 split 所在的 block 具體位置

循環結束之後 splits 會包含全部的文件的 split 具體關鍵信息, 同時 splits.size 也就肯定了 MapTask 的數量

代碼看到這裏就清楚了 Client 是如何計算 MapTask 的並行度以及爲計算向數據移動作了哪些具體的工做


雖然我已經參與開發工做有段時間了, 實際上對於看源碼我仍是有些抵觸了, 老是摸不着頭腦不清楚哪裏是重點, 屢次之後就對源碼至關反感.

