joe, jon joe , kia joe, bob joe ,ali kia, joe kia ,jim kia, dee dee ,kia dee, ali ali ,dee ali, jim ali ,bob ali, joe ali ,jon jon, joe jon ,ali bob, joe bob ,ali bob, jim jim ,kia jim, bob jim ,ali
從上面的文件格式與內容,有多是出現用戶名和好友名交換位置的兩組數據,這時候這就要去重了。java
好比說:apache
joe, jon編程
jon, joeapp
這樣的數據,咱們只能保留一組。ide
package com.jf.mapreduce; import java.io.IOException; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.conf.Configured; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.io.NullWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.Mapper; import org.apache.hadoop.mapreduce.Reducer; import org.apache.hadoop.mapreduce.lib.input.TextInputFormat; import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat; import org.apache.hadoop.util.Tool; import org.apache.hadoop.util.ToolRunner; public class FriendMapReduceData extends Configured implements Tool { static class FriendMapper extends Mapper<LongWritable, Text, Text, NullWritable> { Text key = null; @Override protected void map(LongWritable key, Text value, Mapper<LongWritable, Text, Text, NullWritable>.Context context) throws IOException, InterruptedException { String[] strs = value.toString().split(","); if (strs.length == 2) { // 將每行中姓名按照大小排序,而後做爲key進行輸出 String name1 = strs[0].replaceAll(" ", ""); String name2 = strs[1].replaceAll(" ", ""); this.key = new Text(name1.compareTo(name2) > 0 ? name1 + "," + name2 : name2 + "," + name1); context.write(this.key, NullWritable.get()); } } } static class FriendReducer extends Reducer<Text, NullWritable, Text, NullWritable> { @Override protected void reduce(Text key, Iterable<NullWritable> values, Reducer<Text, NullWritable, Text, NullWritable>.Context context) throws IOException, InterruptedException { context.write(key, NullWritable.get()); } } public int run(String[] args) throws Exception { Configuration conf = getConf(); Path input = new Path(conf.get("input")); Path output = new Path(conf.get("output")); Job job = Job.getInstance(conf, "FriendMapReduceData"); job.setJarByClass(this.getClass()); job.setMapperClass(FriendMapper.class); job.setMapOutputKeyClass(Text.class); job.setMapOutputValueClass(NullWritable.class); job.setReducerClass(FriendReducer.class); job.setOutputKeyClass(Text.class); job.setOutputValueClass(NullWritable.class); TextInputFormat.addInputPath(job, input); TextOutputFormat.setOutputPath(job, output); return job.waitForCompletion(true) ? 0 : 1; } public static void main(String[] args) throws Exception { System.exit(ToolRunner.run(new FriendMapReduceData(), args)); } }
提交數據文件到文件系統中函數
bin/hadoop fs -put /home/softwares/test/friend input/friend
執行oop
bin/yarn jar /home/softwares/my_hadoop-0.0.1-SNAPSHOT.jar com.jf.mapreduce.FriendMapReduceData -Dinput=input/friend -Doutput=output/friend
結果測試
Text 1: the weather is good
Text 2: today is good
Text 3: good weather is good
Text 4: today has good weatherthis
解析每一個單詞出現的次數spa
package com.jf.mapreduce; import java.io.IOException; import java.util.StringTokenizer; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.conf.Configured; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.IntWritable; import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.Mapper; import org.apache.hadoop.mapreduce.Reducer; import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; import org.apache.hadoop.util.Tool; import org.apache.hadoop.util.ToolRunner; public class WordMapReduceData extends Configured implements Tool { static class WordMapper extends Mapper<LongWritable, Text, Text, IntWritable> { private Text key = null; @Override protected void map(LongWritable key, Text value, Mapper<LongWritable, Text, Text, IntWritable>.Context context) throws IOException, InterruptedException { // 解析每行中單詞數量 StringTokenizer words = new StringTokenizer(value.toString()); while (words.hasMoreElements()) { this.key = new Text(words.nextToken()); context.write(this.key, new IntWritable(1)); } } } static class WordReducer extends Reducer<Text, IntWritable, Text, IntWritable> { @Override protected void reduce(Text key, Iterable<IntWritable> values, Reducer<Text, IntWritable, Text, IntWritable>.Context context) throws IOException, InterruptedException { int count = 0; for (IntWritable intWritable : values) { count += intWritable.get(); } context.write(key, new IntWritable(count)); } } public int run(String[] args) throws Exception { // 構建做業輸入和輸出 Configuration conf = getConf(); Path input = new Path(conf.get("input")); Path output = new Path(conf.get("output")); Job job = Job.getInstance(conf, "WordMapReduceData"); job.setJarByClass(this.getClass()); // 設置Mapper函數 job.setMapperClass(WordMapper.class); job.setMapOutputKeyClass(Text.class); job.setMapOutputValueClass(IntWritable.class); // 設置Reducer函數 job.setReducerClass(WordReducer.class); // 設置輸出格式 job.setOutputKeyClass(Text.class); job.setOutputValueClass(IntWritable.class); // 設置輸入和輸出目錄 FileInputFormat.addInputPath(job, input); FileOutputFormat.setOutputPath(job, output); return job.waitForCompletion(true) ? 0 : 1; } public static void main(String[] args) throws Exception { System.exit(ToolRunner.run(new WordMapReduceData(), args)); } }
提交文件到文件系統中
bin/hadoop fs -put /home/softwares/test/words input/words
執行
bin/yarn jar /home/softwares/my_hadoop-0.0.1-SNAPSHOT.jar com.jf.mapreduce.WordMapReduceData -Dinput=input/words -Doutput=output/words
結果
chinese.txt
a|李一|88
a|王二|26
a|張三|99
a|劉四|58
a|陳五|45
a|楊六|66
a|趙七|78
a|黃八|100
a|周九|62
a|吳十|12
math.txt
c|李一|83
c|王二|36
c|張三|92
c|劉四|58
c|陳五|75
c|楊六|66
c|趙七|63
c|黃八|60
c|周九|62
c|吳十|72
english.txt
b|李一|36
b|王二|66
b|張三|86
b|劉四|56
b|陳五|43
b|楊六|86
b|趙七|99
b|黃八|80
b|周九|70
b|吳十|33
有三個數據文件,按照以下格式統計每一個人成績
解析類
package com.jf.mapreduce; import org.apache.hadoop.io.Text; public class ScoreRecordParser { private String id; private String name; private String score; /** * 解析數據 * @param line * @return */ public boolean parse(String line) { String[] strs = line.split("\\|"); if (strs.length != 3) { return false; } id = strs[0].trim(); name = strs[1].trim(); score = strs[2].trim(); if (id.length() > 0 && name.length() > 0 && score.length() > 0) { return true; } else { return false; } } public boolean parse(Text value) { return parse(value.toString()); } public String getId() { return id; } public void setId(String id) { this.id = id; } public String getName() { return name; } public void setName(String name) { this.name = name; } public String getScore() { return score; } public void setScore(String score) { this.score = score; } }
MapReduce類
package com.jf.mapreduce; import java.io.IOException; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.conf.Configured; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.Mapper; import org.apache.hadoop.mapreduce.Reducer; import org.apache.hadoop.mapreduce.lib.input.TextInputFormat; import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat; import org.apache.hadoop.util.Tool; import org.apache.hadoop.util.ToolRunner; public class ScoreMapReduceData extends Configured implements Tool { static class ScoreMapper extends Mapper<LongWritable, Text, Text, Text> { private ScoreRecordParser parser = null; @Override protected void map(LongWritable key, Text value, Mapper<LongWritable, Text, Text, Text>.Context context) throws IOException, InterruptedException { parser = new ScoreRecordParser(); System.out.println("map解析"); //按照姓名進行分組 if(parser.parse(value)){ System.out.println("map:key="+parser.getName()+",value="+value.toString()); context.write(new Text(parser.getName()),value); } } } static class ScoreReducer extends Reducer<Text, Text, Text, Text> { private ScoreRecordParser parser = null; @Override protected void reduce(Text key, Iterable<Text> values, Reducer<Text, Text, Text, Text>.Context context) throws IOException, InterruptedException { StringBuffer buffer = new StringBuffer(); parser = new ScoreRecordParser(); String id = null; int score = 0; double sum = 0; double avg = 0; for (Text value : values) { if(parser.parse(value)){ id= parser.getId(); score = Integer.parseInt(parser.getScore()); if(id.equals("a")){ buffer.append("語文:"+score+"\t"); sum+=score; }else if(id.equals("b")){ buffer.append("英文:"+score+"\t"); sum+=score; }else if(id.equals("c")){ buffer.append("數學:"+score+"\t"); sum+=score; } } } avg = sum/3; buffer.append("總分:"+sum+"\t平均分:"+avg); System.out.println("reduce:key="+key.toString()+",value="+buffer.toString()); context.write(key, new Text(buffer.toString())); } } public int run(String[] args) throws Exception { Configuration conf=getConf(); Path input=new Path(conf.get("input")); Path output=new Path(conf.get("output")); Job job=Job.getInstance(conf,this.getClass().getSimpleName()); job.setJarByClass(this.getClass()); job.setMapperClass(ScoreMapper.class); job.setMapOutputKeyClass(Text.class); job.setMapOutputValueClass(Text.class); job.setReducerClass(ScoreReducer.class); job.setOutputKeyClass(Text.class); job.setOutputValueClass(Text.class); job.setInputFormatClass(TextInputFormat.class); job.setOutputFormatClass(TextOutputFormat.class); TextInputFormat.addInputPath(job,input); TextOutputFormat.setOutputPath(job,output); return job.waitForCompletion(true)?0:1; } public static void main(String[] args) throws Exception { System.exit(ToolRunner.run(new ScoreMapReduceData(), args)); } }
由於執行有三個文件,因此須要把三個文件提交到一個文件夾中,執行時-Dinput指定文件夾便可
bin/yarn jar /home/softwares/my_hadoop-0.0.1-SNAPSHOT.jar com.jf.mapreduce.ScoreMapReduceData -Dinput=input/score -Doutput=output/score
由於有三個文件會放到三個數據塊上,因此就會有3個map去執行
執行結果
file1
Welcome to MapReduce World
file2
MapReduce is simple
file3
MapReduce is powerful is simple
file4
hello MapReduce Bye MapReduce
有上面四個數據文件,要獲得結果:
某個單詞 file_1:出現次數,file_2:出現次數,file_3:出現次數,file_4:出現次數
及統計每一個單詞在每一個文件中出現的次數。首先統計的是單詞的次數,因此map輸出時key能夠設定爲單詞。因爲統計的是單詞在每一個文件中的次數,也就是說咱們能夠首先經過map解析出來的就是每一個單詞所在的文件。如:(is : file2),(is : file3),(is:file3)
這樣在通過洗牌以後到reduce的輸入就是is:file2,file3,file3。這樣就方便咱們統計每一個單詞在各文件中出現的次數。
package com.jf.mapreduce; import java.io.IOException; import java.util.HashMap; import java.util.Map; import java.util.StringTokenizer; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.conf.Configured; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.Mapper; import org.apache.hadoop.mapreduce.Reducer; import org.apache.hadoop.mapreduce.lib.input.FileSplit; import org.apache.hadoop.mapreduce.lib.input.TextInputFormat; import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat; import org.apache.hadoop.util.Tool; import org.apache.hadoop.util.ToolRunner; public class InvertIndexMapReduce extends Configured implements Tool { /** *hadoop在調用map和reduce類時採用的反射調用,內部類須要有實例,因此採用靜態類 * @author Administrator * */ static class IndexMapper extends Mapper<LongWritable, Text, Text, Text> { private Text fileName = null; @Override protected void setup(Mapper<LongWritable, Text, Text, Text>.Context context) throws IOException, InterruptedException { String fileName = ((FileSplit) context.getInputSplit()).getPath().getName(); this.fileName = new Text(fileName); } @Override protected void map(LongWritable key, Text value, Mapper<LongWritable, Text, Text, Text>.Context context) throws IOException, InterruptedException { StringTokenizer tokenizer = new StringTokenizer(value.toString(), " "); String tmp = null; while (tokenizer.hasMoreTokens()) { tmp = tokenizer.nextToken().trim(); if (tmp.length() > 0) { context.write(new Text(tmp), this.fileName); } } } } static class IndexReducer extends Reducer<Text, Text, Text, Text> { @Override protected void reduce(Text key, Iterable<Text> values, Reducer<Text, Text, Text, Text>.Context context) throws IOException, InterruptedException { String tmp = null; Map<String, Integer> map = new HashMap<String, Integer>(); for (Text text : values) { tmp = text.toString(); if (map.get(tmp) != null) { map.put(tmp, map.get(tmp) + 1); } else { map.put(tmp, 1); } } StringBuffer buffer = new StringBuffer(); for (String mk : map.keySet()) { if (buffer.length() > 0) { buffer.append("," + mk + ":" + map.get(mk)); } else { buffer.append(mk + ":" + map.get(mk)); } } context.write(key, new Text(buffer.toString())); } } public int run(String[] args) throws Exception { Configuration conf = getConf(); Path input = new Path(conf.get("input")); Path output = new Path(conf.get("output")); Job job = Job.getInstance(conf, "InvertIndexMapReduce"); job.setJarByClass(this.getClass()); job.setMapperClass(IndexMapper.class); job.setMapOutputKeyClass(Text.class); job.setMapOutputValueClass(Text.class); job.setReducerClass(IndexReducer.class); job.setOutputKeyClass(Text.class); job.setOutputValueClass(Text.class); job.setInputFormatClass(TextInputFormat.class); job.setOutputFormatClass(TextOutputFormat.class); TextInputFormat.addInputPath(job, input); TextOutputFormat.setOutputPath(job, output); return job.waitForCompletion(true) ? 0 : 1; } public static void main(String[] args) throws Exception { System.exit(ToolRunner.run(new InvertIndexMapReduce(), args)); } }
上傳文件
執行命令
bin/yarn jar /home/softwares/my_hadoop-0.0.1-SNAPSHOT.jar com.jf.mapreduce.InvertIndexMapReduce -Dinput=input/index -Doutput=output/index
執行結果
File System Counters FILE: Number of bytes read=249 FILE: Number of bytes written=698080 FILE: Number of read operations=0 FILE: Number of large read operations=0 FILE: Number of write operations=0 HDFS: Number of bytes read=549 HDFS: Number of bytes written=168 HDFS: Number of read operations=18 HDFS: Number of large read operations=0 HDFS: Number of write operations=4 Job Counters Launched map tasks=4 Launched reduce tasks=2 Data-local map tasks=4 Total time spent by all maps in occupied slots (ms)=131026 Total time spent by all reduces in occupied slots (ms)=31874 Total time spent by all map tasks (ms)=131026 Total time spent by all reduce tasks (ms)=31874 Total vcore-seconds taken by all map tasks=131026 Total vcore-seconds taken by all reduce tasks=31874 Total megabyte-seconds taken by all map tasks=134170624 Total megabyte-seconds taken by all reduce tasks=32638976 Map-Reduce Framework Map input records=4 Map output records=16 Map output bytes=205 Map output materialized bytes=285 Input split bytes=444 Combine input records=0 Combine output records=0 Reduce input groups=9 Reduce shuffle bytes=285 Reduce input records=16 Reduce output records=9 Spilled Records=32 Shuffled Maps =8 Failed Shuffles=0 Merged Map outputs=8 GC time elapsed (ms)=2090 CPU time spent (ms)=7370 Physical memory (bytes) snapshot=1040441344 Virtual memory (bytes) snapshot=5057056768 Total committed heap usage (bytes)=597049344 Shuffle Errors BAD_ID=0 CONNECTION=0 IO_ERROR=0 WRONG_LENGTH=0 WRONG_MAP=0 WRONG_REDUCE=0 File Input Format Counters Bytes Read=105 File Output Format Counters Bytes Written=168
結果文件
friendList.txt
joe, jon joe , kia joe, bob joe ,ali kia, joe kia ,jim kia, dee dee ,kia dee, ali ali ,dee ali, jim ali ,bob ali, joe ali ,jon jon, joe jon ,ali bob, joe bob ,ali bob, jim jim ,kia jim, bob jim ,ali
從每隊好友關係中,獲取每一個人的好友列表
package com.jf.mapreduce; import java.io.IOException; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.conf.Configured; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.Mapper; import org.apache.hadoop.mapreduce.Reducer; import org.apache.hadoop.mapreduce.lib.input.TextInputFormat; import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat; import org.apache.hadoop.util.Tool; import org.apache.hadoop.util.ToolRunner; public class FriendListMapReduceData extends Configured implements Tool { static class FriendListMapper extends Mapper<LongWritable, Text, Text, Text> { @Override protected void map(LongWritable key, Text value, Mapper<LongWritable, Text, Text, Text>.Context context) throws IOException, InterruptedException { String line = value.toString(); String[] names = line.replaceAll(" ", "").split(","); if (names.length == 2) { context.write(new Text(names[0]), new Text(names[1])); } } } static class FriendListReducer extends Reducer<Text, Text, Text, Text> { @Override protected void reduce(Text key, Iterable<Text> values, Reducer<Text, Text, Text, Text>.Context context) throws IOException, InterruptedException { StringBuffer buffer = new StringBuffer(); for (Text text : values) { buffer.append("," + text.toString()); } context.write(key, new Text(buffer.toString())); } } public int run(String[] args) throws Exception { Configuration conf = getConf(); Path input = new Path(conf.get("input")); Path output = new Path(conf.get("output")); Job job = Job.getInstance(conf, this.getClass().getSimpleName()); job.setJarByClass(this.getClass()); job.setMapperClass(FriendListMapper.class); job.setMapOutputKeyClass(Text.class); job.setMapOutputValueClass(Text.class); job.setReducerClass(FriendListReducer.class); job.setOutputKeyClass(Text.class); job.setOutputValueClass(Text.class); job.setInputFormatClass(TextInputFormat.class); job.setOutputFormatClass(TextOutputFormat.class); TextInputFormat.addInputPath(job, input); TextOutputFormat.setOutputPath(job, output); return job.waitForCompletion(true) ? 0 : 1; } public static void main(String[] args) throws Exception { System.exit(ToolRunner.run(new FriendListMapReduceData(), args)); } }
friendList2.txt
bob ,joe,jim,ali jon ,ali,joe kia ,dee,jim,joe ali ,jon,joe,bob,jim,dee dee ,ali,kia jim ,ali,bob,kia joe ,ali,bob,kia,jon
除掉本人之外,好友列表裏面的每隊都組成一個好友對
如:A,B,C,D則會組成的好友對有,B-C,B-D,C-D
package com.jf.mapreduce; import java.io.IOException; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.conf.Configured; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.IntWritable; import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.Mapper; import org.apache.hadoop.mapreduce.Reducer; import org.apache.hadoop.mapreduce.lib.input.TextInputFormat; import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat; import org.apache.hadoop.util.Tool; import org.apache.hadoop.util.ToolRunner; public class FriendListMapReduceData2 extends Configured implements Tool { static class FriendListMapper extends Mapper<LongWritable, Text, Text, IntWritable> { @Override protected void map(LongWritable key, Text value, Mapper<LongWritable, Text, Text, IntWritable>.Context context) throws IOException, InterruptedException { String line = value.toString(); String[] names = line.replaceAll(" ", "").split(","); //這裏須要注意,雙層循環,匹配出好友列表裏面每兩個名字都有機會組成一對好友 for (int i = 1; i < names.length - 1; i++) { for (int j = 1; j < names.length - i; j++) { //這裏比較一下,讓一對好友造成惟一的key,避免出現A-B,B-A的狀況出現 if (names[i].compareTo(names[i + j]) >= 0) { System.out.println(names[i] + ":" + names[i + j]); context.write(new Text(names[i] + ":" + names[i + j]), new IntWritable(1)); } else { context.write(new Text(names[i + j] + ":" + names[i]), new IntWritable(1)); } } } } } static class FriendListReducer extends Reducer<Text, IntWritable, Text, IntWritable> { @Override protected void reduce(Text key, Iterable<IntWritable> values, Reducer<Text, IntWritable, Text, IntWritable>.Context context) throws IOException, InterruptedException { int count = 0; for (IntWritable value : values) { count += value.get(); } context.write(key, new IntWritable(count)); } } public int run(String[] args) throws Exception { Configuration conf = getConf(); Path input = new Path(conf.get("input")); Path output = new Path(conf.get("output")); Job job = Job.getInstance(conf, this.getClass().getSimpleName()); job.setJarByClass(this.getClass()); job.setMapperClass(FriendListMapper.class); job.setMapOutputKeyClass(Text.class); job.setMapOutputValueClass(IntWritable.class); job.setReducerClass(FriendListReducer.class); job.setOutputKeyClass(Text.class); job.setOutputValueClass(IntWritable.class); job.setInputFormatClass(TextInputFormat.class); job.setOutputFormatClass(TextOutputFormat.class); TextInputFormat.addInputPath(job, input); TextOutputFormat.setOutputPath(job, output); return job.waitForCompletion(true) ? 0 : 1; } public static void main(String[] args) throws Exception { System.exit(ToolRunner.run(new FriendListMapReduceData2(), args)); } }