hadoop筆記七:MapReduce程序實例

1.統計好友對數

1)數據

joe,    jon    
joe , kia    
joe, bob    
joe ,ali
kia,    joe    
kia ,jim    
kia, dee
dee    ,kia    
dee, ali
ali    ,dee    
ali, jim    
ali ,bob    
ali, joe    
ali ,jon
jon,    joe    
jon ,ali
bob,    joe    
bob ,ali    
bob, jim
jim    ,kia    
jim, bob    
jim ,ali

2)分析

從上面的文件格式與內容,有多是出現用戶名和好友名交換位置的兩組數據,這時候這就要去重了。java

好比說:apache

joe,  jon編程

jon,  joeapp

這樣的數據,咱們只能保留一組。ide

3)代碼實現

package com.jf.mapreduce;

import java.io.IOException;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner;

public class FriendMapReduceData extends Configured implements Tool {

	static class FriendMapper extends Mapper<LongWritable, Text, Text, NullWritable> {
		Text key = null;

		@Override
		protected void map(LongWritable key, Text value, Mapper<LongWritable, Text, Text, NullWritable>.Context context)
				throws IOException, InterruptedException {
			String[] strs = value.toString().split(",");
			if (strs.length == 2) {
				// 將每行中姓名按照大小排序,而後做爲key進行輸出
				String name1 = strs[0].replaceAll(" ", "");
				String name2 = strs[1].replaceAll(" ", "");
				this.key = new Text(name1.compareTo(name2) > 0 ? name1 + "," + name2 : name2 + "," + name1);
				context.write(this.key, NullWritable.get());
			}

		}
	}

	static class FriendReducer extends Reducer<Text, NullWritable, Text, NullWritable> {
		@Override
		protected void reduce(Text key, Iterable<NullWritable> values,
				Reducer<Text, NullWritable, Text, NullWritable>.Context context)
				throws IOException, InterruptedException {
			context.write(key, NullWritable.get());
		}
	}

	public int run(String[] args) throws Exception {
		Configuration conf = getConf();
		Path input = new Path(conf.get("input"));
		Path output = new Path(conf.get("output"));

		Job job = Job.getInstance(conf, "FriendMapReduceData");
		job.setJarByClass(this.getClass());

		job.setMapperClass(FriendMapper.class);
		job.setMapOutputKeyClass(Text.class);
		job.setMapOutputValueClass(NullWritable.class);

		job.setReducerClass(FriendReducer.class);
		job.setOutputKeyClass(Text.class);
		job.setOutputValueClass(NullWritable.class);

		TextInputFormat.addInputPath(job, input);
		TextOutputFormat.setOutputPath(job, output);

		return job.waitForCompletion(true) ? 0 : 1;
	}

	public static void main(String[] args) throws Exception {
		System.exit(ToolRunner.run(new FriendMapReduceData(), args));
	}
}

4)執行以及結果

提交數據文件到文件系統中函數

bin/hadoop fs -put /home/softwares/test/friend input/friend

執行oop

bin/yarn jar /home/softwares/my_hadoop-0.0.1-SNAPSHOT.jar com.jf.mapreduce.FriendMapReduceData -Dinput=input/friend -Doutput=output/friend

結果測試

2.詞頻統計

1)數據

Text 1: the weather is good
Text 2: today is good
Text 3: good weather is good
Text 4: today has good weatherthis

2)分析

解析每一個單詞出現的次數spa

3)代碼實現

package com.jf.mapreduce;

import java.io.IOException;
import java.util.StringTokenizer;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner;

public class WordMapReduceData extends Configured implements Tool {

	static class WordMapper extends Mapper<LongWritable, Text, Text, IntWritable> {
		private Text key = null;

		@Override
		protected void map(LongWritable key, Text value, Mapper<LongWritable, Text, Text, IntWritable>.Context context)
				throws IOException, InterruptedException {
			// 解析每行中單詞數量
			StringTokenizer words = new StringTokenizer(value.toString());
			while (words.hasMoreElements()) {
				this.key = new Text(words.nextToken());
				context.write(this.key, new IntWritable(1));
			}
		}
	}

	static class WordReducer extends Reducer<Text, IntWritable, Text, IntWritable> {
		@Override
		protected void reduce(Text key, Iterable<IntWritable> values,
				Reducer<Text, IntWritable, Text, IntWritable>.Context context)
				throws IOException, InterruptedException {
			int count = 0;
			for (IntWritable intWritable : values) {
				count += intWritable.get();
			}
			context.write(key, new IntWritable(count));
		}
	}

