Map Task內部實現分析

        上篇我剛剛學習完。Spilt的過程,還算比較簡單的了,接下來學習的就是Map操做的過程了,Map和Reduce同樣。是整個MapReduce的重要內容,因此。這一篇,我會好好的講講裏面的內部實現過程。首先要說,MapTask。分爲4種,可能這一點上有人就可能知道了,各自是Job-setup Task,Job-cleanup Task。Task-cleanup和Map Task。前面3個都是輔助性質的任務。不是本文分析的重點,我講的就是裏面的最最重要的MapTask。java

        MapTask的整個過程分爲5個階段:app

Read----->Map------>Collect------->Spill------>Combinejvm

來張時序圖。簡單明瞭:ide


在後面的代碼分析中。你會看到各自方法的調用過程。函數

        在分析整個過程以前。得先了解裏面的一些內部結構,MapTask類做爲Map Task的一個載體。他的類關係例如如下:post


咱們調用的就是裏面的run方法,開啓map任務,對應的代碼:學習

/**
   * mapTask主要運行流程
   */
  @Override
  public void run(final JobConf job, final TaskUmbilicalProtocol umbilical) 
    throws IOException, ClassNotFoundException, InterruptedException {
    this.umbilical = umbilical;

    // start thread that will handle communication with parent
    //發送task任務報告。與父進程作交流
    TaskReporter reporter = new TaskReporter(getProgress(), umbilical,
        jvmContext);
    reporter.startCommunicationThread();
    //推斷用的是新的MapReduceAPI仍是舊的API
    boolean useNewApi = job.getUseNewMapper();
    initialize(job, getJobID(), reporter, useNewApi);

    // check if it is a cleanupJobTask
    //map任務有4種。Job-setup Task, Job-cleanup Task, Task-cleanup Task和MapTask
    if (jobCleanup) {
      //這裏運行的是Job-cleanup Task
      runJobCleanupTask(umbilical, reporter);
      return;
    }
    if (jobSetup) {
      //這裏運行的是Job-setup Task
      runJobSetupTask(umbilical, reporter);
      return;
    }
    if (taskCleanup) {
      //這裏運行的是Task-cleanup Task
      runTaskCleanupTask(umbilical, reporter);
      return;
    }

    //假設前面3個任務都不是,運行的就是最基本的MapTask,依據新老API調用不一樣的方法
    if (useNewApi) {
      runNewMapper(job, splitMetaInfo, umbilical, reporter);
    } else {
      //咱們關注一下老的方法實現splitMetaInfo爲Spilt分片的信息。由於上步驟的InputFormat過程傳入的
      runOldMapper(job, splitMetaInfo, umbilical, reporter);
    }
    done(umbilical, reporter);
  }
在這裏我研究的都是舊的API因此往runOldMapper裏面跳。

在這裏我要插入一句,後面的運行都會環繞着一個叫Mapper的東西,就是用戶運行map函數的一個代理稱呼同樣,他可以全然本身重寫map的背後的過程,也可以用系統自帶的mapp流程。ui


系統已經給了MapRunner的詳細實現:this

public void run(RecordReader<K1, V1> input, OutputCollector<K2, V2> output,
                  Reporter reporter)
    throws IOException {
    try {
      // allocate key & value instances that are re-used for all entries
      K1 key = input.createKey();
      V1 value = input.createValue();
      
      //從RecordReader中獲取每個鍵值對,調用用戶寫的map函數
      while (input.next(key, value)) {
        // map pair to output
    	//調用用戶寫的map函數
        mapper.map(key, value, output, reporter);
        if(incrProcCount) {
          reporter.incrCounter(SkipBadRecords.COUNTER_GROUP, 
              SkipBadRecords.COUNTER_MAP_PROCESSED_RECORDS, 1);
        }
      }
    } finally {
      //結束了關閉mapper
      mapper.close();
    }
  }
從這裏咱們可以看出Map的過程就是迭代式的反覆的運行用戶定義的Map函數操做。好了,有了這些前提,咱們可以往裏深刻的學習了剛剛說到了runOldMapper方法,裏面當即要進行的就是Map Task的第一個過程Read。

      Read階段的做業就是從RecordReader中讀取出一個個key-value,準備給後面的map過程運行map函數操做。spa

