MapReduce(一) mapreduce基礎入門

1、mapreduce入門java

  一、什麼是mapreduce     面試

首先讓咱們來重溫一下 hadoop 的四大組件:
HDFS:分佈式存儲系統
MapReduce:分佈式計算系統
YARN: hadoop 的資源調度系統
Common: 以上三大組件的底層支撐組件,主要提供基礎工具包和 RPC 框架等apache

Mapreduce 是一個分佈式運算程序的編程框架,是用戶開發「基於 hadoop 的數據分析 應用」的核心框架
Mapreduce 核心功能是將用戶編寫的業務邏輯代碼和自帶默認組件整合成一個完整的 分佈式運算程序,併發運行在一個 hadoop 集羣上
   二、爲何須要mapreduce     編程

    爲何須要 MapReduce?
     (1) 海量數據在單機上處理由於硬件資源限制,沒法勝任
     (2) 而一旦將單機版程序擴展到集羣來分佈式運行,將極大增長程序的複雜度和開發難度
     (3) 引入 MapReduce 框架後,開發人員能夠將絕大部分工做集中在業務邏輯的開發上,而將 分佈式計算中的複雜性交由框架來處理緩存

可見在程序由單機版擴成分佈式版時,會引入大量的複雜工做。爲了提升開發效率,能夠將 分佈式程序中的公共功能封裝成框架,讓開發人員能夠將精力集中於業務邏輯。
Hadoop 當中的 MapReduce 就是這樣的一個分佈式程序運算框架,它把大量分佈式程序都會
涉及的到的內容都封裝進了,讓用戶只用專一本身的業務邏輯代碼的開發。 它對應以上問題
的總體結構以下:網絡

      三、mapreduce程序運行實例併發

在 MapReduce 組件裏, 官方給咱們提供了一些樣例程序,其中很是有名的就是 wordcount 和 pi 程序。這些 MapReduce 程序的代碼都在 hadoop-mapreduce-examples-2.6.4.jar 包裏, 這 個 jar 包在 hadoop 安裝目錄下的/share/hadoop/mapreduce/目錄裏

app

下面咱們使用 hadoop 命令來試跑例子程序,看看運行效果
先看 MapReduce 程序求 pi 的程序:框架

那除了這兩個程序之外,還有沒有官方提供的其餘程序呢,還有就是它們的源碼在哪裏呢?
咱們打開 mapreduce 的源碼工程,裏面有一個 hadoop-mapreduce-project 項目:jvm

    四、mapreduce示例編寫及編碼規範

上一步,咱們查看了 WordCount 這個 MapReduce 程序的源碼編寫,能夠得出幾點結論:
(1) 該程序有一個 main 方法,來啓動任務的運行,其中 job 對象就存儲了該程序運行的必要 信息,好比指定 Mapper 類和 Reducer 類
job.setMapperClass(TokenizerMapper.class);
job.setReducerClass(IntSumReducer.class);
(2) 該程序中的 TokenizerMapper 類繼承了 Mapper 類
(3) 該程序中的 IntSumReducer 類繼承了 Reducer 類

總結: MapReduce 程序的業務編碼分爲兩個大部分,一部分配置程序的運行信息,一部分 編寫該 MapReduce 程序的業務邏輯,而且業務邏輯的 map 階段和 reduce 階段的代碼分別繼 承 Mapper 類和 Reducer 類

  

(1) 用戶編寫的程序分紅三個部分: Mapper, Reducer, Driver(提交運行 MR 程序的客戶端)
(2) Mapper 的輸入數據是 KV 對的形式( KV 的類型可自定義)
(3) Mapper 的輸出數據是 KV 對的形式( KV 的類型可自定義)
(4) Mapper 中的業務邏輯寫在 map()方法中
(5) map()方法( maptask 進程)對每個<K,V>調用一次
(6) Reducer 的輸入數據類型對應 Mapper 的輸出數據類型,也是 KV
(7) Reducer 的業務邏輯寫在 reduce()方法中
(8) Reducetask 進程對每一組相同 k 的<k,v>組調用一次 reduce()方法
(9) 用戶自定義的 Mapper 和 Reducer 都要繼承各自的父類
(10) 整個程序須要一個 Drvier 來進行提交,提交的是一個描述了各類必要信息的 job 對象

WordCount 的業務邏輯: 
一、 maptask 階段處理每一個數據分塊的單詞統計分析,思路是每遇到一個單詞則把其轉換成 一個 key-value 對,好比單詞 hello,就轉換成<’hello’,1>發送給 reducetask 去彙總
二、 reducetask 階段將接受 maptask 的結果, 來作彙總計數

其次reduce

       五、mapreduce運行方式與debug

        運行方式:

 

 2、mapreduce的核心程序運行機制

     一、概述

