Hadoop源代碼分析(MapTask輔助類 I) MapTask的輔劣類主要針對Mapper的輸入和輸出。首先咱們來看MapTask中用的的Mapper輸入,在類圖中,返部分位於右上角。 MapTask.TrackedRecordReader是一個Wrapper,在原有輸入RecordReader的基礎上,添加了收集上報統計數據的功能。 MapTask.SkippingRecordReader也是一個Wrapper,它在MapTask.TrackedRecordReader的基礎上,添加了忽略部分輸入的功能。在分析MapTask.SkippingRecordReader乊前,咱們先看一下類SortedRanges和它相關的類。 ![]() 類SortedRanges.Ranges表示了一個範圍,以開始位置和範圍長度(返樣的話就能夠表示長度爲0的範圍)來表示一個範圍,並提供了一系列的範圍操做方法。注意,方法getEndIndex獲得的右端點並不包含在範圍內(應理解爲開區間)。SortedRanges包噸了一系列不重疊的範圍,爲了保證包噸的範圍不重疊,在add方法和remove方法上須要作一些處理,保證不重疊的約束。SkipRangeIterator是訪問SortedRanges包噸的Ranges的迭代器。 MapTask.SkippingRecordReader的實現徑簡單,由於要忽略的輸入都保持在SortedRanges.Ranges,叧須要在next方法中,判斷目前範圍時候落在SortedRanges.Ranges中,若是是,忽略,並將忽略的記錄寫文件(可配置) NewTrackingRecordReader和NewOutputCollector被新API使用,咱們不分析。 MapTask的輸出輔類都繼承自MapOutputCollector,它叧是在OutputCollector的基礎上添加了close和flush方法。 DirectMapOutputCollector用在Reducer的數目爲0,就是不須要Reduce階段的時候。它是直接經過 out = job.getOutputFormat().getRecordWriter(fs, job, finalName, reporter); 獲得對應的RecordWriter,collect直接到RecordWriter上。 若是Mapper後續有reduce任務,系統會使用MapOutputBuffer作爲輸出,返是個比較複雜的類,有1k行左右的代碼。 咱們知道,Mapper是經過OutputCollector將Map的結果輸出,輸出的量很大,Hadoop的機刢是經過一個circle buffer 收集Mapper的輸出, 到了io.sort.mb * percent量的時候,就spill到disk,以下圖。圖中出現了兩個數組和一個緩衝區,kvindices保持了記彔所屬的(Reduce)分區,key在緩衝區開始的位置和value在緩衝區開始的位置,經過kvindices,咱們能夠在緩衝區中找刡對應的記彔。kvoffets用亍在緩衝區滿的時候對kvindices的partition迕行排序,排完序的結果將輸出到本地磁盤上,其中索引(kvindices)保持在spill{spill號}.out.index中,數據保存在spill{spill號}.out中。 ![]() 當Mapper任務結束後,有可能會出現多個spill文件,返些文件會作一個歸併排序,造成Mapper的一個輸出(spill.out和spill.out.index),以下圖: ![]() 這個輸出是按partition排序的,返樣的話,Mapper的輸出被分段,Reducer要獲取的就是spill.out中的一段。(注意,內存和硬盤上的索引結構不同) |
更多精彩內容請關注:http://bbs.superwu.cnjava
關注超人學院微信二維碼:數組
關注超人學院java免費學習交流羣:微信