	public int run(String[] args) throws Exception {
		// 構建做業輸入和輸出
		Configuration conf = getConf();
		Path input = new Path(conf.get("input"));
		Path output = new Path(conf.get("output"));

		Job job = Job.getInstance(conf, "WordMapReduceData");
		job.setJarByClass(this.getClass());
		// 設置Mapper函數
		job.setMapperClass(WordMapper.class);
		job.setMapOutputKeyClass(Text.class);
		job.setMapOutputValueClass(IntWritable.class);
		// 設置Reducer函數
		job.setReducerClass(WordReducer.class);
		// 設置輸出格式
		job.setOutputKeyClass(Text.class);
		job.setOutputValueClass(IntWritable.class);
		// 設置輸入和輸出目錄
		FileInputFormat.addInputPath(job, input);
		FileOutputFormat.setOutputPath(job, output);

		return job.waitForCompletion(true) ? 0 : 1;
	}

	public static void main(String[] args) throws Exception {
		System.exit(ToolRunner.run(new WordMapReduceData(), args));
	}
}

4)執行結果

提交文件到文件系統中

bin/hadoop fs -put /home/softwares/test/words input/words

執行

bin/yarn jar /home/softwares/my_hadoop-0.0.1-SNAPSHOT.jar com.jf.mapreduce.WordMapReduceData -Dinput=input/words -Doutput=output/words

結果

3.統計成績

1)數據

chinese.txt

a|李一|88
a|王二|26
a|張三|99
a|劉四|58
a|陳五|45
a|楊六|66
a|趙七|78
a|黃八|100
a|周九|62
a|吳十|12

math.txt

c|李一|83
c|王二|36
c|張三|92
c|劉四|58
c|陳五|75
c|楊六|66
c|趙七|63
c|黃八|60
c|周九|62
c|吳十|72

english.txt

b|李一|36
b|王二|66
b|張三|86
b|劉四|56
b|陳五|43
b|楊六|86
b|趙七|99
b|黃八|80
b|周九|70
b|吳十|33

2)分析

有三個數據文件,按照以下格式統計每一個人成績

3)代碼實現

解析類

package com.jf.mapreduce;

import org.apache.hadoop.io.Text;

public class ScoreRecordParser {

	private String id;
	private String name;
	private String score;

	/**
	 * 解析數據
	 * @param line
	 * @return
	 */
	public boolean parse(String line) {
		String[] strs = line.split("\\|");
		if (strs.length != 3) {
			return false;
		}
		id = strs[0].trim();
		name = strs[1].trim();
		score = strs[2].trim();
		if (id.length() > 0 && name.length() > 0 && score.length() > 0) {
			return true;
		} else {
			return false;
		}
	}

	public boolean parse(Text value) {
		return parse(value.toString());
	}

	public String getId() {
		return id;
	}

	public void setId(String id) {
		this.id = id;
	}

	public String getName() {
		return name;
	}

	public void setName(String name) {
		this.name = name;
	}

	public String getScore() {
		return score;
	}

	public void setScore(String score) {
		this.score = score;
	}
}

MapReduce類

package com.jf.mapreduce;

import java.io.IOException;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner;

public class ScoreMapReduceData extends Configured implements Tool {

	static class ScoreMapper extends Mapper<LongWritable, Text, Text, Text> {
		
		private ScoreRecordParser parser = null;
		
		@Override
		protected void map(LongWritable key, Text value, Mapper<LongWritable, Text, Text, Text>.Context context)
				throws IOException, InterruptedException {
			parser = new ScoreRecordParser();
			System.out.println("map解析");
			//按照姓名進行分組
			if(parser.parse(value)){
				System.out.println("map:key="+parser.getName()+",value="+value.toString());
				context.write(new Text(parser.getName()),value);
			}
		}
	}

	static class ScoreReducer extends Reducer<Text, Text, Text, Text> {
		
		private ScoreRecordParser parser = null;
		@Override
		protected void reduce(Text key, Iterable<Text> values, Reducer<Text, Text, Text, Text>.Context context)
				throws IOException, InterruptedException {
			StringBuffer buffer = new StringBuffer();
			parser = new ScoreRecordParser();
			String id = null;
			int score = 0;
			double sum = 0;
			double avg = 0;
			for (Text value : values) {
				if(parser.parse(value)){
					id= parser.getId();
					score = Integer.parseInt(parser.getScore());
					if(id.equals("a")){
						buffer.append("語文:"+score+"\t");
						sum+=score;
					}else if(id.equals("b")){
						buffer.append("英文:"+score+"\t");
						sum+=score;
					}else if(id.equals("c")){
						buffer.append("數學:"+score+"\t");
						sum+=score;
					}
				}
			}
			avg = sum/3;
			buffer.append("總分:"+sum+"\t平均分:"+avg);
			System.out.println("reduce:key="+key.toString()+",value="+buffer.toString());
			context.write(key, new Text(buffer.toString()));
		}
	}

