Hadoop(十五)MapReduce程序實例

1、統計好友對數(去重)

1.一、數據準備

joe,    jon    
joe , kia    
joe, bob    
joe ,ali
kia,    joe    
kia ,jim    
kia, dee
dee    ,kia    
dee, ali
ali    ,dee    
ali, jim    
ali ,bob    
ali, joe    
ali ,jon
jon,    joe    
jon ,ali
bob,    joe    
bob ,ali    
bob, jim
jim    ,kia    
jim, bob    
jim ,ali
friends.txt

  有一個friends.txt文件,裏面的一行的格式是:java

  用戶名,好友名apache

  

1.二、需求分析

  1)需求app

    統計有多少對好友ide

  2)分析函數

    從上面的文件格式與內容,有多是出現用戶名和好友名交換位置的兩組數據,這時候這就要去重了。oop

    好比說:測試

      joe,  jonthis

      jon,  joespa

    這樣的數據,咱們只能保留一組。3d

1.三、代碼實現

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner;

import java.io.IOException;

public class DuplicateData_0010 extends Configured implements Tool{
    static class DuplicateDataMapper extends Mapper<LongWritable,Text,Text,NullWritable>{
        Text key = new Text();
        @Override
        protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
            String[] split = value.toString().split(",");
            if (split.length==2){
                String name_1 = split[0].trim();
                String name_2 = split[1].trim();
                if (!name_1.equals(name_2)){
                    this.key.set(
                            name_1.compareTo(name_2)>0?
                                    name_1+","+name_2:
                                    name_2+","+name_1);
                    context.write(this.key,NullWritable.get());
                }
            }
        }
    }

    static class DuplicatteDataReducer extends Reducer<Text,NullWritable,Text,NullWritable>{
        @Override
        protected void reduce(Text key, Iterable<NullWritable> values, Context context) throws IOException, InterruptedException {
            context.write(key, NullWritable.get());
        }
    }

    @Override
    public int run(String[] strings) throws Exception {
        Configuration conf = getConf();
        Path input=  new Path(conf.get("iniput"));
        Path output=  new Path(conf.get("output"));

        Job job = Job.getInstance(conf, this.getClass().getSimpleName() + "Lance");
        job.setJarByClass(this.getClass());

        job.setMapperClass(DuplicateDataMapper.class);
        job.setMapOutputKeyClass(Text.class);
        job.setMapOutputValueClass(NullWritable.class);

        job.setReducerClass(DuplicatteDataReducer.class);
        job.setOutputKeyClass(Text.class);
        job.setOutputValueClass(NullWritable.class);

        TextInputFormat.addInputPath(job, input);
        TextOutputFormat.setOutputPath(job,output);

        return job.waitForCompletion(true)?0:1;
    }

    public static void main(String[] args) throws Exception {
        System.exit(ToolRunner.run(new DuplicateData_0010(),args));
    }
}
DuplicateData_0010

2、詞頻統計

2.一、數據準備    

  設有4組原始文本數據:
    Text 1: the weather is good          Text 2: today is good
    Text 3: good weather is good       Text 4: today has good weather

2.二、需求分析

  1)需求

    求每篇文章每一個單詞出現的次數

  2)分析

    第一:

    

    第二:

    

    預期出現的結果:

    

2.三、代碼實現

  1)編寫一個CountWordMapper類去實現Mapper

/**
*經過繼承org.apache.hadoop.mapreduce.Mapper編寫本身的Mapper
*/
public class WordCountMapper extends Mapper<LongWritable, Text, Text, IntWritable>{
private final static IntWritable one=new IntWritable(1); //統計使用變量
private Text word=new Text(); //單詞變量
/**
* key:當前讀取行的偏移量
* value: 當前讀取的行
* context:map方法執行時上下文
*/
@Override
protected void map(LongWritable key, Text value, Mapper<LongWritable, Text, Text, IntWritable>.Context context)
throws IOException, InterruptedException {
// TODO Auto-generated method stub
StringTokenizer words=new StringTokenizer(value.toString());
while(words.hasMoreTokens()){
word.set(words.nextToken());
context.write(word, one);
}
}
}
WordCountMapper

  2)編寫一個CountWordReducer類去實現Reducer

