複雜倒排索引Java實現

一、定義數據和簡單倒排索引稍做修改,以便驗證結果的正確性。定義初始數據如圖:算法

                

指望結果(PS:須要對outputValue按照文件名排序)app

二、在學習以前首先要掌握Mapper與Reducer層輸入輸出類型關係,咱們知道Mapper的輸出類型和Reducer層輸入類型要對應。ide

Combiner層至關因而Reducer層本地化,通常兩個類中實現代碼一致。例如:找出全國身高最高一位人士,能夠先找出每個省的身高最高的(Combiner層),而後再去帝都一塊兒比較找出全國最高的(Reducer層)。oop

值得注意的是Combiner層的輸入輸出類型,或許你會覺得是Combiner的輸入類型和Mapper的輸出類型一致,Combiner的輸出和Reducer的輸入一致便可。那麼我以爲你徹底能夠試試,畢竟遇到坑並填上坑是咱們學習路上不可少的。學習

實際上Combiner層的輸入類型不只與Mapper的輸出類型要一致,Combiner的輸出類型也要與Mapper的輸出類型一致,由於Combiner的下一階段是Reducer層,因此Combiner的輸出類型要與Mapper的輸出類型和Reducer的輸入類型要一致。spa

三、Hadoop自帶一個默認的分區類HashPartitioner。Partitioner的做用就是將Mapper(若定義了Combiner類則爲Combiner)輸出的Key-Value拆分紅分片,每個Reducer對應一個分片,而後根據key的hash值,均勻分佈到Reduce Tasks上,使key相同的被分發到同一個Reducer。.net

四、知識點都瞭解的差很少了,直接上代碼了!orm

/**
 * 統計每一個單詞在哪些文檔中出現了,並對在文檔中出現的次數作統計和按照文檔名排序
 * @author ZD
 *
 */
public class ComplexIndex {
    //key爲單詞;value爲存在該單詞的全部文檔,格式爲(文檔名:數量)
    private static Map<String, String> map = new HashMap<String, String>();

    private static class ComplexIndexMapper extends Mapper<LongWritable, Text, Text, IntWritable> {排序

        @Override
        protected void map(LongWritable key, Text value, Mapper<LongWritable, Text, Text, IntWritable>.Context context)
                throws IOException, InterruptedException {
            //獲取文件名,首先需強轉爲FileSplit類型
            String fileName = ((FileSplit)context.getInputSplit()).getPath().getName();
            String[] values = value.toString().trim().split(" ");
            for(int i=0; i<values.length; i++){
                //傳((關鍵字:文件名),數量)形式
                context.write(new Text(values[i]+":"+fileName), new IntWritable(1));
            }
        }
    }索引

    private static class ComplexIndexCombiner extends Reducer<Text, IntWritable, Text, IntWritable> {

        @Override
        protected void reduce(Text value, Iterable<IntWritable> datas, Reducer<Text, IntWritable, Text, IntWritable>.Context context)
                throws IOException, InterruptedException {
            //統計關鍵字在文檔中出現的次數
            int sum=0;
            for (IntWritable data : datas) {
                sum++;
            }
            String[] strs = value.toString().split(":");
            //將全部關鍵字放入map
            if(!map.containsKey(strs[0])){
                map.put(strs[0], "");
            }
            context.write(new Text(value), new IntWritable(sum));
        }
    }
    
    private static class ComplexIndexReducer extends Reducer<Text, IntWritable, Text, Text> {
        @Override
        protected void reduce(Text value, Iterable<IntWritable> datas, Reducer<Text, IntWritable, Text, Text>.Context context)
                throws IOException, InterruptedException {
            System.out.println("reducer===========");
            String[] values = value.toString().split(":");
            String str="";
            for (IntWritable data : datas) {
                str+="("+values[1]+","+data.get()+")";
            }
            if(map.containsKey(values[0])){
                map.put(values[0], map.get(values[0])+str);
            }
            //此處中不向文件中輸出結果,而在cleanup()方法中輸出結果
        }

        @Override
        protected void cleanup(Reducer<Text, IntWritable, Text, Text>.Context context)
                throws IOException, InterruptedException {
            for(String key:map.keySet()){
                context.write(new Text(key), new Text(map.get(key)));
            }
        }
    }
    
    /** 自定義HashPartitioner,保證 <word:docid>格式的key值按照word均勻分佈給Reduce Tasks **/
    public static class NewPartitioner extends HashPartitioner<Text, IntWritable> {
        public int getPartition(Text key, IntWritable value, int numReduceTasks) {
            String word = new String();
            word = key.toString().split(":")[0]; // <word:docid>=>word
            //將word爲key,均勻分佈在Reduce Task上(傳入Reducer的inputKey值未變)
            return super.getPartition(new Text(word), value, numReduceTasks);
        }
    }

    public static void main(String[] args) {
        try {
            Configuration cfg = HadoopCfg.getConfigration();
            Job job = Job.getInstance(cfg);
            job.setJobName("ComplexIndex");
            job.setJarByClass(ComplexIndex.class);
            job.setMapperClass(ComplexIndexMapper.class);
            job.setMapOutputKeyClass(Text.class);
            job.setMapOutputValueClass(IntWritable.class);
            job.setCombinerClass(ComplexIndexCombiner.class);
            job.setMapOutputKeyClass(Text.class);
            job.setMapOutputValueClass(IntWritable.class);
            job.setReducerClass(ComplexIndexReducer.class);
            job.setOutputKeyClass(Text.class);
            job.setOutputValueClass(Text.class);
            job.setPartitionerClass(NewPartitioner.class);
            
            FileInputFormat.addInputPath(job, new Path("/input/index"));
            FileOutputFormat.setOutputPath(job, new Path("/complexIndex/"));
            System.exit(job.waitForCompletion(true) ? 0 : 1);
        } catch (Exception e) {
            e.printStackTrace();
        }
    }
}

寫在最後:Mapper、Combiner、Reducer三者的輸入輸出類型很重要以及執行過程須要瞭解清楚以後再寫代碼,這樣會減小不少沒必要要的困擾。本人也是剛接觸Hadoop不久,如有錯誤,望指出。 下一次將會與你們分享PageRank算法的分析與實現,但願有興趣的同窗先去大概瞭解一下該算法。

相關文章
相關標籤/搜索