【大數據分析經常使用算法】2.電影推薦

簡介

基於電影的評分的推薦系統,本章節使用三個類似度算法,求解最佳電影的推薦。java

一、測試數據集

user1	movie1	1
user1	movie2	2
user1	movie3	3
user2	movie1	1
user2	movie2	3
user2	movie2	3
user2	movie5	3

每一條記錄表明用戶對某個電影的評分,採用5分制度。算法

二、Spark實現

public Map<Tuple2<String, String>, Tuple3<Double, Double, Double>> startCompute(String inputFile) throws Exception {
        HashMap<String,Object> returnData = new HashMap<>();
        // handler file
        String sparkFile = CommonUtils.handlerFile(inputFile);
        // 3.Get input file DataSet.
        JavaRDD<String> firstRDD = sc.textFile(inputFile);
        System.out.println("====== debug1: line: K=V=line ======");
        firstRDD.foreach(line -> System.out.println("debug 1: " + line));

        // 4. 找出誰對電影進行了評論:Tuple(movie,Tuple(user,score))
        JavaPairRDD<String, Tuple2<String, Integer>> moviesRDD = firstRDD.mapToPair(
                line -> {
                    // 解析列
                    String[] tokens = line.split("\t");
                    String user = tokens[0];
                    String movie = tokens[1];
                    Integer score = new Integer(tokens[2]);
                    // 返回Tuple(movie,Tuple(user,score))
                    return new Tuple2<String, Tuple2<String, Integer>>(movie, new Tuple2<>(user, score));
                }
        );
        System.out.println("====== debug2: moviesRDD: K = <movie>, V = Tuple2<user,score> ======");
        moviesRDD.foreach( debug2 -> {
            System.out.println("debug2: key = " + debug2._1 + "\t value = " + debug2._2);
        });

        // 5.按movie對movieRDD進行分組:Tuple2<movie, Iterate<Tuple<user, score>>>
        JavaPairRDD<String, Iterable<Tuple2<String, Integer>>> moviesGroupRDD = moviesRDD.groupByKey();
        System.out.println("===== debug3: moviesGroupRDD: K = <movie>, V = Iterable<Tuple2<user,score>> ======");
        moviesGroupRDD.foreach(debug3 -> {
            System.out.println("debug2: key = " + debug3._1 + "\t" + debug3._2);
        });

        // 6.找出每一個電影的評分數量(即評論人數):Tuple2<user, Tuple3<movie,score, numberOfScorer>>
        JavaPairRDD<String, Tuple3<String, Integer, Integer>> usersRDD = moviesGroupRDD.flatMapToPair(mg -> {
            // 當前處理的電影
            String movie = mg._1;
            // 統計評分人數
            int countScores = 0;
            // 從同一個電影對應的多個組中收集全部用戶-評分信息(user,score)
            List<Tuple2<String, Integer>> listUserAndScores = new ArrayList<>();
            // (user,score)
            for (Tuple2<String, Integer> userAndScore : mg._2) {
                countScores++;
                // put to list.
                listUserAndScores.add(userAndScore);
            }
            // 返回結果 List(<user,<movie,score,numberOfScores>>)
            List<Tuple2<String, Tuple3<String, Integer, Integer>>> result = new ArrayList<>();
            for (Tuple2<String, Integer> listUserAndScore : listUserAndScores) {
                String user = listUserAndScore._1;
                Integer score = listUserAndScore._2;
                // 組合爲T3(movie,score,numberOfScores)
                Tuple3<String, Integer, Integer> t3 = new Tuple3<>(movie, score, countScores);
                // 寫入結果
                result.add(new Tuple2<>(user, t3));
            }
            // iterator扁平化
            return result.iterator();
        });
        System.out.println("===== debug4: usersRDD: K = <user>, V = Tuple3<movie,score,numberOfScores> ======");
        usersRDD.foreach(u -> {
            System.out.println("debug4: key = " + u._1 + "\t" + "value = " + u._2);
        });

        // 7.進行自鏈接:以user爲鍵
        JavaPairRDD<String, Tuple2<Tuple3<String, Integer, Integer>, Tuple3<String, Integer, Integer>>> joinRDD = usersRDD.join(usersRDD);
        System.out.println("===== debug5: joinRDD: K = <user>, V = Tuple2<Tuple3<movie,score,numberOfScores>,Tuple3<movie,score,numberOfScores>> ======");
        joinRDD.foreach(join -> {
            System.out.println("debug5: key = " + join._1 + "\t" + "value = " + join._2);
        });

        // 8.刪除重複的(movie1,movie2)對,確保對於任意的鍵值對,都有movie1 < movie2
        JavaPairRDD<String, Tuple2<Tuple3<String, Integer, Integer>, Tuple3<String, Integer, Integer>>> filterJoinRDD = joinRDD.filter(j -> {
            // (movie, score, numberOfScore)
            Tuple3<String, Integer, Integer> v1 = j._2._1;
            Tuple3<String, Integer, Integer> v2 = j._2._2;
            // get movie name
            String movieName1 = v1._1();
            String movieName2 = v2._1();
            if (movieName1.compareTo(movieName2) < 0) {
                return true;
            } else {
                return false;
            }
        });
        System.out.println("===== debug6: filterJoinRDD: K = <user>, V = Tuple2<Tuple3<movie,score,numberOfScores>,Tuple3<movie,score,numberOfScores>> ======");
        filterJoinRDD.foreach(filterJoin -> {
            System.out.println("debug6: key = " + filterJoin._1 + "\t" + "value = " + filterJoin._2);
        });

        // 9.生成全部的(movie1,movie2)組合
        JavaPairRDD<
                Tuple2<String, String>,
                Tuple7<Integer, Integer, Integer, Integer, Integer, Integer, Integer>> moviePairsRDD = filterJoinRDD.mapToPair(fj -> {
            // key = user去掉
            // String user = fj._1;

            // 得到兩個電影的信息: Tuple(movie,score,numberOfScores)
            Tuple3<String, Integer, Integer> m1 = fj._2._1;
            Tuple3<String, Integer, Integer> m2 = fj._2._2;

            // 組合兩個電影的名字,做爲返回的鍵
            Tuple2<String, String> m1m2Key = new Tuple2<>(m1._1(), m2._1());

            // 獲取兩個電影的分數計算係數,做爲返回的V
            Tuple7<Integer, Integer, Integer, Integer, Integer, Integer, Integer> value = new Tuple7<>(
                    m1._2(), //m1 score
                    m1._3(), //m1 hot(number of m1 scores)
                    m2._2(), // m2 score
                    m2._3(), // m2 hot(number of m2 scores)
                    m1._2() * m2._2(), // m1.score * m2.score
                    m1._2() * m1._2(), // square of m1.score
                    m2._2() * m2._2() // square of m2.score
            );
            return new Tuple2<>(m1m2Key, value);
        });
        System.out.println("===== debug7: moviePairsRDD: K = Tuple<m1,m2>, V = <m1_score,number_of_m1_scores" +
                ",m2_score,number_of_m2_scores,m1m2Product, square(m1_score), square(m2_score)> ======");
        moviePairsRDD.foreach(moviePair -> {
            System.out.println("debug7: key = " + moviePair._1 + "\t" + "value = " + moviePair._2);
        });

        // 10.電影對分組: (movie1,movie2) -> list(<m1_score,number_score,....>)
        JavaPairRDD<Tuple2<String, String>,
                Iterable<Tuple7<Integer, Integer, Integer, Integer, Integer, Integer, Integer>>> moviePairGroupRDD = moviePairsRDD.groupByKey();
        System.out.println("===== debug8: moviePairs: K = Tuple<m1,m2>, V = <m1_score,number_of_m1_scores" +
                "                ,m2_score,number_of_m2_scores,m1m2Product, square(m1_score), square(m2_score)> ====== ======");
        moviePairGroupRDD.foreach(moviePairGroup -> {
            System.out.println("debug8: key = " + moviePairGroup._1 + "\t" + "value = " + moviePairGroup._2);
        });

        // end.計算類似度數據
        JavaPairRDD<Tuple2<String, String>, Tuple3<Double, Double, Double>> correlations = moviePairGroupRDD.mapValues(value -> {
            return CalculateSimilarity.calculateCorRelation(value);
        });
        System.out.println("====== Movie Similarity result is:(pearson,cosine,jaccard) ======");
        correlations.mapToPair(c ->{
            String key = "(" + c._1._1 + "," +  c._1._2 + ")";
            return new Tuple2<>(key, c._2);
        }).sortByKey().foreach(r -> {
            System.out.println(r._1  + "\t => \t" + r._2);
        });
        return correlations.collectAsMap();
    }
相關文章
相關標籤/搜索