基於電影的評分的推薦系統,本章節使用三個類似度算法,求解最佳電影的推薦。java
user1 movie1 1 user1 movie2 2 user1 movie3 3 user2 movie1 1 user2 movie2 3 user2 movie2 3 user2 movie5 3
每一條記錄表明用戶對某個電影的評分,採用5分制度。算法
public Map<Tuple2<String, String>, Tuple3<Double, Double, Double>> startCompute(String inputFile) throws Exception { HashMap<String,Object> returnData = new HashMap<>(); // handler file String sparkFile = CommonUtils.handlerFile(inputFile); // 3.Get input file DataSet. JavaRDD<String> firstRDD = sc.textFile(inputFile); System.out.println("====== debug1: line: K=V=line ======"); firstRDD.foreach(line -> System.out.println("debug 1: " + line)); // 4. 找出誰對電影進行了評論:Tuple(movie,Tuple(user,score)) JavaPairRDD<String, Tuple2<String, Integer>> moviesRDD = firstRDD.mapToPair( line -> { // 解析列 String[] tokens = line.split("\t"); String user = tokens[0]; String movie = tokens[1]; Integer score = new Integer(tokens[2]); // 返回Tuple(movie,Tuple(user,score)) return new Tuple2<String, Tuple2<String, Integer>>(movie, new Tuple2<>(user, score)); } ); System.out.println("====== debug2: moviesRDD: K = <movie>, V = Tuple2<user,score> ======"); moviesRDD.foreach( debug2 -> { System.out.println("debug2: key = " + debug2._1 + "\t value = " + debug2._2); }); // 5.按movie對movieRDD進行分組:Tuple2<movie, Iterate<Tuple<user, score>>> JavaPairRDD<String, Iterable<Tuple2<String, Integer>>> moviesGroupRDD = moviesRDD.groupByKey(); System.out.println("===== debug3: moviesGroupRDD: K = <movie>, V = Iterable<Tuple2<user,score>> ======"); moviesGroupRDD.foreach(debug3 -> { System.out.println("debug2: key = " + debug3._1 + "\t" + debug3._2); }); // 6.找出每一個電影的評分數量(即評論人數):Tuple2<user, Tuple3<movie,score, numberOfScorer>> JavaPairRDD<String, Tuple3<String, Integer, Integer>> usersRDD = moviesGroupRDD.flatMapToPair(mg -> { // 當前處理的電影 String movie = mg._1; // 統計評分人數 int countScores = 0; // 從同一個電影對應的多個組中收集全部用戶-評分信息(user,score) List<Tuple2<String, Integer>> listUserAndScores = new ArrayList<>(); // (user,score) for (Tuple2<String, Integer> userAndScore : mg._2) { countScores++; // put to list. listUserAndScores.add(userAndScore); } // 返回結果 List(<user,<movie,score,numberOfScores>>) List<Tuple2<String, Tuple3<String, Integer, Integer>>> result = new ArrayList<>(); for (Tuple2<String, Integer> listUserAndScore : listUserAndScores) { String user = listUserAndScore._1; Integer score = listUserAndScore._2; // 組合爲T3(movie,score,numberOfScores) Tuple3<String, Integer, Integer> t3 = new Tuple3<>(movie, score, countScores); // 寫入結果 result.add(new Tuple2<>(user, t3)); } // iterator扁平化 return result.iterator(); }); System.out.println("===== debug4: usersRDD: K = <user>, V = Tuple3<movie,score,numberOfScores> ======"); usersRDD.foreach(u -> { System.out.println("debug4: key = " + u._1 + "\t" + "value = " + u._2); }); // 7.進行自鏈接:以user爲鍵 JavaPairRDD<String, Tuple2<Tuple3<String, Integer, Integer>, Tuple3<String, Integer, Integer>>> joinRDD = usersRDD.join(usersRDD); System.out.println("===== debug5: joinRDD: K = <user>, V = Tuple2<Tuple3<movie,score,numberOfScores>,Tuple3<movie,score,numberOfScores>> ======"); joinRDD.foreach(join -> { System.out.println("debug5: key = " + join._1 + "\t" + "value = " + join._2); }); // 8.刪除重複的(movie1,movie2)對,確保對於任意的鍵值對,都有movie1 < movie2 JavaPairRDD<String, Tuple2<Tuple3<String, Integer, Integer>, Tuple3<String, Integer, Integer>>> filterJoinRDD = joinRDD.filter(j -> { // (movie, score, numberOfScore) Tuple3<String, Integer, Integer> v1 = j._2._1; Tuple3<String, Integer, Integer> v2 = j._2._2; // get movie name String movieName1 = v1._1(); String movieName2 = v2._1(); if (movieName1.compareTo(movieName2) < 0) { return true; } else { return false; } }); System.out.println("===== debug6: filterJoinRDD: K = <user>, V = Tuple2<Tuple3<movie,score,numberOfScores>,Tuple3<movie,score,numberOfScores>> ======"); filterJoinRDD.foreach(filterJoin -> { System.out.println("debug6: key = " + filterJoin._1 + "\t" + "value = " + filterJoin._2); }); // 9.生成全部的(movie1,movie2)組合 JavaPairRDD< Tuple2<String, String>, Tuple7<Integer, Integer, Integer, Integer, Integer, Integer, Integer>> moviePairsRDD = filterJoinRDD.mapToPair(fj -> { // key = user去掉 // String user = fj._1; // 得到兩個電影的信息: Tuple(movie,score,numberOfScores) Tuple3<String, Integer, Integer> m1 = fj._2._1; Tuple3<String, Integer, Integer> m2 = fj._2._2; // 組合兩個電影的名字,做爲返回的鍵 Tuple2<String, String> m1m2Key = new Tuple2<>(m1._1(), m2._1()); // 獲取兩個電影的分數計算係數,做爲返回的V Tuple7<Integer, Integer, Integer, Integer, Integer, Integer, Integer> value = new Tuple7<>( m1._2(), //m1 score m1._3(), //m1 hot(number of m1 scores) m2._2(), // m2 score m2._3(), // m2 hot(number of m2 scores) m1._2() * m2._2(), // m1.score * m2.score m1._2() * m1._2(), // square of m1.score m2._2() * m2._2() // square of m2.score ); return new Tuple2<>(m1m2Key, value); }); System.out.println("===== debug7: moviePairsRDD: K = Tuple<m1,m2>, V = <m1_score,number_of_m1_scores" + ",m2_score,number_of_m2_scores,m1m2Product, square(m1_score), square(m2_score)> ======"); moviePairsRDD.foreach(moviePair -> { System.out.println("debug7: key = " + moviePair._1 + "\t" + "value = " + moviePair._2); }); // 10.電影對分組: (movie1,movie2) -> list(<m1_score,number_score,....>) JavaPairRDD<Tuple2<String, String>, Iterable<Tuple7<Integer, Integer, Integer, Integer, Integer, Integer, Integer>>> moviePairGroupRDD = moviePairsRDD.groupByKey(); System.out.println("===== debug8: moviePairs: K = Tuple<m1,m2>, V = <m1_score,number_of_m1_scores" + " ,m2_score,number_of_m2_scores,m1m2Product, square(m1_score), square(m2_score)> ====== ======"); moviePairGroupRDD.foreach(moviePairGroup -> { System.out.println("debug8: key = " + moviePairGroup._1 + "\t" + "value = " + moviePairGroup._2); }); // end.計算類似度數據 JavaPairRDD<Tuple2<String, String>, Tuple3<Double, Double, Double>> correlations = moviePairGroupRDD.mapValues(value -> { return CalculateSimilarity.calculateCorRelation(value); }); System.out.println("====== Movie Similarity result is:(pearson,cosine,jaccard) ======"); correlations.mapToPair(c ->{ String key = "(" + c._1._1 + "," + c._1._2 + ")"; return new Tuple2<>(key, c._2); }).sortByKey().foreach(r -> { System.out.println(r._1 + "\t => \t" + r._2); }); return correlations.collectAsMap(); }