上一篇文章分析了自定義inputFormat(小文件合併)的實現,在此博主將繼續Mapreduce的其餘補充(計數器、多job串聯、參數優化等)內容的分享。java
1、計數器應用node
在實際生產代碼中,經常須要將數據處理過程當中遇到的不合規數據行進行全局計數,相似這種需求能夠藉助mapreduce框架中提供的全局計數器來實現。示例代碼以下:數據庫
public class MultiOutputs { //經過枚舉形式定義自定義計數器 enum MyCounter{MALFORORMED,NORMAL} static class CommaMapper extends Mapper<LongWritable, Text, Text, LongWritable> { @Override protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException { String[] words = value.toString().split(","); for (String word : words) { context.write(new Text(word), new LongWritable(1)); } //對枚舉定義的自定義計數器加1 context.getCounter(MyCounter.MALFORORMED).increment(1); //經過動態設置自定義計數器加1 context.getCounter("counterGroupa", "countera").increment(1); } }
2、多job串聯性能優化
以前咱們分享的不少mr程序都是單個mapreduce程序來實現;然而,在實際生產中,一個稍複雜點的處理邏輯每每須要多個mapreduce程序串聯處理,多job的串聯能夠藉助mapreduce框架的JobControl實現。服務器
示例代碼:網絡
ControlledJob cJob1 = new ControlledJob(job1.getConfiguration()); ControlledJob cJob2 = new ControlledJob(job2.getConfiguration()); ControlledJob cJob3 = new ControlledJob(job3.getConfiguration()); cJob1.setJob(job1); cJob2.setJob(job2); cJob3.setJob(job3); // 設置做業依賴關係 cJob2.addDependingJob(cJob1); cJob3.addDependingJob(cJob2); JobControl jobControl = new JobControl("RecommendationJob"); jobControl.addJob(cJob1); jobControl.addJob(cJob2); jobControl.addJob(cJob3); // 新建一個線程來運行已加入JobControl中的做業,開始進程並等待結束 Thread jobControlThread = new Thread(jobControl); jobControlThread.start(); while (!jobControl.allFinished()) { Thread.sleep(500); } jobControl.stop(); return 0;
3、mapreduce參數優化(MapReduce重要配置參數)app
資源相關參數:框架
//如下參數是在用戶本身的mr應用程序中配置就能夠生效 (1) mapreduce.map.memory.mb: 一個Map Task可以使用的資源上限(單位:MB),默認爲1024。若是Map Task實際使用的資源量超過該值,則會被強制殺死。 (2) mapreduce.reduce.memory.mb: 一個Reduce Task可以使用的資源上限(單位:MB),默認爲1024。若是Reduce Task實際使用的資源量超過該值,則會被強制殺死。 (3) mapreduce.map.java.opts: Map Task的JVM參數,你能夠在此配置默認的java heap size等參數, e.g. 「-Xmx1024m -verbose:gc -Xloggc:/tmp/@taskid@.gc」 (@taskid@會被Hadoop框架自動換爲相應的taskid), 默認值: 「」 (4) mapreduce.reduce.java.opts: Reduce Task的JVM參數,你能夠在此配置默認的java heap size等參數, e.g. 「-Xmx1024m -verbose:gc -Xloggc:/tmp/@taskid@.gc」, 默認值: 「」 (5) mapreduce.map.cpu.vcores: 每一個Map task可以使用的最多cpu core數目, 默認值: 1 (6) mapreduce.reduce.cpu.vcores: 每一個Reduce task可以使用的最多cpu core數目, 默認值: 1 //應該在yarn啓動以前就配置在服務器的配置文件中才能生效 (7) yarn.scheduler.minimum-allocation-mb 1024 給應用程序container分配的最小內存 (8) yarn.scheduler.maximum-allocation-mb 8192 給應用程序container分配的最大內存 (9) yarn.scheduler.minimum-allocation-vcores 1 (10)yarn.scheduler.maximum-allocation-vcores 32 (11)yarn.nodemanager.resource.memory-mb 8192 //shuffle性能優化的關鍵參數,應在yarn啓動以前就配置好 (12)mapreduce.task.io.sort.mb 100 //shuffle的環形緩衝區大小,默認100m mapreduce.map.sort.spill.percent 0.8 //環形緩衝區溢出的閾值,默認80%
容錯相關參數ide
(1) mapreduce.map.maxattempts: 每一個Map Task最大重試次數,一旦重試參數超過該值,則認爲Map Task運行失敗,默認值:4。 (2) mapreduce.reduce.maxattempts: 每一個Reduce Task最大重試次數,一旦重試參數超過該值,則認爲Map Task運行失敗,默認值:4。 (3) mapreduce.map.failures.maxpercent: 當失敗的Map Task失敗比例超過該值爲,整個做業則失敗,默認值爲0. 若是你的應用程序容許丟棄部分輸入數據,則該該值設爲一個大於0的值,好比5,表示若是有低於5%的Map Task失敗(若是一個Map Task重試次數超過mapreduce.map.maxattempts,則認爲這個Map Task失敗,其對應的輸入數據將不會產生任何結果),整個做業扔認爲成功。 (4) mapreduce.reduce.failures.maxpercent: 當失敗的Reduce Task失敗比例超過該值爲,整個做業則失敗,默認值爲0. (5) mapreduce.task.timeout: Task超時時間,常常須要設置的一個參數,該參數表達的意思爲:若是一個task在必定時間內沒有任何進入,即不會讀取新的數據,也沒有輸出數據,則認爲該task處於block狀態,多是卡住了,也許永遠會卡主,爲了防止由於用戶程序永遠block住不退出,則強制設置了一個該超時時間(單位毫秒),默認是300000。若是你的程序對每條輸入數據的處理時間過長(好比會訪問數據庫,經過網絡拉取數據等),建議將該參數調大,該參數太小常出現的錯誤提示是「AttemptID:attempt_14267829456721_123456_m_000224_0 Timed out after 300 secsContainer killed by the ApplicationMaster.」。
本地運行mapreduce 做業oop
設置如下幾個參數: mapreduce.framework.name=local mapreduce.jobtracker.address=local fs.defaultFS=local
效率和穩定性相關參數
(1) mapreduce.map.speculative: 是否爲Map Task打開推測執行機制,默認爲false (2) mapreduce.reduce.speculative: 是否爲Reduce Task打開推測執行機制,默認爲false (3) mapreduce.job.user.classpath.first & mapreduce.task.classpath.user.precedence:當同一個class同時出如今用戶jar包和hadoop jar中時,優先使用哪一個jar包中的class,默認爲false,表示優先使用hadoop jar中的class。 (4) mapreduce.input.fileinputformat.split.minsize: FileInputFormat作切片時的最小切片大小,(5)mapreduce.input.fileinputformat.split.maxsize: FileInputFormat作切片時的最大切片大小 (切片的默認大小就等於blocksize,即 134217728)
最後寄語,以上是博主本次文章的所有內容,若是你們以爲博主的文章還不錯,請點贊;若是您對博主其它服務器大數據技術或者博主本人感興趣,請關注博主博客,而且歡迎隨時跟博主溝通交流。