	public int run(String[] args) throws Exception {
		Configuration conf=getConf();
        Path input=new Path(conf.get("input"));
        Path output=new Path(conf.get("output"));

        Job job=Job.getInstance(conf,this.getClass().getSimpleName());
        job.setJarByClass(this.getClass());

        job.setMapperClass(ScoreMapper.class);
        job.setMapOutputKeyClass(Text.class);
        job.setMapOutputValueClass(Text.class);

        job.setReducerClass(ScoreReducer.class);
        job.setOutputKeyClass(Text.class);
        job.setOutputValueClass(Text.class);

        job.setInputFormatClass(TextInputFormat.class);
        job.setOutputFormatClass(TextOutputFormat.class);

        TextInputFormat.addInputPath(job,input);
        TextOutputFormat.setOutputPath(job,output);
		return job.waitForCompletion(true)?0:1;
	}

	public static void main(String[] args) throws Exception {
		System.exit(ToolRunner.run(new ScoreMapReduceData(), args));
	}

}

4)執行結果

由於執行有三個文件,因此須要把三個文件提交到一個文件夾中,執行時-Dinput指定文件夾便可

bin/yarn jar /home/softwares/my_hadoop-0.0.1-SNAPSHOT.jar com.jf.mapreduce.ScoreMapReduceData -Dinput=input/score -Doutput=output/score

由於有三個文件會放到三個數據塊上,因此就會有3個map去執行

執行結果

4.倒排索引

1)數據

file1

Welcome to MapReduce World

file2

MapReduce is simple

file3

MapReduce is powerful is simple

file4

hello MapReduce Bye MapReduce

2)分析

有上面四個數據文件,要獲得結果:

某個單詞  file_1:出現次數,file_2:出現次數,file_3:出現次數,file_4:出現次數

及統計每一個單詞在每一個文件中出現的次數。首先統計的是單詞的次數,因此map輸出時key能夠設定爲單詞。因爲統計的是單詞在每一個文件中的次數,也就是說咱們能夠首先經過map解析出來的就是每一個單詞所在的文件。如:(is : file2),(is : file3),(is:file3) 

這樣在通過洗牌以後到reduce的輸入就是is:file2,file3,file3。這樣就方便咱們統計每一個單詞在各文件中出現的次數。

3)編程

package com.jf.mapreduce;

import java.io.IOException;
import java.util.HashMap;
import java.util.Map;
import java.util.StringTokenizer;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileSplit;
import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner;

public class InvertIndexMapReduce extends Configured implements Tool {

	/**
	 *hadoop在調用map和reduce類時採用的反射調用,內部類須要有實例,因此採用靜態類
	 * @author Administrator
	 *
	 */
	static class IndexMapper extends Mapper<LongWritable, Text, Text, Text> {

		private Text fileName = null;

		@Override
		protected void setup(Mapper<LongWritable, Text, Text, Text>.Context context)
				throws IOException, InterruptedException {
			String fileName = ((FileSplit) context.getInputSplit()).getPath().getName();
			this.fileName = new Text(fileName);
		}

		@Override
		protected void map(LongWritable key, Text value, Mapper<LongWritable, Text, Text, Text>.Context context)
				throws IOException, InterruptedException {
			StringTokenizer tokenizer = new StringTokenizer(value.toString(), " ");
			String tmp = null;
			while (tokenizer.hasMoreTokens()) {
				tmp = tokenizer.nextToken().trim();
				if (tmp.length() > 0) {
					context.write(new Text(tmp), this.fileName);
				}
			}
		}
	}

	static class IndexReducer extends Reducer<Text, Text, Text, Text> {
		@Override
		protected void reduce(Text key, Iterable<Text> values, Reducer<Text, Text, Text, Text>.Context context)
				throws IOException, InterruptedException {
			String tmp = null;
			Map<String, Integer> map = new HashMap<String, Integer>();
			for (Text text : values) {
				tmp = text.toString();
				if (map.get(tmp) != null) {
					map.put(tmp, map.get(tmp) + 1);
				} else {
					map.put(tmp, 1);
				}
			}
			StringBuffer buffer = new StringBuffer();
			for (String mk : map.keySet()) {
				if (buffer.length() > 0) {
					buffer.append("," + mk + ":" + map.get(mk));
				} else {
					buffer.append(mk + ":" + map.get(mk));
				}
			}
			context.write(key, new Text(buffer.toString()));
		}
	}

