MapReduce 的核心流程

下文中新舊的意思分別表明 Hadoop 0.20 先後。緣由是 MapReduce 在這個版本進行了一次大改,主要的特色就是劃分了新舊兩個包名。新版的特色是使用了抽象類代替一些可擴展的接口,以及增長了 Context 的概念。好比說,MapContext中就封裝了獲取切片、讀取 Record 等功能,而MapContextImpl就是對 reader 和 writer 邏輯的封裝。java

有時候,咱們會不當心配置錯誤,好比把mapreduce.job.combine.class配置到了mapred.combiner.class,就可能致使啓用了舊的模塊。node

org.apache.hadoop.mapred.TaskStatus.Phase這個枚舉中能夠看到 task 定義的全部階段:STARTING, MAP, SHUFFLE, SORT, REDUCE, CLEANUP。固然咱們在理解的時候可能加入一些本身定義的階段(根據動做),好比 Split。算法

Split

整個分片在客戶端進行。Split 實際上是是邏輯上的分片,但其大小依賴於底層的文件塊。這個公式搞 Hadoop 的人都很熟悉:apache

protected long computeSplitSize(long goalSize, long minSize, long blockSize) {
    return Math.max(minSize, Math.min(goalSize, blockSize));
}
複製代碼

它其實就是保證了minSize \le goalSize \le blockSize。它是FileInputFormat裏專門設置的規則,且這個並非固定的,而是針對每一個文件(由於 MapReduce 支持傳入路徑,並且是多個路徑)。這裏你就能夠看到一個能夠調優的點了:顯然這裏須要進行和 namenode 的通訊,這樣就須要容許設置mapreduce.input.fileinputformat.list-status.num-threads增長遍歷文件信息時的線程數,這很合理(固然,略有一些奇怪的地方是這個配置實際上影響的是客戶端)。數據結構

Split的核心類是InputFormat。這個接口必須實現getSplitsgetRecordReader方法,這對應着將輸入的文件劃分爲分片並映射到 Map 的輸入。通常來講,咱們默認使用的是TextInputFormat,它使用了LineRecordReader讀取模式和FileInputFormat的按塊切分。其餘的模式也很重要,好比CombineFileInputFormat(能夠讀取若干小文件,這個和 SequenceFile 合在一塊兒不一樣,只是打包了元數據),好比KeyValueTextInputFormat能夠讀取鍵值對,好比NLineInputFormat容許指定切分的行數(其實我以爲應該實如今TextInputFormat裏)。LineRecordReader還有一個細節是它實現了對壓縮文件的自解釋。app

這裏仍是有一些值得細想的地方。好比,當獲取了文件的分塊信息以後,如何對任務的劃分進行優化(本地化)?若是是本地文件,如何寫入 HDFS 並補全元信息?svn

顯然,本地化已經涉及到做業調度,因此這裏的邏輯應該在 AM 處理。所以這裏JobSubmitter會將數據進行序列化(具體能夠參考writeJobSplitMetaInfo),以後在JobImpl.InitTransitioncreateSplits方法能夠看到它讀取了一組TaskSplitMetaInfo信息。JobImpl中只會建立 Map 和 Reduce 任務,具體的調度就要參考調度器的實現了。函數

listStatus調用getFileSystem後,會根據 schema 獲得對應的FileSystem實現類,本地就是LocalFileSystem。這裏並無作特殊的處理,而是將這個位置也做爲元信息傳遞了。oop

Map

Mapper裏的MapContext會從InputFormat裏取 RecordReader 做爲記錄讀取的方法。這裏的實現比較簡單並且不過重要,重點是理解 Mapper 的run方法:優化

public void run(Context context) throws IOException, InterruptedException {
    setup(context);
    try {
    while (context.nextKeyValue()) {
        map(context.getCurrentKey(), context.getCurrentValue(), context);
    }
    } finally {
    cleanup(context);
    }
}
複製代碼

