咱們主要來學習MapTask的內部實現。
總體執行流程
如上圖示,MapTask的整個處理流程分五個階段:
●read階段:經過RecordReader從InputSplit分片中將數據解析成一個個key/value。
●map階段:將由RecordReader解析出的key/value交給map()方法處理,並生成一個個新的key/value。
●collect階段:將map()中新生成key/value由OutpCollector.collect()寫入內存中的環形數據緩衝區。
●spill階段:當環形緩衝區達到必定閥值後,會將數據寫到本地磁盤上,生成一個spill文件。在寫文件以前,會先將數據進行一次本地排序,必要的時候(按配置要求)還會對數據進行壓縮。
●combine階段:當全部數據處理完後,將全部的臨時的spill文件進行一次合併,最終之生成一個數據文件。
接下來咱們會對該流程中最重要的collect、spill和combine三個階段進行更深刻的學習。
Collect過程
前階段的map中新生成key/value對後,會調用OutpCollector.collect(key,value),在該方法內部,先調用Partitioner.getPartition()獲取該記錄的分區號,而後將<key,value,partition>傳給MapOutputBuffer.collect()做進一步的處理。
MapOutputBuffer內部使用了一個內部的環形的緩衝區來暫時保存用戶的輸出數據,當緩衝區使用率達到必定閥值後,由SpillThread線程將緩衝區中的數據spill到本地磁盤上,當全部的數據處理完畢後,對全部的文件進行合併,最終只生成一個文件。該數據緩衝區直接用想到MapTask的寫效率。
環形緩衝區使得collect階段和spill階段能夠並行處理。
MapOutputBuffer內部採用了兩級索引結構,涉及三個環形的內存緩衝區,分別是kvoffsets、kvindices和kvbuffer,這個環形緩衝區的大小能夠經過io.sot.mb來設置,默認大小是100MB,圖示以下:
kvoffsets即偏移量索引數組,用於保存key/value在kvindices中的偏移量。一個key/value對在kvoffsets數組中佔一個int的大小,而在kvindices數組中站3個int的大小(如上圖示,包括分區號partition,key的起始位置和value的起始位置)。
當kvoffsets的使用率超過io.sort.spill.percent(默認爲80%)後,便會觸發SpillTread線程將數據spill到磁盤上。
kvindices即文職索引數組,用於保存實際的key/value在數據緩衝區kvbuffer中的起始位置。
kvbuffer即數據局緩衝區,用於實際保存key/value,默認狀況下可以使用io.sort.mb的95%,當該緩衝區使用率使用率超過io.sort.spill.percent後,便會觸發SpillTread線程將數據spill到磁盤上。
Spill過程
在collect階段的執行過程當中,當內存中的環形數據緩衝區中的數據達到必定發以後,便會觸發一次Spill操做,將部分數據spill到本地磁盤上。SpillThread線程其實是kvbuffer緩衝區的消費者,主要代碼以下:算法
spillLock.lock(); 數據結構
while(true){ 學習
spillDone.sinnal(); spa
while(kvstart == kvend){ 線程
spillReady.await(); 指針
} 排序
spillDone.unlock(); 索引
//排序並將緩衝區kvbuffer中的數據spill到本地磁盤上 圖片
sortAndSpill();
spillLock.lock;
//重置各個指針,爲下一下spill作準備
if(bufend < bufindex && bufindex < bufstart){
bufvoid = kvbuffer.length;
}
vstart = vend;
bufstart = bufend;
}
spillLock.unlock();
sortAndSpill()方法中的內部流程是這樣的:
第一步,使用用快速排序算法對kvbuffer[bufstart,bufend)中的數據排序,先對partition分區號排序,而後再按照key排序,通過這兩輪排序後,數據就會以分區爲單位彙集在一塊兒,且同一分區內的數據按key有序;
第二步,按分區大小由小到大依次將每一個分區中的數據寫入任務的工做目錄下的臨時文件中,若是用戶設置了Combiner,則寫入文件以前,會對每一個分區中的數據作一次彙集操做,好比<key1,val1>和<key1,val2>合併成<key1,<val1,val2>>;
第三步,將分區數據的元信息寫到內存索引數據結構SpillRecord中。分區的元數據信息包括臨時文件中的偏移量、壓縮前數據的大小和壓縮後數據的大小。
Combine過程
當任務的全部數據都處理完後,MapTask會將該任務全部的臨時文件年合併成一個大文件,同時生成相應的索引文件。在合併過程當中,是以分區文單位進行合併的。
讓每一個Task最終生成一個文件,能夠避免同時打開大量文件和對小文件產生隨機讀帶來的開銷。