MapReduce編程實例

 MapReduce常見編程實例集錦。java

  1. WordCount單詞統計
  2. 數據去重
  3. 倒排索引

1. WordCount單詞統計

(1) 輸入輸出數據庫

輸入數據:apache

file1.csv內容
hellod world
file2.csv內容
hellod hadoop

輸出結果:編程

hadoop    1
hello    2
world    1

(2) 代碼實現及分析app

package com.hadoop.kwang;

import java.io.IOException;
import java.util.StringTokenizer;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;

public class WordCount {

    /**
     * Mapper類
     * 
     * Object和Text是輸入數據的<key,value>類型
     * Text和IntWritable是輸出數據的<key,value>類型
     */
    public static class TokenizerMapper extends Mapper<Object, Text, Text, IntWritable> {

        private final static IntWritable one = new IntWritable(1);
        private Text word = new Text();

        public void map(Object key, Text value, Context context) throws IOException, InterruptedException {
            
            //讀取一行的文本,並進行分割
            StringTokenizer itr = new StringTokenizer(value.toString());
            
            //遍歷讀取並記錄分割後的每個單詞
            while (itr.hasMoreTokens()) {
                word.set(itr.nextToken());
                
                //輸出的<key,value>形式都是:<"word",1>
                context.write(word, one);    
            }
        }
    }

    /**
     * Reducer類
     *
     */
    public static class IntSumReducer extends Reducer<Text, IntWritable, Text, IntWritable> {
        private IntWritable result = new IntWritable();

        public void reduce(Text key, Iterable<IntWritable> values, Context context)
                throws IOException, InterruptedException {
            //統計單詞次數
            int sum = 0; 
            
            //values是某個key對應的value的集合,即<key,value-list>,好比<hello, <1,1>>,values是值的集合
            for (IntWritable val : values) {
                //對全部value進行累加
                sum += val.get();        
            }
            result.set(sum);
            context.write(key, result);
        }
    }

    public static void main(String[] args) throws Exception {
        
        Configuration conf = new Configuration();

        //配置輸入輸出路徑
        String input = "hdfs://0.0.0.0:xxx/hadoop/wordcount/input/";
        String output = "hdfs://0.0.0.0:xxx/hadoop/wordcount/output/";

        Job job = new Job(conf, "word count");
        job.setJarByClass(WordCount.class);
        job.setMapperClass(TokenizerMapper.class);        //爲job設置Mapper類
        job.setCombinerClass(IntSumReducer.class);        //爲job設置Conbiner類
        job.setReducerClass(IntSumReducer.class);        //爲job設置Reducer類
        
        job.setOutputKeyClass(Text.class);                //設置輸出key類型
        job.setOutputValueClass(IntWritable.class);        //設置輸出value類型

        FileInputFormat.addInputPath(job, new Path(input));        //設置數據輸入路徑
        FileOutputFormat.setOutputPath(job, new Path(output));    //設置數據輸出路徑

        System.exit(job.waitForCompletion(true) ? 0 : 1);
    }
}
View Code

 2. 數據去重

(1) 輸入輸出ide

輸入數據:工具

file1.csv內容
2017-12-09 a
2017-12-10 a
2017-12-11 a
2017-12-12 b
2017-12-13 b
file2.csv內容
2017-12-09 b
2017-12-10 b
2017-12-11 b
2017-12-12 b
2017-12-13 b

輸出結果:oop

2017-12-09 a
2017-12-09 b
2017-12-10 a
2017-12-10 b
2017-12-11 a
2017-12-11 b
2017-12-12 b
2017-12-13 b 

(2) 代碼實現及分析ui

import java.io.IOException;
import java.net.URI;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapred.JobConf;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;


public class DedupClean {
    
    /*
     * Mapper類
     */
    public static class DedupCleanMapper extends Mapper<LongWritable, Text, Text, Text> {
        
        private static Text line = new Text();
        private static Text nullString = new Text("");
        