其中setupcleanup只調用一次,都是平時 MR 常見的回調點。

咱們知道在 Map 中會將結果分區寫入不一樣文件。這裏提供了一個環形緩衝區(相似於其餘的緩衝設計,好比消息隊列)MapOutputBuffer來平滑輸出,它繼承了MapOutputCollector。在每次緩衝區溢出(spill,其實閾值爲80%)後,都會寫入一個臨時文件。以後,Map 會合並臨時文件到最終的分區文件。這個過程有大量的配置優化點,好比mapreduce.task.io.sort.mb(控制緩衝區大小),mapreduce.task.io.sort.factor(控制進行 merge 的文件數,另外這個也針對 Reduce),mapreduce.map.output.compress(進行 map 結果壓縮)等等。

注意,分區不表明寫入不一樣文件,而是文件的不一樣位置

Spill

spill 過程是由獨立的線程執行的,不過爲何不能和 Reducer 同樣多個線程寫呢?主要是這種多寫者問題幾乎必然是有鎖的(無鎖算法必須使用其餘特定數據結構),效率並不比單寫者高。

在寫入磁盤前,Map 會進行排序(sort),這點比較容易忽略。你能夠這樣記住它:MapOutputBuffer實現了IndexedSortable(固然,其實原本是 MapReduce 提到了老是排序的好處)。這個接口頗有意思,這裏默認實現是 Hadoop 本身實現的快速排序,它會對分區先進行排序,然後保持分區內有序(這個也能夠經過map.sort.class設置)。至於寫到什麼文件裏,你能夠在getSpillFileForWrite裏找到(通常都是 out 後綴),好比%s_spill_%d.out(前面是做業號,後面是溢出的次數)。

在網上流傳着一種說法:Combiner 應該是一個純函數。不過,其實咱們知道,Google 在論文裏就提到了 Map 和 Reduce 也應該是純函數(不過也能夠在必定範圍內違反)。固然這個傳言是有緣由的,由於 Combine 被屢次調用了:在sortAndSpill的最後,以及你們都知道的,在整個 Map 階段的最後,它會調用mergeParts方法。

注意Combine 和 Merge 的含義是不一樣的。Combine 是按照原文教義的,是一種 accelerate;Merge 是處理臨時文件的。不管是 Map 仍是 Reduce 階段都是緩衝區再加文件合併。那麼能不能直接寫到結果文件呢?其實理論上能夠,可是沒有意義,由於咱們每次有新的溢出時,都要和舊的結果文件進行合併。這樣作也拖慢了 Map 的輸出。

Spill 以後可選的步驟是合併(combine)和壓縮。Map 的壓縮不只僅是最後的輸出,它在輸出臨時文件時就會進行了,因此能夠極大提升傳輸的效率(可是以 CPU 佔用爲代價)。

Spill 生成的全部分區都在一個文件裏。所以,它須要元信息來標記分區的範圍,這就是SpillRecord,在 Map Task 裏有一個對應的 ArrayList 集合indexCacheList,它存儲着全部 Map 臨時文件 的元信息。不過,若是它的大小超過了mapreduce.task.index.cache.limit.bytes,那麼就會溢寫到磁盤,因此這裏也是一個能夠調優的地方。

Merge 過程能夠經過修改mapreduce.task.io.sort.factor來增長一次合併的數量(多路歸併),不然的話就會增長循環次數。不過,這裏略微比咱們想的要複雜:整個文件列表首先會被排序,這以後會取出要歸併的文件,組成小根堆,而後迭代堆的值合併——固然這也會生成臨時文件,且臨時文件會二分搜索後插入到當前排序列表。最終,全部文件會合併爲一個。但這個代碼看起來很是容易讓人迷惑:這個堆MergeQueue繼承了 Hadoop 本身實現的PriorityQueue類。它自己的泛型參數是K 和 V,而繼承的PriorityQueueSegment<K,V>,也就是待合併的文件。而這個類又包含了排序的文件列表segments

