如何將一個反向引用索引的程序的Reducer輸出的類型改成IntWritableapache
public static class Reduce extends MapReduceBaseapp
implements Reducer<Text, Text, Text, IntWritable> {框架
public void reduce(Text key, Iterator<Text> values,oop
OutputCollector<Text, IntWritable> output,性能
Reporter reporter) throws IOException {spa
int count = 0;orm
while (values.hasNext()) {對象
values.next();索引
count++;接口
}
output.collect(key, new IntWritable(count));
}
}
計算不一樣引用次數專利的數目
之因此選擇K2、V2、K3和V3的數據爲IntWritable類型,是由於它們的數據必然爲整數,而且使用IntWritable比Text更高效
public class CitationHistogram extends Configured implements Tool {
public static class MapClass extends MapReduceBase
implements Mapper<Text, Text, IntWritable, IntWritable> {
private final static IntWritable uno = new IntWritable(1);
private IntWritable citationCount = new IntWritable();
public void map(Text key, Text, value,
OutputCollector<IntWritable, IntWritable> output,
Reporter reporter) throws IOException {
citationCount.set(Integer.parseInt(value.toString()));
output.collect(citationCount, uno);
}
}
public static class Reduce extends MapReduceBase
implements Reducer<IntWritable, IntWritable, IntWritable, IntWritable> {
public void reduce(IntWritable key, Iterator<IntWritable> values,
OutputCollector<IntWritable, IntWritable > output,
Reporter reporter) throws IOException {
int count = 0;
while (values.hasNext()) {
count += values.next().get();
}
output.collect(key, new IntWritable(count));
}
}
public int run(String[] args) throws Exception {
Configuration conf = getConf();
JobConf job = new JobConf(conf, CitationHistogram.class);
Path in = new Path(args[0]);
Path out = new Path(args[1]);
FileInputFormat.setInputPaths(job, in);
FileOutputFormat.setOutputPath(job, out);
job.setJobName("CitationHistogram");
job.setMapperClass(MapClass.class);
job.setReducerClass(Reduce.class);
job.setInputFomrat(KeyValueTextInputFormat.class);
job.setOutputFormat(TextOutputFormat.class);
job.setOutputKeyClass(IntWritable.class);
job.setOutputValueClass(IntWritable.class);
JobClient.runJob(job);
return 0;
}
public static void main(String[] args) throws Exception {
int res = ToolRunner.run(new Configuration(), new CitationHistogram(), args);
system.exit(res);
}
}
須要說明的幾點:
1、類名爲CitationHistogram
2、輸入格式爲KeyValueTextInputFormat,輸出格式爲TextOutputFormat
3、KeyValueTextInputFormat默認以"<tab>"製表符進行分割,可使用job.set("key.value.separator.in.input.line", ","),修改分隔符爲",",其餘須要自行修改
4、MapClass類使用了兩個私有成員變量uno 和citationCount,爲何要這樣定義呢?
出於對效率的考慮citationCount和uno的定義被放在類中而不是方法中,有多少記錄,map()方法就會被調用多少次 (對每一個JVM而言,就是一個分片中的記錄數)。減小在map()方法中生成的對象個數能夠提升性能,並減小垃圾回收。因爲citationCount和uno被傳遞給output.collect(),咱們依賴output.collect()方法的約定不會修改這兩個對象。
5、Reducer計算每一個key對應的值的總數,這彷佛並不高效,由於咱們知道全部的值都是1,爲何咱們還要去加它們呢?緣由在於,它會讓咱們在之後能更容易地增長一個combiner來提升性能。
Hadoop API是改了又改,變了又變
1、0.20版本被視爲舊API和將來穩定API之間的過渡
2、爲了保持向後兼容,版本0.20以及以後的版本支持下一代API並將舊API標記爲棄用
並不推薦立刻轉向新的API
1、0.20版本,許多Hadoop自身的類庫尚未基於新的API重寫,若是MapReduce代碼使用0.20中的新API,這些類將沒法被使用。
2、在0.20版本以前,0.18.3仍被不少人認爲是最完備與最穩定的版本
3、雖然在新版本的API中有所改變,可是改變的僅僅影響基礎的MapReduce模板,咱們能夠基於新API所作的改變,重寫這個模板以備未來使用。
你可能會奇怪爲何不提0.19?
通常的意見認爲它的初始版本問題比較多,有許多bug,一些副版本試圖解決這個問題,但社區但願直接跳到版本0.20
新版本的API作了哪些改動?
1、在新的API中org.apahce.hadoop.mapred的許多類都被移走了,多數被放入org.apache.hadoop.mapreduce中,並且類庫都放在org.apache.hadoop.mapreduce.lib的一個包裏。當轉爲使用新API時,org.apache.hadoop.mapred下全部的類的import聲明(或者徹底引用)就不存在了,它們都被棄用
2、新API中最有益的變化是引入了上下文對象context,最直接的影響在於替換了map()和reduce()方法中使用的OutputCollector和Reporter對象。如今將經過調用Context.writer()而不是OutputCollector.collect()輸出鍵/值對。深遠的影響是統一了應用代碼和MapReduce框架之間的通訊,並固定了Mapper和Reducer的API,使得添加新功能時不會改變基本方法簽名,新的功能僅僅時在context對象上增長的方法,在引入這些方法以前寫的程序不會感知到這些新方法,它們能夠在更新的版本中繼續編譯與運行
3、新的map()和reduce()方法分別被包含在新的抽象Mapper和Reducer中,它們取代了原始API中的Mapper和Reducer接口(org.apache.hadoop.mapred.Mapper和org.apache.hadoop.mapred.Reducer)。新的抽象類也替換了MapReudceBase類,使之被棄用
4、新的map()和reduce()方法多了一兩處細微的改變,它們能夠拋出InterruptedException而非單一的IOException,並且,reduce()方法再也不以Iterator而以Iterable來接受一個值的列表,這樣更容易使用Java的foreach語義來實現迭代。
回顧一下原始的API中的簽名
public static class MapClass extends MapReduceBase
implements Mapper<K1, V1, K2, V2> {
public void map(K1 key, V1 value,
OutputCollector<K2, V2> output,
Reporter reporter) throws IOException { }
}
public static class Reduce extends MapReduceBase
implements Reducer<K2, V2, K3, V3> {
public void reduce(K2 key, Iterator<V2> values,
OutputCollector<K3, V3> output,
Reporter reporter) throws IOException { }
}
新的API中的簽名,體會與上面簽名的不一樣
public static class MapClass extends Mapper<K1, V2, K2, V2> {
public void map(K1 key, V1 value, Context context)
throws IOException, InterruptedException { }
}
public static class Reduce extends Reducer<K2, V2, K3, V3> {
public void reduce(K2 key, Iterable<V2> values, Context context)
throws IOException, InterruptedException { }
}
你還須要改變driver中的一些內容來支持新的API
1、在新的API中JobConf和JobClient被替換了,它們的功能已經被放入Configuration類(它本來是JobConf的父類)和一個新的類Job中
2、Configuration類純粹爲了配置做業而設,而Job類負責定義和控制一個做業的執行
3、好比,setOutputKeyClass()和setOutputValueClass()等方法被從JobConf轉移到了Job
4、做業的構造和提交執行如今放在Job中,本來須要使用JobConf來構造一個做業:
JobConf job = new JobConf(conf, MyJob.class);
job.setJobName("MyJob");
而如今可經過Job類完成:
Job job = new Job(conf, "MyJob");
job.setJarByClass(MyJob.class);
之前是經過JobClient提交做業去執行:
JobClient.runJob(job);
如今一樣經過Job類來完成
System.exit(job.waitForCompletion(true) ? 0 : 1);
基於版本0.20及其以上的新API重寫的Hadoop基礎程序模板(Hadoop 1.X也適用)
public class MyJob extends Configured implements Tool {
public static class MapClass extends Mapper<LongWritable, Text, Text, Text> {
public void map(LongWirtable key, Text value, Context context)
throws IOException, InterruptedException {
String[] citation = value.toString().split(",");
context.write(new Text(citation[1]), new Text(citation[0]));
}
}
public static class Reduce extends Reducer<Text, Text, Text, Text> {
// Iterable類型容許foreach循環
public void reduce(Text key, Iterable<Text> values, Context context)
throws IOException, InterruptedException {
for (Text val : values) {
if (csv.length() > 0) csv += ",";
csv += val.toString();
}
context.write(key, new Text(csv));
}
}
public int run(String[] args) throws Exception {
Configuration conf = getConf();
Job job = new Job(conf, "MyJob");
job.setJarByClass(MyJob.class);
Path in = new Path(args[0]);
Path out = new Path(args[1]);
FileInputFormat.setInputPaths(job, in);
FileOutputFormat.setOutputPath(job, out);
job.setMapperClass(MapClass.class);
job.setReducerClass(Reduce,.class);
// 兼容InputFormat類
job.setInputFormatClass(TextInputFormat.class);
job.setOutputFormatClass(TextOutputFormat.class);
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(Text.class);
System.exit(job.waitForCompletion(true)?0:1);
return 0;
}
public static void main(String[] args) throws Exception {
int res = ToolRunner.run(new Configuration(), new MyJob(), args);
System.exit(res);
}
}
1、這段代碼實現了反向索引功能
2、KeyValueTextInputFormat類未被移植到版本0.20的新API中,重寫這個模板咱們不得不使用TextInputFormat