下文中新舊的意思分別表明 Hadoop 0.20 先後。緣由是 MapReduce 在這個版本進行了一次大改,主要的特色就是劃分了新舊兩個包名。新版的特色是使用了抽象類代替一些可擴展的接口,以及增長了 Context 的概念。好比說,MapContext
中就封裝了獲取切片、讀取 Record 等功能,而MapContextImpl
就是對 reader 和 writer 邏輯的封裝。java
有時候,咱們會不當心配置錯誤,好比把mapreduce.job.combine.class
配置到了mapred.combiner.class
,就可能致使啓用了舊的模塊。node
在org.apache.hadoop.mapred.TaskStatus.Phase
這個枚舉中能夠看到 task 定義的全部階段:STARTING, MAP, SHUFFLE, SORT, REDUCE, CLEANUP
。固然咱們在理解的時候可能加入一些本身定義的階段(根據動做),好比 Split。算法
整個分片在客戶端進行。Split 實際上是是邏輯上的分片,但其大小依賴於底層的文件塊。這個公式搞 Hadoop 的人都很熟悉:apache
protected long computeSplitSize(long goalSize, long minSize, long blockSize) {
return Math.max(minSize, Math.min(goalSize, blockSize));
}
複製代碼
它其實就是保證了。它是
FileInputFormat
裏專門設置的規則,且這個並非固定的,而是針對每一個文件(由於 MapReduce 支持傳入路徑,並且是多個路徑)。這裏你就能夠看到一個能夠調優的點了:顯然這裏須要進行和 namenode 的通訊,這樣就須要容許設置mapreduce.input.fileinputformat.list-status.num-threads
增長遍歷文件信息時的線程數,這很合理(固然,略有一些奇怪的地方是這個配置實際上影響的是客戶端)。數據結構
Split的核心類是InputFormat
。這個接口必須實現getSplits
和getRecordReader
方法,這對應着將輸入的文件劃分爲分片並映射到 Map 的輸入。通常來講,咱們默認使用的是TextInputFormat
,它使用了LineRecordReader
讀取模式和FileInputFormat
的按塊切分。其餘的模式也很重要,好比CombineFileInputFormat
(能夠讀取若干小文件,這個和 SequenceFile 合在一塊兒不一樣,只是打包了元數據),好比KeyValueTextInputFormat
能夠讀取鍵值對,好比NLineInputFormat
容許指定切分的行數(其實我以爲應該實如今TextInputFormat
裏)。LineRecordReader
還有一個細節是它實現了對壓縮文件的自解釋。app
這裏仍是有一些值得細想的地方。好比,當獲取了文件的分塊信息以後,如何對任務的劃分進行優化(本地化)?若是是本地文件,如何寫入 HDFS 並補全元信息?svn
顯然,本地化已經涉及到做業調度,因此這裏的邏輯應該在 AM 處理。所以這裏JobSubmitter
會將數據進行序列化(具體能夠參考writeJobSplitMetaInfo
),以後在JobImpl.InitTransition
的createSplits
方法能夠看到它讀取了一組TaskSplitMetaInfo
信息。JobImpl
中只會建立 Map 和 Reduce 任務,具體的調度就要參考調度器的實現了。函數
在listStatus
調用getFileSystem
後,會根據 schema 獲得對應的FileSystem
實現類,本地就是LocalFileSystem
。這裏並無作特殊的處理,而是將這個位置也做爲元信息傳遞了。oop
Mapper
裏的MapContext
會從InputFormat
裏取 RecordReader 做爲記錄讀取的方法。這裏的實現比較簡單並且不過重要,重點是理解 Mapper 的run
方法:優化
public void run(Context context) throws IOException, InterruptedException {
setup(context);
try {
while (context.nextKeyValue()) {
map(context.getCurrentKey(), context.getCurrentValue(), context);
}
} finally {
cleanup(context);
}
}
複製代碼
其中setup
和cleanup
只調用一次,都是平時 MR 常見的回調點。
咱們知道在 Map 中會將結果分區寫入不一樣文件。這裏提供了一個環形緩衝區(相似於其餘的緩衝設計,好比消息隊列)MapOutputBuffer
來平滑輸出,它繼承了MapOutputCollector
。在每次緩衝區溢出(spill,其實閾值爲80%)後,都會寫入一個臨時文件。以後,Map 會合並臨時文件到最終的分區文件。這個過程有大量的配置優化點,好比mapreduce.task.io.sort.mb
(控制緩衝區大小),mapreduce.task.io.sort.factor
(控制進行 merge 的文件數,另外這個也針對 Reduce),mapreduce.map.output.compress
(進行 map 結果壓縮)等等。
注意,分區不表明寫入不一樣文件,而是文件的不一樣位置。
spill 過程是由獨立的線程執行的,不過爲何不能和 Reducer 同樣多個線程寫呢?主要是這種多寫者問題幾乎必然是有鎖的(無鎖算法必須使用其餘特定數據結構),效率並不比單寫者高。
在寫入磁盤前,Map 會進行排序(sort),這點比較容易忽略。你能夠這樣記住它:MapOutputBuffer
實現了IndexedSortable
(固然,其實原本是 MapReduce 提到了老是排序的好處)。這個接口頗有意思,這裏默認實現是 Hadoop 本身實現的快速排序,它會對分區先進行排序,然後保持分區內有序(這個也能夠經過map.sort.class
設置)。至於寫到什麼文件裏,你能夠在getSpillFileForWrite
裏找到(通常都是 out 後綴),好比%s_spill_%d.out
(前面是做業號,後面是溢出的次數)。
在網上流傳着一種說法:Combiner 應該是一個純函數。不過,其實咱們知道,Google 在論文裏就提到了 Map 和 Reduce 也應該是純函數(不過也能夠在必定範圍內違反)。固然這個傳言是有緣由的,由於 Combine 被屢次調用了:在sortAndSpill
的最後,以及你們都知道的,在整個 Map 階段的最後,它會調用mergeParts
方法。
注意Combine 和 Merge 的含義是不一樣的。Combine 是按照原文教義的,是一種 accelerate;Merge 是處理臨時文件的。不管是 Map 仍是 Reduce 階段都是緩衝區再加文件合併。那麼能不能直接寫到結果文件呢?其實理論上能夠,可是沒有意義,由於咱們每次有新的溢出時,都要和舊的結果文件進行合併。這樣作也拖慢了 Map 的輸出。
Spill 以後可選的步驟是合併(combine)和壓縮。Map 的壓縮不只僅是最後的輸出,它在輸出臨時文件時就會進行了,因此能夠極大提升傳輸的效率(可是以 CPU 佔用爲代價)。
Spill 生成的全部分區都在一個文件裏。所以,它須要元信息來標記分區的範圍,這就是SpillRecord
,在 Map Task 裏有一個對應的 ArrayList 集合indexCacheList
,它存儲着全部 Map 臨時文件 的元信息。不過,若是它的大小超過了mapreduce.task.index.cache.limit.bytes
,那麼就會溢寫到磁盤,因此這裏也是一個能夠調優的地方。
Merge 過程能夠經過修改mapreduce.task.io.sort.factor
來增長一次合併的數量(多路歸併),不然的話就會增長循環次數。不過,這裏略微比咱們想的要複雜:整個文件列表首先會被排序,這以後會取出要歸併的文件,組成小根堆,而後迭代堆的值合併——固然這也會生成臨時文件,且臨時文件會二分搜索後插入到當前排序列表。最終,全部文件會合併爲一個。但這個代碼看起來很是容易讓人迷惑:這個堆MergeQueue
繼承了 Hadoop 本身實現的PriorityQueue
類。它自己的泛型參數是K 和 V,而繼承的PriorityQueue
是Segment<K,V>
,也就是待合併的文件。而這個類又包含了排序的文件列表segments
。
Hadoop 實際上是利用這個堆找全局最小值,方法是它將 Segment 的最小值(也就是第一個值)做爲排序的 key。這樣,就能夠實現一個相似於單出隊的多路歸併效果。要注意的是,取出來的 key 以後就要被更新。這一點咱們能夠從adjustPriorityQueue
裏看到:
private void adjustPriorityQueue(Segment<K, V> reader) throws IOException{
long startPos = reader.getReader().bytesRead;
boolean hasNext = reader.nextRawKey();
long endPos = reader.getReader().bytesRead;
totalBytesProcessed += endPos - startPos;
mergeProgress.set(Math.min(1.0f, totalBytesProcessed * progPerByte));
if (hasNext) {
adjustTop();
} else {
pop();
reader.close();
}
}
複製代碼
其中nextRawKey
會把 key 設置爲下一個 segment 裏鍵的位置(細節能夠看一下IFile
的positionToNextRecord
方法)。調整以後就能夠順利的繼續進行堆的調整了。
最後,就是在Merger
的writeFile
方法裏,會對RawKeyValueIterator
進行調用,這裏是直接傳的this
。不過我看了一下,Merger
自己在Merge
時就會寫文件,而以後咱們在輸出時仍然會寫 Map 的結果文件,感受這樣不是重複寫了兩次?雖說Merger
是 Map 和 Reduce 複用的,但這樣看起來也並不妥。
Google 的論文也提到,有些場景很適合自定義分區器,他舉的例子是 IP 地址。自定義分區器很大的意義之一在於防止數據傾斜——雖說哈希是最不容易傾斜的了。好比,Hadoop 提供了TotalOrderPartitioner
,它的額外做用是能夠有效分區。這個分區器頗有意思,它原本是實現全序排序的,也就是說它會保存每一個分區的最大最小值。這樣一來就構成了一個典型的二分搜索樹結構——不過,這裏考慮得更細一些,即對於可進行逐字節比較的類型(實現BinaryComparable
)好比Text
,還能夠經過 trie 樹搜索。
還有一種KeyFieldBasedPartitioner
能夠基於某些域來進行哈希。
也能夠說是 Copy,從 Map 端拉取(pull)數據爲 Reducer 提供輸入。
getProgress().setStatus("reduce");
setPhase(TaskStatus.Phase.SHUFFLE);
複製代碼
在 Reducer 初始化的時候,會設置當前 Task 狀態爲 SHUFFLE。Shuffle 這個名字也不是沒有體現,好比在 Reducer 獲取到 Map 的輸出之後,會對數據的順序進行 Shuffle,防止熱點。
這裏 Shuffle 階段的代碼都放在了Shuffle
這個類裏,它實現了ShuffleConsumerPlugin
接口。這個名字看起來有點奇怪,由於它是爲了實現Shuffle
的服務化(可由第三方服務提供)。這個能夠查看對應的 commit:
這裏 Shuffle 會啓動一個事件接收器EventFetcher
的獨立線程來處理 map 完成的事件。這個接收器內部使用了TaskUmbilicalProtocol
協議。咱們沒必要去深究下層 RPC 通訊的邏輯,只須要知道它包含了getMapCompletionEvents
方法。另外,它的實現類TaskAttemptListenerImpl
實際上是用來監聽心跳的(實現了TaskAttemptListener
)。
當收到了完成的事件以後,這些消息會被ShuffleScheduler
解析。具體的實如今addKnownMapOutput
方法中,它會把解析到的mapHost
,也就是持有 map 輸出的節點放到pendingHosts
裏。mapHost
有這樣幾種狀態:
不過,這個pendingHosts
並無暴露出來,Shuffle
是不可見的。調度器ShuffleScheduler
只暴露一個getHost
的接口給Fetcher
線程。這個和EventFetcher
同樣,都是獨立的線程,並且這裏還能夠用mapreduce.reduce.shuffle.parallelcopies
配置來增長並行數。
ShuffleScheduler
的實現ShuffleSchedulerImpl
是使用synchronized
上鎖,諸如addKnownMapOutput
最後會進行notifyAll
,而getHost
會在pendingHosts
不足時等待。Fetcher
的run
代碼很是簡潔:
while (!stopped && !Thread.currentThread().isInterrupted()) {
MapHost host = null;
try {
// If merge is on, block
merger.waitForResource();
// Get a host to shuffle from
host = scheduler.getHost();
metrics.threadBusy();
// Shuffle
copyFromHost(host);
} finally {
if (host != null) {
scheduler.freeHost(host);
metrics.threadFree();
}
}
}
複製代碼
這裏若是有內存 merge 正在運行,會阻塞當前拷貝數據,由於後面的copyFromHost
有可能會觸發歸併操做。
和 Map 不同的是,這個Merger
的類型是MergeManager
。它是對 Shuffle 階段歸併的抽象,把歸併劃分到了OnDiskMerger
和inMemoryMerger
兩個單獨的線程。它提供了三個方法:waitForResource
,reserve
和close
。
其中reserve
會判斷當前是否溢出,並建立一個臨時的拷貝文件(也可能在內存)。這個閾值是根據mapreduce.reduce.memory.totalbytes
和mapreduce.reduce.shuffle.memory.limit.percent
(默認是0.25)的乘積。若是小於閾值,且總內存使用沒有溢出,就生成InMemoryMapOutput
,反之生成OnDiskMapOutput
。它們都實現了MapOutput
接口,當整個拷貝完成的時候會調用這個接口的commit
方法,將這個輸入加入到finishedMaps
裏。
InMemoryMapOutput
的commit
很特殊,它會判斷是否超過了緩衝區閾值(默認是總內存的0.66)或者文件數是否超過mapreduce.reduce.merge.memtomem.threshold
(默認是ioSortFactor
),若是超過了就會調inMemoryMerger
;同理,這個OnDiskMapOutput
也會根據文件的數量來進行歸併(默認是2 * ioSortFactor - 1
)。這裏調用是基於一個鏈表pendingToBeMerged
的對象鎖,也就是 notify。
最後在close
的時候,會把內存和磁盤的臨時文件都合併一次(有可能沒到閾值)。當完成了全部的前期工做後,會調用finalMerge
方法,這個方法的核心就是Merger.merge
,也就是 Map 裏相同的歸併流程。這樣,就能夠造成一個全局排序的輸出文件。由於核心過程都在前面提到了,這裏再也不贅述。
copyFromHost
是從HttpURLConnection
裏獲取數據流。
Reduce 和 Map 同樣,能夠定製OutputFormat
的格式,不過它沒有如何分片。整個代碼都很簡單,reducerContext
會把 Map 的鍵值對合並,其實就是遍歷鍵值對,一直到下一個不一樣的鍵(更準確的來講,是在根據groupComparator
來肯定下一個鍵,因此它有可能會把幾個 key 值放在一塊兒,但默認是RawComparator
,也就是不分組)爲止,而後將全部的值合在一塊兒輸出。
Reduce 必須等待 Shuffle 完成纔開始執行,所以有可能會致使 Slot Hoarding 問題。
其實整個 MapReduce 的代碼有很是多值得看一看的地方,好比,Uber 是如何實現的? 和 AM 通訊是如何實現的?但每每時間所限,咱們只能觀其大略,而後實際用到了再翻一翻細節。董西成大佬很早就出了兩本關於 Hadoop 源碼的書,不過惋惜只對 MapReduce 1.x 有比較詳細的描述,而 YARN 的篇幅較少,許多細節已經與如今比較主流的 Hadoop 版本不太同樣了,固然仍是值得一看的。
To be continued...