Hadoop 實際上是利用這個堆找全局最小值,方法是它將 Segment 的最小值(也就是第一個值)做爲排序的 key。這樣,就能夠實現一個相似於單出隊的多路歸併效果。要注意的是,取出來的 key 以後就要被更新。這一點咱們能夠從adjustPriorityQueue裏看到:

private void adjustPriorityQueue(Segment<K, V> reader) throws IOException{
    long startPos = reader.getReader().bytesRead;
    boolean hasNext = reader.nextRawKey();
    long endPos = reader.getReader().bytesRead;
    totalBytesProcessed += endPos - startPos;
    mergeProgress.set(Math.min(1.0f, totalBytesProcessed * progPerByte));
    if (hasNext) {
    adjustTop();
    } else {
    pop();
    reader.close();
    }
}
複製代碼

其中nextRawKey會把 key 設置爲下一個 segment 裏鍵的位置(細節能夠看一下IFilepositionToNextRecord方法)。調整以後就能夠順利的繼續進行堆的調整了。

最後,就是在MergerwriteFile方法裏,會對RawKeyValueIterator進行調用,這裏是直接傳的this。不過我看了一下,Merger自己在Merge時就會寫文件,而以後咱們在輸出時仍然會寫 Map 的結果文件,感受這樣不是重複寫了兩次?雖說Merger是 Map 和 Reduce 複用的,但這樣看起來也並不妥。

Partition

Google 的論文也提到,有些場景很適合自定義分區器,他舉的例子是 IP 地址。自定義分區器很大的意義之一在於防止數據傾斜——雖說哈希是最不容易傾斜的了。好比,Hadoop 提供了TotalOrderPartitioner,它的額外做用是能夠有效分區。這個分區器頗有意思,它原本是實現全序排序的,也就是說它會保存每一個分區的最大最小值。這樣一來就構成了一個典型的二分搜索樹結構——不過,這裏考慮得更細一些,即對於可進行逐字節比較的類型(實現BinaryComparable)好比Text,還能夠經過 trie 樹搜索。

還有一種KeyFieldBasedPartitioner能夠基於某些域來進行哈希。

Shuffle

也能夠說是 Copy,從 Map 端拉取(pull)數據爲 Reducer 提供輸入。

getProgress().setStatus("reduce"); 
setPhase(TaskStatus.Phase.SHUFFLE);  
複製代碼

在 Reducer 初始化的時候,會設置當前 Task 狀態爲 SHUFFLE。Shuffle 這個名字也不是沒有體現,好比在 Reducer 獲取到 Map 的輸出之後,會對數據的順序進行 Shuffle,防止熱點。

這裏 Shuffle 階段的代碼都放在了Shuffle這個類裏,它實現了ShuffleConsumerPlugin接口。這個名字看起來有點奇怪,由於它是爲了實現Shuffle的服務化(可由第三方服務提供)。這個能夠查看對應的 commit:

svn.apache.org/viewvc?view…

這裏 Shuffle 會啓動一個事件接收器EventFetcher的獨立線程來處理 map 完成的事件。這個接收器內部使用了TaskUmbilicalProtocol 協議。咱們沒必要去深究下層 RPC 通訊的邏輯,只須要知道它包含了getMapCompletionEvents方法。另外,它的實現類TaskAttemptListenerImpl實際上是用來監聽心跳的(實現了TaskAttemptListener)。

當收到了完成的事件以後,這些消息會被ShuffleScheduler解析。具體的實如今addKnownMapOutput方法中,它會把解析到的mapHost,也就是持有 map 輸出的節點放到pendingHosts裏。mapHost有這樣幾種狀態:

  • IDLE:表示 Map 尚未完成
  • PENDING:已經完成等待處理
  • BUSY:表示正被拷貝
  • PENALIZED:錯誤,拷貝失敗

