1. MapReduce 介紹 |
假設有一個計算文件中單詞個數的需求,文件比較多也比較大,在單擊運行的時候機器的內存受限,磁盤受限,運算能力受限,而一旦將單機版程序擴展到集羣來分佈式運行,將極大增長程序的複雜度和開發難度,所以這個工做可能完成不了。針對以上這個案例,MapReduce在這裏能起到什麼做用呢,引入MapReduce框架後,開發人員能夠將絕大部分工做集中在業務邏輯的開發上,而將分佈式計算中的複雜性交由框架來處理。編程
可見在程序由單機版擴成分佈式時,會引入大量的複雜工做。爲了提升開發效率,能夠將分佈式程序中的公共功能封裝成框架,讓開發人員能夠將精力集中於業務邏輯。而MapReduce就是這樣一個分佈式程序的通用框架。數組
MapReduce 也採用了 Master/Slave(M/S)架構。它主要由如下幾個組件組成 :Client、JobTracker、 TaskTracker 和 Task。下面分別對這幾個組件進行介紹。 服務器
(1)Client架構
用戶編寫的MapReduce程序經過Client提交到JobTracker端;同時用戶可經過Client提供的一些接口查看做業運行狀態。在Hadoop內部用「做業」 (Job)表示MapReduce程序。一個 MapReduce程序可對應若干個做業,而每一個做業會被分解成若干個Map/Reduce任務(Task)。併發
(2)JobTrackerapp
JobTracker 主要負責資源監控和做業調度。JobTracker 監控全部 TaskTracker 與做業Job的健康情況,一旦發現失敗狀況後,其會將相應的任務轉移到其餘節點;同時,JobTracker 會跟蹤任務的執行進度、資源使用量等信息,並將這些信息告訴任務調度器,而調度器會在資源出現空閒時,選擇合適的任務使用這些資源。在Hadoop 中,任務調度器是一個可插拔的模塊,用戶能夠根據本身的須要設計相應的調度器。框架
(3)TaskTracker分佈式
TaskTracker會週期性地經過Heartbeat將本節點上資源的使用狀況和任務的運行進度彙報給JobTracker,同時接收JobTracker發送過來的命令並執行相應的操做(如啓動新任務、殺死 任務等)。TaskTracker 使用「slot」等量劃分本節點上的資源量。 「slot」表明計算資源(CPU、 內存等)。一個 Task 獲取到一個slot 後纔有機會運行,而Hadoop調度器的做用就是將各個TaskTracker上的空閒slot分配給Task使用。slot分爲Map slot和Reduce slot 兩種,分別供Map Task和Reduce Task使用。TaskTracker經過slot數目(可配置參數)限定Task的併發度。ide
(4)Task函數
Task 分爲 Map Task 和 Reduce Task 兩種,均由TaskTracker啓動。從上一小節中咱們知道,HDFS以固定大小的block 爲基本單位存儲數據,而對於MapReduce 而言,其處理單位是split。 split 與 block 的對應關係以下圖所示。split 是一個邏輯概念,它只包含一些元數據信息,好比 數據起始位置、數據長度、數據所在節點等。它的劃分方法徹底由用戶本身決定。但須要注意的是,split的多少決定了Map Task的數目,由於每一個split會交由一個Map Task處理。
Map Task 執行過程以下圖所示。由該圖可知,Map Task 先將對應的split 迭代解析成一 個個 key/value 對,依次調用用戶自定義的map() 函數進行處理,最終將臨時結果存放到本地磁盤上,其中臨時數據被分紅若干個partition(分片),每一個partition 將被一個Reduce Task處理。
Reduce Task 執行過程以下圖所示。該過程分爲三個階段:
①從遠程節點上讀取Map Task 中間結果(稱爲「Shuffle階段」);
②按照key對key/value 對進行排序(稱爲「Sort階段」);
③依次讀取 <key, value list>,調用用戶自定義的 reduce() 函數處理,並將最終結果存到HDFS上(稱爲「Reduce 階段」)。
MapReduce是一種並行編程模式,利用這種模式軟件開發者能夠輕鬆地編寫出分佈式並行程序。在Hadoop的體系結構中,MapReduce是一個簡單易用的軟件框架,基於它能夠將任務分發到由上千臺商用機器組成的集羣上,並以一種可靠容錯的方式並行處理大量的數據集,實現Hadoop的並行任務處理功能。MapReduce框架是由一個單獨運行在主節點的JobTrack和運行在每一個集羣從節點的TaskTrack共同組成的。
主節點負責調度構成一個做業的全部任務,這些任務分佈在不一樣的節點上。主節點監控它們的執行狀況,而且從新執行以前失敗的任務;
從節點僅負責由主節點指派的任務。
當一個Job任務被提交時,JobTrack接收到提交做業和其配置信息以後,就會配置信息等發給從節點,同時調度任務並監控TaskTrack的執行。
Hadoop的發佈包中內置了一個hadoop-mapreduce-example-2.6.5.jar,這個jar包中有各類MR示例程序,能夠經過如下步驟運行:
啓動hdfs,yarn,而後在集羣中的任意一臺服務器上啓動執行程序(好比運行wordcount):
hadoop jar hadoop-mapreduce-example-2.6.5.jar wordcount /wordcount/data /wordcount/out
2.MapReduce 編程 |
1) 用戶編寫的程序分紅三個部分:Mapper,Reducer,Driver(提交運行mr程序的客戶端)
2) Mapper的輸入數據是KV對的形式(KV的類型可自定義)
3) Mapper的輸出數據是KV對的形式(KV的類型可自定義)
4) Mapper中的業務邏輯寫在map()方法中
5) map()方法(maptask進程)對每個<K,V>調用一次
6) Reducer的輸入數據類型對應Mapper的輸出數據類型,也是KV
7) Reducer的業務邏輯寫在reduce()方法中
8) Reducetask進程對每一組相同k的<k,v>組調用一次reduce()方法
9) 用戶自定義的Mapper和Reducer都要繼承各自的父類
10) 整個程序須要一個Drvier來進行提交,提交的是一個描述了各類必要信息的job對象
需求:在一堆給定的文本文件中統計輸出每個單詞出現的總次數
(1)定義一個mapper類
//首先要定義四個泛型的類型 //keyin: LongWritable valuein: Text //keyout: Text valueout:IntWritable public class WordCountMapper extends Mapper<LongWritable, Text, Text, IntWritable>{ //map方法的生命週期: 框架每傳一行數據就被調用一次 //key : 這一行的起始點在文件中的偏移量 //value: 這一行的內容 @Override protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException { //拿到一行數據轉換爲string String line = value.toString(); //將這一行切分出各個單詞 String[] words = line.split(" "); //遍歷數組,輸出<單詞,1> for(String word:words){ context.write(new Text(word), new IntWritable(1)); } } }
(2)定義一個reducer類
//生命週期:框架每傳遞進來一個kv 組,reduce方法被調用一次 @Override protected void reduce(Text key, Iterable<IntWritable> values, Context context) throws IOException, InterruptedException { //定義一個計數器 int count = 0; //遍歷這一組kv的全部v,累加到count中 for(IntWritable value:values){ count += value.get(); } context.write(key, new IntWritable(count)); } }
(3)定義一個主類,用來描述job並提交job
public class WordCountRunner { //把業務邏輯相關的信息(哪一個是mapper,哪一個是reducer,要處理的數據在哪裏,輸出的結果放哪裏……)描述成一個job對象 //把這個描述好的job提交給集羣去運行 public static void main(String[] args) throws Exception { Configuration conf = new Configuration(); Job wcjob = Job.getInstance(conf); //指定我這個job所在的jar包 // wcjob.setJar("/home/hadoop/wordcount.jar"); wcjob.setJarByClass(WordCountRunner.class); wcjob.setMapperClass(WordCountMapper.class); wcjob.setReducerClass(WordCountReducer.class); //設置咱們的業務邏輯Mapper類的輸出key和value的數據類型 wcjob.setMapOutputKeyClass(Text.class); wcjob.setMapOutputValueClass(IntWritable.class); //設置咱們的業務邏輯Reducer類的輸出key和value的數據類型 wcjob.setOutputKeyClass(Text.class); wcjob.setOutputValueClass(IntWritable.class); //指定要處理的數據所在的位置 FileInputFormat.setInputPaths(wcjob, "hdfs://hdp-server01:9000/wordcount/data/big.txt"); //指定處理完成以後的結果所保存的位置 FileOutputFormat.setOutputPath(wcjob, new Path("hdfs://hdp-server01:9000/wordcount/output/")); //向yarn集羣提交這個job boolean res = wcjob.waitForCompletion(true); System.exit(res?0:1); }
1) 將mapreduce程序提交給yarn集羣resourcemanager,分發到不少的節點上併發執行
2) 處理的數據和輸出結果應該位於hdfs文件系統
3) 提交集羣的實現步驟:
將程序打成JAR包,而後在集羣的任意一個節點上用hadoop命令啓動hadoop jar wordcount.jar cn.bigdata.mrsimple.WordCountDriver inputpath outputpath