1. 使用job.setInputFormatClass(TextInputFormat)作爲輸入格式。注意輸出應該符合自定義Map中定義的輸出。
2. 進入Mapper的map()方法,生成一個List。
3. 在map階段的最後,會先調用job.setPartitionerClass()對這個List進行分區,每一個分區映射到一個reducer。
4. 每一個分區內又調用job.setSortComparatorClass()設置的key比較函數類排序(若是沒有經過job.setSortComparatorClass()設置key比較函數類,則使用key的實現的compareTo方法)。能夠看到,這是一個二次排序。
5. 若是設置了Combiner(job.setCombinerClass)對output進行一次合併,從而減小對reduce的輸出流量和預處理reduce的input數據。但不必定會執行,對於Combiner執行時機參考Reference[4]。
【說明】以上步驟省略了collect階段、cache階段等細節,更詳細步驟參考Reference[3]
1. shuffle階段
reducer開始fetch全部映射到這個reducer的map輸出。
2.1 sort階段
再次調用job.setSortComparatorClass()設置的key比較函數類對全部數據對排序(由於一個reducer接受多個mappers,須要從新排序)。
2.2 secondary sort階段
而後開始構造一個key對應的value迭代器。這時就要用到分組,使用jobjob.setGroupingComparatorClass()設置的分組函數類。只要這個比較器比較的兩個key相同,他們就屬於同一個組,它們的value放在一個value迭代器,而這個迭代器的key使用屬於同一個組的全部key的第一個key。
3.reduce階段
最後就是進入Reducer的reduce()方法,reduce()方法的輸入是全部的(key和它的value迭代器)。一樣注意輸入與輸出的類型必須與自定義的Reducer中聲明的一致。
【注意】reducers的輸出是無序的。