接下來咱們來分析Task的兩個子類,MapTask和ReduceTask。MapTask的相關類圖以下:
MapTask其實不是很複雜,複雜的是支持MapTask工做的一些輔助類。MapTask的成員變量少,只有split和splitClass。咱們知道,Map的輸入是split,是原始數據的一個切分,這個切分由org.apache.hadoop.mapred.InputSplit的子類具體描述(前面咱們是經過org.apache.hadoop.mapreduce.InputSplit介紹了InputSplit,它們對外的API是同樣的)。splitClass是InputSplit子類的類名,經過它,咱們能夠利用Java的反射機制,建立出InputSplit子類。而split是一個BytesWritable,它是InputSplit子類串行化之後的結果,再經過InputSplit子類的readFields方法,咱們能夠回覆出對應的InputSplit對象。
MapTask最重要的方法是run。run方法至關簡單,配置完系統的TaskReporter後,就根據狀況執行runJobCleanupTask,runJobSetupTask,runTaskCleanupTask或執行Mapper。因爲MapReduce如今有兩套API,MapTask須要支持這兩套API,使得MapTask執行Mapper分爲runNewMapper和runOldMapper,run*Mapper後,MapTask會調用父類的done方法。
接下來咱們來分析runOldMapper,最開始部分是構造Mapper處理的InputSplit,更新Task的配置,而後就開始建立Mapper的RecordReader,rawIn是原始輸入,而後分正常(使用TrackedRecordReader,後面討論)和跳過部分記錄(使用SkippingRecordReader,後面討論)兩種狀況,構造對應的真正輸入in。
跳過部分記錄是Map的一種出錯恢復策略,咱們知道,MapReduce處理的數據集合很是大,而有些任務對一部分出錯的數據不進行處理,對結果的影響很小(如大數據集合的一些統計量),那麼,一小部分的數據出錯致使已處理的大量結果無效,是得不償失的,跳過這部分記錄,成了Mapper的一種選擇。
Mapper的輸出,是經過MapOutputCollector進行的,也分兩種狀況,若是沒有Reducer,那麼,用DirectMapOutputCollector(後面討論),不然,用MapOutputBuffer(後面討論)。
構造完Mapper的輸入輸出,經過構造配置文件中配置的MapRunnable,就能夠執行Mapper了。目前系統有兩個MapRunnable:MapRunner和MultithreadedMapRunner,以下圖。
原有API在這塊的處理上和新API有很大的不同。接口MapRunnable是原有API中Mapper的執行器,run方法就是用於執行用戶的Mapper。MapRunner是單線程執行器,至關簡單,首先,當MapTask調用:
web
MapRunnable<INKEY,INVALUE,OUTKEY,OUTVALUE>runner =apache
ReflectionUtils.newInstance(job.getMapRunnerClass(),job);微信
MapRunner的configure會在newInstance的最後被調用,configure執行的過程當中,對應的Mapper會經過反射機制構造出來。
MapRunner的run方法,會先建立對應的key,value對象,而後,對InputSplit的每一對<key,value>,調用Mapper的map方法,循環結束後,Mapper對應的清理方法會被調用。咱們須要注意,key,value對象在run方法中是被重複使用的,就是說,每次傳入Mapper的map方法的key,value都是同一個對象,只不過是裏面的內容變了,對象並無變。若是你須要保留key,value的內容,須要實現clone機制,克隆出對象的一個新備份。
相對於新API的多線程執行器,老API的MultithreadedMapRunner就比較複雜了,整體來講,就是經過阻塞隊列配合Java的多線程執行器,將<key,value>分發到多個線程中去處理。須要注意的是,在這個過程當中,這些線程共享一個Mapper實例,若是Mapper有共享的資源,須要有必定的保護機制。
runNewMapper用於執行新版本的Mapper,比runOldMapper稍微複雜,咱們就再也不討論了。多線程
更多精彩內容請關注:http://bbs.superwu.cn app
關注超人學院微信二維碼:oop