02Spark的左鏈接

兩個文件,一個是用戶的數據,一個是交易的數據。java

用戶:apache

 

 交易:api

 

 

流程以下:ide

分爲如下幾個步驟: (1)分別讀取user文件和transform文件,並轉爲兩個RDD.函數

* (2)對上面兩個RDD執行maptopair操做。生成userpairRdd和transformpairRddspa

* (3)對transformpairRdd和userpairRdd執行union操做,就是把上面的數據放在一塊兒,生成allRddscala

* (4)而後把allRdd用groupBykey分組,把同一個UserID的數據都放在一塊兒。生成groupRdd。debug

* (5)對grouprdd處理,生成productLoctionRdd:(p1,UT),(p2,UT)這種productlistRdd。3d

* (6)productlistRdd這裏面有數據重複,須要去重。調試

 

 

代碼結構:

 

 

代碼:

package com.test.book;

import java.util.ArrayList;
import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import java.util.Set;

import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaPairRDD;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.api.java.function.Function;
import org.apache.spark.api.java.function.PairFlatMapFunction;
import org.apache.spark.api.java.function.PairFunction;
import scala.Tuple2;

public class LeftJoinCmain {

    /*
     * 分爲如下幾個步驟: (1)分別讀取user文件和transform文件,並轉爲RDD.
     * (2)對上面兩個RDD執行maptopair操做。生成userpairRdd和transformpairRdd
     * (3)對transformpairRdd和userpairRdd執行union操做,就是把上面的數據放在一塊兒,生成allRdd
     * (4)而後把allRdd用groupBykey分組,把同一個UserID的數據都放在一塊兒。生成groupRdd。
     * (5)對grouprdd處理,生成productLoctionRdd:(p1,UT),(p2,UT)這種productlistRdd。
     * (6)productlistRdd這裏面有數據重複,須要去重。
     * 
     */

    public static void main(String[] args) {

        SparkConf conf = new SparkConf().setAppName("LeftJoinCmain").setMaster("local");
        JavaSparkContext sc = new JavaSparkContext(conf);
        // 導入user的數據
        JavaRDD<String> user = sc.textFile("/Users/mac/Desktop/user.txt");
        // 導入transform的數據
        JavaRDD<String> transform = sc.textFile("/Users/mac/Desktop/transactions.txt");

        // 生成一個JavaPairRDD,KEY是uerID,Value是Tuple的形式,("L",地址)
        JavaPairRDD<String, Tuple2<String, String>> userpairRdd = user
                .mapToPair(new PairFunction<String, String, Tuple2<String, String>>() {

                    /**
                     * 
                     */
                    private static final long serialVersionUID = 1L;

                    @Override
                    public Tuple2<String, Tuple2<String, String>> call(String line) throws Exception {
                        String[] args = line.split(" ");
                        return new Tuple2<String, Tuple2<String, String>>(args[0],
                                new Tuple2<String, String>("L", args[1]));
                    }

                });

        // 生成一個transform,
        JavaPairRDD<String, Tuple2<String, String>> transformpairRdd = transform
                .mapToPair(new PairFunction<String, String, Tuple2<String, String>>() {

                    /**
                     * 
                     */
                    private static final long serialVersionUID = 1L;

                    @Override
                    public Tuple2<String, Tuple2<String, String>> call(String line) throws Exception {
                        String[] args = line.split(" ");

                        return new Tuple2<String, Tuple2<String, String>>(args[2],
                                new Tuple2<String, String>("P", args[1]));
                    }
                });

        /**
         * allRdd的格式是: { (userID,Tuple("L","UT")), (userID,Tuple("P","p3")) . . . }
         */
        JavaPairRDD<String, Tuple2<String, String>> allRdd = userpairRdd.union(transformpairRdd);

        /**
         * 這一步就是把同一個uerID的數據放在一塊兒,結果是: (userID1,List[(Tuple2("L","UT"),//一個用戶地址信息
         * Tuple2("P","p1"),//其餘的都是商品信息 Tuple2("P","p2") ] )
         */
        JavaPairRDD<String, Iterable<Tuple2<String, String>>> groupRdd = allRdd.groupByKey();

        /**
         * 這一步就是從groupRdd中去掉userID,生成productLoctionRdd:(p1,UT),(p2,UT)這種。
         * 
         */

        JavaPairRDD<String, String> productlistRdd = groupRdd.flatMapToPair(
                new PairFlatMapFunction<Tuple2<String, Iterable<Tuple2<String, String>>>, String, String>() {

                    @Override
                    public Iterable<Tuple2<String, String>> call(Tuple2<String, Iterable<Tuple2<String, String>>> t)
                            throws Exception {

                        String location = "UNKNOWN";
                        Iterable<Tuple2<String, String>> pairs = t._2;
                        List<String> products = new ArrayList<String>();
                        for (Tuple2<String, String> pair : pairs) {

                            if (pair._1.equals("L"))
                                location = pair._2;
                            if (pair._1.equals("P")) {
                                products.add(pair._2);
                            }

                        }

                        List<Tuple2<String, String>> kvList = new ArrayList<Tuple2<String, String>>();

                        for (String product : products) {
                            kvList.add(new Tuple2<String, String>(product, location));

                        }
                        return kvList;
                    }
                });

        // 把一個商品的全部地址都查出來

        JavaPairRDD<String, Iterable<String>> productbylocation = productlistRdd.groupByKey();
        List<Tuple2<String, Iterable<String>>> debug3 = productbylocation.collect();

        for (Tuple2<String, Iterable<String>> value : debug3) {

            Iterator<String> iterator = value._2.iterator();

            while (iterator.hasNext()) {
                System.out.println(value._1 + ":" + iterator.next());
            }

        }

        /**
         * 上述代碼通過調試, 結果以下: p2:GA p4:GA p4:UT p4:CA p1:UT p1:UT p1:GA p3:UT
         * 
         * 
         * 發現有相同的商品和地址。咱們須要把這個重複的結果去除。
         */
        // 處理以下:咱們用mapvalues()函數

        JavaPairRDD<String, Tuple2<Set<String>, Integer>> productByuniqueLocation = productbylocation
                .mapValues(new Function<Iterable<String>, Tuple2<Set<String>, Integer>>() {

                    @Override
                    public Tuple2<Set<String>, Integer> call(Iterable<String> v1) throws Exception {
                        Set<String> uniquelocations = new HashSet<String>();

                        Iterator<String> iterator = v1.iterator();

                        while (iterator.hasNext()) {

                            String value = iterator.next();
                            uniquelocations.add(value);

                        }

                        // 返回一個商品的全部地址,以及地址的個數。
                        return new Tuple2<Set<String>, Integer>(uniquelocations, uniquelocations.size());
                    }
                });

        List<Tuple2<String, Tuple2<Set<String>, Integer>>> finalresult = productByuniqueLocation.collect();
        for (Tuple2<String, Tuple2<Set<String>, Integer>> vTuple2 : finalresult) {

            String aa=vTuple2._1;
            Iterator<String> iterator=vTuple2._2._1.iterator();
            while(iterator.hasNext())
            {
                System.out.println("商品的名字:"+aa+"全部的地址"+iterator.next());
                
                
            }
            
            
        }

    }

}

 

 

運行結果:

 

 

 

去重後的結果:

相關文章
相關標籤/搜索