/**
* 經過繼承org.apache.hadoop.mapreduce.Reducer編寫本身的Reducer
*/
public class WordCountReducer extends Reducer<Text, IntWritable, Text, IntWritable>{
/**
* key:待統計的word
* values:待統計word的全部統計標識
* context:reduce方法執行時的上下文
*/
@Override
protected void reduce(Text key, Iterable<IntWritable> values,Reducer<Text, IntWritable, Text, IntWritable>.Context
context) throws IOException, InterruptedException {
// TODO Auto-generated method stub
int count=0;
for(IntWritable one:values){
count+=one.get();
}
context.write(key, new IntWritable(count));
}
}
WordCountReducer

  3)編寫一個WordCount做業調度的驅動程序WordCountDriver

/**
* WordCount做業調度的驅動程序 *
*/
public class WordCountDriver {
public static void main(String[] args) throws Exception {
// 構建新的做業
Configuration conf=new Configuration();
Job job = Job.getInstance(conf, "Word Count");
job.setJarByClass(WordCountDriver.class);
// 設置Mapper和Reducer函數
job.setMapperClass(WordCountMapper.class);
job.setReducerClass(WordCountReducer.class);
// 設置輸出格式
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(IntWritable.class);
// 設置輸入和輸出目錄
FileInputFormat.addInputPath(job, new Path(args[0]));
FileOutputFormat.setOutputPath(job, new Path(args[1]));
// ᨀ交做業執行
System.exit(job.waitForCompletion(true)?0:1);
}
}
WordCountDriver

2.四、 運行測試

  1)前期準備

    將程序打成jar包: wordcount.jar
     準備好Text 1-4文件

  2)運行

    yarn jar wordcount.jar com.briup.WordCount input output

3、成績統計

3.一、數據準備

  chinese.txt

a|李一|88
a|王二|26
a|張三|99
a|劉四|58
a|陳五|45
a|楊六|66
a|趙七|78
a|黃八|100
a|周九|62
a|吳十|12
chinese.txt

  english.txt

b|李一|36
b|王二|66
b|張三|86
b|劉四|56
b|陳五|43
b|楊六|86
b|趙七|99
b|黃八|80
b|周九|70
b|吳十|33
english.txt

  math.txt

c|李一|83
c|王二|36
c|張三|92
c|劉四|58
c|陳五|75
c|楊六|66
c|趙七|63
c|黃八|60
c|周九|62
c|吳十|72
math.txt

  我看查看chinese.txt查看數據格式:

    

  a表明語文:李一表明名字:88表明語文成績

3.二、需求分析

  根據上面的數據,統計一下分數,格式以下:

  

3.三、代碼實現

  1)編寫一個解析類解析上面的每門課的數據

    ScoreRecordParser

import org.apache.hadoop.io.Text;

public class ScoreRecordParser{
    private String id;
    private String name;
    private String score;

    public boolean parse(String line){
        String[] strs=line.split("[|]");
        if(strs.length<3){
            return false;
        }
        id=strs[0].trim();
        name=strs[1].trim();
        score=strs[2].trim();
        if(id.length()>0&&name.length()>0&&score.length()>0){
            return true;
        }else{
            return false;
        }
    }

    public boolean parse(Text line){
        return parse(line.toString());
    }

    public String getId(){
        return id;
    }

    public void setId(String id){
        this.id=id;
    }

    public String getName(){
        return name;
    }

    public void setName(String name){
        this.name=name;
    }

    public String getScore(){
        return score;
    }

    public void setScore(String score){
        this.score=score;
    }
}
View Code

  2)實現需求

  CalculateScore_0010

import com.briup.bd1702.hadoop.mapred.utils.ScoreRecordParser;
import java.io.IOException;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner;

public class CalculateScore_0010 extends Configured implements Tool{
    private static ScoreRecordParser parser=new ScoreRecordParser();

    static class CalculateScoreMapper extends Mapper<LongWritable,Text,Text,Text>{
        private Text key=new Text();

        @Override
        protected void map(LongWritable key,Text value,Context context) throws IOException, InterruptedException{
            if(parser.parse(value)){
                this.key.set(parser.getName());
                context.write(this.key,value);
            }
        }
    }

    static class CalculateScoreReducer extends Reducer<Text,Text,Text,Text>{
        private Text value=new Text();

