1.準備待處理文件html
2.job提交前生成一個處理規劃apache
3.將切片信息job.split,配置信息job.xml和咱們本身寫的jar包交給yarn網絡
4.yarn根據切片規劃計算出MapTask的數量app
(以一個MapTask爲例)oop
5.Maptask調用inputFormat生成RecordReader,將本身處理的切片文件內容打散成K,V值性能
6.MapTask將打散好的K,V值交給Mapper,Mapper通過一系列的處理將KV值寫出大數據
7.寫出的KV值被outputCollector收集起來,寫入一個在內存的環形緩衝區ui
8,9.當環形緩衝區被寫入的空間等於80%時,會觸發溢寫.此時數據是在內存中,因此在溢寫以前,會對數據進行排序,是一個二次排序的快排(先根據分區排序再根據key排序).而後將數據有序的寫入到磁盤上.spa
緩衝區爲何是環形的?這樣作是爲了能夠在緩衝區的任何地方進行數據的寫入.3d
當第一次溢寫時,數據會從餘下的20%空間中的中間位置,再分左右繼續寫入,也就是從第一次是從上往下寫數據變成了從下往上寫數據
10,11.當屢次溢寫產生多個有序的文件後,會觸發歸併排序,將多個有序的文件合併成一個有序的大文件.當文件數>=10個時,會觸發歸併排序,取文件的一小部分放入內存的緩衝區,再生成一個小文件部分大小x文件數的緩衝區,逐個比較放入大文件緩衝區,依次比較下去,再將大文件緩衝寫入到磁盤,歸併結束後將大文件放在文件列表的末尾,繼續重複此動做,直到合併成一個大文件.這次歸併排序的時間複雜度要求較低.
12.當全部的MapTask執行完任務後,啓動相應數量的ReduceTask,並告知每個ReduceTask應該處理的數據分區
13.ReduceTask將指定分區的文件下載到本地,若有多個分區文件的話,ReduceTask上將會有多個大文件,再一次歸併排序,造成一個大文件.
14.15,若是有分組要求的話,ReduceTask會將數據一組一組的交給Reduce,處理完後準備將數據寫出
16.Reduce調用output生成RecordWrite將數據寫入到指定路徑中
上圖中,數據從Mapper寫出來以後到數據進入到Reduce以前,這一階段就叫作Shuffle
Shuffle時,會有三次排序,第一次是數據從環形緩衝區寫入到磁盤時,會有一次快排,第二次是在MapTask中,將多個分區且內部有序的小文件歸併成一個分區且內部有序的大文件,第三次是在ReduceTask中,從多個MapTask中獲取指定分區的大文件,再進行一個歸併排序,合併成一個大文件.
以WordCount爲例,試想一下,在第一次從環形緩衝區寫入到磁盤時,排好序的數據爲(w1,1),(w1,1),(w1,1),(w2,1),(w2,1),(w3,1),這樣的數據會增長網絡傳輸量,因此在這裏可使用Combiner進行數據合併.最後造成的數據是(w1,3),(w2,2),(w3,1),後續會詳細講解~
將Mapper想象成一個水池,數據是池裏的水.默認分一個區,只有一根水管.若是隻有一個ReduceTask,則水會所有順着惟一的水管流入到ReduceTask中.若是此時有3根水管,則水會被分紅三股水流流入到3個ReduceTask中,並且哪些水進哪一個水管,並不受咱們主觀控制,也就是數據處理速度加快了~~Partition分區就決定了分幾根水管.試想一下,若是有4根水管,末端只有3個ReduceTask,那麼有一股水流會丟失.也就是形成數據丟失,程序會報錯.若是隻有2根水管,那麼則有一個ReduceTask無事可作,最後生成的是一個空文件,浪費資源
因此,通常來講,有幾個ReduceTask就要分幾個區,至於partition和ReduceTask設置爲幾,要看集羣性能,數據集,業務,經驗等等~
對應流程圖上,也就是從環形緩衝區寫入到磁盤時,會分區
collector出現了,除了將key,value收集到緩衝區中以外,還收集了partition分區
key.hashCode() & Integer.MAX_VALUE,保證取餘前的數爲正數
好比,numReduceTasks = 3, 一個數n對3取餘,結果會有0,1,2三種可能,也就是分三個區,再一次印證了要 reduceTask number = partition number
默認分區是根據key的hashcode和reduceTasks的個數取模獲得的,用戶沒法控制哪一個key存儲到哪一個分區上
案例演練
以12小章的統計流量案例爲例,大數據-Hadoop生態(12)-Hadoop序列化和源碼追蹤
將手機號13六、13七、13八、139開頭都分別放到一個獨立的4個文件中,其餘開頭的放到一個文件中
自定義Partition類
package com.atguigu.partitioner; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Partitioner; public class MyPartitioner extends Partitioner<Text, FlowBean> { public int getPartition(Text text, FlowBean flowBean, int numPartitions) { //1. 截取手機前三位 String start = text.toString().substring(0, 3); //2. 按照手機號前三位返回分區號 switch (start) { case "136": return 0; case "137": return 1; case "138": return 2; case "139": return 3; default: return 4; } } }
Driver類的main()中增長如下代碼
job.setPartitionerClass(MyPartitioner.class); job.setNumReduceTasks(5);
輸出結果,5個文件
若是job.setNumReduceTasks(10),會生成10個文件,其中5個是空文件
若是job.setNumReduceTasks(2),程序會直接執行失敗報異常
若是job.setNumReduceTasks(1),程序會運行成功,由於若是numReduceTasks=1時,根本就不會執行分區的過程
若是是如下狀況,也會執行失敗.MapReduce會認爲你分了41個區,因此分區號必須從0開始,逐一累加.
job.setNumReduceTasks(5)
switch (start) { case "136": return 0; case "137": return 1; case "138": return 2; case "139": return 3; default: return 40; }