一、MapReduce中數據流動
(1)最簡單的過程: map - reduce
(2)定製了partitioner以將map的結果送往指定reducer的過程: map - partition - reduce
(3)增長了在本地先進性一次reduce(優化)過程: map - combin(本地reduce) - partition -reduce
二、Mapreduce中Partition的概念以及使用。
(1)Partition的原理和做用
獲得map給的記錄後,他們該分配給哪些reducer來處理呢?hadoop採用的默認的派發方式是根據散列值來派發的,可是實際中,這並不能很高效或者按照咱們要求的去執行任務。例如,通過partition處理後,一個節點的reducer分配到了20條記錄,另外一個卻分配道了10W萬條,試想,這種狀況效率如何。又或者,咱們想要處理後獲得的文件按照必定的規律進行輸出,假設有兩個reducer,咱們想要最終結果中part-00000中存儲的是"h"開頭的記錄的結果,part-00001中存儲其餘開頭的結果,這些默認的partitioner是作不到的。因此須要咱們本身定製partition來根據本身的要求,選擇記錄的reducer。自定義partitioner很簡單,只要自定義一個類,而且繼承Partitioner類,重寫其getPartition方法就行了,在使用的時候經過調用Job的setPartitionerClass指定一下便可緩存
Map的結果,會經過partition分發到Reducer上。Mapper的結果,可能送到Combiner作合併,Combiner在系統中並無本身的基類,而是用Reducer做爲Combiner的基類,他們對外的功能是同樣的,只是使用的位置和使用時的上下文不太同樣而已。Mapper最終處理的鍵值對<key, value>,是須要送到Reducer去合併的,合併的時候,有相同key的鍵/值對會送到同一個Reducer那。哪一個key到哪一個Reducer的分配過程,是由Partitioner規定的。它只有一個方法,架構
getPartition(Text key, Text value, int numPartitions)app
輸入是Map的結果對<key, value>和Reducer的數目,輸出則是分配的Reducer(整數編號)。就是指定Mappr輸出的鍵值對到哪個reducer上去。系統缺省的Partitioner是HashPartitioner,它以key的Hash值對Reducer的數目取模,獲得對應的Reducer。這樣保證若是有相同的key值,確定被分配到同一個reducre上。若是有N個reducer,編號就爲0,1,2,3……(N-1)。框架
(2)Partition的使用
分區出現的必要性,如何使用Hadoop產生一個全局排序的文件?最簡單的方法就是使用一個分區,可是該方法在處理大型文件時效率極低,由於一臺機器必須處理全部輸出文件,從而徹底喪失了MapReduce所提供的並行架構的優點。事實上咱們能夠這樣作,首先建立一系列排好序的文件;其次,串聯這些文件(相似於歸併排序);最後獲得一個全局有序的文件。主要的思路是使用一個partitioner來描述全局排序的輸出。比方說咱們有1000個1-10000的數據,跑10個ruduce任務, 若是咱們運行進行partition的時候,可以將在1-1000中數據的分配到第一個reduce中,1001-2000的數據分配到第二個reduce中,以此類推。即第n個reduce所分配到的數據所有大於第n-1個reduce中的數據。這樣,每一個reduce出來以後都是有序的了,咱們只要cat全部的輸出文件,變成一個大的文件,就都是有序的了dom
基本思路就是這樣,可是如今有一個問題,就是數據的區間如何劃分,在數據量大,還有咱們並不清楚數據分佈的狀況下。一個比較簡單的方法就是採樣,假若有一億的數據,咱們能夠對數據進行採樣,如取10000個數據採樣,而後對採樣數據分區間。在Hadoop中,patition咱們能夠用TotalOrderPartitioner替換默認的分區。而後將採樣的結果傳給他,就能夠實現咱們想要的分區。在採樣時,咱們可使用hadoop的幾種採樣工具,RandomSampler,InputSampler,IntervalSampler。分佈式
這樣,咱們就能夠對利用分佈式文件系統進行大數據量的排序了,咱們也能夠重寫Partitioner類中的compare函數,來定義比較的規則,從而能夠實現字符串或其餘非數字類型的排序,也能夠實現二次排序乃至屢次排序。函數
二、MapReduce中分組的概念和使用
分區的目的是根據Key值決定Mapper的輸出記錄被送到哪個Reducer上去處理。而分組的就比較好理解了。筆者認爲,分組就是與記錄的Key相關。在同一個分區裏面,具備相同Key值的記錄是屬於同一個分組的。工具
三、MapReduce中Combiner的使用
不少MapReduce程序受限於集羣上可用的帶寬,因此它會盡力最小化須要在map和reduce任務之間傳輸的中間數據。Hadoop容許用戶聲明一個combiner function來處理map的輸出,同時把本身對map的處理結果做爲reduce的輸入。由於combiner function自己只是一種優化,hadoop並不保證對於某個map輸出,這個方法會被調用多少次。換句話說,無論combiner function被調用多少次,對應的reduce輸出結果都應該是同樣的。oop
下面咱們以《權威指南》的例子來加以說明,假設1950年的天氣數據讀取是由兩個map完成的,其中第一個map的輸出以下:
(1950, 0)
(1950, 20)
(1950, 10)大數據
第二個map的輸出爲:
(1950, 25)
(1950, 15)
而reduce獲得的輸入爲:(1950, [0, 20, 10, 25, 15]), 輸出爲:(1950, 25)
因爲25是集合中的最大值,咱們可使用一個相似於reduce function的combiner function來找出每一個map輸出中的最大值,這樣的話,reduce的輸入就變成了:
(1950, [20, 25])
各個funciton 對溫度值的處理過程能夠表示以下:max(0, 20, 10, 25, 15) =max(max(0, 20, 10), max(25, 15)) = max(20, 25) = 25
注意:並非全部的函數都擁有這個屬性的(有這個屬性的函數咱們稱之爲commutative和associative),例如,若是咱們要計算平均溫度,就不能這樣使用combiner function,由於mean(0, 20, 10, 25, 15) =14,而mean(mean(0, 20, 10),mean(25, 15)) = mean(10, 20) = 15
combiner function並不能取代reduce function(由於仍然須要reduce function處理來自不一樣map的帶有相同key的記錄)。可是他能夠幫助減小須要在map和reduce之間傳輸的數據,就爲這一點combiner function就值得考慮使用。
四、Shuffle階段排序流程詳解
咱們首先看一下MapReduce中的排序的整體流程。
MapReduce框架會確保每個Reducer的輸入都是按Key進行排序的。通常,將排序以及Map的輸出傳輸到Reduce的過程稱爲混洗(shuffle)。每個Map都包含一個環形的緩存,默認100M,Map首先將輸出寫到緩存當中。當緩存的內容達到「閾值」時(閾值默認的大小是緩存的80%),一個後臺線程負責將結果寫到硬盤,這個過程稱爲「spill」。Spill過程當中,Map仍能夠向緩存寫入結果,若是緩存已經寫滿,那麼Map進行等待。
Spill的具體過程以下:首先,後臺線程根據Reducer的個數將輸出結果進行分組,每個分組對應一個Reducer。其次,對於每個分組後臺線程對輸出結果的Key進行排序。在排序過程當中,若是有Combiner函數,則對排序結果進行Combiner函數進行調用。每一次spill都會在硬盤產生一個spill文件。所以,一個Map task有可能會產生多個spill文件,當Map寫出最後一個輸出時,會將全部的spill文件進行合併與排序,輸出最終的結果文件。在這個過程當中Combiner函數仍然會被調用。從整個過程來看,Combiner函數的調用次數是不肯定的。下面咱們重點分析下Shuffle階段的排序過程:
Shuffle階段的排序能夠理解成兩部分,一個是對spill進行分區時,因爲一個分區包含多個key值,因此要對分區內的<key,value>按照key進行排序,即key值相同的一串<key,value>存放在一塊兒,這樣一個partition內按照key值總體有序了。
第二部分並非排序,而是進行merge,merge有兩次,一次是map端將多個spill 按照分區和分區內的key進行merge,造成一個大的文件。第二次merge是在reduce端,進入同一個reduce的多個map的輸出 merge在一塊兒,該merge理解起來有點複雜,最終不是造成一個大文件,並且期間數據在內存和磁盤上都有。因此shuffle階段的merge並非嚴格的排序意義,只是將多個總體有序的文件merge成一個大的文件,因爲不一樣的task執行map的輸出會有所不一樣,因此merge後的結果不是每次都相同,不過仍是嚴格要求按照分區劃分,同時每一個分區內的具備相同key的<key,value>對挨在一塊兒。
Shuffle排序綜述:若是隻定義了map函數,沒有定義reduce函數,那麼輸入數據通過shuffle的排序後,結果爲key值相同的輸出挨在一塊兒,且key值小的必定在前面,這樣總體來看key值有序(宏觀意義的,不必定是按從大到小,由於若是採用默認的HashPartitioner,則key 的hash值相等的在一個分區,若是key爲IntWritable的話,每一個分區內的key會排序好的),而每一個key對應的value不是有序的。
五、MapReduce中輔助排序的原理與實現
(1)任務
咱們須要把內容以下的sample.txt文件處理爲下面文件:
源文件:Sample.txt
bbb 654
ccc 534
ddd 423
aaa 754
bbb 842
ccc 120
ddd 219
aaa 344
bbb 214
ccc 547
ddd 654
aaa 122
bbb 102
ccc 479
ddd 742
aaa 146
目標:part-r-00000
aaa 122
bbb 102
ccc 120
ddd 219
(2)工做原理
過程導引:
一、定義包含記錄值和天然值的組合鍵,本例中爲MyPariWritable.
二、自定義鍵的比較器(comparator)來根據組合鍵對記錄進行排序,即同時利用天然鍵和天然值進行排序。(aaa 122組合爲一個鍵)。
三、針對組合鍵的Partitioner(本示例使用默認的hashPartitioner)和分組comparator在進行分區和分組時均只考慮天然鍵。
詳細過程:首先在map階段,使用job.setInputFormatClass定義的InputFormat將輸入的數據集分割成小數據塊splites,同時InputFormat提供一個RecordReder的實現。本例子中使用的是TextInputFormat,他提供的RecordReder會將文本的一行的行號做爲key,這一行的文本做爲value。這就是自定義Map的輸入是<LongWritable, Text>的緣由。而後調用自定義Map的map方法,將一個個<LongWritable, Text>對輸入給Map的map方法。注意輸出應該符合自定義Map中定義的輸出< MyPariWritable, NullWritable>。最終是生成一個List< MyPariWritable, NullWritable>。在map階段的最後,會先調用job.setPartitionerClass對這個List進行分區,每一個分區映射到一個reducer。每一個分區內又調用job.setSortComparatorClass設置的key比較函數類排序。能夠看到,這自己就是一個二次排序。在reduce階段,reducer接收到全部映射到這個reducer的map輸出後,也是會調用job.setSortComparatorClass設置的key比較函數類對全部數據對排序。而後開始構造一個key對應的value迭代器。這時就要用到分組,使用jobjob.setGroupingComparatorClass設置的分組函數類。只要這個比較器比較的兩個key相同,他們就屬於同一個組(本例中因爲要求得每個分區內的最小值,所以比較MyPariWritable類型的Key時,只須要比較天然鍵,這樣就能保證只要兩個MyPariWritable的天然鍵相同,則它們被送到Reduce端時候的Key就認爲在相同的分組,因爲該分組的Key只取分組中的第一個,而這些數據已經按照自定義MyPariWritable比較器排好序,則第一個Key正好包含了每個天然鍵對應的最小值),它們的value放在一個value迭代器,而這個迭代器的key使用屬於同一個組的全部key的第一個key。最後就是進入Reducer的reduce方法,reduce方法的輸入是全部的key和它的value迭代器。一樣注意輸入與輸出的類型必須與自定義的Reducer中聲明的一致。