每一個任務最重要的一個過程就Shuffle過程,這個過程會把全部的數據進行洗牌整理,排序,若是數據量大,將會很是的耗時。如圖1.1所示,是一個從map端輸出數據到合併成一個文件的過程。java
圖1.1 Map文件輸出算法
從圖中能夠看到Map端輸出的數據會被提交到一個內存緩衝區當中,當內存滿了後,會被Spill到HDFS中,當Map任務結束後,會把全部的臨時文件合併到一個最終的文件中,做爲一個最終的Map輸出文件。這個過程涉及到兩個過程都有排序操做,第一個是從KVBuffer到文件Spill中,默認經過快速排序算法進行排序。第二個是全部臨時文件合併時,此時會有一次多路歸併排序的過程,使用歸併排序算法。數組
Mapper任務執行完後的數據會經過MapOutputBuffer提交到一個kvbuffer緩衝區中,這個緩衝區的數據是專門存儲map端輸出數據的,它是一個環形緩衝區,大小可經過配置mapreduce.task.io.sort.mb來配置這個緩衝區的大小,默認是100MB。kvbuffer本質上是一個byte數組,模擬的環形數據結構,環形緩衝區適用於寫入和讀取的內容保持在順序的狀況下,要否則就不能均勻的向前推動。緩存
雖然在Hadoop中,數據是要排序的,可是在Hadoop中有個很是良好的策略,就是不移動數據自己,而是爲每一個數據創建一個元數據kvmeta,在排序的時候,直接對元數據進行排序,而後順序讀寫元數據便可。數據結構
kvbuffer邏輯結構如圖1.2所示。app
圖1.2 kvbuffer結構oop
圖中長條矩形表示做爲字節數組的緩衝區kvbuffer,其七點處的下標爲0,終點處的下標爲kvbuffer.length。注意,這是按環形緩衝區使用的,因此往裏寫入內容時一旦超過終點就又「翻折」到緩衝區的起點,反之亦然。this
分隔點的位置能夠在緩衝區的任何位置上,分隔點的位置肯定後,數據(KV對)都放在分隔點的右側,而且向右伸展,而元數據則放在它的左側,而且向左擴展。spa
寫入到緩衝區的每一個KV對都有一組配套的元數據指明其位置和長度。KV對長度是可變的,但元數據的長度是固定的,都是16字節,即4個整數。這樣,全部的元數據合併在一塊兒就是一個元數據塊,至關於一個(倒立的)數組,能夠經過KV對的元數據,再按照其元數據的指引就可找到這個KV對的K和V,還能夠知道這個KV對屬於哪一個Partition。線程
其中元數據的數據主要是構成以下:
//val offset in acct ,第一個整數是V值起點字節的下標。
int VALSTART=0
//key offset in acct,第二個正式是K值起點字節的下標。
int KEYSTART=0
//partition offset in acct,第三個整數是KV對所屬的Partition
int PARTITION=2
//length of value,第四個整數是V值的長度
int VALLEN=3
如圖1.3所示,在有些狀況,數據緩衝區在底部,自底向上伸展,元數據則在頂部,自頂向下伸展;兩者相互靠攏。
圖1.3 kvbuffer變量
其中上圖的參數,kvstart指向元數據塊中的第一份元數據,kvend指向元數據塊的最後一份數據。kvindex指向下一份元數據指向的位置。buffer index指向下一份(KV數據對)的寫入位置,buffer start是KV(數據對)開始的位置。kvindex,kvstart,kvend是整數類型的數組下標。
當環形緩衝區kvbuffer滿了或者達到必定的閾值後,須要把緩衝區的數據寫入臨時文件中,這個過程叫sortAndSpill。在源碼中能夠看到有個專門的Spill線程來負責這個工做,當有須要Spill操做的時候,線程會被喚醒,而後執行Spill,在Spill以前,會有一個sort階段,先把kvbuffer中的數據按照partition值和key兩個關鍵字升序排序,移動的只是索引數據,排序結果是kvmeta中數據按照partition爲單位彙集在一塊兒,同一partition內的按照key有序。
詳細的sortAndSpill代碼以下:
private void sortAndSpill() throws IOException, ClassNotFoundException, InterruptedException { //approximate the length of the output file to be the length of the //buffer + header lengths for the partitions long size = (bufend >= bufstart ? bufend - bufstart : (bufvoid - bufend) + bufstart) + partitions * APPROX_HEADER_LENGTH; FSDataOutputStream out = null; try { // create spill file final SpillRecord spillRec = new SpillRecord(partitions); //每一個partiiton定義一個索引 final Path filename = mapOutputFile.getSpillFileForWrite(numSpills, size); out = rfs.create(filename); final int endPosition = (kvend > kvstart) ? kvend : kvoffsets.length + kvend; // 使用快速排序算法 sorter.sort(MapOutputBuffer.this, kvstart, endPosition, reporter); int spindex = kvstart; // Spill文件的索引 IndexRecord rec = new IndexRecord(); InMemValBytes value = new InMemValBytes(); for (int i = 0; i < partitions; ++i) { // 循環訪問各個分區 IFile.Writer<K, V> writer = null; try { long segmentStart = out.getPos(); writer = new Writer<K, V>(job, out, keyClass, valClass, codec, spilledRecordsCounter); if (combinerRunner == null) { //沒有定義combiner // spill directly DataInputBuffer key = new DataInputBuffer(); while (spindex < endPosition && kvindices[kvoffsets[spindex % kvoffsets.length] + PARTITION] == i) { final int kvoff = kvoffsets[spindex % kvoffsets.length]; getVBytesForOffset(kvoff, value); key.reset(kvbuffer, kvindices[kvoff + KEYSTART], (kvindices[kvoff + VALSTART] - kvindices[kvoff + KEYSTART])); writer.append(key, value); ++spindex; } } else { //定義了combiner,使用combiner合併數據 int spstart = spindex; while (spindex < endPosition && kvindices[kvoffsets[spindex % kvoffsets.length] + PARTITION] == i) { ++spindex; } // Note: we would like to avoid the combiner if we've fewer // than some threshold of records for a partition if (spstart != spindex) { combineCollector.setWriter(writer); RawKeyValueIterator kvIter = new MRResultIterator(spstart, spindex); combinerRunner.combine(kvIter, combineCollector); } } // close the writer writer.close(); // record offsets rec.startOffset = segmentStart; //分區鍵值起始位置 rec.rawLength = writer.getRawLength();//數據原始長度 rec.partLength = writer.getCompressedLength();//數據壓縮後的長度 spillRec.putIndex(rec, i); writer = null; } finally { if (null != writer) writer.close(); } } // 處理spill文件的索引,若是內存索引大小超過限制,則寫入到文件中。 if (totalIndexCacheMemory >= INDEX_CACHE_MEMORY_LIMIT) { // create spill index file Path indexFilename = mapOutputFile.getSpillIndexFileForWrite(numSpills, partitions * MAP_OUTPUT_INDEX_RECORD_LENGTH); spillRec.writeToFile(indexFilename, job); } else { indexCacheList.add(spillRec); totalIndexCacheMemory += spillRec.size() * MAP_OUTPUT_INDEX_RECORD_LENGTH; } LOG.info("Finished spill " + numSpills); ++numSpills; } finally { if (out != null) out.close(); } }
從代碼中能夠看到,看出再排序完成以後,循環訪問內存中的每一個分區,若是沒有定義combine的話就直接把這個分區的鍵值對spill寫出到磁盤。spill是mapreduce的中間結果,存儲在數據節點的本地磁盤上,存儲路徑由如下參數指定:
1.core-site.xml:
hadoop.tmp.dir// hadoop臨時文件夾目錄
2.mapred-site.xml:
mapreduce.cluster.local.dir =${hadoop.tmp.dir}/mapred/local;
//默認的中間文件存放路徑
在執行mapreduce任務的過程,咱們能夠經過這個路徑去查看spill文件的狀況。
當Map任務執行完後,可能產生了不少的Spill文件,這些文件須要合併到一個文件腫麼而後備份發給各個Reducer。若是kvbuffer緩衝區不爲空,就執行一次沖刷操做,確保全部的數據已寫入文件中,而後執行mergeParts()合併Spill文件。merge合併操做也會帶有排序操做,將單個有序的spill文件合併成最終的有序的文件。merge多路歸併排序也是經過spill文件的索引來操做的
圖1.4 就是map輸出到磁盤的過程,這些中間文件(fiel.out,file.out.inde)未來是要等着Reducer取走的,不過並非Reducer取走以後就刪除的,由於Reducer可能會運行失敗,在整個Job完成以後,ApplicationMaster通知Mapper能夠刪除了纔會將這些中間文件刪掉.向硬盤中寫數據的時機。
圖1.4 spill文件merge
在MapTask還未完成最終合併時,ReduceTask是沒有數據輸入的,即便此時ReduceTask進程已經建立,也只能睡眠等地啊有MapTask完成運行,從而能夠從其所在節點獲取其輸出數據。如前所述,一個MapTask最終數據輸出是一個合併好的Spill文件,能夠經過該節點的Web地址,即所謂的MapOutputServerAddress加以訪問。
ReduceTask運行在YarnChild啓動的Java虛擬上。在Reduce Shuffle階段,分爲兩個步驟,第一個copy,第二個Merge Sort。
(1).Copy階段
Reduce任務經過HTTP向各個Map任務拖取它所須要的數據。Map任務成功完成後,會通知ApplicationMaster狀態已經更新。因此,對於指定做業來講,ApplicationMaster能記錄Map輸出和NodeManager機器的映射關係。Reduce會按期向ApplicationMaster獲取Map的輸出位置,一旦拿到輸出位置,Reduce任務就會今後輸出對應的機器上上覆制輸出到本地,而不會等到全部的Map任務結束。
(2).Merge Sort
Copy過來的數據會先放入內存緩衝區中,若是內存緩衝區中能放得下此次數據的話就直接把數據寫到內存中,即內存到內存merge。Reduce要向每一個Map去拖取數據,在內存中每一個Map對應一塊數據,當內存緩存區中存儲的Map數據佔用空間達到必定程度的時候,開始啓動內存中merge,把內存中的數據merge輸出到磁盤上一個文件中,即內存到磁盤merge。
當屬於該reducer的map輸出所有拷貝完成,則會在reducer上生成多個文件(若是拖取的全部map數據總量都沒有內存緩衝區,則數據就只存在於內存中),這時開始執行合併操做,即磁盤到磁盤merge,Map的輸出數據已是有序的,Merge進行一次合併排序,所謂Reduce端的sort過程就是這個合併的過程。通常Reduce是一邊copy一邊sort,即copy和sort兩個階段是重疊而不是徹底分開的。最終Reduce shuffle過程會輸出一個總體有序的數據塊。
詳細的流程過程如圖1.5所示。
圖1.5 reduce shuffle過程圖