[Hadoop in Action] 第6章 編程實踐

  • Hadoop程序開發的獨門絕技
  • 在本地,僞分佈和全分佈模式下調試程序
  • 程序輸出的完整性檢查和迴歸測試
  • 日誌和監控
  • 性能調優
 
一、開發MapReduce程序
 
[本地模式]
 
     本地模式下的hadoop將全部的運行都放在一個單獨的Java虛擬機中完成,而且使用的是本地文件系統(非HDFS)。在本地模式中運行的程序將全部的日誌和錯誤信息都輸出到控制檯,最後它會給出所處理數據的總量。
 
對程序進行正確性檢查:
  • 完整性檢查
  • 迴歸測試
  • 考慮使用long而非int
 
 
[僞分佈模式]
 
本地模式不具有生產型hadoop集羣的分佈式特徵。一些bug在運行本地模式時是不會出現的。如今是經過日誌文件和web界面遠程監視它,這些工具和之後在監控生產集羣時用的工具是相同的。
 
二、生產集羣上的監視和調試
 
[計數器]
 

代碼清單 使用計數器統計缺失值個數的MapClass
 
  1 import java.io.IOException;
  2 import java.util.regex.PatternSyntaxException;
  3 import java.util.Iterator;
  4  
  5 import org.apache.hadoop.conf.Configuration;
  6 import org.apache.hadoop.conf.Configured;
  7 import org.apache.hadoop.fs.Path;
  8 import org.apache.hadoop.io.IntWritable;
  9 import org.apache.hadoop.io.LongWritable;
 10 import org.apache.hadoop.io.DoubleWritable;
 11 import org.apache.hadoop.io.Text;
 12 import org.apache.hadoop.mapred.*;
 13 import org.apache.hadoop.util.Tool;
 14 import org.apache.hadoop.util.ToolRunner;
 15  
 16  
 17 public class AveragingWithCombiner extends Configured implements Tool {
 18  
 19     public static class MapClass extends MapReduceBase
 20         implements Mapper<LongWritable, Text, Text, Text> {
 21  
 22         static enum ClaimsCounters { MISSING, QUOTED };
 23  
 24         public void map(LongWritable key, Text value,
 25                         OutputCollector<Text, Text> output,
 26                         Reporter reporter) throws IOException {
 27  
 28             String fields[] = value.toString().split(",", -20);
 29             String country = fields[4];
 30             String numClaims = fields[8];
 31             if (numClaims.length() == 0) {
 32                 reporter.incrCounter(ClaimsCounters.MISSING, 1);
 33             } else if (numClaims.startsWith("\"")) {
 34                 reporter.incrCounter(ClaimsCounters.QUOTED, 1);
 35             } else {
 36                 output.collect(new Text(country), new Text(numClaims + ",1"));
 37             }
 38  
 39         }
 40     }
 41  
 42     public static class Combine extends MapReduceBase
 43         implements Reducer<Text, Text, Text, Text> {
 44  
 45         public void reduce(Text key, Iterator<Text> values,
 46                            OutputCollector<Text, Text> output,
 47                            Reporter reporter) throws IOException {
 48  
 49             double sum = 0;
 50             int count = 0;
 51             while (values.hasNext()) {
 52                 String fields[] = values.next().toString().split(",");
 53                 sum += Double.parseDouble(fields[0]);
 54                 count += Integer.parseInt(fields[1]);
 55             }
 56             output.collect(key, new Text(sum + "," + count));
 57         }
 58     }
 59  
 60     public static class Reduce extends MapReduceBase
 61         implements Reducer<Text, Text, Text, DoubleWritable> {
 62  
 63         public void reduce(Text key, Iterator<Text> values,
 64                            OutputCollector<Text, DoubleWritable> output,
 65                            Reporter reporter) throws IOException {
 66  
 67             double sum = 0;
 68             int count = 0;
 69             while (values.hasNext()) {
 70                 String fields[] = values.next().toString().split(",");
 71                 sum += Double.parseDouble(fields[0]);
 72                 count += Integer.parseInt(fields[1]);
 73             }
 74             output.collect(key, new DoubleWritable(sum/count));
 75         }
 76     }
 77  
 78     public int run(String[] args) throws Exception {
 79         // Configuration processed by ToolRunner
 80         Configuration conf = getConf();
 81  
 82         // Create a JobConf using the processed conf
 83         JobConf job = new JobConf(conf, AveragingWithCombiner.class);
 84  
 85         // Process custom command-line options
 86         Path in = new Path(args[0]);
 87         Path out = new Path(args[1]);
 88         FileInputFormat.setInputPaths(job, in);
 89         FileOutputFormat.setOutputPath(job, out);
 90  
 91         // Specify various job-specific parameters     
 92         job.setJobName("AveragingWithCombiner");
 93         job.setMapperClass(MapClass.class);
 94         job.setCombinerClass(Combine.class);
 95         job.setReducerClass(Reduce.class);
 96  
 97         job.setInputFormat(TextInputFormat.class);
 98         job.setOutputFormat(TextOutputFormat.class);
 99         job.setOutputKeyClass(Text.class);
100         job.setOutputValueClass(Text.class);
101  
102         // Submit the job, then poll for progress until the job is complete
103         JobClient.runJob(job);
104  
105         return 0;
106     }
107  
108     public static void main(String[] args) throws Exception {
109         // Let ToolRunner handle generic command-line options 
110         int res = ToolRunner.run(new Configuration(), new AveragingWithCombiner(), args);
111  
112         System.exit(res);
113     }
114 }
 

 
[跳過壞記錄]
 
(1)在Java中配置記錄跳讀
 
     hadoop從0.19版本起就已經支持skipping特徵了,但默認狀態是關閉的。在Java中,這個特徵由類SkipBadRecords來控制,所有由靜態方法組成。做業的driver須要調用以下的一個或所有方法:
     public static void setMapperMaxSkipRecords(Configuration conf, long maxSkipRecs)
     public static void setReducerMaxSkipGroups(Configuration conf, long maxSkipRecs)
來分別爲map任務和reduce任務打開記錄跳讀的設置。若是最大的跳讀區域大小被設置爲0(默認),那麼記錄跳讀就處於關閉狀態。可使用JobConf.setMaxMapAttempts()和JobConf.setMaxReduceAttempts()方法,或者設置等效的屬性mapred.map.max.attempts和mapred.reduce.max.attempts來作到這點。
 
     若是skipping被啓用,hadoop在任務失效兩次後就進入skipping模式。你能夠在SkipBadRecords的setAttemptsToStartSkipping()方法中設置觸發skipping模式的任務失效次數:
     public static void setAttemptsToStartSkipping(Configuration conf, int attemptsToStartSkipping)
hadoop會把被跳過的記錄寫入HDFS以供之後分析,它們以序列文件的形式寫入_log/skip目錄,能夠用hadoop fs -text <filepath>解壓並讀取。你可使用方法SkipBadRecords.setSkipOutputPath(JobConf conf, Path path)修改當前用於存放被跳過記錄的目錄_log/skip,若是path被設爲空,或者一個值爲「none」的字符串path,hadoop就會放棄記錄被跳過的記錄。
 
(2)在Java以外配置記錄跳讀
 
SkipBadRecords方法
JobConf屬性
setAttemptsToStartSkipping() mapred.skip.attempts.to.start.skipping
setMapperMaxSkipRecords() mapred.skip.map.max.skip.records
setReducerMaxSkipGroups() mapred.skip.reduce.max.skip.groups
setSkipOutputPath() mapred.skip.out.dir
setAutoIncrMapperProcCount() mapred.skip.map.auto.incr.proc.count
setAutoIncrReducerProcCount() mapred.skip.reduce.auto.incr.proc.count
 
 
三、性能調優
 
(1)經過combiner來減小網絡流量
     Combiner能夠減小在map和reduce階段之間洗牌的數據量,較低的網絡流量縮短了執行時間。
 
(2)減小輸入數據量
 
(3)使用壓縮
     hadoop內置支持壓縮與解壓。啓用對map輸出的壓縮涉及對兩個屬性的配置:
 
屬性
描述
mapred.compress.map.output Boolean屬性,表示mapper的輸出是否被壓縮
mapred.map.output.compression.codec Class屬性,表示哪一種CompressionCodec被用於壓縮mapper的輸出
 
conf.setBoolean(「mapred.compress.map.output」, true);
conf.setClass(「mapred.map.output.compression.codec」, GzipCodec.calss, CompressionCodec.class);
也能夠直接使用JobConf中的便捷方法setCompressionMapOutput()和setMapOutputCompressorClass()。
 
(4)重用JVM
     hadoop從版本0.19.0開始,容許相同做業的多個任務之間重用JVM。所以,啓動開銷被平攤到多個任務中。一個新屬性(mapred.job.reuse.jvm.num.tasks)指定了一個JVM能夠運行的最大任務數。它默認值爲1,此時JVM不能被重用。你能夠增大該屬性值來啓用JVM重用。若是將其設置爲-1,則意味着在可重複使用JVM的任務數量上沒有限制。在JobConf對象中有一個便捷方法,setNumTasksToExecutePerJvm(int),能夠用它很方便地設置做業的屬性。
 
(5)根據猜想執行來運行
     啓動和禁止猜想執行的配置屬性:
 
屬性
描述
mapred.map.tasks.speculative.execution 布爾屬性,表示是否運行map任務猜想執行
mapred.reduce.tasks.speculative.execution 布爾屬性,表示是否運行reduce任務猜想執行
 
(6)代碼重構與算法重寫
     Streaming程序重寫爲hadoop的Java程序
 
 
 [轉載請註明] http://www.cnblogs.com/zhengrunjian/ 
相關文章
相關標籤/搜索