給定一個包含上千萬用戶的社交網絡,咱們會實現一個MapReduce、Spark程序,在全部用戶對中找出「共同好友」。java
令 $$ {U_1,U_2,...,U_n} $$ 爲包含一個全部用戶列表的集合。咱們的目標是爲每一個$(U_i,U_j)$對$i\ne j$找出共同好友。redis
咱們本章提出3個解決方案:算法
MapReduce/Hadoop解決方案,使用基本數據類型apache
Spark解決方案,使用彈性數據集RDD。api
現在大多數社交網絡網站都提供了有關的服務,能夠幫助咱們與好友共享信息、圖片和視頻。數組
有些網站樹森之還提供了視頻聊天服務,幫助你與好友保持聯繫。根據定義,「好友」是指你認識、喜歡和信任的一我的。緩存
好比咱們QQ好友列表,這個列表上好友關係都是雙向的。若是我是你的好友,那麼你也是個人好友,注意這個特色,咱們的程序中運用了這個特色,將這種關係進行合併求解交集,便可獲得A,B的共同好友。網絡
有不少辦法能夠找到共同好友:app
令${A_1,A_2,...A_n}$是$user_1$的好友集合,${B_1,B_2,...,B_n}$是$user_2$的好友集合,那麼,$user_1,user_2$的共同好友集合就能夠定義爲 $$ A \cap B $$ 即兩個集合的交集。less
POJO的簡單實現以下所示:
package com.sunrun.movieshow.algorithm.friend; import java.util.ArrayList; import java.util.HashSet; import java.util.List; import java.util.Set; public class POJOFriend { public static Set<String> intersection(Set<String> A, Set<String> B){ if(A == null || B == null){ return null; } if(A.isEmpty() || B. isEmpty()){ return null; } Set<String> result = new HashSet<>(); result.addAll(A); result.retainAll(B); return result; } public static void main(String[] args) { Set<String> A = new HashSet<>(); Set<String> B = new HashSet<>(); A.add("A"); A.add("B"); A.add("C"); B.add("B"); B.add("C"); B.add("D"); System.out.println(intersection(A,B)); /** * [B, C] */ } }
映射器接受一個$(k_1,v_1)$,其中$k_1$是一個用戶,$v_1$是這個用戶的好友列表。
映射器發出一組新的$(k_2,v_2)$,$k_2$是一個$Tuple2(k1,f_i)$,其中$f_i \in v_1$,即會迭代全部的好友列表和$k_1$進行兩兩組合。
歸約器的key是一個用戶對,value則是一個好友集合列表。reduce函數獲得全部好友集合的交集,從而找出$(u_i,u_j)$對的共同好友。
至於在Mapper過程當中的數據傳輸,會關聯到數組類型,咱們有兩個方案:
一、依然使用文本形式,在Driver節點進行解析;
二、若是你的信息須要解析爲非String,例如Long等,可使用ArrayListOfLongsWritable類。
他是一個實現了Hadoop串行化協議的類,咱們沒必要本身去實現這些接口,能夠直接從第三方組件裏面抽取使用
<dependency> <groupId>edu.umd</groupId> <artifactId>cloud9</artifactId> <version>1.3.2</version> </dependency>
該組件包含了豐富的實現了hadoop序列化的協議提供使用,好比PairOfStrings
以及ArrayListOfLongWritable
等。
package com.sunrun.movieshow.algorithm.friend.mapreduce; import edu.umd.cloud9.io.pair.PairOfStrings; import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.io.Writable; import org.apache.hadoop.mapreduce.Mapper; import java.io.IOException; import java.util.ArrayList; import java.util.List; public class FriendMapper extends Mapper<LongWritable, Text, Text, Text> { private static final Text KEY = new Text(); private static final Text VALUE = new Text(); // 獲取朋友列表 static String getFriends(String[] tokens) { // 不可能再有共同好友 if (tokens.length == 2) { return ""; } StringBuilder builder = new StringBuilder(); for (int i = 1; i < tokens.length; i++) { builder.append(tokens[i]); if (i < (tokens.length - 1)) { builder.append(","); } } return builder.toString(); } // 使key有序,這裏的有序只的是key的兩個用戶id有序,和總體數據無關 static String buildSortedKey(String user, String friend) { long p = Long.parseLong(user); long f = Long.parseLong(friend); if (p < f) { return user + "," + friend; } else { return friend + "," + user; } } @Override protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException { String[] tokes = value.toString().split(" "); // user String user = tokes[0]; // value VALUE.set(getFriends(tokes)); // rescue keys for (int i = 1; i < tokes.length ; i++) { String otherU = tokes[i]; KEY.set(buildSortedKey(user,otherU)); context.write(KEY,VALUE); } } }
package com.sunrun.movieshow.algorithm.friend.mapreduce; import org.apache.commons.lang.StringUtils; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Reducer; import java.io.IOException; import java.util.*; public class FriendReducer extends Reducer<Text, Text,Text, Text> { static void addFriends(Map<String, Integer> map, String friendsList) { String[] friends = StringUtils.split(friendsList, ","); for (String friend : friends) { Integer count = map.get(friend); if (count == null) { map.put(friend, 1); } else { map.put(friend, ++count); } } } @Override protected void reduce(Text key, Iterable<Text> values, Context context) throws IOException, InterruptedException { Map<String, Integer> map = new HashMap<String, Integer>(); Iterator<Text> iterator = values.iterator(); int numOfValues = 0; while (iterator.hasNext()) { String friends = iterator.next().toString(); if (friends.equals("")) { context.write(key, new Text("[]")); return; } addFriends(map, friends); numOfValues++; } // now iterate the map to see how many have numOfValues List<String> commonFriends = new ArrayList<String>(); for (Map.Entry<String, Integer> entry : map.entrySet()) { //System.out.println(entry.getKey() + "/" + entry.getValue()); if (entry.getValue() == numOfValues) { commonFriends.add(entry.getKey()); } } // sen it to output context.write(key, new Text(commonFriends.toString())); } }
package com.sunrun.movieshow.algorithm.friend.spark; import avro.shaded.com.google.common.collect.ImmutableCollection; import com.google.common.collect.Sets; import com.sunrun.movieshow.algorithm.common.SparkHelper; import org.apache.spark.api.java.JavaPairRDD; import org.apache.spark.api.java.JavaRDD; import org.apache.spark.api.java.JavaSparkContext; import scala.Tuple2; import shapeless.Tuple; import java.util.*; /** * 共同好友的Spark解決方案 * 輸入: * [root@h24 ~]# hadoop fs -cat /friend/input/* * A B C D E * B A C D * C A B D E * D A B C * E A C * F A * 即第一列表明當前用戶,後面的數據表明其好友列表。 * * 輸出 * (A,B) => C,D * 兩個好友之間的公共好友。 */ public class FriendSpark { // 構建有序key,避免重複 static Tuple2<String,String> buildSortedKey(String u1, String u2){ if(u1.compareTo(u2) < 0){ return new Tuple2<>(u1,u2); }else{ return new Tuple2<>(u2,u1); } } public static void main(String[] args) { // 1.讀取配置文件 String hdfsUrl = "hdfs://10.21.1.24:9000/friend/"; JavaSparkContext sc = SparkHelper.getSparkContext("CommonFriend"); JavaRDD<String> rdd = sc.textFile(hdfsUrl + "input"); // 2.解析內容 /** * A B C * ((A,B),(B,C)) * ((A,C),(B,C)) */ JavaPairRDD<Tuple2<String, String>, List<String>> pairs = rdd.flatMapToPair(line -> { String[] tokes = line.split(" "); // 當前處理的用戶 String user = tokes[0]; // 該用戶的好友列表 List<String> friends = new ArrayList<>(); for (int i = 1; i < tokes.length; i++) { friends.add(tokes[i]); } List<Tuple2<Tuple2<String, String>, List<String>>> result = new ArrayList<>(); // 算法處理,注意順序,依次抽取每個好友和當前用戶配對做爲key,好友列表做爲value輸出, // 但若是該用戶只有一個好友的話,那麼他們的共同好友應該設置爲空集 if (friends.size() == 1) { result.add(new Tuple2<>(buildSortedKey(user, friends.get(0)), new ArrayList<>())); } else { for (String friend : friends) { Tuple2<String, String> K = buildSortedKey(user, friend); result.add(new Tuple2<>(K, friends)); } } return result.iterator(); }); /** * pairs.saveAsTextFile(hdfsUrl + "output1"); * ((A,B),[B, C, D, E]) * ((A,C),[B, C, D, E]) * ((A,D),[B, C, D, E]) * ((A,E),[B, C, D, E]) * ((A,B),[A, C, D]) * ((B,C),[A, C, D]) * ((B,D),[A, C, D]) * ((A,C),[A, B, D, E]) * ((B,C),[A, B, D, E]) * ((C,D),[A, B, D, E]) * ((C,E),[A, B, D, E]) * ((A,D),[A, B, C]) * ((B,D),[A, B, C]) * ((C,D),[A, B, C]) * ((A,E),[A, C]) * ((C,E),[A, C]) * ((A,F),[]) */ // 3.直接計算共同好友,步驟是group以及reduce的合併過程。 JavaPairRDD<Tuple2<String, String>, List<String>> commonFriends = pairs.reduceByKey((a, b) -> { List<String> intersection = new ArrayList<>(); for (String item : b) { if (a.contains(item)) { intersection.add(item); } } return intersection; }); commonFriends.saveAsTextFile(hdfsUrl + "commonFriend"); /** * [root@h24 ~]# hadoop fs -cat /friend/commonFriend/p* * ((A,E),[C]) * ((C,D),[A, B]) * ((A,D),[B, C]) * ((C,E),[A]) * ((A,F),[]) * ((A,B),[C, D]) * ((B,C),[A, D]) * ((A,C),[B, D, E]) * ((B,D),[A, C]) */ } }