How MapReduce Works(轉)

原文地址:http://www.cnblogs.com/ggjucheng/archive/2012/04/23/2465820.htmlhtml

1、從Map到Reducenode

MapReduce實際上是分治算法的一種實現,其處理過程亦和用管道命令來處理十分類似,一些簡單的文本字符的處理甚至也可使用Unix的管道命令來替代,從處理流程的角度來看大概以下:算法

cat input | grep | sort | uniq -c | cat > output編程

# Input -> Map -> Shuffle & Sort -> Reduce -> Output服務器

簡單的流程圖以下:網絡

clip_image001

對於Shuffle,簡單地說就是將Map的輸出經過必定的算法劃分到合適的Reducer中進行處理。Sort固然就是對中間的結果進行按key排序,由於Reducer的輸入是嚴格要求按key排序的。app

Input->Map->Shuffle&Sort->Reduce->Output只是從宏觀的角度對MapReduce的簡單描述,實際在MapReduce的框架中,即從編程的角度來看,其處理流程是Input->Map->Sort->Combine->Partition->Reduce->Output。用以前的對溫度進行統計的例子來說述這些過程。框架

Input Phasesocket

輸入的數據須要以必定的格式傳遞給Mapper的,格式有多種,如TextInputFormat、DBInputFormat、SequenceFileInput等等,可使用JobConf.setInputFormat來設置,這個過程還應該包括對輸入的數據進行任務粒度劃分(split)而後再傳遞給Mapper。在溫度的例子中,因爲處理的都是文本數據,輸入的格式使用默認的TextInputFormat便可。ide

Map Phase

對輸入的key、value對進行處理,輸出的是key、value的集合,即map (k1, v1) -> list(k2, v2),使用JobConf.setMapperClass設置本身的Mapper。在例子中,將(行號、溫度的文本數據)做爲key/value輸入,通過處理後,從溫度的文件數據中提取出日期中的年份和該日的溫度數據,造成新的key/value對,最後以list(年,  溫度)的結果輸出,如[(1950, 10), (1960, 40), (1960, 5)]。

Sort Phase

對Mapper輸出的數據進行排序,能夠經過JobConf.setOutputKeyComparatorClass來設置本身的排序規則。在例子中,通過排序以後,輸出的list集合是按年份進行排序的list(年, 溫度),如[(1950, 10), (1950, 5), (1960, 40)]。

Combine Phase

這個階段是將中間結果中有相同的key的<key, value>對合併成一對,Combine的過程與Reduce很類似,使用的甚至是Reduce的接口。經過Combine可以減小<key, value>的集合數量,從而減小網絡流量。Combine只是一個可選的優化過程,而且不管Combine執行多少次(>=0),都會使Reducer產生相同的輸出,使用JobConf.setCombinerClass來設置自定義的Combine Class。在例子中,假如map1產生出的結果爲[(1950, 0), (1950, 20), (1950, 10)],在map2產生出的結果爲[(1950, 15), (1950, 25)],這兩組數據做爲Reducer的輸入並通過Reducer處理後的年最高溫度結果爲(1950, 25),然而當在Mapper以後加了Combine(Combine先過濾出最高溫度),則map1的輸出是[(1950, 20)]和map2的輸出是[(1950, 25)],雖然其餘的三組數據被拋棄了,可是對於Reducer的輸出而言,處理後的年最高溫度依然是(1950, 25)。

Partition Phase

把Mapper任務輸出的中間結果按key的範圍劃分紅R份(R是預先定義的Reduce任務的個數),默認的劃分算法是」(key.hashCode() & Integer.MAX_VALUE) % numPartitions」,這樣保證了某一範圍的key必定是由某個Reducer來處理,簡化了Reducer的處理流程,使用JobConf.setPartitionClass來設置自定義的Partition Class。在例子中,默認就天然是對年份進行取模了。

Reduce Phase

Reducer獲取Mapper輸出的中間結果,做爲輸入對某一key範圍區間進行處理,使用JobConf.setReducerClass來設置。在例子中,與Combine Phase中的處理是同樣的,把各個Mapper傳遞過來的數據計算年最高溫度。

Output Phase

Reducer的輸出格式和Mapper的輸入格式是相對應的,固然Reducer的輸出還能夠做爲另外一個Mapper的輸入繼續進行處理。

