joe, jon
joe , kia
joe, bob
joe ,ali
kia, joe
kia ,jim
kia, dee
dee ,kia
dee, ali
ali ,dee
ali, jim
ali ,bob
ali, joe
ali ,jon
jon, joe
jon ,ali
bob, joe
bob ,ali
bob, jim
jim ,kia
jim, bob
jim ,ali
有一個friends.txt文件,裏面的一行的格式是:java
用戶名,好友名apache
1)需求app
統計有多少對好友ide
2)分析函數
從上面的文件格式與內容,有多是出現用戶名和好友名交換位置的兩組數據,這時候這就要去重了。oop
好比說:測試
joe, jonthis
jon, joespa
這樣的數據,咱們只能保留一組。3d
import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.conf.Configured; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.io.NullWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.Mapper; import org.apache.hadoop.mapreduce.Reducer; import org.apache.hadoop.mapreduce.lib.input.TextInputFormat; import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat; import org.apache.hadoop.util.Tool; import org.apache.hadoop.util.ToolRunner; import java.io.IOException; public class DuplicateData_0010 extends Configured implements Tool{ static class DuplicateDataMapper extends Mapper<LongWritable,Text,Text,NullWritable>{ Text key = new Text(); @Override protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException { String[] split = value.toString().split(","); if (split.length==2){ String name_1 = split[0].trim(); String name_2 = split[1].trim(); if (!name_1.equals(name_2)){ this.key.set( name_1.compareTo(name_2)>0? name_1+","+name_2: name_2+","+name_1); context.write(this.key,NullWritable.get()); } } } } static class DuplicatteDataReducer extends Reducer<Text,NullWritable,Text,NullWritable>{ @Override protected void reduce(Text key, Iterable<NullWritable> values, Context context) throws IOException, InterruptedException { context.write(key, NullWritable.get()); } } @Override public int run(String[] strings) throws Exception { Configuration conf = getConf(); Path input= new Path(conf.get("iniput")); Path output= new Path(conf.get("output")); Job job = Job.getInstance(conf, this.getClass().getSimpleName() + "Lance"); job.setJarByClass(this.getClass()); job.setMapperClass(DuplicateDataMapper.class); job.setMapOutputKeyClass(Text.class); job.setMapOutputValueClass(NullWritable.class); job.setReducerClass(DuplicatteDataReducer.class); job.setOutputKeyClass(Text.class); job.setOutputValueClass(NullWritable.class); TextInputFormat.addInputPath(job, input); TextOutputFormat.setOutputPath(job,output); return job.waitForCompletion(true)?0:1; } public static void main(String[] args) throws Exception { System.exit(ToolRunner.run(new DuplicateData_0010(),args)); } }
設有4組原始文本數據:
Text 1: the weather is good Text 2: today is good
Text 3: good weather is good Text 4: today has good weather
1)需求
求每篇文章每一個單詞出現的次數
2)分析
第一:
第二:
預期出現的結果:
1)編寫一個CountWordMapper類去實現Mapper
/** *經過繼承org.apache.hadoop.mapreduce.Mapper編寫本身的Mapper */ public class WordCountMapper extends Mapper<LongWritable, Text, Text, IntWritable>{ private final static IntWritable one=new IntWritable(1); //統計使用變量 private Text word=new Text(); //單詞變量 /** * key:當前讀取行的偏移量 * value: 當前讀取的行 * context:map方法執行時上下文 */ @Override protected void map(LongWritable key, Text value, Mapper<LongWritable, Text, Text, IntWritable>.Context context) throws IOException, InterruptedException { // TODO Auto-generated method stub StringTokenizer words=new StringTokenizer(value.toString()); while(words.hasMoreTokens()){ word.set(words.nextToken()); context.write(word, one); } } }
2)編寫一個CountWordReducer類去實現Reducer
/** * 經過繼承org.apache.hadoop.mapreduce.Reducer編寫本身的Reducer */ public class WordCountReducer extends Reducer<Text, IntWritable, Text, IntWritable>{ /** * key:待統計的word * values:待統計word的全部統計標識 * context:reduce方法執行時的上下文 */ @Override protected void reduce(Text key, Iterable<IntWritable> values,Reducer<Text, IntWritable, Text, IntWritable>.Context context) throws IOException, InterruptedException { // TODO Auto-generated method stub int count=0; for(IntWritable one:values){ count+=one.get(); } context.write(key, new IntWritable(count)); } }
3)編寫一個WordCount做業調度的驅動程序WordCountDriver
/** * WordCount做業調度的驅動程序 * */ public class WordCountDriver { public static void main(String[] args) throws Exception { // 構建新的做業 Configuration conf=new Configuration(); Job job = Job.getInstance(conf, "Word Count"); job.setJarByClass(WordCountDriver.class); // 設置Mapper和Reducer函數 job.setMapperClass(WordCountMapper.class); job.setReducerClass(WordCountReducer.class); // 設置輸出格式 job.setOutputKeyClass(Text.class); job.setOutputValueClass(IntWritable.class); // 設置輸入和輸出目錄 FileInputFormat.addInputPath(job, new Path(args[0])); FileOutputFormat.setOutputPath(job, new Path(args[1])); // ᨀ交做業執行 System.exit(job.waitForCompletion(true)?0:1); } }
1)前期準備
將程序打成jar包: wordcount.jar
準備好Text 1-4文件
2)運行
yarn jar wordcount.jar com.briup.WordCount input output
chinese.txt
a|李一|88 a|王二|26 a|張三|99 a|劉四|58 a|陳五|45 a|楊六|66 a|趙七|78 a|黃八|100 a|周九|62 a|吳十|12
english.txt
b|李一|36 b|王二|66 b|張三|86 b|劉四|56 b|陳五|43 b|楊六|86 b|趙七|99 b|黃八|80 b|周九|70 b|吳十|33
math.txt
c|李一|83 c|王二|36 c|張三|92 c|劉四|58 c|陳五|75 c|楊六|66 c|趙七|63 c|黃八|60 c|周九|62 c|吳十|72
我看查看chinese.txt查看數據格式:
a表明語文:李一表明名字:88表明語文成績
根據上面的數據,統計一下分數,格式以下:
1)編寫一個解析類解析上面的每門課的數據
ScoreRecordParser
import org.apache.hadoop.io.Text; public class ScoreRecordParser{ private String id; private String name; private String score; public boolean parse(String line){ String[] strs=line.split("[|]"); if(strs.length<3){ return false; } id=strs[0].trim(); name=strs[1].trim(); score=strs[2].trim(); if(id.length()>0&&name.length()>0&&score.length()>0){ return true; }else{ return false; } } public boolean parse(Text line){ return parse(line.toString()); } public String getId(){ return id; } public void setId(String id){ this.id=id; } public String getName(){ return name; } public void setName(String name){ this.name=name; } public String getScore(){ return score; } public void setScore(String score){ this.score=score; } }
2)實現需求
CalculateScore_0010
import com.briup.bd1702.hadoop.mapred.utils.ScoreRecordParser; import java.io.IOException; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.conf.Configured; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.Mapper; import org.apache.hadoop.mapreduce.Reducer; import org.apache.hadoop.mapreduce.lib.input.TextInputFormat; import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat; import org.apache.hadoop.util.Tool; import org.apache.hadoop.util.ToolRunner; public class CalculateScore_0010 extends Configured implements Tool{ private static ScoreRecordParser parser=new ScoreRecordParser(); static class CalculateScoreMapper extends Mapper<LongWritable,Text,Text,Text>{ private Text key=new Text(); @Override protected void map(LongWritable key,Text value,Context context) throws IOException, InterruptedException{ if(parser.parse(value)){ this.key.set(parser.getName()); context.write(this.key,value); } } } static class CalculateScoreReducer extends Reducer<Text,Text,Text,Text>{ private Text value=new Text(); @Override protected void reduce(Text key,Iterable<Text> values,Context context) throws IOException, InterruptedException{ StringBuffer buffer=new StringBuffer(); double sum=0; for(Text text:values){ if(parser.parse(text)){ String id=parser.getId(); String score=parser.getScore(); switch(id){ case "a":{ buffer.append("語文:"+score+"\t"); break; } case "b":{ buffer.append("英語:"+score+"\t"); break; } case "c":{ buffer.append("數學:"+score+"\t"); break; } } sum+=Double.parseDouble(score); } } buffer.append("總分:"+sum+"\t平均分:"+(sum/3)); this.value.set(buffer.toString()); context.write(key,this.value); } } @Override public int run(String[] args) throws Exception{ Configuration conf=getConf(); Path input=new Path(conf.get("input")); Path output=new Path(conf.get("output")); Job job=Job.getInstance(conf,this.getClass().getSimpleName()); job.setJarByClass(this.getClass()); job.setMapperClass(CalculateScoreMapper.class); job.setMapOutputKeyClass(Text.class); job.setMapOutputValueClass(Text.class); job.setReducerClass(CalculateScoreReducer.class); job.setOutputKeyClass(Text.class); job.setOutputValueClass(Text.class); job.setInputFormatClass(TextInputFormat.class); job.setOutputFormatClass(TextOutputFormat.class); TextInputFormat.addInputPath(job,input); TextOutputFormat.setOutputPath(job,output); return job.waitForCompletion(true)?0:1; } public static void main(String[] args) throws Exception{ System.exit(ToolRunner.run(new P00010_CalculateScore_0010(),args)); } }
這裏執行由於有三個文件,咱們用一個目錄去存儲,而後在-Dinput中指定這個目錄就能夠了 。
在上面的三個文件中,都是特別小的,因此三個文件要用三個數據塊去存儲,而後用三個map去執行者三個文件。
首先知道什麼是倒排索引?
好比所咱們有file_1到file_4這四篇文章,咱們須要求出:某個單詞,在每一篇文章出現的次數
好比說輸出格式是這樣的:
某個單詞 file_1:出現次數,file_2:出現次數,file_3:出現次數,file_4:出現次數
file_1
Welcome to MapReduce World
file_2
MapReduce is simple
file_3
MapReduce is powerful is simple
file_4
hello MapReduce Bye MapReduce
1)需求
實現文件輸出格式以下:
某個單詞 file_1:出現次數,file_2:出現次數,file_3:出現次數,file_4:出現次數
2)分析
好比MapReduce這個單詞,咱們分析一下:
在map端出來的格式:
注意:f1,f2,f3,f4表明文件名
通過洗牌以後,進入reduce的數據格式:
在reduce怎麼處理呢?
咱們構建一個Map集合用來存放某個路徑在這個集合中出現的次數:
最後就能夠造成咱們想要的文件:
import java.io.IOException; import java.util.HashMap; import java.util.Map; import java.util.Map.Entry; import java.util.Set; import java.util.StringTokenizer; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.conf.Configured; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.Mapper; import org.apache.hadoop.mapreduce.Reducer; import org.apache.hadoop.mapreduce.lib.input.FileSplit; import org.apache.hadoop.mapreduce.lib.input.TextInputFormat; import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat; import org.apache.hadoop.util.Tool; import org.apache.hadoop.util.ToolRunner; public class InvertIndex_0010 extends Configured implements Tool{ static class InvertIndexMapper extends Mapper<LongWritable,Text,Text,Text>{ private Text word=new Text(); private Text file=new Text(); @Override protected void setup(Context context) throws IOException, InterruptedException{ String fileName=((FileSplit)context.getInputSplit()) .getPath().getName(); file.set(fileName); } @Override protected void map(LongWritable key, Text value,Context context) throws IOException, InterruptedException{ StringTokenizer words= new StringTokenizer(value.toString()," "); String item=null; while(words.hasMoreTokens()){ item=words.nextToken(); if(item.trim().length()>=1){ word.set(item.trim()); context.write(word,file); } } } } static class InvertIndexReducer extends Reducer<Text,Text,Text,Text>{ private Map<String,Integer> index=new HashMap<>(); private Text value=new Text(); @Override protected void reduce(Text key,Iterable<Text> values,Context context) throws IOException, InterruptedException{ index.clear(); for(Text file:values){ String fileName=file.toString(); if(index.get(fileName)!=null){ index.put(fileName,index.get(fileName)+1); }else{ index.put(fileName,1); } } StringBuffer buffer=new StringBuffer(); Set<Entry<String,Integer>> entries=index.entrySet(); for(Entry<String,Integer> entry:entries){ buffer.append(","+entry.getKey().toString()+":"+entry.getValue().toString()); } this.value.set(buffer.toString()); context.write(key,value); } } @Override public int run(String[] args) throws Exception{ Configuration conf=getConf(); Path input=new Path(conf.get("input")); Path output=new Path(conf.get("output")); Job job=Job.getInstance(conf,this.getClass().getSimpleName()); job.setJarByClass(this.getClass()); job.setMapperClass(InvertIndexMapper.class); job.setMapOutputKeyClass(Text.class); job.setMapOutputValueClass(Text.class); job.setReducerClass(InvertIndexReducer.class); job.setOutputKeyClass(Text.class); job.setOutputValueClass(Text.class); job.setInputFormatClass(TextInputFormat.class); job.setOutputFormatClass(TextOutputFormat.class); TextInputFormat.addInputPath(job,input); TextOutputFormat.setOutputPath(job,output); return job.waitForCompletion(true)?0:1; } public static void main(String[] args) throws Exception{ System.exit(ToolRunner.run(new P00010_InvertIndex_0010(),args)); } }
注意:
這裏使用了一個StringTokenizer來分割數據:
StringTokenizer words= new StringTokenizer(value.toString()," "); String item=null; while(words.hasMoreTokens()){ item=words.nextToken(); if(item.trim().length()>=1){ word.set(item.trim()); context.write(word,file); } }
首先咱們要知道什麼是共現次數?
咱們分析一個用戶數據來解釋:
joe, jon
joe , kia
joe, bob
joe ,ali
kia, joe
kia ,jim
kia, dee
dee ,kia
dee, ali
ali ,dee
ali, jim
ali ,bob
ali, joe
ali ,jon
jon, joe
jon ,ali
bob, joe
bob ,ali
bob, jim
jim ,kia
jim, bob
jim ,ali
上面這個數據中,在一行中左邊是一個用戶,右邊是它的好友。
那咱們能夠根據上面的數據列出全部用戶的好友列表:
ali,bob,jim,dee,jon,joe
bob,jim,ali,joe
dee,kia,ali
jim,ali,bob,kia
joe,ali,bob,kia,jon
jon,joe,ali
kia,dee,jim,joe
接下來咱們把每一個用戶的好友列表每兩兩組成一對,在全部用戶的好友列表中去計算,這兩兩組成的一對共出現了幾回。
好比說:
bob,jim組成了一組,在餘下的好友列表中兩兩組成去計算共出現了幾回。(除了用戶自己),也就是下面的數據。
dee,jon,joe
jim,ali,joe
kia,ali
ali,bob,kia
ali,bob,kia,jon
joe,ali
dee,jim,joe
接下來就是jin,dee。而後是dee,jon依次類推。。。
從上面的分析咱們能夠得出預期的結果爲:
ali,bob 2 ali,jim 1 ali,joe 2 ali,jon 1 ali,kia 3 bob,dee 1 bob,jim 1 bob,joe 1 bob,jon 2 bob,kia 2 dee,jim 2 dee,joe 2 dee,jon 1 jim,joe 3 jim,jon 1 joe,jon 1 jon,kia 1
咱們能夠分兩步去寫,也就是分兩個MapReduce任務。第一個MapReduce計算好友列表。第二個在每兩兩組成一組,計算這一組所出現的次數。
1)計算好友列表
import com.briup.bd1702.hadoop.mapred.utils.FriendRecordParser; import java.io.IOException; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.conf.Configured; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.Mapper; import org.apache.hadoop.mapreduce.Reducer; import org.apache.hadoop.mapreduce.lib.input.TextInputFormat; import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat; import org.apache.hadoop.util.Tool; import org.apache.hadoop.util.ToolRunner; public class FriendList_0010 extends Configured implements Tool{ static class FriendListMapper extends Mapper<LongWritable,Text,Text,Text>{ private Text userName=new Text(); private Text friendName=new Text(); private FriendRecordParser parser=new FriendRecordParser(); @Override protected void map(LongWritable key,Text value,Context context) throws IOException, InterruptedException{ parser.parse(value); if(parser.isValid()){ userName.set(parser.getUserName()); friendName.set(parser.getFriendName()); context.write(userName,friendName); System.out.println("----"+userName+"----"+friendName+"----"); } } } static class FriendListReducer extends Reducer<Text,Text,Text,Text>{ private Text friendsNames=new Text(); @Override protected void reduce(Text key, Iterable<Text> values,Context context) throws IOException, InterruptedException{ StringBuffer buffer=new StringBuffer(); for(Text name:values){ buffer.append(","+name); } System.out.println("++++"+buffer.toString()+"++++"); friendsNames.set(buffer.toString()); context.write(key,friendsNames); } } @Override public int run(String[] args) throws Exception{ Configuration conf=getConf(); Path input=new Path(conf.get("input")); Path output=new Path(conf.get("output")); Job job=Job.getInstance(conf,this.getClass().getSimpleName()); job.setJarByClass(this.getClass()); job.setMapperClass(FriendListMapper.class); job.setMapOutputKeyClass(Text.class); job.setMapOutputValueClass(Text.class); job.setReducerClass(FriendListReducer.class); job.setOutputKeyClass(Text.class); job.setOutputValueClass(Text.class); job.setInputFormatClass(TextInputFormat.class); job.setOutputFormatClass(TextOutputFormat.class); TextInputFormat.addInputPath(job,input); TextOutputFormat.setOutputPath(job,output); return job.waitForCompletion(true)?0:1; } public static void main(String[] args) throws Exception{ System.exit(ToolRunner.run(new P00010_FriendList_0010(),args)); } }
2)計算共現次數
import java.io.IOException; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.conf.Configured; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.IntWritable; import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.Mapper; import org.apache.hadoop.mapreduce.Reducer; import org.apache.hadoop.mapreduce.lib.input.TextInputFormat; import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat; import org.apache.hadoop.util.Tool; import org.apache.hadoop.util.ToolRunner; public class Cooccurence_0010 extends Configured implements Tool{ static class CooccurenceMapper extends Mapper<LongWritable,Text,Text,IntWritable>{ private Text key=new Text(); private IntWritable value=new IntWritable(1); @Override protected void map(LongWritable key,Text value,Context context) throws IOException, InterruptedException{ String[] strs=value.toString().split(","); for(int i=1;i<strs.length-1;i++){ for(int j=i+1;j<strs.length;j++){ //這個的目的是:兩個數據造成一對以後,順序固定的問題。 this.key.set(strs[i].compareTo(strs[j])<0? strs[i]+","+strs[j]: strs[j]+","+strs[i]); context.write(this.key,this.value); } } } } static class CooccurenceReducer extends Reducer<Text,IntWritable,Text,IntWritable>{ @Override protected void reduce(Text key,Iterable<IntWritable> values,Context context) throws IOException, InterruptedException{ int count=0; for(IntWritable value:values){ count+=value.get(); } context.write(key,new IntWritable(count)); } } @Override public int run(String[] args) throws Exception{ Configuration conf=getConf(); Path input=new Path(conf.get("input")); Path output=new Path(conf.get("output")); Job job=Job.getInstance(conf,this.getClass().getSimpleName()); job.setJarByClass(this.getClass()); job.setMapperClass(CooccurenceMapper.class); job.setMapOutputKeyClass(Text.class); job.setMapOutputValueClass(IntWritable.class); job.setReducerClass(CooccurenceReducer.class); job.setOutputKeyClass(Text.class); job.setOutputValueClass(IntWritable.class); job.setInputFormatClass(TextInputFormat.class); job.setOutputFormatClass(TextOutputFormat.class); TextInputFormat.addInputPath(job,input); TextOutputFormat.setOutputPath(job,output); return job.waitForCompletion(true)?0:1; } public static void main(String[] args) throws Exception{ System.exit(ToolRunner.run(new P00020_Cooccurence_0010(),args)); } }
喜歡就點個「推薦」哦~!