Hadoop實戰讀書筆記(9)

如何將一個反向引用索引的程序的Reducer輸出的類型改成IntWritableapache

public static class Reduce extends MapReduceBaseapp

       implements Reducer<Text, Text, Text, IntWritable> {框架

       public void reduce(Text key, Iterator<Text> values,oop

                                   OutputCollector<Text, IntWritable> output,性能

                                   Reporter reporter) throws IOException {spa

              int count = 0;orm

              while (values.hasNext()) {對象

                     values.next();索引

                     count++;接口

              }

              output.collect(key, new IntWritable(count));

       }

}

 

計算不一樣引用次數專利的數目

之因此選擇K2V2K3V3的數據爲IntWritable類型,是由於它們的數據必然爲整數,而且使用IntWritableText更高效

public class CitationHistogram extends Configured implements Tool {

       public static class MapClass extends MapReduceBase

              implements Mapper<Text, Text, IntWritable, IntWritable> {

              private final static IntWritable uno = new IntWritable(1);

              private IntWritable citationCount = new IntWritable();

              public void map(Text key, Text, value,

                                   OutputCollector<IntWritable, IntWritable> output,

                                   Reporter reporter) throws IOException {

                     citationCount.set(Integer.parseInt(value.toString()));

                     output.collect(citationCount, uno);

              }

       }

       public static class Reduce extends MapReduceBase

              implements Reducer<IntWritable, IntWritable, IntWritable, IntWritable> {

              public void reduce(IntWritable key, Iterator<IntWritable> values,

                                          OutputCollector<IntWritable, IntWritable > output,

                                          Reporter reporter) throws IOException {

                     int count = 0;

                     while (values.hasNext()) {

                            count += values.next().get();

                     }

                     output.collect(key, new IntWritable(count));

              }

       }

       public int run(String[] args) throws Exception {

              Configuration conf = getConf();

              JobConf job = new JobConf(conf, CitationHistogram.class);

              Path in = new Path(args[0]);

              Path out = new Path(args[1]);

              FileInputFormat.setInputPaths(job, in);

              FileOutputFormat.setOutputPath(job, out);

              job.setJobName("CitationHistogram");

              job.setMapperClass(MapClass.class);

              job.setReducerClass(Reduce.class);

              job.setInputFomrat(KeyValueTextInputFormat.class);

              job.setOutputFormat(TextOutputFormat.class);

              job.setOutputKeyClass(IntWritable.class);

              job.setOutputValueClass(IntWritable.class);

              JobClient.runJob(job);

              return 0;

       }

       public static void main(String[] args) throws Exception {

              int res = ToolRunner.run(new Configuration(), new CitationHistogram(), args);

              system.exit(res);

       }

}

須要說明的幾點:

1、類名爲CitationHistogram

2、輸入格式爲KeyValueTextInputFormat,輸出格式爲TextOutputFormat

3KeyValueTextInputFormat默認以"<tab>"製表符進行分割,可使用job.set("key.value.separator.in.input.line", ","),修改分隔符爲",",其餘須要自行修改

4MapClass類使用了兩個私有成員變量uno citationCount,爲何要這樣定義呢?

出於對效率的考慮citationCountuno的定義被放在類中而不是方法中,有多少記錄,map()方法就會被調用多少次 (對每一個JVM而言,就是一個分片中的記錄數)。減小在map()方法中生成的對象個數能夠提升性能,並減小垃圾回收。因爲citationCountuno被傳遞給output.collect(),咱們依賴output.collect()方法的約定不會修改這兩個對象。

5Reducer計算每一個key對應的值的總數,這彷佛並不高效,由於咱們知道全部的值都是1,爲何咱們還要去加它們呢?緣由在於,它會讓咱們在之後能更容易地增長一個combiner來提升性能。

 

Hadoop API是改了又改,變了又變

10.20版本被視爲舊API和將來穩定API之間的過渡

2、爲了保持向後兼容,版本0.20以及以後的版本支持下一代API並將舊API標記爲棄用

 

並不推薦立刻轉向新的API

10.20版本,許多Hadoop自身的類庫尚未基於新的API重寫,若是MapReduce代碼使用0.20中的新API,這些類將沒法被使用。

2、在0.20版本以前,0.18.3仍被不少人認爲是最完備與最穩定的版本

3、雖然在新版本的API中有所改變,可是改變的僅僅影響基礎的MapReduce模板,咱們能夠基於新API所作的改變,重寫這個模板以備未來使用。

 

你可能會奇怪爲何不提0.19

通常的意見認爲它的初始版本問題比較多,有許多bug,一些副版本試圖解決這個問題,但社區但願直接跳到版本0.20

 

新版本的API作了哪些改動?

1、在新的APIorg.apahce.hadoop.mapred的許多類都被移走了,多數被放入org.apache.hadoop.mapreduce中,並且類庫都放在org.apache.hadoop.mapreduce.lib的一個包裏。當轉爲使用新API時,org.apache.hadoop.mapred下全部的類的import聲明(或者徹底引用)就不存在了,它們都被棄用

