本文系原創,如有轉載須要,請註明出處。https://www.cnblogs.com/bigdata-stone/java
MapReduce是面向大數據並行處理的計算模型、框架和平臺。算法
在 MapReduce 程序的開發過程當中,每每須要用到 FileInputFormat與 TextInputFormat,咱們會發現 TextInputFormat 這個類繼承自FileInputFormat , FileInputFormat 這 個 類 繼 承 自 InputFormat ,InputFormat 這個類會將文件 file 按照邏輯進行劃分,劃分紅的每個split 切片將會被分配給一個 Mapper 任務,文件先被切分紅 split 塊,然後每個 split 切片對應一個 Mapper 任務 。緩存
input File 經過 split 被邏輯切分爲多個 split 文件,經過 Record按行讀取內容給 map(用戶本身實現的)進行處理,數據被 map 處理結束以後交給 OutputCollector 收集器,對其結果 key 進行分區(默認使用 hash 分區),而後寫入 buffer,每一個 map task 都有一個內存緩衝區,存儲着 map 的輸出結果,當緩衝區快滿的時候須要將緩衝區的數據以一個臨時文件的方式存放到磁盤,當整個 map task 結束後再對磁盤中這個 maptask 產生的全部臨時文件作合併,生成最終的正式輸出文件,而後等待 reduce task 來拉數據。 Map 端的輸入的(k,v)分別是該行的起始偏移量,以及每一行的數據內容,map 端的輸出(k,v)能夠根據需求進行自定義,可是若是輸出的是 javabean 對象,須要對javabean 繼承 writable 。網絡
分區函數partitioner 的做用是將 mapper輸出的 key/value經過給定的分區函數來拆分爲分片(shard),每一個 reducer 對應一個分片 默認狀況下, partitioner 先計算 key 的散列值(一般爲 md5值)。而後通reducer 個數執行取模運算: key.hashCode%(reducer 個數)。這種方式不只可以隨機地將整個key空間平均分發給每一個reducer,同時也能確保不一樣mapper產生的相同key能被分發到同一個reducer。也能夠自定義分區去繼承 partition<key,value>把不一樣的結果寫入不一樣的文件中分區 Partitioner 主要做用在於如下兩點 (1)根據業務須要,產生多個輸出文件;(2)多個 reduce 任務併發運行,提升總體 job 的運行效率 map 端的 combine 組件。併發
每個 map 均可能會產生大量的本地輸出, Combiner 的做用就是對 map 端的輸出先作一次合併,以減小在 map 和 reduce 節點之間的數據傳輸量,以提升網絡 IO 性能,是 MapReduce 的一種優化手段之一combiner 是 MR 程序中 Mapper 和 Reducer 以外的一種組件combiner 組件的父類就是 Reducercombiner 和 reducer 的區別在於運行的位置:combiner 是在每個 maptask 所在的節點運行reducer 是接收全局全部 Mapper 的輸出結果;combiner 的意義就是對每個 maptask 的輸出進行局部彙總,以減少網絡傳輸量具體實現步驟:app
1)自定義一個 combiner 繼承 Reducer,重寫 reduce 方法框架
2)中設置: job.setCombinerClass(CustomCombiner.class)combiner 可以應用的前提是不能影響最終的業務邏輯,並且,combine輸出 kv 應該跟 reducer 的輸入 kv 類型要對應起來 分佈式
Combiner 使用須要注意的是:ide
1.有不少人認爲這個 combiner 和 map 輸出的數據合併是一個過程,其實否則, map 輸出的數據合併只會產生在有數據 spill 出的時候,即進行 merge 操做。函數
2.與 mapper 與 reducer 不一樣的是, combiner 沒有默認的實現,須要顯式的設置在 conf 中才有做用。
3.並非全部的 job 都適用 combiner,只有操做知足結合律的纔可設置 combiner。 combine 操做相似於: opt(opt(1, 2, 3), opt(4, 5,6))。若是 opt 爲求和、求最大值的話,可使用,可是若是是求中值的話,不適用。
4.通常來講, combiner 和 reducer 它們倆進行一樣的操做。
shuffle 的過程是:Map 產生輸出開始到 Reduc 取得數據做爲輸入以前的過程稱做 shuffle.1).Collect 階段:將 MapTask 的結果輸出到默認大小爲100M 的環形緩衝區,保存的是 key/value, Partition 分區信息等。2).Spill 階段:當內存中的數據量達到必定的閥值的時候,就會將數據寫入本地磁盤,在將數據寫入磁盤以前須要對數據進行一次排序的操做,若是配置了 combiner,還會將有相同分區號和 key 的數據進行排序。3).Merge 段把全部溢出的臨時文件進行一次合併操做,以確保一個MapTask 最終只產生一箇中間數據文件。4).Copy 階段: ReduceTask 啓動 Fetcher 線程到已經完成MapTask 的節點上覆制一份屬於本身的數據,這些數據默認會存在內存的緩衝區中,當內存的緩衝區達到必定的閥值的時候,就會將數據寫到磁盤之上。5).Merge 階段:在 ReduceTask 遠程複製數據的同時,會在後臺開啓兩個線程對內存到本地的數據文件進行合併操做。6).Sort 階段:在對數據進行合併的同時,會進行排序操做,因爲 MapTask 階段已經對數據進行了局部的排序,ReduceTask 只需保證 Copy 的數據的最終總體有效性便可。Shuffle 中的緩衝區大小會影響到 mapreduce 程序的執行效率,原則上說,緩衝區越大,磁盤 io 的次數越少,執行速度就越快緩衝區的大小能夠經過參數調整, 參數:io.sort.mb 默認 100M
reducer 將已經分好組的數據做爲輸入,並依次爲每一個鍵對應分組執行 reduce 函數。 reduce 函數的輸入是鍵以及包含與該鍵對應的全部值的迭代器。reduce 端的輸入是 map 端的輸出,它的輸出的(k,v)根據需求進行自定義reducetask 並行度一樣影響整個 job 的執行併發度和執行效率,與maptask的併發數由切片數決定不一樣, Reducetask 數量的決定是能夠直接手動設置:job.setNumReduceTasks(4);若是數據分佈不均勻,就有可能在 reduce 階段產生數據傾斜。默認的 reduceTask 的是 1 。
OutputFormat 主要用於描述輸出數據的格式,它可以將用戶提供的 key/value對寫入特定格式的文件中。 Hadoop 自帶了不少 OutputFormat 的實現,它們與InputFormat 實現相對應,足夠知足咱們業務的須要。
Map端代碼
public class WCMapper extends Mapper<LongWritable,Text,Text,IntWritable> {
@Override
protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
Text keyout = new Text();
IntWritable valueout = new IntWritable();
String[] arr = value.toString().split(" ");
for(String s:arr){
keyout.set(s);
valueout.set(1);
context.write(keyout,valueout);
}
}
}
reduce端代碼:
public class WCReducer extends Reducer<Text,IntWritable,Text,IntWritable>{
@Override
protected void reduce(Text key, Iterable<IntWritable> values, Context context) throws IOException, InterruptedException {
int count = 0 ;
for(IntWritable iw:values){
count=iw.get()+count;
}
context.write(key,new IntWritable(count));
}
}
app端代碼:
public class WCApp {
public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException {
Configuration conf= new Configuration();
// conf.set("fs.defaultFS","file:///");
Job job = Job.getInstance(conf);
job.setJobName("WCApp");
job.setJarByClass(WCApp.class);
FileInputFormat.addInputPath(job,new Path(args[0]));
FileOutputFormat.setOutputPath(job,new Path(args[1]));
job.setInputFormatClass(TextInputFormat.class);
job.setMapperClass(WCMapper.class);
job.setReducerClass(WCReducer.class);
job.setNumReduceTasks(1);
job.setMapOutputKeyClass(Text.class);
job.setMapOutputValueClass(IntWritable.class);
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(IntWritable.class);
job.waitForCompletion(true);
}
}
程序運行時過程設計到的一個角色實體
1.1. Client:編寫mapreduce程序,配置做業,提交做業的客戶端 ;
1.2. ResourceManager:集羣中的資源分配管理 ;
1.3. NodeManager:啓動和監管各自節點上的計算資源 ;
1.4. ApplicationMaster:每一個程序對應一個AM,負責程序的任務調度,自己也是運行在NM的Container中 ;
1.5. HDFS:分佈式文件系統,保存做業的數據、配置信息等等。
客戶端提交Job
2.1. 客戶端編寫好Job後,調用Job實例的Submit()或者waitForCompletion()方法提交做業;
2.2. 客戶端向ResourceManager請求分配一個Application ID,客戶端會對程序的輸出、輸入路徑進行檢查,若是沒有問題,進行做業輸入分片的計算。
Job提交到ResourceManager
3.1. 將做業運行所須要的資源拷貝到HDFS中(jar包、配置文件和計算出來的輸入分片信息等);
3.2. 調用ResourceManager的submitApplication方法將做業提交到ResourceManager。
給做業分配ApplicationMaster
4.1. ResourceManager收到submitApplication方法的調用以後會命令一個NodeManager啓動一個Container ;
4.2. 在該NodeManager的Container上啓動管理該做業的ApplicationMaster進程。
ApplicationMaster初始化做業
5.1. ApplicationMaster對做業進行初始化操做;
5.2. ApplicationMaster從HDFS中得到輸入分片信息(map、reduce任務數)
任務分配
6.1. ApplicationMaster爲其每一個map和reduce任務向RM請求計算資源;
6.2. map任務優先於reduce任,map數據優先考慮本地化的數據。任務執行,在 Container 上啓動任務(經過YarnChild進程來運行),執行map/reduce任務。
輸入分片(input split)
每一個輸入分片會讓一個map任務來處理,默認狀況下,以HDFS的一個塊的大小(默認爲128M,能夠設置)爲一個分片。map輸出的結果會暫且放在一個環形內存緩衝區中(mapreduce.task.io.sort.mb=100M
),當該緩衝區快要溢出時(默認mapreduce.map.sort.spill.percent=0.8
),會在本地文件系統中建立一個溢出文件,將該緩衝區中的數據寫入這個文件;
map階段:由咱們本身編寫,最後調用 context.write(…);
partition分區階段
3.1. 在map中調用 context.write(k2,v2)方法輸出,該方法會馬上調用 Partitioner類對數據進行分區,一個分區對應一個 reduce task。
3.2. 默認的分區實現類是 HashPartitioner ,根據k2的哈希值 % numReduceTasks
,可能出現「數據傾斜」現象。
3.3. 能夠自定義 partition ,調用 job.setPartitioner(…)本身定義分區函數。
combiner合併階段:將屬於同一個reduce處理的輸出結果進行合併操做
4.1. 是可選的;
4.2. 目的有三個:1.減小Key-Value對;2.減小網絡傳輸;3.減小Reduce的處理。
shuffle階段:即Map和Reduce中間的這個過程
5.1. 首先 map 在作輸出時候會在內存裏開啓一個環形內存緩衝區,專門用來作輸出,同時map還會啓動一個守護線程;
5.2. 如緩衝區的內存達到了閾值的80%,守護線程就會把內容寫到磁盤上,這個過程叫spill,另外的20%內存能夠繼續寫入要寫進磁盤的數據;
5.3. 寫入磁盤和寫入內存操做是互不干擾的,若是緩存區被撐滿了,那麼map就會阻塞寫入內存的操做,讓寫入磁盤操做完成後再繼續執行寫入內存操做;
5.4. 寫入磁盤時會有個排序操做,若是定義了combiner函數,那麼排序前還會執行combiner操做;
5.5. 每次spill操做也就是寫入磁盤操做時候就會寫一個溢出文件,也就是說在作map輸出有幾回spill就會產生多少個溢出文件,等map輸出所有作完後,map會合並這些輸出文件,這個過程裏還會有一個Partitioner操做(如上)
5.6. 最後 reduce 就是合併map輸出文件,Partitioner會找到對應的map輸出文件,而後進行復制操做,複製操做時reduce會開啓幾個複製線程,這些線程默認個數是5個(可修改),這個複製過程和map寫入磁盤過程相似,也有閾值和內存大小,閾值同樣能夠在配置文件裏配置,而內存大小是直接使用reduce的tasktracker的內存大小,複製時候reduce還會進行排序操做和合並文件操做,這些操做完了就會進行reduce計算了。
reduce階段:由咱們本身編寫,最終結果存儲在hdfs上的。