【大數據分析經常使用算法】6.共同好友

簡介

給定一個包含上千萬用戶的社交網絡,咱們會實現一個MapReduce、Spark程序,在全部用戶對中找出「共同好友」。java

令 $$ {U_1,U_2,...,U_n} $$ 爲包含一個全部用戶列表的集合。咱們的目標是爲每一個$(U_i,U_j)$對$i\ne j$找出共同好友。redis

咱們本章提出3個解決方案:算法

  • MapReduce/Hadoop解決方案,使用基本數據類型apache

  • Spark解決方案,使用彈性數據集RDD。api

一、共同好友的概念

現在大多數社交網絡網站都提供了有關的服務,能夠幫助咱們與好友共享信息、圖片和視頻。數組

有些網站樹森之還提供了視頻聊天服務,幫助你與好友保持聯繫。根據定義,「好友」是指你認識、喜歡和信任的一我的。緩存

好比咱們QQ好友列表,這個列表上好友關係都是雙向的。若是我是你的好友,那麼你也是個人好友,注意這個特色,咱們的程序中運用了這個特色,將這種關係進行合併求解交集,便可獲得A,B的共同好友。網絡

有不少辦法能夠找到共同好友:app

  • 使用緩存策略,將共同好友保存在一個緩存中(redis、memcached)
  • 使用Mapreduce離線計算,每隔一段時間(例如一天)計算一次每一個人的共同好友並存儲這些結果;

一、POJO共同好友解決方案

令${A_1,A_2,...A_n}$是$user_1$的好友集合,${B_1,B_2,...,B_n}$是$user_2$的好友集合,那麼,$user_1,user_2$的共同好友集合就能夠定義爲 $$ A \cap B $$ 即兩個集合的交集。less

POJO的簡單實現以下所示:

package com.sunrun.movieshow.algorithm.friend;

import java.util.ArrayList;
import java.util.HashSet;
import java.util.List;
import java.util.Set;

public class POJOFriend {
    public static Set<String> intersection(Set<String> A, Set<String> B){
        if(A == null || B == null){
            return null;
        }

        if(A.isEmpty() || B. isEmpty()){
            return null;
        }

        Set<String> result = new HashSet<>();
        result.addAll(A);
        result.retainAll(B);
        return result;
    }

    public static void main(String[] args) {
        Set<String> A = new HashSet<>();
        Set<String> B = new HashSet<>();

        A.add("A");
        A.add("B");
        A.add("C");

        B.add("B");
        B.add("C");
        B.add("D");

        System.out.println(intersection(A,B));
        /**
         * [B, C]
         */

    }
}

二、MapReduce解決方案

映射器接受一個$(k_1,v_1)$,其中$k_1$是一個用戶,$v_1$是這個用戶的好友列表。

映射器發出一組新的$(k_2,v_2)$,$k_2$是一個$Tuple2(k1,f_i)$,其中$f_i \in v_1$,即會迭代全部的好友列表和$k_1$進行兩兩組合。

歸約器的key是一個用戶對,value則是一個好友集合列表。reduce函數獲得全部好友集合的交集,從而找出$(u_i,u_j)$對的共同好友。

至於在Mapper過程當中的數據傳輸,會關聯到數組類型,咱們有兩個方案:

一、依然使用文本形式,在Driver節點進行解析;

二、若是你的信息須要解析爲非String,例如Long等,可使用ArrayListOfLongsWritable類。

他是一個實現了Hadoop串行化協議的類,咱們沒必要本身去實現這些接口,能夠直接從第三方組件裏面抽取使用

<dependency>
    <groupId>edu.umd</groupId>
    <artifactId>cloud9</artifactId>
    <version>1.3.2</version>
</dependency>

該組件包含了豐富的實現了hadoop序列化的協議提供使用,好比PairOfStrings以及ArrayListOfLongWritable等。

2.一、Mapper

package com.sunrun.movieshow.algorithm.friend.mapreduce;

import edu.umd.cloud9.io.pair.PairOfStrings;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.io.Writable;
import org.apache.hadoop.mapreduce.Mapper;

import java.io.IOException;
import java.util.ArrayList;
import java.util.List;

public class FriendMapper extends Mapper<LongWritable, Text, Text, Text> {

    private static final Text KEY = new Text();
    private static final Text VALUE = new Text();

    // 獲取朋友列表
    static String getFriends(String[] tokens) {
        // 不可能再有共同好友
        if (tokens.length == 2) {
            return "";
        }
        StringBuilder builder = new StringBuilder();
        for (int i = 1; i < tokens.length; i++) {
            builder.append(tokens[i]);
            if (i < (tokens.length - 1)) {
                builder.append(",");
            }
        }
        return builder.toString();
    }


    // 使key有序,這裏的有序只的是key的兩個用戶id有序,和總體數據無關
    static String buildSortedKey(String user, String friend) {
        long p = Long.parseLong(user);
        long f = Long.parseLong(friend);
        if (p < f) {
            return user + "," + friend;
        } else {
            return friend + "," + user;
        }
    }

    @Override
    protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
        String[] tokes = value.toString().split(" ");

        // user
        String user = tokes[0];

        // value
        VALUE.set(getFriends(tokes));

        // rescue keys
        for (int i = 1; i < tokes.length ; i++) {
            String otherU = tokes[i];
            KEY.set(buildSortedKey(user,otherU));
            context.write(KEY,VALUE);
        }
    }
}

2.二、Reducer

package com.sunrun.movieshow.algorithm.friend.mapreduce;

import org.apache.commons.lang.StringUtils;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer;

import java.io.IOException;
import java.util.*;

