大數據教程(10.7)Mapreduce的其餘補充(計數器、多job串聯、參數優化等)

    上一篇文章分析了自定義inputFormat(小文件合併)的實現,在此博主將繼續Mapreduce的其餘補充(計數器、多job串聯、參數優化等)內容的分享。java

    1、計數器應用node

           在實際生產代碼中,經常須要將數據處理過程當中遇到的不合規數據行進行全局計數,相似這種需求能夠藉助mapreduce框架中提供的全局計數器來實現。示例代碼以下:數據庫

public class MultiOutputs {
	//經過枚舉形式定義自定義計數器
	enum MyCounter{MALFORORMED,NORMAL}

	static class CommaMapper extends Mapper<LongWritable, Text, Text, LongWritable> {

		@Override
		protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {

			String[] words = value.toString().split(",");

			for (String word : words) {
				context.write(new Text(word), new LongWritable(1));
			}
			//對枚舉定義的自定義計數器加1
			context.getCounter(MyCounter.MALFORORMED).increment(1);
			//經過動態設置自定義計數器加1
			context.getCounter("counterGroupa", "countera").increment(1);
		}

	}

    2、多job串聯性能優化

           以前咱們分享的不少mr程序都是單個mapreduce程序來實現;然而,在實際生產中,一個稍複雜點的處理邏輯每每須要多個mapreduce程序串聯處理,多job的串聯能夠藉助mapreduce框架的JobControl實現。服務器

           示例代碼:網絡

ControlledJob cJob1 = new ControlledJob(job1.getConfiguration());
 ControlledJob cJob2 = new ControlledJob(job2.getConfiguration());
 ControlledJob cJob3 = new ControlledJob(job3.getConfiguration());
       
 cJob1.setJob(job1);
 cJob2.setJob(job2);
 cJob3.setJob(job3);

 // 設置做業依賴關係
 cJob2.addDependingJob(cJob1);
 cJob3.addDependingJob(cJob2);
 
 JobControl jobControl = new JobControl("RecommendationJob");
 jobControl.addJob(cJob1);
 jobControl.addJob(cJob2);
 jobControl.addJob(cJob3);
 
 
 // 新建一個線程來運行已加入JobControl中的做業,開始進程並等待結束
 Thread jobControlThread = new Thread(jobControl);
 jobControlThread.start();
 while (!jobControl.allFinished()) {
       Thread.sleep(500);
 }
 jobControl.stop();
 
 return 0;

    3、mapreduce參數優化(MapReduce重要配置參數app

           資源相關參數:框架

//如下參數是在用戶本身的mr應用程序中配置就能夠生效
(1) mapreduce.map.memory.mb: 一個Map Task可以使用的資源上限(單位:MB),默認爲1024。若是Map Task實際使用的資源量超過該值,則會被強制殺死。
(2) mapreduce.reduce.memory.mb: 一個Reduce Task可以使用的資源上限(單位:MB),默認爲1024。若是Reduce Task實際使用的資源量超過該值,則會被強制殺死。
(3) mapreduce.map.java.opts: Map Task的JVM參數,你能夠在此配置默認的java heap size等參數, e.g.
「-Xmx1024m -verbose:gc -Xloggc:/tmp/@taskid@.gc」 (@taskid@會被Hadoop框架自動換爲相應的taskid), 默認值: 「」
(4) mapreduce.reduce.java.opts: Reduce Task的JVM參數,你能夠在此配置默認的java heap size等參數, e.g.
「-Xmx1024m -verbose:gc -Xloggc:/tmp/@taskid@.gc」, 默認值: 「」
(5) mapreduce.map.cpu.vcores: 每一個Map task可以使用的最多cpu core數目, 默認值: 1
(6) mapreduce.reduce.cpu.vcores: 每一個Reduce task可以使用的最多cpu core數目, 默認值: 1

//應該在yarn啓動以前就配置在服務器的配置文件中才能生效
(7) yarn.scheduler.minimum-allocation-mb	  1024   給應用程序container分配的最小內存
(8) yarn.scheduler.maximum-allocation-mb	  8192	給應用程序container分配的最大內存
(9) yarn.scheduler.minimum-allocation-vcores	1	
(10)yarn.scheduler.maximum-allocation-vcores	32
(11)yarn.nodemanager.resource.memory-mb   8192  

//shuffle性能優化的關鍵參數,應在yarn啓動以前就配置好
(12)mapreduce.task.io.sort.mb   100         //shuffle的環形緩衝區大小,默認100m
mapreduce.map.sort.spill.percent   0.8    //環形緩衝區溢出的閾值,默認80%

          容錯相關參數ide

(1) mapreduce.map.maxattempts: 每一個Map Task最大重試次數,一旦重試參數超過該值,則認爲Map Task運行失敗,默認值:4。
(2) mapreduce.reduce.maxattempts: 每一個Reduce Task最大重試次數,一旦重試參數超過該值,則認爲Map Task運行失敗,默認值:4。
(3) mapreduce.map.failures.maxpercent: 當失敗的Map Task失敗比例超過該值爲,整個做業則失敗,默認值爲0. 若是你的應用程序容許丟棄部分輸入數據,則該該值設爲一個大於0的值,好比5,表示若是有低於5%的Map Task失敗(若是一個Map Task重試次數超過mapreduce.map.maxattempts,則認爲這個Map Task失敗,其對應的輸入數據將不會產生任何結果),整個做業扔認爲成功。
(4) mapreduce.reduce.failures.maxpercent: 當失敗的Reduce Task失敗比例超過該值爲,整個做業則失敗,默認值爲0.
(5) mapreduce.task.timeout: Task超時時間,常常須要設置的一個參數,該參數表達的意思爲:若是一個task在必定時間內沒有任何進入,即不會讀取新的數據,也沒有輸出數據,則認爲該task處於block狀態,多是卡住了,也許永遠會卡主,爲了防止由於用戶程序永遠block住不退出,則強制設置了一個該超時時間(單位毫秒),默認是300000。若是你的程序對每條輸入數據的處理時間過長(好比會訪問數據庫,經過網絡拉取數據等),建議將該參數調大,該參數太小常出現的錯誤提示是「AttemptID:attempt_14267829456721_123456_m_000224_0 Timed out after 300 secsContainer killed by the ApplicationMaster.」。

          本地運行mapreduce 做業oop

設置如下幾個參數:
mapreduce.framework.name=local
mapreduce.jobtracker.address=local
fs.defaultFS=local

          效率和穩定性相關參數

(1) mapreduce.map.speculative: 是否爲Map Task打開推測執行機制,默認爲false
(2) mapreduce.reduce.speculative: 是否爲Reduce Task打開推測執行機制,默認爲false
(3) mapreduce.job.user.classpath.first & mapreduce.task.classpath.user.precedence:當同一個class同時出如今用戶jar包和hadoop jar中時,優先使用哪一個jar包中的class,默認爲false,表示優先使用hadoop jar中的class。
(4) mapreduce.input.fileinputformat.split.minsize: FileInputFormat作切片時的最小切片大小,(5)mapreduce.input.fileinputformat.split.maxsize:  FileInputFormat作切片時的最大切片大小
(切片的默認大小就等於blocksize,即 134217728)

         最後寄語,以上是博主本次文章的所有內容,若是你們以爲博主的文章還不錯,請點贊;若是您對博主其它服務器大數據技術或者博主本人感興趣,請關注博主博客,而且歡迎隨時跟博主溝通交流。

相關文章
相關標籤/搜索