	public int run(String[] args) throws Exception {
		Configuration conf = getConf();
		Path input = new Path(conf.get("input"));
		Path output = new Path(conf.get("output"));

		Job job = Job.getInstance(conf, "InvertIndexMapReduce");
		job.setJarByClass(this.getClass());

		job.setMapperClass(IndexMapper.class);
		job.setMapOutputKeyClass(Text.class);
		job.setMapOutputValueClass(Text.class);

		job.setReducerClass(IndexReducer.class);
		job.setOutputKeyClass(Text.class);
		job.setOutputValueClass(Text.class);

		job.setInputFormatClass(TextInputFormat.class);
		job.setOutputFormatClass(TextOutputFormat.class);

		TextInputFormat.addInputPath(job, input);
		TextOutputFormat.setOutputPath(job, output);

		return job.waitForCompletion(true) ? 0 : 1;
	}

	public static void main(String[] args) throws Exception {
		System.exit(ToolRunner.run(new InvertIndexMapReduce(), args));
	}

}

4)測試結果

上傳文件

執行命令

bin/yarn jar /home/softwares/my_hadoop-0.0.1-SNAPSHOT.jar com.jf.mapreduce.InvertIndexMapReduce -Dinput=input/index -Doutput=output/index

執行結果

File System Counters
		FILE: Number of bytes read=249
		FILE: Number of bytes written=698080
		FILE: Number of read operations=0
		FILE: Number of large read operations=0
		FILE: Number of write operations=0
		HDFS: Number of bytes read=549
		HDFS: Number of bytes written=168
		HDFS: Number of read operations=18
		HDFS: Number of large read operations=0
		HDFS: Number of write operations=4
	Job Counters 
		Launched map tasks=4
		Launched reduce tasks=2
		Data-local map tasks=4
		Total time spent by all maps in occupied slots (ms)=131026
		Total time spent by all reduces in occupied slots (ms)=31874
		Total time spent by all map tasks (ms)=131026
		Total time spent by all reduce tasks (ms)=31874
		Total vcore-seconds taken by all map tasks=131026
		Total vcore-seconds taken by all reduce tasks=31874
		Total megabyte-seconds taken by all map tasks=134170624
		Total megabyte-seconds taken by all reduce tasks=32638976
	Map-Reduce Framework
		Map input records=4
		Map output records=16
		Map output bytes=205
		Map output materialized bytes=285
		Input split bytes=444
		Combine input records=0
		Combine output records=0
		Reduce input groups=9
		Reduce shuffle bytes=285
		Reduce input records=16
		Reduce output records=9
		Spilled Records=32
		Shuffled Maps =8
		Failed Shuffles=0
		Merged Map outputs=8
		GC time elapsed (ms)=2090
		CPU time spent (ms)=7370
		Physical memory (bytes) snapshot=1040441344
		Virtual memory (bytes) snapshot=5057056768
		Total committed heap usage (bytes)=597049344
	Shuffle Errors
		BAD_ID=0
		CONNECTION=0
		IO_ERROR=0
		WRONG_LENGTH=0
		WRONG_MAP=0
		WRONG_REDUCE=0
	File Input Format Counters 
		Bytes Read=105
	File Output Format Counters 
		Bytes Written=168

結果文件

5.共現矩陣

第一步:統計好友列表

1)數據

friendList.txt

joe,    jon    
joe , kia    
joe, bob    
joe ,ali
kia,    joe    
kia ,jim    
kia, dee
dee    ,kia    
dee, ali
ali    ,dee    
ali, jim    
ali ,bob    
ali, joe    
ali ,jon
jon,    joe    
jon ,ali
bob,    joe    
bob ,ali    
bob, jim
jim    ,kia    
jim, bob    
jim ,ali

2)分析

從每隊好友關係中,獲取每一個人的好友列表

3)編程

package com.jf.mapreduce;

import java.io.IOException;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner;

public class FriendListMapReduceData extends Configured implements Tool {

    static class FriendListMapper extends Mapper<LongWritable, Text, Text, Text> {
        @Override
        protected void map(LongWritable key, Text value, Mapper<LongWritable, Text, Text, Text>.Context context)
                throws IOException, InterruptedException {
            String line = value.toString();
            String[] names = line.replaceAll(" ", "").split(",");
            if (names.length == 2) {
                context.write(new Text(names[0]), new Text(names[1]));
            }
        }
    }

