兩個文件,一個是用戶的數據,一個是交易的數據。java
用戶:apache
交易:api
流程以下:ide
分爲如下幾個步驟: (1)分別讀取user文件和transform文件,並轉爲兩個RDD.函數
* (2)對上面兩個RDD執行maptopair操做。生成userpairRdd和transformpairRddspa
* (3)對transformpairRdd和userpairRdd執行union操做,就是把上面的數據放在一塊兒,生成allRddscala
* (4)而後把allRdd用groupBykey分組,把同一個UserID的數據都放在一塊兒。生成groupRdd。debug
* (5)對grouprdd處理,生成productLoctionRdd:(p1,UT),(p2,UT)這種productlistRdd。3d
* (6)productlistRdd這裏面有數據重複,須要去重。調試
代碼結構:
代碼:
package com.test.book; import java.util.ArrayList; import java.util.HashSet; import java.util.Iterator; import java.util.List; import java.util.Set; import org.apache.spark.SparkConf; import org.apache.spark.api.java.JavaPairRDD; import org.apache.spark.api.java.JavaRDD; import org.apache.spark.api.java.JavaSparkContext; import org.apache.spark.api.java.function.Function; import org.apache.spark.api.java.function.PairFlatMapFunction; import org.apache.spark.api.java.function.PairFunction; import scala.Tuple2; public class LeftJoinCmain { /* * 分爲如下幾個步驟: (1)分別讀取user文件和transform文件,並轉爲RDD. * (2)對上面兩個RDD執行maptopair操做。生成userpairRdd和transformpairRdd * (3)對transformpairRdd和userpairRdd執行union操做,就是把上面的數據放在一塊兒,生成allRdd * (4)而後把allRdd用groupBykey分組,把同一個UserID的數據都放在一塊兒。生成groupRdd。 * (5)對grouprdd處理,生成productLoctionRdd:(p1,UT),(p2,UT)這種productlistRdd。 * (6)productlistRdd這裏面有數據重複,須要去重。 * */ public static void main(String[] args) { SparkConf conf = new SparkConf().setAppName("LeftJoinCmain").setMaster("local"); JavaSparkContext sc = new JavaSparkContext(conf); // 導入user的數據 JavaRDD<String> user = sc.textFile("/Users/mac/Desktop/user.txt"); // 導入transform的數據 JavaRDD<String> transform = sc.textFile("/Users/mac/Desktop/transactions.txt"); // 生成一個JavaPairRDD,KEY是uerID,Value是Tuple的形式,("L",地址) JavaPairRDD<String, Tuple2<String, String>> userpairRdd = user .mapToPair(new PairFunction<String, String, Tuple2<String, String>>() { /** * */ private static final long serialVersionUID = 1L; @Override public Tuple2<String, Tuple2<String, String>> call(String line) throws Exception { String[] args = line.split(" "); return new Tuple2<String, Tuple2<String, String>>(args[0], new Tuple2<String, String>("L", args[1])); } }); // 生成一個transform, JavaPairRDD<String, Tuple2<String, String>> transformpairRdd = transform .mapToPair(new PairFunction<String, String, Tuple2<String, String>>() { /** * */ private static final long serialVersionUID = 1L; @Override public Tuple2<String, Tuple2<String, String>> call(String line) throws Exception { String[] args = line.split(" "); return new Tuple2<String, Tuple2<String, String>>(args[2], new Tuple2<String, String>("P", args[1])); } }); /** * allRdd的格式是: { (userID,Tuple("L","UT")), (userID,Tuple("P","p3")) . . . } */ JavaPairRDD<String, Tuple2<String, String>> allRdd = userpairRdd.union(transformpairRdd); /** * 這一步就是把同一個uerID的數據放在一塊兒,結果是: (userID1,List[(Tuple2("L","UT"),//一個用戶地址信息 * Tuple2("P","p1"),//其餘的都是商品信息 Tuple2("P","p2") ] ) */ JavaPairRDD<String, Iterable<Tuple2<String, String>>> groupRdd = allRdd.groupByKey(); /** * 這一步就是從groupRdd中去掉userID,生成productLoctionRdd:(p1,UT),(p2,UT)這種。 * */ JavaPairRDD<String, String> productlistRdd = groupRdd.flatMapToPair( new PairFlatMapFunction<Tuple2<String, Iterable<Tuple2<String, String>>>, String, String>() { @Override public Iterable<Tuple2<String, String>> call(Tuple2<String, Iterable<Tuple2<String, String>>> t) throws Exception { String location = "UNKNOWN"; Iterable<Tuple2<String, String>> pairs = t._2; List<String> products = new ArrayList<String>(); for (Tuple2<String, String> pair : pairs) { if (pair._1.equals("L")) location = pair._2; if (pair._1.equals("P")) { products.add(pair._2); } } List<Tuple2<String, String>> kvList = new ArrayList<Tuple2<String, String>>(); for (String product : products) { kvList.add(new Tuple2<String, String>(product, location)); } return kvList; } }); // 把一個商品的全部地址都查出來 JavaPairRDD<String, Iterable<String>> productbylocation = productlistRdd.groupByKey(); List<Tuple2<String, Iterable<String>>> debug3 = productbylocation.collect(); for (Tuple2<String, Iterable<String>> value : debug3) { Iterator<String> iterator = value._2.iterator(); while (iterator.hasNext()) { System.out.println(value._1 + ":" + iterator.next()); } } /** * 上述代碼通過調試, 結果以下: p2:GA p4:GA p4:UT p4:CA p1:UT p1:UT p1:GA p3:UT * * * 發現有相同的商品和地址。咱們須要把這個重複的結果去除。 */ // 處理以下:咱們用mapvalues()函數 JavaPairRDD<String, Tuple2<Set<String>, Integer>> productByuniqueLocation = productbylocation .mapValues(new Function<Iterable<String>, Tuple2<Set<String>, Integer>>() { @Override public Tuple2<Set<String>, Integer> call(Iterable<String> v1) throws Exception { Set<String> uniquelocations = new HashSet<String>(); Iterator<String> iterator = v1.iterator(); while (iterator.hasNext()) { String value = iterator.next(); uniquelocations.add(value); } // 返回一個商品的全部地址,以及地址的個數。 return new Tuple2<Set<String>, Integer>(uniquelocations, uniquelocations.size()); } }); List<Tuple2<String, Tuple2<Set<String>, Integer>>> finalresult = productByuniqueLocation.collect(); for (Tuple2<String, Tuple2<Set<String>, Integer>> vTuple2 : finalresult) { String aa=vTuple2._1; Iterator<String> iterator=vTuple2._2._1.iterator(); while(iterator.hasNext()) { System.out.println("商品的名字:"+aa+"全部的地址"+iterator.next()); } } } }
運行結果:
去重後的結果: