[Hadoop in Action] 第4章 編寫MapReduce基礎程序

  • 基於hadoop的專利數據處理示例
  • MapReduce程序框架
  • 用於計數統計的MapReduce基礎程序
  • 支持用腳本語言編寫MapReduce程序的hadoop流式API
  • 用於提高性能的Combiner
 
一、獲取專利數據集
 
獲取網址:http://www.nber.org/patents/
使用數據集:cite75_99.txt和apat63_99.txt
 
二、構建MapReduce程序的基礎模版
 
 

代碼清單 典型hadoop程序模版
 
 1 import java.io.IOException;
 2 import java.util.Iterator;
 3  
 4 import org.apache.hadoop.conf.Configuration;
 5 import org.apache.hadoop.conf.Configured;
 6 import org.apache.hadoop.fs.Path;
 7 import org.apache.hadoop.io.Text;
 8 import org.apache.hadoop.mapred.FileInputFormat;
 9 import org.apache.hadoop.mapred.FileOutputFormat;
10 import org.apache.hadoop.mapred.JobClient;
11 import org.apache.hadoop.mapred.JobConf;
12 import org.apache.hadoop.mapred.KeyValueTextInputFormat;
13 import org.apache.hadoop.mapred.MapReduceBase;
14 import org.apache.hadoop.mapred.Mapper;
15 import org.apache.hadoop.mapred.OutputCollector;
16 import org.apache.hadoop.mapred.Reducer;
17 import org.apache.hadoop.mapred.Reporter;
18 import org.apache.hadoop.mapred.TextOutputFormat;
19 import org.apache.hadoop.util.Tool;
20 import org.apache.hadoop.util.ToolRunner;
21  
22 public class MyJob extends Configured implements Tool {
23  
24     public static class MapClass extends MapReduceBase
25         implements Mapper<Text, Text, Text, Text> {
26  
27         public void map(Text key, Text value,
28                         OutputCollector<Text, Text> output,
29                         Reporter reporter) throws IOException {
30  
31             output.collect(value, key);
32         }
33     }
34  
35     public static class Reduce extends MapReduceBase
36         implements Reducer<Text, Text, Text, Text> {
37  
38         public void reduce(Text key, Iterator<Text> values,
39                            OutputCollector<Text, Text> output,
40                            Reporter reporter) throws IOException {
41  
42             String csv = "";
43             while (values.hasNext()) {
44                 if (csv.length() > 0) csv += ",";
45                 csv += values.next().toString();
46             }
47             output.collect(key, new Text(csv));
48         }
49     }
50  
51     public int run(String[] args) throws Exception {
52         Configuration conf = getConf();
53  
54         JobConf job = new JobConf(conf, MyJob.class);
55  
56         Path in = new Path(args[0]);
57         Path out = new Path(args[1]);
58         FileInputFormat.setInputPaths(job, in);
59         FileOutputFormat.setOutputPath(job, out);
60  
61         job.setJobName("MyJob");
62         job.setMapperClass(MapClass.class);
63         job.setReducerClass(Reduce.class);
64  
65         job.setInputFormat(KeyValueTextInputFormat.class);
66         job.setOutputFormat(TextOutputFormat.class);
67         job.setOutputKeyClass(Text.class);
68         job.setOutputValueClass(Text.class);
69         job.set("key.value.separator.in.input.line", ",");
70  
71         JobClient.runJob(job);
72  
73         return 0;
74     }
75  
76     public static void main(String[] args) throws Exception { 
77         int res = ToolRunner.run(new Configuration(), new MyJob(), args);
78  
79         System.exit(res);
80     }
81 }
 

 
     框架的核心在run()方法中,也稱爲driver。它實例化、配置並傳遞一個JobConf對象命名的做業給JobClient.runJob()以啓動MapReduce做業。JobConf對象將保持做業運行所需的所有配置參數。Driver須要在做業中爲每一個做業定製基本參數,包括輸入路徑、輸出路徑、Mapper類和Reducer類。另外,每一個做業能夠重置默認的做業屬性,例如InputFormat、OutputFormat等,也能夠調用JobConf對象中的set()方法填充人意配置參數。一旦傳遞JobConf對象到JobClient.runJob(),它就被視爲做業的整體規劃,成爲決定這個做業如何運做的藍本。
 
     JobConf對象有許多參數,但咱們並不但願所有的參數都經過編寫driver來設置,能夠把Hadoop安裝時的配置文件做爲一個很好的起點。同時,用戶可能但願在命令行啓動一個做業時傳遞額外的參數來改變做業配置。Driver能夠經過自定義一組命令並自行處理用戶參數,來支持用戶修改其中的一些配置。由於常常須要作這樣的任務,hadoop框架便提供了ToolRunner、Tool和Configured來簡化其實現。
 
     經過使用ToolRunner,MyJob能夠自動支持下表中的選項:
 
選項
描述
-conf <configuration file>  指定一個配置文件
-D <property=value> 給JobConf屬性賦值
-fs <local | namenode:port> 指定一個NameNode,能夠是「local」
-jt <local | jobtracker:port> 指定一個JobTracker
-files <list of files> 指定一個以逗號分隔的文件列表,用於MapReduce做業。這些文件自動地分佈到全部節點,使之可從本地獲取
-libjars <list of jars> 指定一個以逗號分隔的jar文件列表,使之包含在全部任務JVM的classpath中
-archives <list of archives> 指定一個以逗號分隔的存檔文件列表,使之能夠在全部任務節點上打開
 
Mapper類和Reducer類以下所示:
 
public static class MapClass extends MapReduceBase
     implements Mapper<k1, v1, k2, v2> {
     public void map(k1 key, v1 value, OutputCollector<k2, v2> output, Reporter reporter) throws IOException { }
}
 
public static class Reducer extends MapReduceBase
     implements Mapper<k2, v2, k3, v3> {
     public void map(k2 key, Iterator<v2> values, OutputCollector<k3, v3> output, Reporter reporter) throws IOException { }
}
 
     Mapper類的核心操做爲map()方法,Reducer類爲reduce()方法。每一個map()方法的調用分別被賦予一個類型爲k1和v1的鍵/值對。這個鍵/值對由mapper生成,並經過OutputCollector對象的collect()方法來輸出。你須要在map()方法中的合適位置調用:
     output.collect((k2) k, (v2) v);
 
     在Reudcer中reduce()方法的每次調用均被賦予k2類型的鍵,以及v2類型的一組值。注意它必須與mapper中使用的k2和v2類型相同。Reduce()方法可能會循環遍歷v2類型的全部值:
     while (values.hasNext()) {
          v2 v = values.next();
          ...
     }
 
     Reduce()方法還使用OutputCollector來蒐集其鍵/值的輸出,它們的類型爲k3/v3。在reduce()方法中能夠調用:
     output.collect((k3) k, (v3) v);
 
     除了在Mapper和Reducer之間保持k2和v2的類型一致,還須要確保在Mapper和Reducer中使用的鍵值類型與在driver中設置的輸入格式、輸出鍵的類,以及輸出值的類保持一致。
 
 
三、計數
 
能夠修改獲取反向引用索引的程序來輸出技術結果,只須要修改Reducer。若是選擇讓計數結果輸出的類型爲IntWritable,就須要在Reducer代碼中的3個地方進行聲明:
 
public static class Reduce extends MapReduceBase
     implements Reducer<Text, Text, Text, IntWritable> {
     public void reduce(Text key, Iterator<Text> values, OutputCollector<Text, IntWritable> output,
                                    Reporter reporter) throws IOException {
          int count = 0;
          while (values.hasNext()) {
          values.next();
          count++;
          }
          output.collect(key, new IntWritable(count));
     }
}
 
  1. 編寫MapReduce程序的第一步是瞭解數據流;
  2. 基於對數據流的理解,能夠爲輸入、中間結果、輸出的鍵/值對k一、v一、k二、v二、k3和v3設定類型;
  3. 根據數據流河數據類型,很容易可以理解程序代碼。
 

代碼清單 CitationHistogram.java
 
 1 import java.io.IOException;
 2 import java.util.Iterator;
 3  
 4 import org.apache.hadoop.conf.Configuration;
 5 import org.apache.hadoop.conf.Configured;
 6 import org.apache.hadoop.fs.Path;
 7 import org.apache.hadoop.io.IntWritable;
 8 import org.apache.hadoop.io.Text;
 9 import org.apache.hadoop.mapred.FileInputFormat;
10 import org.apache.hadoop.mapred.FileOutputFormat;
11 import org.apache.hadoop.mapred.JobClient;
12 import org.apache.hadoop.mapred.JobConf;
13 import org.apache.hadoop.mapred.KeyValueTextInputFormat;
14 import org.apache.hadoop.mapred.MapReduceBase;
15 import org.apache.hadoop.mapred.Mapper;
16 import org.apache.hadoop.mapred.OutputCollector;
17 import org.apache.hadoop.mapred.Reducer;
18 import org.apache.hadoop.mapred.Reporter;
19 import org.apache.hadoop.mapred.TextOutputFormat;
20 import org.apache.hadoop.util.Tool;
21 import org.apache.hadoop.util.ToolRunner;
22  
23 public class CitationHistogram extends Configured implements Tool {
24  
25     public static class MapClass extends MapReduceBase
26         implements Mapper<Text, Text, IntWritable, IntWritable> {
27  
28         private final static IntWritable uno = new IntWritable(1);
29         private IntWritable citationCount = new IntWritable();
30  
31         public void map(Text key, Text value,
32                         OutputCollector<IntWritable, IntWritable> output,
33                         Reporter reporter) throws IOException {
34  
35             citationCount.set(Integer.parseInt(value.toString()));
36             output.collect(citationCount, uno);
37         }
38     }
39  
40     public static class Reduce extends MapReduceBase
41         implements Reducer<IntWritable,IntWritable,IntWritable,IntWritable>
42     {
43  
44         public void reduce(IntWritable key, Iterator<IntWritable> values,
45                            OutputCollector<IntWritable, IntWritable>output,
46                            Reporter reporter) throws IOException {
47  
48             int count = 0;
49             while (values.hasNext()) {
50                 count += values.next().get();
51             }
52             output.collect(key, new IntWritable(count));
53         }
54     }
55  
56     public int run(String[] args) throws Exception {
57         Configuration conf = getConf();
58  
59         JobConf job = new JobConf(conf, CitationHistogram.class);
60  
61         Path in = new Path(args[0]);
62         Path out = new Path(args[1]);
63         FileInputFormat.setInputPaths(job, in);
64         FileOutputFormat.setOutputPath(job, out);
65  
66         job.setJobName("CitationHistogram");
67         job.setMapperClass(MapClass.class);
68         job.setReducerClass(Reduce.class);
69  
70         job.setInputFormat(KeyValueTextInputFormat.class);
71         job.setOutputFormat(TextOutputFormat.class);
72         job.setOutputKeyClass(IntWritable.class);
73         job.setOutputValueClass(IntWritable.class);
74  
75         JobClient.runJob(job);
76  
77         return 0;
78     }
79  
80     public static void main(String[] args) throws Exception { 
81         int res = ToolRunner.run(new Configuration(), 
82                                  new CitationHistogram(), 
83                                  args);
84  
85         System.exit(res);
86     }
87 }
 

 
四、適應Hadoop API的改變
 
     (1)首先值得注意的是,在新的API中org.apache.hadoop.mapred的許多類都被移走了。多數被放入org.apache.hadoop.mapreduce,並且類庫都放在org.apache.hadoop.mapreduce.lib的一個包中。當轉爲使用新API時,org.apache.hadoop.mapred下全部類的import聲明就不存在了,它們都被棄用。
 
     (2)新API中最有益的變化是引入了上下文對象context。最直接的影響在於替換了map()和reduce()方法中使用的OutputCollector和Reporter對象。深遠的影響是統一了應用代碼和MapReduce框架之間的通訊,並固定了Mapper和Reduce的API,使得添加新功能時不會改變基本方法簽名。
 
     (3)新的map()和reduce()方法分別被包含在新的抽象類Mapper和Reducer中。它們取代了原始API中的Mapper和Reducer接口。新的抽象類也替換了MapReduceBase類,使之被棄用。
 
     (4)新的map()和Reduce()方法多了一兩處細微的改變。它們能夠拋出InterruptedException而非單一的IOException。並且,reduce()方法再也不以Iterator而以Iterable來接受一個值的列表,這樣更容易使用Java的foreach語義來實現迭代。
 
原始API中的簽名
 
public static class MapClass extends MapReduceBase
     implements Mapper<k1, v1, k2, v2> {
     public void map(k1 key, v1 value, OutputCollector<k2, v2> output, Reporter reporter) throws IOException { }
}
 
public static class Reducer extends MapReduceBase
     implements Mapper<k2, v2, k3, v3> {
     public void map(k2 key, Iterator<v2> values, OutputCollector<k3, v3> output, Reporter reporter) throws IOException { }
}
 
新API必定程度上對它們作了簡化
 
public static class MapClass extends Mapper<k1, v1, k2, v2> {
     public void map(k1 key, v1 value, Context context) throws IOException, InterruptedException { }
}
 
public static class Reduce extends Reducer<k2, v2, k3, v3> {
     public void map(k2 key, Iterable<v2> value, Context context) throws IOException, InterruptedException { }
}
 
     (5)還須要改變driver中的一些內容來支持新的API。在新的API中JobConf和JobClient被替換了。它們的功能已經被放入Configuration類和一個新的類Job中。Configuration類純粹爲了配置做業而設,而Job類負責定義和控制一個做業的執行。做業的構造和提交執行如今放在Job中。
 
原API
 
JobConf job = new JobConf(conf, MyJob.calss);
job.setJobName(「MyJob");
 
JobClient.runJob(job);
 
新API
 
Job job = new Job(conf, 「MyJob」);
job.setJarByClass(MyJob.class);
 
System.exit(job.waitForCompletion(true)?0:1);
 

代碼清單 基於版本0.20新API重寫的hadoop基礎程序模版
 
 1 import java.io.IOException;
 2 import java.util.Iterable;
 3  
 4 import org.apache.hadoop.conf.Configuration;
 5 import org.apache.hadoop.conf.Configured;
 6 import org.apache.hadoop.fs.Path;
 7 import org.apache.hadoop.io.Text;
 8 import org.apache.hadoop.io.LongWritable;
 9 import org.apache.hadoop.mapreduce.Job;
10 import org.apache.hadoop.mapreduce.Mapper;
11 import org.apache.hadoop.mapreduce.Reducer;
12 import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
13 import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
14 import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
15 import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;
16 import org.apache.hadoop.util.Tool;
17 import org.apache.hadoop.util.ToolRunner;
18  
19 public class MyJob extends Configured implements Tool {
20  
21     public static class MapClass extends Mapper<LongWritable, Text, Text, Text> {
22  
23         public void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
24  
25             String[] citation = value.toString().split(",");
26             context.write(new Text(citation[1]), new Text(citation[0]));
27         }
28     }
29  
30     public static class Reduce extends Reducer<Text, Text, Text, Text> {
31  
32         public void reduce(Text key, Iterable<Text> values, Context context) throws IOException, InterruptedException {
33  
34             String csv = "";
35             for (Text val:values) {    //Iterable類型容許foreach循環
36                 if (csv.length() > 0) csv += ",";
37                 csv += val.toString();
38             }
39  
40             context.write(key, new Text(csv));
41         }
42     }
43  
44     public int run(String[] args) throws Exception {
45         Configuration conf = getConf();
46  
47         Job job = new Job(conf, "MyJob");
48         job.setJarByClass(MyJob.class);
49  
50         Path in = new Path(args[0]);
51         Path out = new Path(args[1]);
52         FileInputFormat.setInputPaths(job, in);
53         FileOutputFormat.setOutputPath(job, out);
54  
55         job.setMapperClass(MapClass.class);
56         job.setReducerClass(Reduce.class);
57  
58         job.setInputFormatClass(TextInputFormat.class);    //兼容的InputFormat類
59         job.setOutputFormatClass(TextOutputFormat.class);
60         job.setOutputKeyClass(Text.class);
61         job.setOutputValueClass(Text.class);
62  
63         System.exit(job.waitForCompletion(true)?0:1);
64  
65         return 0;
66     }
67  
68     public static void main(String[] args) throws Exception { 
69         int res = ToolRunner.run(new Configuration(), new MyJob(), args);
70  
71         System.exit(res);
72     }
73 }
 

 
五、Hadoop的Streaming
 
  1. 經過Unix命令使用Streaming
  2. 經過腳本使用Streaming
  3. 用Streaming處理鍵/值對
  4. 經過Aggregate包使用Streaming
 
六、使用combiner提高性能
 
hadoop經過擴展MapReduce框架,在mapper和reducer之間增長了一個combiner。你能夠將combiner視爲reducer的助手。它致力於減小mapper的輸出以下降網絡和reducer上的壓力。爲了使combiner工做,它在數據的轉換上必須與reducer等價。
 
 
 [轉載請註明] http://www.cnblogs.com/zhengrunjian/ 
相關文章
相關標籤/搜索