2、新API中最有益的變化是引入了上下文對象context,最直接的影響在於替換了map()reduce()方法中使用的OutputCollectorReporter對象。如今將經過調用Context.writer()而不是OutputCollector.collect()輸出鍵/值對。深遠的影響是統一了應用代碼和MapReduce框架之間的通訊,並固定了MapperReducerAPI,使得添加新功能時不會改變基本方法簽名,新的功能僅僅時在context對象上增長的方法,在引入這些方法以前寫的程序不會感知到這些新方法,它們能夠在更新的版本中繼續編譯與運行

3、新的map()reduce()方法分別被包含在新的抽象MapperReducer中,它們取代了原始API中的MapperReducer接口(org.apache.hadoop.mapred.Mapperorg.apache.hadoop.mapred.Reducer)。新的抽象類也替換了MapReudceBase類,使之被棄用

4、新的map()reduce()方法多了一兩處細微的改變,它們能夠拋出InterruptedException而非單一的IOException,並且,reduce()方法再也不以Iterator而以Iterable來接受一個值的列表,這樣更容易使用Javaforeach語義來實現迭代。

 

回顧一下原始的API中的簽名

public static class MapClass extends MapReduceBase

       implements Mapper<K1, V1, K2, V2> {

       public void map(K1 key, V1 value,

                                   OutputCollector<K2, V2> output,

                                   Reporter reporter) throws IOException { } 

}

public static class Reduce extends MapReduceBase

       implements Reducer<K2, V2, K3, V3> {

       public void reduce(K2 key, Iterator<V2> values,

                                   OutputCollector<K3, V3> output,

                                   Reporter reporter) throws IOException { }

}

 

新的API中的簽名,體會與上面簽名的不一樣

public static class MapClass extends Mapper<K1, V2, K2, V2> {

       public void map(K1 key, V1 value, Context context)

                                   throws IOException, InterruptedException { }

}

public static class Reduce extends Reducer<K2, V2, K3, V3> {

       public void reduce(K2 key, Iterable<V2> values, Context context)

                                   throws IOException, InterruptedException { }

}


你還須要改變driver中的一些內容來支持新的API

1、在新的APIJobConfJobClient被替換了,它們的功能已經被放入Configuration(它本來是JobConf的父類)和一個新的類Job

2Configuration類純粹爲了配置做業而設,而Job類負責定義和控制一個做業的執行

3、好比,setOutputKeyClass()setOutputValueClass()等方法被從JobConf轉移到了Job

4、做業的構造和提交執行如今放在Job中,本來須要使用JobConf來構造一個做業:

JobConf job = new JobConf(conf, MyJob.class);

job.setJobName("MyJob");

而如今可經過Job類完成:

Job job = new Job(conf, "MyJob");

job.setJarByClass(MyJob.class);

之前是經過JobClient提交做業去執行:

JobClient.runJob(job);

如今一樣經過Job類來完成

System.exit(job.waitForCompletion(true) ? 0 : 1);

 

基於版本0.20及其以上的新API重寫的Hadoop基礎程序模板(Hadoop 1.X也適用)

public class MyJob extends Configured implements Tool {

       public static class MapClass extends Mapper<LongWritable, Text, Text, Text> {

              public void map(LongWirtable key, Text value, Context context)

                                          throws IOException, InterruptedException {

                     String[] citation = value.toString().split(",");

                     context.write(new Text(citation[1]), new Text(citation[0]));

              }

       }

       public static class Reduce extends Reducer<Text, Text, Text, Text> {

              // Iterable類型容許foreach循環

              public void reduce(Text key, Iterable<Text> values, Context context)

                                          throws IOException, InterruptedException {

                     for (Text val : values) {

                            if (csv.length() > 0) csv += ",";

                            csv += val.toString();

                     }

                     context.write(key, new Text(csv));

              }

       }

       public int run(String[] args) throws Exception {

              Configuration conf = getConf();

              Job job = new Job(conf, "MyJob");

              job.setJarByClass(MyJob.class);

              Path in = new Path(args[0]);

              Path out = new Path(args[1]);

              FileInputFormat.setInputPaths(job, in);

              FileOutputFormat.setOutputPath(job, out);

              job.setMapperClass(MapClass.class);

              job.setReducerClass(Reduce,.class);

              // 兼容InputFormat

              job.setInputFormatClass(TextInputFormat.class);

              job.setOutputFormatClass(TextOutputFormat.class);

              job.setOutputKeyClass(Text.class);

              job.setOutputValueClass(Text.class);

              System.exit(job.waitForCompletion(true)?0:1);

              return 0;

       }

       public static void main(String[] args) throws Exception {

              int res = ToolRunner.run(new Configuration(), new MyJob(), args);

              System.exit(res);

       }

}

1、這段代碼實現了反向索引功能

2KeyValueTextInputFormat類未被移植到版本0.20的新API中,重寫這個模板咱們不得不使用TextInputFormat

相關文章
相關標籤/搜索