@[TOC]數組
首先,接到hdf文件輸入,在mapreduce中的map task開始以前,將文件按照指定的大小切割成若干個部分,每一部分稱爲一個split,默認是split的大小與block的大小相等,均爲128MB。split大小由minSize、maxSize、blocksize決定,以wordcount代碼爲例,如下是main()方法
進入waitForCompletion(true)
方法,進入submit()
方法
找到 return submitter .submitJobInternal(Job.this, cluster);
進入,找到 int maps = writeSplits(job, submitJobDir);
進入writeNewSplits()
方法
進入writeNewSplits()方法,能夠看出該方法首先獲取splits數組信息後,排序,將會優先處理大文件。最終返回mapper數量。這其中又分爲兩部分:肯定切片數量 和 寫入切片信息。肯定切片數量的任務交由FileInputFormat的getSplits(job)完成,寫入切片信息的任務交由JobSplitWriter.createSplitFiles(jobSubmitDir, conf, jobSubmitDir.getFileSystem(conf), array)方法,該方法會將切片信息和SplitMetaInfo都寫入HDFS中,return array.length;
返回的是map任務數,默認map的數量是: default_num = total_size / block_size;
實際的mapper數量就是輸入切片的數量,而切片的數量又由使用的輸入格式決定,默認爲TextInputFormat,該類爲FileInputFormat的子類。肯定切片數量的任務交由FileInputFormat的getSplits(job)完成。FileInputFormat繼承自抽象類InputFormat,該類定義了MapReduce做業的輸入規範,其中的抽象方法List<InputSplit> getSplits(JobContext context)定義瞭如何將輸入分割爲InputSplit,不一樣的輸入有不一樣的分隔邏輯,而分隔獲得的每一個InputSplit交由不一樣的mapper處理,所以該方法的返回值肯定了mapper的數量。緩存
每一個map task都有一個內存緩衝區, map的輸出結果先寫到內存中的環形緩衝區,緩衝區爲100M,不斷的向緩衝區力寫數據,當達到80M時,須要將緩衝區中的數據以一個臨時文件的方式存到磁盤,當整個map task結束後再對磁盤中這個map task所產生的全部臨時文件作合併,生成最終的輸出文件。最後,等待reduce task來拉取數據。固然,若是map task的結果不大,可以徹底存儲到內存緩衝區,且未達到內存緩衝區的閥值,那麼就不會有寫臨時文件到磁盤的操做,也不會有後面的合併。在寫入的過程當中會進行分區、排序、combine操做。
環形緩衝區:是使用指針機制把內存中的地址首尾相接造成一個存儲中間數據的緩存區域,默認100MB;80M閾值,20M緩衝區,是爲了解決寫入環形緩衝區數據的速度大於寫出到spill文件的速度是數據的不丟失;Spill文件:spill文件是環形緩衝區到達閾值後寫入到磁盤的單個文件.這些文件在map階段計算結束時,會合成分好區的一個merge文件供給給reduce任務抓取;spill文件太小的時候,就不會浪費io資源合併merge;默認狀況下3個如下spill文件不合並;對於在環形緩衝區中的數據,最終達不到80m可是數據已經計算完畢的狀況,map任務將會調用flush將緩衝區中的數據強行寫出spill文件。網絡
通過map類處理後,輸出到內存緩衝區(默認大小100M),超過必定大小後,文件溢寫到磁盤上,按照key分類
按照key合併成大文件,減小網絡開銷 app
看一下MapReduce自帶的分區器HashPartitioner
假設有聽個reduce任務,則分區的計算以下:ide
在對map結果進行分區以後,對於落在相同的分區中的鍵值對,要進行排序。函數
Shuffle過程是MapReduce的核心,描述着數據從map task輸出到reduce task輸入的這段過程。reducetask根據本身的分區號,去各個maptask分區機器上取相應的結果分區數據,reducetask會將這些文件再進行合併(歸併排序)。
全部相同key的數據聚集到一個partition
將相同的key value匯聚到一塊兒, 但不計算 fetch
reduce階段分三個步驟:
抓取,合併,排序
1 reduce 任務會建立並行的抓取線程(fetcher)負責從完成的map任務中獲取結果文件,是否完成是經過rpc心跳監聽,經過http協議抓取;默認是5個抓取線程,可調,爲了是總體並行,在map任務量大,分區多的時候,抓取線程調大;
2 抓取過來的數據會先保存在內存中,若是內存過大也溢出,不可見,不可調,可是單位是每一個merge文件,不會切分數據;每一個merge文件都會被封裝成一個segment的對象,這個對象控制着這個merge文件的讀取記錄操做,有兩種狀況出現:在內存中有merge數據 •在溢寫以後存到磁盤上的數據 •經過構造函數的區分,來分別建立對應的segment對象
3 這種segment對象會放到一個內存隊列中MergerQueue,對內存和磁盤上的數據分別進行合併,內存中的merge對應的segment直接合並,磁盤中的合併與一個叫作合併因子的factor有關(默認是10)
4 排序問題,MergerQueue繼承輪換排序的接口,每個segment 是排好序的,並且按照key的值大小邏輯(和真的大小不要緊);每個segment的第一個key都是邏輯最小,而全部的segment的排序是按照第一個key大小排序的,最小的在前面,這種邏輯總能保證第一個segment的第一個key值是全部key的邏輯最小文件合併以後,最終交給reduce函數計算的,是MergeQueue隊列,每次計算的提取數據邏輯都是提取第一個segment的第一個key和value數據,一旦segment被調用了提取key的方法,MergeQueue隊列將會總體從新按照最小key對segment排序,最終造成總體有序的計算結果;
partition 、Reduce、輸出文件數量相等
Reduce任務數量
在大數據量的狀況下,若是隻設置1個Reduce任務,其餘節點將被閒置,效率底下 因此將Reduce設置成一個較大的值(max:72).調節Reduce任務數量的方法 一個節點的Reduce任務數並不像Map任務數那樣受多個因素制約大數據
經過參數調節mapred.reduce.tasks(在配置文件中) 在代碼中調用job.setNumReduceTasks(int n)方法(在code中)