        @Override
        protected void reduce(Text key,Iterable<Text> values,Context context) throws IOException, InterruptedException{
            StringBuffer buffer=new StringBuffer();
            double sum=0;
            for(Text text:values){
                if(parser.parse(text)){
                    String id=parser.getId();
                    String score=parser.getScore();
                    switch(id){
                        case "a":{
                            buffer.append("語文:"+score+"\t");
                            break;
                        }
                        case "b":{
                            buffer.append("英語:"+score+"\t");
                            break;
                        }
                        case "c":{
                            buffer.append("數學:"+score+"\t");
                            break;
                        }
                    }
                    sum+=Double.parseDouble(score);
                }
            }
            buffer.append("總分:"+sum+"\t平均分:"+(sum/3));
            this.value.set(buffer.toString());
            context.write(key,this.value);
        }
    }
    @Override
    public int run(String[] args) throws Exception{
        Configuration conf=getConf();
        Path input=new Path(conf.get("input"));
        Path output=new Path(conf.get("output"));

        Job job=Job.getInstance(conf,this.getClass().getSimpleName());
        job.setJarByClass(this.getClass());

        job.setMapperClass(CalculateScoreMapper.class);
        job.setMapOutputKeyClass(Text.class);
        job.setMapOutputValueClass(Text.class);

        job.setReducerClass(CalculateScoreReducer.class);
        job.setOutputKeyClass(Text.class);
        job.setOutputValueClass(Text.class);

        job.setInputFormatClass(TextInputFormat.class);
        job.setOutputFormatClass(TextOutputFormat.class);

        TextInputFormat.addInputPath(job,input);
        TextOutputFormat.setOutputPath(job,output);

        return job.waitForCompletion(true)?0:1;
    }

    public static void main(String[] args) throws Exception{
        System.exit(ToolRunner.run(new P00010_CalculateScore_0010(),args));
    }
}
CalculateScore_0010

3.四、執行

  這裏執行由於有三個文件,咱們用一個目錄去存儲,而後在-Dinput中指定這個目錄就能夠了 。

  在上面的三個文件中,都是特別小的,因此三個文件要用三個數據塊去存儲,而後用三個map去執行者三個文件。

4、倒排索引

  首先知道什麼是倒排索引?

  好比所咱們有file_1到file_4這四篇文章,咱們須要求出:某個單詞,在每一篇文章出現的次數

  好比說輸出格式是這樣的:

      某個單詞  file_1:出現次數,file_2:出現次數,file_3:出現次數,file_4:出現次數

4.一、數據準備

  file_1

Welcome to MapReduce World
file_1

  file_2 

MapReduce is simple
file_2

  file_3

MapReduce is powerful is simple 
file_3

  file_4

hello MapReduce Bye MapReduce
file_4

 4.二、需求分析

  1)需求

    實現文件輸出格式以下:

      某個單詞  file_1:出現次數,file_2:出現次數,file_3:出現次數,file_4:出現次數

  2)分析

    好比MapReduce這個單詞,咱們分析一下:

    在map端出來的格式:

      注意:f1,f2,f3,f4表明文件名

    通過洗牌以後,進入reduce的數據格式:

      

    在reduce怎麼處理呢?

      咱們構建一個Map集合用來存放某個路徑在這個集合中出現的次數:

        

    最後就能夠造成咱們想要的文件:

      

4.三、代碼實現倒排索引

import java.io.IOException;
import java.util.HashMap;
import java.util.Map;
import java.util.Map.Entry;
import java.util.Set;
import java.util.StringTokenizer;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileSplit;
import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner;

public class InvertIndex_0010 extends Configured implements Tool{
    static class InvertIndexMapper extends Mapper<LongWritable,Text,Text,Text>{
        private Text word=new Text();
        private Text file=new Text();
        @Override
        protected void setup(Context context) throws IOException, InterruptedException{
            String fileName=((FileSplit)context.getInputSplit())
                .getPath().getName();
            file.set(fileName);
        }

        @Override
        protected void map(LongWritable key,
            Text value,Context context) throws IOException, InterruptedException{
            StringTokenizer words=
                new StringTokenizer(value.toString()," ");
            String item=null;
            while(words.hasMoreTokens()){
                item=words.nextToken();
                if(item.trim().length()>=1){
                    word.set(item.trim());
                    context.write(word,file);
                }
            }
        }
    }