        @Override
        protected void map(LongWritable key, Text value, Mapper<LongWritable, Text, Text, Text>.Context context)
                throws IOException, InterruptedException {
            //直接讀取一行的數據做爲key
            line = value;
            
            //寫入key和value
            context.write(line, nullString);
        }
    }
    
    
    /*
     * Recuder類
     */
    public static class DedupCleanReducer extends Reducer<Text, Text, Text, Text> {
        
        @Override
        protected void reduce(Text key, Iterable<Text> values, Reducer<Text, Text, Text, Text>.Context context)
                throws IOException, InterruptedException {
            //寫入key和空value,重複的key覆蓋
            context.write(key, new Text(""));
        }
    }
    
    public static void main(String[] args) throws Exception {
        
        final String FILE_IN_PATH = "hdfs://0.0.0.0:XXX/hadoop/dedupclean/input/";
        final String FILE_OUT_PATH = "hdfs://0.0.0.0:XXX/hadoop/dedupclean/ouput/";
        
        Configuration conf = new Configuration();
        
        //刪除已經存在的輸出目錄
        FileSystem fs = FileSystem.get(new URI(FILE_OUT_PATH), conf);
        if (fs.exists(new Path(FILE_OUT_PATH))) {
            fs.delete(new Path(FILE_OUT_PATH), true);
        }
        
        Job job = Job.getInstance(conf, "DedupClean");
        job.setJarByClass(DedupClean.class);
        job.setMapperClass(DedupCleanMapper.class);
        job.setReducerClass(DedupCleanReducer.class);
        
        job.setOutputKeyClass(Text.class);
        job.setOutputValueClass(Text.class);
        
        FileInputFormat.addInputPath(job, new Path(FILE_IN_PATH));
        FileOutputFormat.setOutputPath(job, new Path(FILE_OUT_PATH));
        
        System.exit(job.waitForCompletion(true) ? 0 : 1);
    }
}
View Code

3. 倒排索引搜索引擎

(1) 介紹

文檔是由許多的單詞組成的,其中每一個單詞也能夠在同一個文檔中重複出現屢次,固然,同一個單詞也能夠在不一樣的文檔中。

 

正排索引(forward index):從文檔角度看其中的單詞,標識每一個文檔(用文檔ID標識)都含有哪些單詞,以及每一個單詞出現了多少次(詞頻)及出現的位置(相對於文檔首部的偏移量)。

倒排索引(inverted index):從單詞角度看文檔,標識每一個單詞分別在哪些文檔中出現(文檔ID),以及在各自的文檔中每一個單詞分別出現了多少次(詞頻)及其出現的位置(相對於該文檔首部的偏移量)。

 

簡單記爲:

正排索引:文檔 ——> 單詞

倒排索引:單詞 ——> 文檔

 

應用場景:好比搜索引擎、大規模數據庫索引、文檔檢索、信息檢索領域等,總之,倒排索引在檢索領域是很重要的一種索引機制。

(2) 輸入輸出及原理圖

輸入數據:

a.txt內容
hello you hello
b.txt內容
hello hans

輸出結構:

hans    b.txt:1
hello    b.txt:1;a.txt:2
you    a.txt:1

具體的原理實現示意圖以下圖所示:

(3) 代碼實現及分析

import java.io.IOException;
import java.net.URI;
import java.util.StringTokenizer;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.input.FileSplit;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;

public class InvertedIndex {

    /*
     * Mapper類
     * 
     *     輸出<word:filename, value>格式,如<hello:a.txt, 1>
     *                                   <hello:a.txt, 1>
     *                                   <hello:b.txt, 1>
     */
    public static class InvertedIndexMapper extends Mapper<LongWritable, Text, Text, Text> {

        @Override
        protected void map(LongWritable key, Text value, Mapper<LongWritable, Text, Text, Text>.Context context)
                throws IOException, InterruptedException {

            //獲取文件名
            //文件路徑:hdfs://10.20.14.47:8020/hadoop/invertedindex/input/a.txt (split.getPath()方法)
            FileSplit split = (FileSplit)context.getInputSplit();
            //fileName:a.txt
            String fileName = StringUtil.getShortPath(split.getPath().toString());

            //以<word:filename, value>形式存儲 (便於Combiner中統計統一文件中相同單詞數量)
            StringTokenizer st = new StringTokenizer(value.toString());
            while(st.hasMoreTokens()) {
                String word = st.nextToken().toLowerCase();
                word = word + ":" + fileName;
                context.write(new Text(word), new Text("1"));
            }
        }
    }