//獲取輸入inputSplit信息
    InputSplit inputSplit = getSplitDetails(new Path(splitIndex.getSplitLocation()),
           splitIndex.getStartOffset());

    updateJobWithSplit(job, inputSplit);
    reporter.setInputSplit(inputSplit);
    
    //是不是跳過錯誤記錄模式,獲取RecordReader
    RecordReader<INKEY,INVALUE> in = isSkipping() ? 
        new SkippingRecordReader<INKEY,INVALUE>(inputSplit, umbilical, reporter) :
        new TrackedRecordReader<INKEY,INVALUE>(inputSplit, job, reporter);
        後面的就是Map階段。把值取出來以後。就要給Mapper去運行裏面的run方法了,run方法裏面會調用用戶本身實現的map函數。以前也都是分析過了的。

在用戶編寫的map的尾部,一般會調用collect.collect()方法,把處理後的key-value輸出,這個時候,也就來到了collect階段。

runner.run(in, new OldOutputCollector(collector, conf), reporter);
        以後進行的是Collect階段基本的操做時什麼呢,就是把一堆堆的key-value進行分區輸出到環形緩衝區中。這是的數據只放在內存中。尚未寫到磁盤中。在collect這個過程當中涉及的東西還比較多,看一下結構關係圖;



裏面有個partitioner的成員變量,專門用於獲取key-value的的分區號。默認是經過key的哈希取模運算。獲得分區號的,固然你可以本身定義實現,假設不分區的話partition就是等於-1。

  /**
   * Since the mapred and mapreduce Partitioners don't share a common interface
   * (JobConfigurable is deprecated and a subtype of mapred.Partitioner), the
   * partitioner lives in Old/NewOutputCollector. Note that, for map-only jobs,
   * the configured partitioner should not be called. It's common for
   * partitioners to compute a result mod numReduces, which causes a div0 error
   */
  private static class OldOutputCollector<K,V> implements OutputCollector<K,V> {
    private final Partitioner<K,V> partitioner;
    private final MapOutputCollector<K,V> collector;
    private final int numPartitions;

    @SuppressWarnings("unchecked")
    OldOutputCollector(MapOutputCollector<K,V> collector, JobConf conf) {
      numPartitions = conf.getNumReduceTasks();
      if (numPartitions > 0) {
    	//假設分區數大於0,則反射獲取系統配置方法,默認哈希去模。用戶可以本身實現字節的分區方法
    	//因爲是RPC傳來的,因此採用反射
        partitioner = (Partitioner<K,V>)
          ReflectionUtils.newInstance(conf.getPartitionerClass(), conf);
      } else {
    	//假設分區數爲0。說明不進行分區
        partitioner = new Partitioner<K,V>() {
          @Override
          public void configure(JobConf job) { }
          @Override
          public int getPartition(K key, V value, int numPartitions) {
        	//分區號直接返回-1表明不分區處理
            return -1;
          }
        };
      }
      this.collector = collector;
    }
    .....
collect的代理調用實現方法例如如下,注意此時還不是真正調用:

.....
    @Override
    public void collect(K key, V value) throws IOException {
      try {
    	//詳細經過collect方法分區寫入內存。調用partitioner.getPartition獲取分區號
    	//緩衝區爲環形緩衝區
        collector.collect(key, value,
                          partitioner.getPartition(key, value, numPartitions));
      } catch (InterruptedException ie) {
        Thread.currentThread().interrupt();
        throw new IOException("interrupt exception", ie);
      }
    }
這裏的collector指的是上面代碼中的MapOutputCollector對象。開放給用調用的是OldOutputCollector,但是咱們看看代碼:

interface MapOutputCollector<K, V> {

    public void collect(K key, V value, int partition
                        ) throws IOException, InterruptedException;
    public void close() throws IOException, InterruptedException;
    
    public void flush() throws IOException, InterruptedException, 
                               ClassNotFoundException;
        
  }

他僅僅是一個接口,真正的實現是誰呢?這個時候應該回頭看一下代碼:

private <INKEY,INVALUE,OUTKEY,OUTVALUE>
  void runOldMapper(final JobConf job,
                    final TaskSplitIndex splitIndex,
                    final TaskUmbilicalProtocol umbilical,
                    TaskReporter reporter
                    ) throws IOException, InterruptedException,
                             ClassNotFoundException {
	...
	int numReduceTasks = conf.getNumReduceTasks();
    LOG.info("numReduceTasks: " + numReduceTasks);
    MapOutputCollector collector = null;
    if (numReduceTasks > 0) {
      //假設存在ReduceTask,則將數據存入MapOutputBuffer環形緩衝
      collector = new MapOutputBuffer(umbilical, job, reporter);
    } else { 
      //假設沒有ReduceTask任務的存在,直接寫入把操做結果寫入HDFS做爲終於結果
      collector = new DirectMapOutputCollector(umbilical, job, reporter);
    }
    MapRunnable<INKEY,INVALUE,OUTKEY,OUTVALUE> runner =
      ReflectionUtils.newInstance(job.getMapRunnerClass(), job);

    try {
      runner.run(in, new OldOutputCollector(collector, conf), reporter);
      .....
分爲2種狀況當有Reduce任務時。collector爲MapOutputBuffer,沒有Reduce任務時爲DirectMapOutputCollector。從這裏也能明確。做者考慮的很是周全呢,沒有Reduce直接寫入HDFS,效率會高很是多。

也就是說。終於的collect方法就是MapOutputBuffer的方法了。

因爲collect的操做時將數據存入環形緩衝區,這意味着。用戶對數據的讀寫都是在同個緩衝區上的,因此爲了不出現髒數據的現象,必定會作額外處理。這裏做者用了和BlockingQueue相似的操做,用一個ReetrantLocj,獲取2個鎖控制條件,一個爲spillDone

,一個爲spillReady。同個condition的await,signal方法實現丟緩衝區的讀寫控制。

.....
    private final ReentrantLock spillLock = new ReentrantLock();
    private final Condition spillDone = spillLock.newCondition();
    private final Condition spillReady = spillLock.newCondition();
    .....
而後看collect的方法:

public synchronized void collect(K key, V value, int partition
              ) throws IOException {
      .....
      try {
        // serialize key bytes into buffer
        int keystart = bufindex;
        keySerializer.serialize(key);
        if (bufindex < keystart) {
          // wrapped the key; reset required
          bb.reset();
          keystart = 0;
        }
        // serialize value bytes into buffer
        final int valstart = bufindex;
        valSerializer.serialize(value);
        int valend = bb.markRecord();

        if (partition < 0 || partition >= partitions) {
          throw new IOException("Illegal partition for " + key + " (" +
              partition + ")");
        }
        ....

至於環形緩衝區的結構。不是本文的重點,結構設計仍是比較複雜的。你們可以自行學習。當環形緩衝區內的數據漸漸地被填滿以後,會出現"溢寫"操做,就是把緩衝中的數據寫到磁盤DISK中。這個過程就是後面的Spill階段了。

      Spill的階段會時不時的穿插在collect的運行過程當中。

...
          if (kvstart == kvend && kvsoftlimit) {
            LOG.info("Spilling map output: record full = " + kvsoftlimit);
            startSpill();
          }
假設開頭kvstart的位置等kvend的位置,說明轉了一圈有到頭了。數據已經滿了的狀態,開始spill溢寫操做。

private synchronized void startSpill() {
      LOG.info("bufstart = " + bufstart + "; bufend = " + bufmark +
               "; bufvoid = " + bufvoid);
      LOG.info("kvstart = " + kvstart + "; kvend = " + kvindex +
               "; length = " + kvoffsets.length);
      kvend = kvindex;
      bufend = bufmark;
      spillReady.signal();
    }
會觸發condition的信號量操做:

private synchronized void startSpill() {
      LOG.info("bufstart = " + bufstart + "; bufend = " + bufmark +
               "; bufvoid = " + bufvoid);
      LOG.info("kvstart = " + kvstart + "; kvend = " + kvindex +
               "; length = " + kvoffsets.length);
      kvend = kvindex;
      bufend = bufmark;
      spillReady.signal();
    }
就會跑到了SpillThead這個地方運行sortAndSpill方法:

spillThreadRunning = true;
        try {
          while (true) {
            spillDone.signal();
            while (kvstart == kvend) {
              spillReady.await();
            }
            try {
              spillLock.unlock();
              //當緩衝區溢出時,寫到磁盤中
              sortAndSpill();
sortAndSpill裏面會對數據作寫入文件操做寫入以前還會有sort排序操做。數據多了還會進行必定的combine合併操做。

private void sortAndSpill() throws IOException, ClassNotFoundException,
                                       InterruptedException {
      ......
      try {
        // create spill file
        final SpillRecord spillRec = new SpillRecord(partitions);
        final Path filename =
            mapOutputFile.getSpillFileForWrite(numSpills, size);
        out = rfs.create(filename);

        final int endPosition = (kvend > kvstart)
          ? kvend
          : kvoffsets.length + kvend;
        //在寫入操做前進行排序操做
        sorter.sort(MapOutputBuffer.this, kvstart, endPosition, reporter);
        int spindex = kvstart;
        IndexRecord rec = new IndexRecord();
        InMemValBytes value = new InMemValBytes();
        for (int i = 0; i < partitions; ++i) {
          IFile.Writer<K, V> writer = null;
          try {
            long segmentStart = out.getPos();
            writer = new Writer<K, V>(job, out, keyClass, valClass, codec,
                                      spilledRecordsCounter);
            if (combinerRunner == null) {
              // spill directly
              DataInputBuffer key = new DataInputBuffer();
              while (spindex < endPosition &&
                  kvindices[kvoffsets[spindex % kvoffsets.length]
                            + PARTITION] == i) {
                final int kvoff = kvoffsets[spindex % kvoffsets.length];
                getVBytesForOffset(kvoff, value);
                key.reset(kvbuffer, kvindices[kvoff + KEYSTART],
                          (kvindices[kvoff + VALSTART] - 
                           kvindices[kvoff + KEYSTART]));
                //writer中寫入鍵值對操做
                writer.append(key, value);
                ++spindex;
              }
            } else {
              int spstart = spindex;
              while (spindex < endPosition &&
                  kvindices[kvoffsets[spindex % kvoffsets.length]
                            + PARTITION] == i) {
                ++spindex;
              }
              // Note: we would like to avoid the combiner if we've fewer
              // than some threshold of records for a partition
              //假設分區多的話,運行合併操做
              if (spstart != spindex) {
                combineCollector.setWriter(writer);
                RawKeyValueIterator kvIter =
                  new MRResultIterator(spstart, spindex);
                //運行一次文件合併combine操做
                combinerRunner.combine(kvIter, combineCollector);
              }
            }

          ......
          //寫入到文件裏
          spillRec.writeToFile(indexFilename, job);
        } else {
          indexCacheList.add(spillRec);
          totalIndexCacheMemory +=
            spillRec.size() * MAP_OUTPUT_INDEX_RECORD_LENGTH;
        }
        LOG.info("Finished spill " + numSpills);
        ++numSpills;
      } finally {
        if (out != null) out.close();
      }
    }
       每次Spill的過程都會產生一堆堆的文件,在最後的時候就會來到了Combine階段。也就是Map任務的最後一個階段了,他的任務就是把所有上一階段的任務產生的文件進行Merge操做,合併成一個文件,便於後面的Reduce的任務的讀取,在代碼的相應實現中是collect.flush()方法。

.....
    try {
      runner.run(in, new OldOutputCollector(collector, conf), reporter);
      //將collector中的數據刷新到內存中去
      collector.flush();
    } finally {
      //close
      in.close();                               // close input
      collector.close();
    }
  }
這裏的collector的flush方法調用的就是MapOutputBuffer.flush方法,
public synchronized void flush() throws IOException, ClassNotFoundException,
                                            InterruptedException {
      ...
      // shut down spill thread and wait for it to exit. Since the preceding
      // ensures that it is finished with its work (and sortAndSpill did not
      // throw), we elect to use an interrupt instead of setting a flag.
      // Spilling simultaneously from this thread while the spill thread
      // finishes its work might be both a useful way to extend this and also
      // sufficient motivation for the latter approach.
      try {
        spillThread.interrupt();
        spillThread.join();
      } catch (InterruptedException e) {
        throw (IOException)new IOException("Spill failed"
            ).initCause(e);
      }
      // release sort buffer before the merge
      kvbuffer = null;
      //最後進行merge合併成一個文件
      mergeParts();
      Path outputPath = mapOutputFile.getOutputFile();
      fileOutputByteCounter.increment(rfs.getFileStatus(outputPath).getLen());
    }
至此,Map任務宣告結束了。整體流程仍是真是有點九曲十八彎的感受。

分析這麼一個比較龐雜的過程,我一直在想怎樣更好的表達出個人想法。歡迎MapReduce的學習者,提出意見,共同窗習

相關文章
相關標籤/搜索