    static class InvertIndexReducer extends Reducer<Text,Text,Text,Text>{
        private Map<String,Integer> index=new HashMap<>();
        private Text value=new Text();
        @Override
        protected void reduce(Text key,Iterable<Text> values,Context context) throws IOException, InterruptedException{
            index.clear();
            for(Text file:values){
                String fileName=file.toString();
                if(index.get(fileName)!=null){
                    index.put(fileName,index.get(fileName)+1);
                }else{
                    index.put(fileName,1);
                }
            }
            StringBuffer buffer=new StringBuffer();
            Set<Entry<String,Integer>> entries=index.entrySet();
            for(Entry<String,Integer> entry:entries){
                buffer.append(","+entry.getKey().toString()+":"+entry.getValue().toString());
            }
            this.value.set(buffer.toString());
            context.write(key,value);
        }
    }

    @Override
    public int run(String[] args) throws Exception{
        Configuration conf=getConf();
        Path input=new Path(conf.get("input"));
        Path output=new Path(conf.get("output"));

        Job job=Job.getInstance(conf,this.getClass().getSimpleName());
        job.setJarByClass(this.getClass());

        job.setMapperClass(InvertIndexMapper.class);
        job.setMapOutputKeyClass(Text.class);
        job.setMapOutputValueClass(Text.class);

        job.setReducerClass(InvertIndexReducer.class);
        job.setOutputKeyClass(Text.class);
        job.setOutputValueClass(Text.class);

        job.setInputFormatClass(TextInputFormat.class);
        job.setOutputFormatClass(TextOutputFormat.class);

        TextInputFormat.addInputPath(job,input);
        TextOutputFormat.setOutputPath(job,output);

        return job.waitForCompletion(true)?0:1;
    }

    public static void main(String[] args) throws Exception{
        System.exit(ToolRunner.run(new P00010_InvertIndex_0010(),args));
    }
}
InvertIndex_0010

  注意:

    這裏使用了一個StringTokenizer來分割數據:

StringTokenizer words=
                new StringTokenizer(value.toString()," ");
            String item=null;
            while(words.hasMoreTokens()){
                item=words.nextToken();
                if(item.trim().length()>=1){
                    word.set(item.trim());
                    context.write(word,file);
                }
            }

 

5、共現矩陣(共現次數) 

5.一、需求分析   

  首先咱們要知道什麼是共現次數? 

  咱們分析一個用戶數據來解釋:

joe,    jon    
joe , kia    
joe, bob    
joe ,ali
kia,    joe    
kia ,jim    
kia, dee
dee    ,kia    
dee, ali
ali    ,dee    
ali, jim    
ali ,bob    
ali, joe    
ali ,jon
jon,    joe    
jon ,ali
bob,    joe    
bob ,ali    
bob, jim
jim    ,kia    
jim, bob    
jim ,ali

  上面這個數據中,在一行中左邊是一個用戶,右邊是它的好友。

  那咱們能夠根據上面的數據列出全部用戶的好友列表:

ali,bob,jim,dee,jon,joe
bob,jim,ali,joe
dee,kia,ali
jim,ali,bob,kia
joe,ali,bob,kia,jon
jon,joe,ali
kia,dee,jim,joe

  接下來咱們把每一個用戶的好友列表每兩兩組成一對,在全部用戶的好友列表中去計算,這兩兩組成的一對共出現了幾回

  好比說:

     bob,jim組成了一組,在餘下的好友列表中兩兩組成去計算共出現了幾回。(除了用戶自己),也就是下面的數據。

  dee,jon,joe
  jim,ali,joe
  kia,ali
  ali,bob,kia
  ali,bob,kia,jon
  joe,ali
  dee,jim,joe

 

    接下來就是jin,dee。而後是dee,jon依次類推。。。        

  從上面的分析咱們能夠得出預期的結果爲:  

  ali,bob    2
  ali,jim    1
  ali,joe    2
  ali,jon    1
  ali,kia    3
  bob,dee    1
  bob,jim    1
  bob,joe    1
  bob,jon    2
  bob,kia    2
  dee,jim    2
  dee,joe    2
  dee,jon    1
  jim,joe    3
  jim,jon    1
  joe,jon    1
  jon,kia    1

 

  咱們能夠分兩步去寫,也就是分兩個MapReduce任務。第一個MapReduce計算好友列表。第二個在每兩兩組成一組,計算這一組所出現的次數。

5.二、代碼實現

  1)計算好友列表

import com.briup.bd1702.hadoop.mapred.utils.FriendRecordParser;
import java.io.IOException;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner;

public class FriendList_0010 extends Configured implements Tool{

    static class FriendListMapper extends Mapper<LongWritable,Text,Text,Text>{
        private Text userName=new Text();
        private Text friendName=new Text();
        private FriendRecordParser parser=new FriendRecordParser();
        @Override
        protected void map(LongWritable key,Text value,Context context) throws IOException, InterruptedException{
            parser.parse(value);
            if(parser.isValid()){
                userName.set(parser.getUserName());
                friendName.set(parser.getFriendName());
                context.write(userName,friendName);
                System.out.println("----"+userName+"----"+friendName+"----");
            }
        }
    }

    static class FriendListReducer extends Reducer<Text,Text,Text,Text>{
        private Text friendsNames=new Text();
        @Override
        protected void reduce(Text key,
                Iterable<Text> values,Context context) throws IOException, InterruptedException{
            StringBuffer buffer=new StringBuffer();
            for(Text name:values){
                buffer.append(","+name);
            }
            System.out.println("++++"+buffer.toString()+"++++");
            friendsNames.set(buffer.toString());
            context.write(key,friendsNames);
        }
    }

    @Override
    public int run(String[] args) throws Exception{
        Configuration conf=getConf();
        Path input=new Path(conf.get("input"));
        Path output=new Path(conf.get("output"));

        Job job=Job.getInstance(conf,this.getClass().getSimpleName());
        job.setJarByClass(this.getClass());

        job.setMapperClass(FriendListMapper.class);
        job.setMapOutputKeyClass(Text.class);
        job.setMapOutputValueClass(Text.class);

        job.setReducerClass(FriendListReducer.class);
        job.setOutputKeyClass(Text.class);
        job.setOutputValueClass(Text.class);

        job.setInputFormatClass(TextInputFormat.class);
        job.setOutputFormatClass(TextOutputFormat.class);

        TextInputFormat.addInputPath(job,input);
        TextOutputFormat.setOutputPath(job,output);

        return job.waitForCompletion(true)?0:1;
    }

    public static void main(String[] args) throws Exception{
        System.exit(ToolRunner.run(new P00010_FriendList_0010(),args));
    }
}
FriendList

  2)計算共現次數

import java.io.IOException;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner;

public class Cooccurence_0010 extends Configured implements Tool{

    static class CooccurenceMapper extends Mapper<LongWritable,Text,Text,IntWritable>{
        private Text key=new Text();
        private IntWritable value=new IntWritable(1);
        @Override
        protected void map(LongWritable key,Text value,Context context) throws IOException, InterruptedException{
            String[] strs=value.toString().split(",");
            for(int i=1;i<strs.length-1;i++){
                for(int j=i+1;j<strs.length;j++){
                    //這個的目的是:兩個數據造成一對以後,順序固定的問題。
                    this.key.set(strs[i].compareTo(strs[j])<0?
                        strs[i]+","+strs[j]:
                        strs[j]+","+strs[i]);
                    context.write(this.key,this.value);
                }
            }
        }
    }

    static class CooccurenceReducer extends Reducer<Text,IntWritable,Text,IntWritable>{
        @Override
        protected void reduce(Text key,Iterable<IntWritable> values,Context context) throws IOException, InterruptedException{
            int count=0;
            for(IntWritable value:values){
                count+=value.get();
            }
            context.write(key,new IntWritable(count));
        }
    }

    @Override
    public int run(String[] args) throws Exception{
        Configuration conf=getConf();
        Path input=new Path(conf.get("input"));
        Path output=new Path(conf.get("output"));

        Job job=Job.getInstance(conf,this.getClass().getSimpleName());
        job.setJarByClass(this.getClass());

        job.setMapperClass(CooccurenceMapper.class);
        job.setMapOutputKeyClass(Text.class);
        job.setMapOutputValueClass(IntWritable.class);

        job.setReducerClass(CooccurenceReducer.class);
        job.setOutputKeyClass(Text.class);
        job.setOutputValueClass(IntWritable.class);

        job.setInputFormatClass(TextInputFormat.class);
        job.setOutputFormatClass(TextOutputFormat.class);

        TextInputFormat.addInputPath(job,input);
        TextOutputFormat.setOutputPath(job,output);

        return job.waitForCompletion(true)?0:1;
    }

    public static void main(String[] args) throws Exception{
        System.exit(ToolRunner.run(new P00020_Cooccurence_0010(),args));
    }
}
Cooccurence_0010

 

           

 喜歡就點個「推薦」哦~!    

相關文章
相關標籤/搜索