MapReduce是一種編程模型,用於大規模數據集(大於1TB)的並行運算。MapReduce採用」分而治之」的思想,把對大規模數據集的操做,分發給一個主節點管理下的各個分節點共同完成,而後經過整合各個節點的中間結果,獲得最終結果。簡單地說,MapReduce就是」任務的分解與結果的彙總」。程序員
MapReduce架構算法
先來看一下MapReduce1.0的架構圖編程
上圖中的TaskTracker對應HDFS中的DataNode,數組
在MapReduce1.x中,用於執行MapReduce任務的機器角色有兩個:一個是JobTracker;另外一個是TaskTracker,JobTracker是用於調度工做的,TaskTracker是用於執行工做的。一個Hadoop集羣中只有一臺JobTracker。架構
流程分析app
在客戶端啓動任務,客戶端向JobTracker請求一個Job ID。分佈式
將運行任務所須要的程序文件複製到HDFS上,包括MapReduce程序打包的JAR文件、配置文件和客戶端計算所得的輸入劃分信息。這些文件都存放在JobTracker專門爲該任務建立的文件夾中。文件夾名Job ID。ide
JobTracker接收到任務後,將其放在一個隊列裏,等待調度器對其進行調度,看成業調度器根據本身的調度算法調度到該任務時,會根據輸入劃分信息建立N個map任務,並將map任務分配給N個TaskTracker(DataNode)執行。函數
map任務不是隨隨便便地分配給某個TaskTracker的,這裏有個概念叫:數據本地化(Data-Local)。意思是:將map任務分配給含有該map處理的數據塊的TaskTracker上,同時將程序JAR包複製到該TaskTracker上來運行,這叫「運算移動,數據不移動」。而分配reduce任務時並不考慮數據本地化。oop
TaskTracker每隔一段時間會給JobTracker發送一個Heartbeat(心跳),告訴JobTracker它依然在運行,同時心跳中還攜帶着不少的信息,好比當前map任務完成的進度等信息。當JobTracker收到做業的最後一個任務完成信息時,便把該做業設置成「成功」。當JobClient查詢狀態時,它將得知任務已完成,便顯示一條消息給用戶。
以上是在客戶端、JobTracker、TaskTracker的層次來分析MapReduce的工做原理的,下面咱們再細緻一點,從map任務和reduce任務的層次來分析分析吧。
MapReduce運行流程
以wordcount爲例,運行的詳細流程圖以下
1.split階段
首先mapreduce會根據要運行的大文件來進行split,每一個輸入分片(input split)針對一個map任務,輸入分片(input split)存儲的並不是數據自己,而是一個分片長度和一個記錄數據位置的數組。輸入分片(input split)每每和HDFS的block(塊)關係很密切,假如咱們設定HDFS的塊的大小是64MB,咱們運行的大文件是64x10M,mapreduce會分爲10個map任務,每一個map任務都存在於它所要計算的block(塊)的DataNode上。
2.map階段
map階段就是程序員編寫的map函數了,所以map函數效率相對好控制,並且通常map操做都是本地化操做也就是在數據存儲節點上進行。本例的map函數以下:
publicclassWCMapperextendsMapperLongWritable,Text,Text,IntWritable{@Override protectedvoidmap(LongWritablekey,Textvalue,Contextcontext)throwsIOException,InterruptedException{ Stringstr=value.toString(); String[]strs=StringUtils.split(str,'');for(Strings:strs){ context.write(newText(s),newIntWritable(1)); } } }
根據空格切分單詞,計數爲1,生成key爲單詞,value爲出現1次的map供後續計算。
3.shuffle階段
shuffle階段主要負責將map端生成的數據傳遞給reduce端,所以shuffle分爲在map端的過程和在reduce端的執行過程。
先看map端:
map首先進行數據結果數據屬於哪一個partition的判斷,其中一個partition對應一個reduce,通常經過key.hash()%reduce個數來實現。
把map數據寫入到Memory Buffer(內存緩衝區),到達80%閥值,開啓溢寫進磁盤過程,同時進行key排序,若是有combiner步驟,則會對相同的key作歸併處理,最終多個溢寫文件合併爲一個文件。
reduce端:
reduce節點從各個map節點拉取存在磁盤上的數據放到Memory Buffer(內存緩衝區),同理將各個map的數據進行合併並存到磁盤,最終磁盤的數據和緩衝區剩下的20%合併傳給reduce階段。
4.reduce階段
reduce對shuffle階段傳來的數據進行最後的整理合並
publicclassWCReducerextendsReducerText,IntWritable,Text,IntWritable{@Override protectedvoidreduce(Textkey,IterableIntWritablevalues,Contextcontext)throwsIOException,InterruptedException{intsum=0;for(IntWritablei:values){ sum+=i.get(); } context.write(key,newIntWritable(sum)); } }
MapReduce的優缺點
優勢:
易於編程;
良好的擴展性;
高容錯性;
4.適合PB級別以上的大數據的分佈式離線批處理。
缺點:
難以實時計算(MapReduce處理的是存儲在本地磁盤上的離線數據)
不能流式計算(MapReduce設計處理的數據源是靜態的)
難以DAG計算MapReduce這些並行計算大都是基於非循環的數據流模型,也就是說,一次計算過程當中,不一樣計算節點之間保持高度並行,這樣的數據流模型使得那些須要反覆使用一個特定數據集的迭代算法沒法高效地運行。
原文轉載自:http://blog.itpub.net/31557424/viewspace-2220234/