不過,這個pendingHosts並無暴露出來,Shuffle是不可見的。調度器ShuffleScheduler只暴露一個getHost的接口給Fetcher線程。這個和EventFetcher同樣,都是獨立的線程,並且這裏還能夠用mapreduce.reduce.shuffle.parallelcopies配置來增長並行數。

ShuffleScheduler的實現ShuffleSchedulerImpl是使用synchronized上鎖,諸如addKnownMapOutput最後會進行notifyAll,而getHost會在pendingHosts不足時等待。Fetcherrun代碼很是簡潔:

while (!stopped && !Thread.currentThread().isInterrupted()) {
  MapHost host = null;
  try {
    // If merge is on, block
    merger.waitForResource();

    // Get a host to shuffle from
    host = scheduler.getHost();
    metrics.threadBusy();

    // Shuffle
    copyFromHost(host);
  } finally {
    if (host != null) {
    scheduler.freeHost(host);
    metrics.threadFree();            
    }
  }
}
複製代碼

這裏若是有內存 merge 正在運行,會阻塞當前拷貝數據,由於後面的copyFromHost有可能會觸發歸併操做。

和 Map 不同的是,這個Merger的類型是MergeManager。它是對 Shuffle 階段歸併的抽象,把歸併劃分到了OnDiskMergerinMemoryMerger兩個單獨的線程。它提供了三個方法:waitForResourcereserveclose

其中reserve會判斷當前是否溢出,並建立一個臨時的拷貝文件(也可能在內存)。這個閾值是根據mapreduce.reduce.memory.totalbytesmapreduce.reduce.shuffle.memory.limit.percent(默認是0.25)的乘積。若是小於閾值,且總內存使用沒有溢出,就生成InMemoryMapOutput,反之生成OnDiskMapOutput。它們都實現了MapOutput接口,當整個拷貝完成的時候會調用這個接口的commit方法,將這個輸入加入到finishedMaps裏。

InMemoryMapOutputcommit很特殊,它會判斷是否超過了緩衝區閾值(默認是總內存的0.66)或者文件數是否超過mapreduce.reduce.merge.memtomem.threshold(默認是ioSortFactor),若是超過了就會調inMemoryMerger;同理,這個OnDiskMapOutput也會根據文件的數量來進行歸併(默認是2 * ioSortFactor - 1)。這裏調用是基於一個鏈表pendingToBeMerged的對象鎖,也就是 notify。

Sort

最後在close的時候,會把內存和磁盤的臨時文件都合併一次(有可能沒到閾值)。當完成了全部的前期工做後,會調用finalMerge方法,這個方法的核心就是Merger.merge,也就是 Map 裏相同的歸併流程。這樣,就能夠造成一個全局排序的輸出文件。由於核心過程都在前面提到了,這裏再也不贅述。

copyFromHost是從HttpURLConnection裏獲取數據流。

Reduce

Reduce 和 Map 同樣,能夠定製OutputFormat的格式,不過它沒有如何分片。整個代碼都很簡單,reducerContext會把 Map 的鍵值對合並,其實就是遍歷鍵值對,一直到下一個不一樣的鍵(更準確的來講,是在根據groupComparator來肯定下一個鍵,因此它有可能會把幾個 key 值放在一塊兒,但默認是RawComparator,也就是不分組)爲止,而後將全部的值合在一塊兒輸出。

Reduce 必須等待 Shuffle 完成纔開始執行,所以有可能會致使 Slot Hoarding 問題。


其實整個 MapReduce 的代碼有很是多值得看一看的地方,好比,Uber 是如何實現的? 和 AM 通訊是如何實現的?但每每時間所限,咱們只能觀其大略,而後實際用到了再翻一翻細節。董西成大佬很早就出了兩本關於 Hadoop 源碼的書,不過惋惜只對 MapReduce 1.x 有比較詳細的描述,而 YARN 的篇幅較少,許多細節已經與如今比較主流的 Hadoop 版本不太同樣了,固然仍是值得一看的。

To be continued...

相關文章
相關標籤/搜索