測試數據:前面表明QQ用戶,:後面表明用戶QQ好友java
A:B,C,D,F,E,O
B:A,C,E,K
C:F,A,D,I
D:A,E,F,L
E:B,C,D,M,L
F:A,B,C,D,E,O,M
G:A,C,D,E,F
H:A,C,D,E,O
I:A,O
J:B,O
K:A,C,D
L:D,E,F
M:E,F,G
O:A,H,I,Japache
1.先根據用戶QQ好友做爲key,用戶做爲value來輸出:app
好比第一行:B->A C->A D->A F->A E->A O->Aide
第二行:A->B C->B E->B K->Boop
而後根據相同的key使用reduce來聚合測試
好比輸出:B->A B->C B->D....大數據
reduce會根據相同的key分發給相應的reduce task來執行spa
結果輸出爲:B->A C D....net
再寫代碼組合輸出:code
A-C B
A-D B
C-D B
.....
代碼實現
package com.xuyu.friends; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.Mapper; import org.apache.hadoop.mapreduce.Reducer; import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; import java.io.IOException; import java.util.ArrayList; import java.util.Collections; public class CommonFriendsOne { public static class CommonFriendsMapper extends Mapper<LongWritable,Text,Text,Text>{ Text k=new Text(); Text v=new Text(); //A:B,C,D,F,E,O //輸出:B->A C->A D-A... @Override protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException { String[] userAndFriends = value.toString().split(":"); String user=userAndFriends[0]; String[] friends= userAndFriends[1].split(","); v.set(user); for (String f:friends){ k.set(f); //friend user context.write(k,v); } } } public static class CommonFriendsOneReduce extends Reducer<Text,Text,Text,Text>{ //一組數據:B->A E F J... //一組數據:C->B F E J... //因此須要排序 @Override protected void reduce(Text friend, Iterable<Text> users, Context context) throws IOException, InterruptedException { ArrayList<String> userList=new ArrayList<String>(); for (Text user:users){ userList.add(user.toString()); } //排序 Collections.sort(userList); //組合輸出 for(int i=0;i<userList.size()-1;i++){ for(int j=i+1;j<userList.size();j++){ context.write(new Text(userList.get(i)+"-"+userList.get(j)),friend); } } } } public static void main(String[] args) throws Exception{ Configuration conf = new Configuration(); Job job = Job.getInstance(conf); //動態獲取jar包在哪裏 job.setJarByClass(CommonFriendsOne.class); //2.封裝參數:本次job所要調用的mapper實現類 job.setMapperClass(CommonFriendsMapper.class); job.setReducerClass(CommonFriendsOneReduce.class); //3.封裝參數:本次job的Mapper實現類產生的數據key,value的類型 job.setMapOutputKeyClass(Text.class); job.setMapOutputValueClass(Text.class); //4.封裝參數:本次Reduce返回的key,value數據類型 job.setOutputKeyClass(Text.class); job.setOutputValueClass(Text.class); FileInputFormat.setInputPaths(job,new Path("F:\\mrdata\\friends\\input")); FileOutputFormat.setOutputPath(job,new Path("F:\\mrdata\\friends\\output")); boolean res = job.waitForCompletion(true); System.exit(res ? 0:-1); } }
結果輸出
part-r-000000文件
B-C A B-D A B-F A B-G A B-H A B-I A B-K A B-O A C-D A C-F A C-G A C-H A C-I A C-K A C-O A D-F A D-G A D-H A D-I A D-K A D-O A F-G A F-H A F-I A F-K A F-O A G-H A G-I A G-K A G-O A H-I A H-K A H-O A I-K A I-O A K-O A A-E B A-F B A-J B E-F B E-J B F-J B A-B C A-E C A-F C A-G C A-H C A-K C B-E C B-F C B-G C B-H C B-K C E-F C E-G C E-H C E-K C F-G C F-H C F-K C G-H C G-K C H-K C A-C D A-E D A-F D A-G D A-H D A-K D A-L D C-E D C-F D C-G D C-H D C-K D C-L D E-F D E-G D E-H D E-K D E-L D F-G D F-H D F-K D F-L D G-H D G-K D G-L D H-K D H-L D K-L D A-B E A-D E A-F E A-G E A-H E A-L E A-M E B-D E B-F E B-G E B-H E B-L E B-M E D-F E D-G E D-H E D-L E D-M E F-G E F-H E F-L E F-M E G-H E G-L E G-M E H-L E H-M E L-M E A-C F A-D F A-G F A-L F A-M F C-D F C-G F C-L F C-M F D-G F D-L F D-M F G-L F G-M F L-M F C-O I D-E L E-F M A-F O A-H O A-I O A-J O F-H O F-I O F-J O H-I O H-J O I-J O
輸出結果應該爲這種:第一行表示:A和B用戶都有C和E這兩個共同好友
A-B [C, E] A-C [D, F] A-D [E, F] A-E [B, C, D] A-F [B, C, D, E, O] A-G [C, D, E, F]
代碼實現
import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.Mapper; import org.apache.hadoop.mapreduce.Reducer; import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; import java.io.IOException; import java.util.ArrayList; import java.util.Collections; public class CommonFriendsOne2 { public static class CommonFriendsMapper extends Mapper<LongWritable,Text,Text,Text>{ @Override protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException { String[] friendsAndUser = value.toString().split("\t"); context.write(new Text(friendsAndUser[0]),new Text(friendsAndUser[1])); } } public static class CommonFriendsOneReduce extends Reducer<Text,Text,Text,Text>{ @Override protected void reduce(Text friends, Iterable<Text> users, Context context) throws IOException, InterruptedException { ArrayList<String> userList=new ArrayList<String>(); for (Text user:users){ userList.add(user.toString()); } //排序 Collections.sort(userList); context.write(friends,new Text(userList.toString())); } } public static void main(String[] args) throws Exception{ Configuration conf = new Configuration(); Job job = Job.getInstance(conf); //動態獲取jar包在哪裏 job.setJarByClass(CommonFriendsOne2.class); //2.封裝參數:本次job所要調用的mapper實現類 job.setMapperClass(CommonFriendsMapper.class); job.setReducerClass(CommonFriendsOneReduce.class); //3.封裝參數:本次job的Mapper實現類產生的數據key,value的類型 job.setMapOutputKeyClass(Text.class); job.setMapOutputValueClass(Text.class); //4.封裝參數:本次Reduce返回的key,value數據類型 job.setOutputKeyClass(Text.class); job.setOutputValueClass(Text.class); FileInputFormat.setInputPaths(job,new Path("F:\\mrdata\\friends\\output")); FileOutputFormat.setOutputPath(job,new Path("F:\\mrdata\\friends\\output2")); boolean res = job.waitForCompletion(true); System.exit(res ? 0:-1); } }
統計輸出結果爲
A-B [C, E] A-C [D, F] A-D [E, F] A-E [B, C, D] A-F [B, C, D, E, O] A-G [C, D, E, F] A-H [C, D, E, O] A-I [O] A-J [B, O] A-K [C, D] A-L [D, E, F] A-M [E, F] B-C [A] B-D [A, E] B-E [C] B-F [A, C, E] B-G [A, C, E] B-H [A, C, E] B-I [A] B-K [A, C] B-L [E] B-M [E] B-O [A] C-D [A, F] C-E [D] C-F [A, D] C-G [A, D, F] C-H [A, D] C-I [A] C-K [A, D] C-L [D, F] C-M [F] C-O [A, I] D-E [L] D-F [A, E] D-G [A, E, F] D-H [A, E] D-I [A] D-K [A] D-L [E, F] D-M [E, F] D-O [A] E-F [B, C, D, M] E-G [C, D] E-H [C, D] E-J [B] E-K [C, D] E-L [D] F-G [A, C, D, E] F-H [A, C, D, E, O] F-I [A, O] F-J [B, O] F-K [A, C, D] F-L [D, E] F-M [E] F-O [A] G-H [A, C, D, E] G-I [A] G-K [A, C, D] G-L [D, E, F] G-M [E, F] G-O [A] H-I [A, O] H-J [O] H-K [A, C, D] H-L [D, E] H-M [E] H-O [A] I-J [O] I-K [A] I-O [A] K-L [D] K-O [A] L-M [E, F]