MapTask
:map----->sort
map
:Mapper.map()中將輸出的key-value
寫出以前
sort
:Mapper.map()中將輸出的key-value
寫出以後算法
Read階段
MapTask經過用戶編寫的RecordReader
,從輸入InputSplit
中解析出一個個key/value
。緩存
Map階段
該節點主要是將解析出的key/value交給用戶編寫map()
函數處理,併產生一系列新的key/value。數據結構
Collect收集階段
在用戶編寫map()函數中,當數據處理完成後,通常會調用OutputCollector.collect()
輸出結果。在該函數內部,它會將生成的key/value分區(調用Partitioner
),並寫入一個內存緩衝區中,而且會被Partitioner
計算一個分區號,按照前後順序分配index
下標app
Spill階段函數
即「溢寫」,在此階段有兩個重要線程。收集線程負責向緩衝區收集數據,緩衝區初始值爲100M,當使用到80%閾值,喚醒溢寫線程,溢寫線程會將緩衝區已經收集的數據溢寫到磁盤。線程
在溢寫前,會對緩衝區中的數據進行排序(快速排序),在排序時,只經過比較key進行排序,只改變index的位置,不交換數據的位置code
排序後,按照分區,依次將數據寫入到磁盤的臨時文件的若干分區中排序
每次溢寫都會生成一個臨時文件,當全部的數據都溢寫完成以後,會將全部的臨時文件片斷合併爲一個總的文件索引
在合併時,將全部的臨時文件的相同分區的數據,進行合併,合併後再對全部的數據進行排序(歸併排序)內存
最終生成一個結果文件(output/file.out
),同時生成相應的索引文件output/file.out.index
,這個文件分爲若干分區,每一個分區的數據已經按照key進行了排序,等待reduceTask
的shuffle線程來拷貝數據!
步驟1:利用快速排序算法對緩存區內的數據進行排序,排序方式是,先按照分區編號Partition進行排序,而後按照key進行排序。這樣,通過排序後,數據以分區爲單位彙集在一塊兒,且同一分區內全部數據按照key有序。
步驟2:按照分區編號由小到大依次將每一個分區中的數據寫入任務工做目錄下的臨時文件output/spillN.out(N表示當前溢寫次數)中。若是用戶設置了Combiner,則寫入文件以前,對每一個分區中的數據進行一次彙集操做。
步驟3:將分區數據的元信息寫到內存索引數據結構SpillRecord中,其中每一個分區的元信息包括在臨時文件中的偏移量、壓縮前數據大小和壓縮後數據大小。若是當前內存索引大小超過1MB,則將內存索引寫到文件output/spillN.out.index中。