    /*
     * Conbiner類
     * 
     *     輸入<word:filename, value>格式,如<hello:a.txt, 1>
     *                                   <hello:a.txt, 1>
     *                                   <hello:b.txt, 1>
     * 
     *     輸出<word, filename:values>格式,如<hello, a.txt:2>
     *                                    <hello, b.txt:1>
     */
    public static class InvertedIndexCombiner extends Reducer<Text, Text, Text, Text> {
        @Override
        protected void reduce(Text key, Iterable<Text> values, Reducer<Text, Text, Text, Text>.Context context)
                throws IOException, InterruptedException {
            
            long sum = 0;
            //統計同一個單詞在同一個文件中的次數
            for(Text val : values) {
                sum += Integer.valueOf(val.toString());
            }
            
            //將key(hello:a.txt) 分割爲newKey(hello)和fileKey(a.txt)
            String newKey = StringUtil.getSplitByIndex(key.toString(), ":", 0);
            String fileKey = StringUtil.getSplitByIndex(key.toString(), ":", 1);
            
            context.write(new Text(newKey), new Text(fileKey + ":" + String.valueOf(sum)));
        }
    }

    /*
     * Recuder類
     * 
     *     輸入<word, filename:values>格式,如<hello, a.txt:2>
     *                                    <hello, b.txt:1>
     * 
     *     輸出<word, filename1:values;filename2:values>格式,如<hello, a.txt:2;b.txt:1>
     */
    public static class InvertedIndexReducer extends Reducer<Text, Text, Text, Text> {

        @Override
        protected void reduce(Text key, Iterable<Text> values, Reducer<Text, Text, Text, Text>.Context context)
                throws IOException, InterruptedException {
            
            StringBuilder sb = new StringBuilder();
            
            //聚合同一單詞出如今的文件及出現次數
            for(Text val : values) {
                sb.append(val.toString() + ";");
            }
            context.write(key, new Text(sb.toString()));

        }
    }
    
    //指定輸入輸出路徑
    private static final String FILE_IN_PATH  = "hdfs://0.0.0.0:xxx/hadoop/invertedindex/input";
    private static final String FILE_OUT_PATH = "hdfs://0.0.0.0:xxx/hadoop/invertedindex/output"; 
    
    public static void main(String[] args) throws Exception {
        
        Configuration conf = new Configuration();
        
        //刪除已經存在的輸出路徑
        FileSystem fs = FileSystem.get(new URI(FILE_OUT_PATH), conf);
        if (fs.exists(new Path(FILE_OUT_PATH))) {
            fs.delete(new Path(FILE_OUT_PATH), true);
        }
        
        Job job = Job.getInstance(conf, "InvertedIndex");
        job.setJarByClass(InvertedIndex.class);
        job.setMapperClass(InvertedIndexMapper.class);
        job.setCombinerClass(InvertedIndexCombiner.class);
        job.setReducerClass(InvertedIndexReducer.class);
        
        job.setOutputKeyClass(Text.class);
        job.setOutputValueClass(Text.class);
        
        FileInputFormat.addInputPath(job, new Path(FILE_IN_PATH));
        FileOutputFormat.setOutputPath(job, new Path(FILE_OUT_PATH));
        
        System.exit(job.waitForCompletion(true) ? 0 : 1);
        
    }
}

/*
 * 工具類
 *     獲取文件路徑
 */
class StringUtil {
    
    /*
     * 獲取文件路徑名
     */
    public static String getShortPath(String filePath) {
        if (filePath.length() == 0) {
            return filePath;
        }
        return filePath.substring(filePath.lastIndexOf("/") + 1);
    }
    
    /*
     * 根據regex分割str,並返回index位置的值
     */
    public static String getSplitByIndex(String str, String regex, int index) {
        String[] splits = str.split(regex);
        if (splits.length < index) {
            return "";
        }
        return splits[index];
    }
}
View Code
相關文章
相關標籤/搜索