MapReduce學習筆記 —— Map的中間結果

《Hadoop技術內幕——深刻解析MapReduce架構設計與實現原理》(董西城著)一書中,第8章《Task運行過程分析》中第3小結詳細介紹了Map Task的內部實現,過程如圖所示:
Map Task計算流程,圖片來自原書安全

在Spill階段,當環形緩衝區滿後,MapReduce會將數據寫到本地磁盤上,生成一個臨時文件。其步驟以下:數據結構

  1. 對緩衝區kvbuffer中區間[bufstart, bufend)內的數據進行排序。會先partition,而後基於key排序。
  2. 按照分區編號由小到大依次將每一個分區中的數據寫入任務工做目錄下的臨時文件output/spillN.out中。
  3. 將分區數據的元信息寫道內存索引數據結構SpillRecorder中,每一個元信息包含在臨時文件中的偏移量、壓縮先後的數據大小。

這些臨時文件會在Combine階段進行合併,最終生成一個文件,並保存到output/file.out中,同時生成響應的索引文件oiutput/file.out.index架構

那麼問題來了,這些中間結果是否能夠被第三方獲取?是否能夠進行加密?加密是否可逆?app

在該書第11章《Hadoop安全機制》中有相關介紹。在未添加安全機制以前,任何用戶能夠經過URL來獲取人意一個Map Task的中間輸出結果。爲了解決該問題,Hadoop在Reduce Task與TaskTracker之間的通訊機制上添加了雙向認證機制,以保證有且僅有同做業的Reduce Task纔可以讀取Map Task的中間結果。該雙向認證是以它們之間的共享的做業令牌爲基礎。ide

過程以下:
TaskTracker對Reduce Task認證: Reduce Task從TaskTracker上獲取數據以前,先要將HMAC-SHA1(URL, JobToken)發送給TaskTracker,TaskTracker利用本身保存的做業令牌計算HMAC-SHA1,而後比較該值與Reduce Task發過來的是否一致。
Reduce Task對TaskTracker認證:TaskTracker對Reduce Task認證成功後,須要使用Reduce Task發送過來的HMAC-SHA1值與做業令牌計算一個新的HMAC-SHA1,經Reduce Task驗證後,雙方纔能夠證明傳送數據。函數

========================================oop

MapOutputBuffer中有一個變量叫作mapOutputFile。在sortAndSpill函數中(被flush調用),會經過這個變量拿到文件地址,並寫出中間結果,在該方法中,調用了下文中提到的writer.append(key, value)來寫出數據。看起來沒有加密的過程。this

在執行shuffle.run()時,會對map的數據進行提取併合並。就會調用merger.close(),
實際會調用到MergeManagerlmpl的close方法,代碼以下:加密

@Override
  public RawKeyValueIterator close() throws Throwable {
    // Wait for on-going merges to complete
    if (memToMemMerger != null) { 
      memToMemMerger.close();
    }
    inMemoryMerger.close();
    onDiskMerger.close();

    List<InMemoryMapOutput<K, V>> memory = 
      new ArrayList<InMemoryMapOutput<K, V>>(inMemoryMergedMapOutputs);
    inMemoryMergedMapOutputs.clear();
    memory.addAll(inMemoryMapOutputs);
    inMemoryMapOutputs.clear();
    List<CompressAwarePath> disk = new ArrayList<CompressAwarePath>(onDiskMapOutputs);
    onDiskMapOutputs.clear();
    return finalMerge(jobConf, rfs, memory, disk);
  }

那麼咱們看到了memToMemMerger\inMemoryMerger\onDiskMerger三種不一樣的Merger,定義以下:spa

private IntermediateMemoryToMemoryMerger memToMemMerger;
private final MergeThread<InMemoryMapOutput<K,V>, K,V> inMemoryMerger;
private final OnDiskMerger onDiskMerger;

其中IntermediateMemoryToMemoryMerger繼承自 MergeThread<InMemoryMapOutput<K, V>, K, V>,然而MergeThread的close方法和run方法以下:

public synchronized void close() throws InterruptedException {
  closed = true;
  waitForMerge();
  interrupt();
}


public void run() {
  while (true) {
    List<T> inputs = null;
    try {
      // Wait for notification to start the merge...
      synchronized (pendingToBeMerged) {
      while(pendingToBeMerged.size() <= 0) {
        pendingToBeMerged.wait();
      }
      // Pickup the inputs to merge.
      inputs = pendingToBeMerged.removeFirst();
    }

    // Merge
    merge(inputs);
    } catch (InterruptedException ie) {
      numPending.set(0);
      return;
    } catch(Throwable t) {
      numPending.set(0);
      reporter.reportException(t);
      return;
    } finally {
      synchronized (this) {
      numPending.decrementAndGet();
      notifyAll();
    }
  }
}

而imMemoryMerger則是由createInMemoryMerger函數建立,實際上是一個InMemoryMerger的實例。

這三者都會在merge方法中建立一個Writer變量,並調用Merger.writeFile(iter, writer, reporter, jobConf)。隨後調用writer.close()來完成調用。close函數實現以下:

public void close() throws IOException {

  // When IFile writer is created by BackupStore, we do not have
  // Key and Value classes set. So, check before closing the
  // serializers
  if (keyClass != null) {
    keySerializer.close();
    valueSerializer.close();
  }

  // Write EOF_MARKER for key/value length
  WritableUtils.writeVInt(out, EOF_MARKER);
  WritableUtils.writeVInt(out, EOF_MARKER);
  decompressedBytesWritten += 2 * WritableUtils.getVIntSize(EOF_MARKER);

  //Flush the stream
  out.flush();

  if (compressOutput) {
    // Flush
    compressedOut.finish();
    compressedOut.resetState();
  }

  // Close the underlying stream iff we own it...
  if (ownOutputStream) {
    out.close();
  }
  else {
    // Write the checksum
    checksumOut.finish();
  }

  compressedBytesWritten = rawOut.getPos() - start;

  if (compressOutput) {
    // Return back the compressor
    CodecPool.returnCompressor(compressor);
    compressor = null;
  }

  out = null;
  if(writtenRecordsCounter != null) {
    writtenRecordsCounter.increment(numRecordsWritten);
  }
}

咱們會發現其中關鍵的就是out。out的建立以下:

if (codec != null) {
    this.compressor = CodecPool.getCompressor(codec);
    if (this.compressor != null) {
      this.compressor.reset();
      this.compressedOut = codec.createOutputStream(checksumOut, compressor);
      this.out = new FSDataOutputStream(this.compressedOut,  null);
      this.compressOutput = true;
    } else {
      LOG.warn("Could not obtain compressor from CodecPool");
      this.out = new FSDataOutputStream(checksumOut,null);
    }
  } else {
    this.out = new FSDataOutputStream(checksumOut,null);
  }

這一部分解釋了黨咱們傳入了壓縮格式的時候,中間結果如何進行壓縮。

幾個結論:

  • 輸出應該是機遇Job Configuration裏面的設定,壓縮成具體的格式。能夠參看:StackOverflow
  • 直接使用Map的中間結果應該也是不可行的,除非本身修改源代碼。能夠參看:StackOverflow。可是能夠嘗試實現IFile作一些常識。
相關文章
相關標籤/搜索