public class FriendReducer extends Reducer<Text, Text,Text, Text> {

    static void addFriends(Map<String, Integer> map, String friendsList) {
        String[] friends = StringUtils.split(friendsList, ",");
        for (String friend : friends) {
            Integer count = map.get(friend);
            if (count == null) {
                map.put(friend, 1);
            } else {
                map.put(friend, ++count);
            }
        }
    }

    @Override
    protected void reduce(Text key, Iterable<Text> values, Context context) throws IOException, InterruptedException {
        Map<String, Integer> map = new HashMap<String, Integer>();
        Iterator<Text> iterator = values.iterator();
        int numOfValues = 0;
        while (iterator.hasNext()) {
            String friends = iterator.next().toString();
            if (friends.equals("")) {
                context.write(key, new Text("[]"));
                return;
            }
            addFriends(map, friends);
            numOfValues++;
        }

        // now iterate the map to see how many have numOfValues
        List<String> commonFriends = new ArrayList<String>();
        for (Map.Entry<String, Integer> entry : map.entrySet()) {
            //System.out.println(entry.getKey() + "/" + entry.getValue());
            if (entry.getValue() == numOfValues) {
                commonFriends.add(entry.getKey());
            }
        }

        // sen it to output
        context.write(key, new Text(commonFriends.toString()));
    }
}

三、Spark解決方案

package com.sunrun.movieshow.algorithm.friend.spark;

import avro.shaded.com.google.common.collect.ImmutableCollection;
import com.google.common.collect.Sets;
import com.sunrun.movieshow.algorithm.common.SparkHelper;
import org.apache.spark.api.java.JavaPairRDD;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
import scala.Tuple2;
import shapeless.Tuple;

import java.util.*;

/**
 * 共同好友的Spark解決方案
 * 輸入:
 * [root@h24 ~]# hadoop fs -cat /friend/input/*
 * A B C D E
 * B A C D
 * C A B D E
 * D A B C
 * E A C
 * F A
 * 即第一列表明當前用戶,後面的數據表明其好友列表。
 *
 * 輸出
 * (A,B) => C,D
 *  兩個好友之間的公共好友。
 */
public class FriendSpark {
    // 構建有序key,避免重複
    static Tuple2<String,String> buildSortedKey(String u1, String u2){
        if(u1.compareTo(u2) < 0){
            return new Tuple2<>(u1,u2);
        }else{
            return new Tuple2<>(u2,u1);
        }
    }

    public static void main(String[] args) {
        // 1.讀取配置文件
        String hdfsUrl = "hdfs://10.21.1.24:9000/friend/";
        JavaSparkContext sc = SparkHelper.getSparkContext("CommonFriend");
        JavaRDD<String> rdd = sc.textFile(hdfsUrl + "input");

        // 2.解析內容
        /**
         * A B C
         * ((A,B),(B,C))
         * ((A,C),(B,C))
         */
        JavaPairRDD<Tuple2<String, String>, List<String>> pairs = rdd.flatMapToPair(line -> {
            String[] tokes = line.split(" ");

            // 當前處理的用戶
            String user = tokes[0];

            // 該用戶的好友列表
            List<String> friends = new ArrayList<>();
            for (int i = 1; i < tokes.length; i++) {
                friends.add(tokes[i]);
            }

            List<Tuple2<Tuple2<String, String>, List<String>>> result = new ArrayList<>();
            // 算法處理,注意順序,依次抽取每個好友和當前用戶配對做爲key,好友列表做爲value輸出,
            // 但若是該用戶只有一個好友的話,那麼他們的共同好友應該設置爲空集
            if (friends.size() == 1) {
                result.add(new Tuple2<>(buildSortedKey(user, friends.get(0)), new ArrayList<>()));
            } else {
                for (String friend : friends) {
                    Tuple2<String, String> K = buildSortedKey(user, friend);
                    result.add(new Tuple2<>(K, friends));
                }
            }

            return result.iterator();
        });

        /**
         *  pairs.saveAsTextFile(hdfsUrl + "output1");
         * ((A,B),[B, C, D, E])
         * ((A,C),[B, C, D, E])
         * ((A,D),[B, C, D, E])
         * ((A,E),[B, C, D, E])
         * ((A,B),[A, C, D])
         * ((B,C),[A, C, D])
         * ((B,D),[A, C, D])
         * ((A,C),[A, B, D, E])
         * ((B,C),[A, B, D, E])
         * ((C,D),[A, B, D, E])
         * ((C,E),[A, B, D, E])
         * ((A,D),[A, B, C])
         * ((B,D),[A, B, C])
         * ((C,D),[A, B, C])
         * ((A,E),[A, C])
         * ((C,E),[A, C])
         * ((A,F),[])
         */

        // 3.直接計算共同好友,步驟是group以及reduce的合併過程。
        JavaPairRDD<Tuple2<String, String>, List<String>> commonFriends = pairs.reduceByKey((a, b) -> {
            List<String> intersection = new ArrayList<>();
            for (String item : b) {
                if (a.contains(item)) {
                    intersection.add(item);
                }
            }
            return intersection;
        });

        commonFriends.saveAsTextFile(hdfsUrl + "commonFriend");
        /**
         * [root@h24 ~]# hadoop fs -cat /friend/commonFriend/p*
         * ((A,E),[C])
         * ((C,D),[A, B])
         * ((A,D),[B, C])
         * ((C,E),[A])
         * ((A,F),[])
         * ((A,B),[C, D])
         * ((B,C),[A, D])
         * ((A,C),[B, D, E])
         * ((B,D),[A, C])
         */
    }
}
相關文章
相關標籤/搜索