一個完整的 mapreduce 程序在分佈式運行時有兩類實例進程:
 (1) MRAppMaster:負責整個程序的過程調度及狀態協調  (該進程在yarn節點上)
 (2) Yarnchild:負責 map 階段的整個數據處理流程         
 (3) Yarnchild:負責 reduce 階段的整個數據處理流程
以上兩個階段 maptask 和 reducetask 的進程都是 yarnchild,並非說這 maptask 和 reducetask 就跑在同一個 yarnchild 進行裏
(Yarnchild進程在運行該命令的節點上)

      二、mapreduce程序的運行流程(經典面試題)

(1) 一個 mr 程序啓動的時候,最早啓動的是 MRAppMaster, MRAppMaster 啓動後根據本次 job 的描述信息,計算出須要的 maptask 實例數量,而後向集羣申請機器啓動相應數量的 maptask 進程
(2) maptask 進程啓動以後,根據給定的數據切片(哪一個文件的哪一個偏移量範圍)範圍進行數 據處理,主體流程爲:
    A、 利用客戶指定的 inputformat 來獲取 RecordReader 讀取數據,造成輸入 KV 對
    B、 將輸入 KV 對傳遞給客戶定義的 map()方法,作邏輯運算,並將 map()方法輸出的 KV 對收 集到緩存
    C、 將緩存中的 KV 對按照 K 分區排序後不斷溢寫到磁盤文件 (超過緩存內存寫到磁盤臨時文件,最後都寫到該文件,ruduce 獲取該文件後,刪除 )
(3) MRAppMaster 監控到全部 maptask 進程任務完成以後(真實狀況是,某些 maptask 進 程處理完成後,就會開始啓動 reducetask 去已完成的 maptask 處 fetch 數據),會根據客戶指 定的參數啓動相應數量的 reducetask 進程,並告知 reducetask 進程要處理的數據範圍(數據
分區)
(4) Reducetask 進程啓動以後,根據 MRAppMaster 告知的待處理數據所在位置,從若干臺 maptask 運行所在機器上獲取到若干個 maptask 輸出結果文件,並在本地進行從新歸併排序, 而後按照相同 key 的 KV 爲一個組,調用客戶定義的 reduce()方法進行邏輯運算,並收集運
算輸出的結果 KV,而後調用客戶指定的 outputformat 將結果數據輸出到外部存儲

      三、maptask並行度決定機制

maptask 的並行度決定 map 階段的任務處理併發度,進而影響到整個 job 的處理速度 那麼, mapTask 並行實例是否越多越好呢?其並行度又是如何決定呢?

一個 job 的 map 階段並行度由客戶端在提交 job 時決定, 客戶端對 map 階段並行度的規劃
的基本邏輯爲:
將待處理數據執行邏輯切片(即按照一個特定切片大小,將待處理數據劃分紅邏輯上的多 個 split),而後每個 split 分配一個 mapTask 並行實例處理
這段邏輯及造成的切片規劃描述文件,是由 FileInputFormat實現類的 getSplits()方法完成的。
該方法返回的是 List<InputSplit>, InputSplit 封裝了每個邏輯切片的信息,包括長度和位置  信息,而 getSplits()方法返回一組 InputSplit

       四、切片機制

     五、maptask並行度經驗之談

若是硬件配置爲 2*12core + 64G,恰當的 map 並行度是大約每一個節點 20-100 個 map,最好 每一個 map 的執行時間至少一分鐘。
     (1)若是 job 的每一個 map 或者 reduce task 的運行時間都只有 30-40 秒鐘,那麼就減小該 job 的 map 或者 reduce 數,每個 task(map|reduce)的 setup 和加入到調度器中進行調度,這個 中間的過程可能都要花費幾秒鐘,因此若是每一個 task 都很是快就跑完了,就會在 task 的開
始和結束的時候浪費太多的時間。
配置 task 的 JVM 重用能夠改善該問題:
mapred.job.reuse.jvm.num.tasks,默認是 1,表示一個 JVM 上最多能夠順序執行的 task 數目(屬於同一個 Job)是 1。也就是說一個 task 啓一個 JVM。這個值能夠在 mapred-site.xml 中進行更改, 當設置成多個,就意味着這多個 task 運行在同一個 JVM 上,但不是同時執行,
是排隊順序執行
   (2)若是 input 的文件很是的大,好比 1TB,能夠考慮將 hdfs 上的每一個 blocksize 設大,好比 設成 256MB 或者 512MB
     六、reducetask並行度決定機制

 補充:

package com.ghgj.mr.wordcount;

import java.io.IOException;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;


public class WordCountMR {