2、Details of Job Run

上面只是從task運行中描述了Map和Reduce的過程,實際上當從運行」hadoop jar」開始還涉及到不少其餘的細節。從整個Job運行的流程來看,以下圖所示:

clip_image002

從上圖能夠看到,MapReduce運行過程當中涉及有4個獨立的實體:

1. Client,用於提交MapReduce job。

2. JobTracker,負責協調job的運行。

3. TaskTrackers,運行 job分解後的多個tasks,task主要是負責運行Mapper和Reducer。

4. Distributed filesystem,用於存儲上述實體運行時共享的job文件(如中間結果文件)。

Job Submission

當調用了JobClient.runJob()以後,Job便開始被提交了,在Job提交這個步驟中,經歷瞭如下的過程:

1. Client向JobTacker申請一個新的job ID(step 2),job ID形如job_200904110811_0002的格式,是由JobTracker運行當前的job的時間和一個由JobTracker維護的自增計數(從1開始)組成的。

2. 檢查job的output specification,好比輸出目錄是否已經存在(存在則拋異常)、是否有權限寫等等。

3. Computes the input splits for the job,這些input splits就是做爲Mapper的輸入。

4. Copies the resources needed to run the job, including the job JAR file, the configuration file and the computed input splits, to the jobtracker’s filesystem in a direcotry named after the job ID(step 3)。

5. Tells the jobtracker that the job is ready for execution(step 4)。

Job Initialization

當JobTracker收到Job提交的請求後,將job保存在一個內部隊列,並讓Job Scheduler處理並初始化。初始化涉及到建立一個封裝了其tasks的job對象,並保持對task的狀態和進度的根據(step 5)。當建立要運行的一系列task對象後,Job Scheduler首先開始從文件系統中獲取由JobClient計算的input splits(step 6),而後再爲每一個split建立map task。

Task Assignment

TaskTrackers會使用一個簡單的loop爲按期向JobTracker發送heartbeat調用,發送的間隔時間大約5秒,通常取決於集羣服務器的規模和繁忙程度以及網絡擁擠程度。這個heartbeat一方面是告知JobTracker當前TaskTracker處於live狀態,同時是用於JobTracker和TaskTracker進行通訊,TaskTracker會根據heartbeat的返回值來執行必定的操做(step 7)。

To choose a reduce task the JobTracker simply takes the next in its list of yet-to-be-run reduce tasks, since there are no data locality considerations. For a map task, however, it takes account of the TaskTracker’s network location and picks a task whose input splits is as close as possible to the tasktracker. In the optimal case, the task is data-local, that is , running on the same node that the split resides on. Alternatively, the task may be rack-local: on the same rack, but not the same node, as the split.

Task Execution

當TaskTrack被分配到一個task以後,接下來就是運行這個task。首先,它會須要的job JAR文件從shared filesystem拷貝到local filesystem,而後建立一個working direcotry並un-jars拷貝的JAR文件到該directory,最後就建立一個TaskRunner對象運行task。

TaskRunner在運行的時候是啓動了一個新的JVM來run each task(step 10),這樣是爲了防止在用戶自定義的Mapper出現異常令JVM掛了,從而連累到TaskTracker。TaskRunner子進程會使用umbilical接口和TaskTracker通訊並每隔幾秒向TaskTracker彙報進度。

對於使用Streaming和Pipes方式來建立的Mapper,也是做爲TaskTracker的子進程來運行的。Streaming是使用標準輸入輸出來通訊,而Pipes是使用socket來進行通訊,以下圖:

clip_image003

Progress and Status Updates

進度和狀態是經過heartbeat來更新和維護的。來對於Map Task,進度就是已處理數據和全部輸入數據的比例。對於Reduce Task,狀況就有點複雜,包括3部分,拷貝中間結果文件、排序、Reduce調用,每部分佔1/3。

Job Completion

當Job完成後,JobTracker會收一個Job Complete的通知,並將當前的Job狀態更新爲Successful,同時JobClient也會輪循獲知提交的Job已經完成,將信息顯示給用戶。最後,JobTracker會清理和回收該Job的相關資源,並通知TaskTracker進行相同的操做(好比刪除中間結果文件)。

相關文章
相關標籤/搜索