    static class FriendListReducer extends Reducer<Text, Text, Text, Text> {
        @Override
        protected void reduce(Text key, Iterable<Text> values, Reducer<Text, Text, Text, Text>.Context context)
                throws IOException, InterruptedException {
            StringBuffer buffer = new StringBuffer();
            for (Text text : values) {
                buffer.append("," + text.toString());
            }
            context.write(key, new Text(buffer.toString()));
        }
    }

    public int run(String[] args) throws Exception {
        Configuration conf = getConf();
        Path input = new Path(conf.get("input"));
        Path output = new Path(conf.get("output"));

        Job job = Job.getInstance(conf, this.getClass().getSimpleName());
        job.setJarByClass(this.getClass());

        job.setMapperClass(FriendListMapper.class);
        job.setMapOutputKeyClass(Text.class);
        job.setMapOutputValueClass(Text.class);

        job.setReducerClass(FriendListReducer.class);
        job.setOutputKeyClass(Text.class);
        job.setOutputValueClass(Text.class);

        job.setInputFormatClass(TextInputFormat.class);
        job.setOutputFormatClass(TextOutputFormat.class);

        TextInputFormat.addInputPath(job, input);
        TextOutputFormat.setOutputPath(job, output);

        return job.waitForCompletion(true) ? 0 : 1;
    }

    public static void main(String[] args) throws Exception {
        System.exit(ToolRunner.run(new FriendListMapReduceData(), args));
    }
}

4)結果

第二步:把每個人的好友列表兩兩組成一對,統計每隊出現的次數

1)數據

friendList2.txt

bob	,joe,jim,ali
jon	,ali,joe
kia	,dee,jim,joe
ali	,jon,joe,bob,jim,dee
dee	,ali,kia
jim	,ali,bob,kia
joe	,ali,bob,kia,jon

2)分析

除掉本人之外,好友列表裏面的每隊都組成一個好友對

如:A,B,C,D則會組成的好友對有,B-C,B-D,C-D

3)編程

package com.jf.mapreduce;

import java.io.IOException;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner;

public class FriendListMapReduceData2 extends Configured implements Tool {

	static class FriendListMapper extends Mapper<LongWritable, Text, Text, IntWritable> {
		@Override
		protected void map(LongWritable key, Text value, Mapper<LongWritable, Text, Text, IntWritable>.Context context)
				throws IOException, InterruptedException {
			String line = value.toString();
			String[] names = line.replaceAll(" ", "").split(",");
			//這裏須要注意,雙層循環,匹配出好友列表裏面每兩個名字都有機會組成一對好友
			for (int i = 1; i < names.length - 1; i++) {
				for (int j = 1; j < names.length - i; j++) {
					//這裏比較一下,讓一對好友造成惟一的key,避免出現A-B,B-A的狀況出現
					if (names[i].compareTo(names[i + j]) >= 0) {
						System.out.println(names[i] + ":" + names[i + j]);
						context.write(new Text(names[i] + ":" + names[i + j]), new IntWritable(1));
					} else {
						context.write(new Text(names[i + j] + ":" + names[i]), new IntWritable(1));
					}
				}

			}
		}
	}

	static class FriendListReducer extends Reducer<Text, IntWritable, Text, IntWritable> {
		@Override
		protected void reduce(Text key, Iterable<IntWritable> values,
				Reducer<Text, IntWritable, Text, IntWritable>.Context context)
				throws IOException, InterruptedException {
			int count = 0;
			for (IntWritable value : values) {
				count += value.get();
			}
			context.write(key, new IntWritable(count));
		}
	}

	public int run(String[] args) throws Exception {
		Configuration conf = getConf();
		Path input = new Path(conf.get("input"));
		Path output = new Path(conf.get("output"));

		Job job = Job.getInstance(conf, this.getClass().getSimpleName());
		job.setJarByClass(this.getClass());

		job.setMapperClass(FriendListMapper.class);
		job.setMapOutputKeyClass(Text.class);
		job.setMapOutputValueClass(IntWritable.class);

		job.setReducerClass(FriendListReducer.class);
		job.setOutputKeyClass(Text.class);
		job.setOutputValueClass(IntWritable.class);

		job.setInputFormatClass(TextInputFormat.class);
		job.setOutputFormatClass(TextOutputFormat.class);

		TextInputFormat.addInputPath(job, input);
		TextOutputFormat.setOutputPath(job, output);

		return job.waitForCompletion(true) ? 0 : 1;
	}

	public static void main(String[] args) throws Exception {
		System.exit(ToolRunner.run(new FriendListMapReduceData2(), args));
	}
}

4)結果

相關文章
相關標籤/搜索