	/**
	 * 該main方法是該mapreduce程序運行的入口,其中用一個Job類對象來管理程序運行時所須要的不少參數:
	 * 好比,指定用哪一個組件做爲數據讀取器、數據結果輸出器
	 *     指定用哪一個類做爲map階段的業務邏輯類,哪一個類做爲reduce階段的業務邏輯類
	 *     指定wordcount job程序的jar包所在路徑
	 *     ....
	 *     以及其餘各類須要的參數
	 */
	public static void main(String[] args) throws Exception {
		// 指定hdfs相關的參數
		Configuration conf = new Configuration();
		conf.set("fs.defaultFS", "hdfs://hadoop02:9000");
		System.setProperty("HADOOP_USER_NAME", "hadoop");
		
//		conf.set("mapreduce.framework.name", "yarn");
//		conf.set("yarn.resourcemanager.hostname", "hadoop04");
		
		Job job = Job.getInstance(conf);
		
		// 設置jar包所在路徑
		job.setJarByClass(WordCountMR.class);
		
		// 指定mapper類和reducer類
		job.setMapperClass(WordCountMapper.class);
		job.setReducerClass(WordCountReducer.class);
		
		// 指定maptask的輸出類型
		job.setMapOutputKeyClass(Text.class);
		job.setMapOutputValueClass(IntWritable.class);
		
		// 指定reducetask的輸出類型
		job.setOutputKeyClass(Text.class);
		job.setOutputValueClass(IntWritable.class);
		
//		Path inputPath = new Path("d:/wordcount/input");
//		Path outputPath = new Path("d:/wordcount/output");
		
		// 指定該mapreduce程序數據的輸入和輸出路徑
		Path inputPath = new Path("/wordcount/input");
		Path outputPath = new Path("/wordcount/output");
		FileSystem fs = FileSystem.get(conf);
		if(fs.exists(outputPath)){
			fs.delete(outputPath, true);
		}
		FileInputFormat.setInputPaths(job, inputPath);
		FileOutputFormat.setOutputPath(job, outputPath);
		
//		job.submit();
		// 最後提交任務
		boolean waitForCompletion = job.waitForCompletion(true);
		System.exit(waitForCompletion?0:1);
	}
	
	/**
	 * Mapper<KEYIN, VALUEIN, KEYOUT, VALUEOUT> 
	 * 
	 * KEYIN 是指框架讀取到的數據的key的類型,在默認的InputFormat下,讀到的key是一行文本的起始偏移量,因此key的類型是Long 
	 * VALUEIN 是指框架讀取到的數據的value的類型,在默認的InputFormat下,讀到的value是一行文本的內容,因此value的類型是String
	 * KEYOUT 是指用戶自定義邏輯方法返回的數據中key的類型,由用戶業務邏輯決定,在此wordcount程序中,咱們輸出的key是單詞,因此是String
	 * VALUEOUT 是指用戶自定義邏輯方法返回的數據中value的類型,由用戶業務邏輯決定,在此wordcount程序中,咱們輸出的value是單詞的數量,因此是Integer
	 * 
	 * 可是,String ,Long等jdk中自帶的數據類型,在序列化時,效率比較低,hadoop爲了提升序列化效率,自定義了一套序列化框架
	 * 因此,在hadoop的程序中,若是該數據須要進行序列化(寫磁盤,或者網絡傳輸),就必定要用實現了hadoop序列化框架的數據類型
	 * 
	 * Long ----> LongWritable
	 * String ----> Text
	 * Integer ----> IntWritable
	 * Null ----> NullWritable
	 */
	static class WordCountMapper extends Mapper<LongWritable, Text, Text, IntWritable>{
		@Override
		protected void map(LongWritable key, Text value,Context context)
				throws IOException, InterruptedException {
			
			String[] words = value.toString().split(" ");
			for(String word: words){
				context.write(new Text(word), new IntWritable(1));
			}
		}
	}
	
	/**
	 * 首先,和前面同樣,Reducer類也有輸入和輸出,輸入就是Map階段的處理結果,輸出就是Reduce最後的輸出
	 * reducetask在調咱們寫的reduce方法,reducetask應該收到了前一階段(map階段)中全部maptask輸出的數據中的一部分
	 * (數據的key.hashcode%reducetask數==本reductask號),因此reducetaks的輸入類型必須和maptask的輸出類型同樣
	 * 
	 * reducetask將這些收到kv數據拿來處理時,是這樣調用咱們的reduce方法的:
	 * 先將本身收到的全部的kv對按照k分組(根據k是否相同)
	 * 將某一組kv中的第一個kv中的k傳給reduce方法的key變量,把這一組kv中全部的v用一個迭代器傳給reduce方法的變量values
	 */
	static class WordCountReducer extends Reducer<Text, IntWritable, Text, IntWritable>{
		@Override
		protected void reduce(Text key, Iterable<IntWritable> values, Context context)
				throws IOException, InterruptedException {
			
			int sum = 0;
			for(IntWritable v: values){
				sum += v.get();
			}
			context.write(key, new IntWritable(sum));
		}
	}
}
相關文章
相關標籤/搜索