spark三種鏈接join

本文主要介紹spark join相關操做。java

講述spark鏈接相關的三個方法join,left-outer-join,right-outer-join,在這以前,咱們用hiveSQL先跑出告終果以方便進行對比。sql

咱們以實例來進行說明。個人實現步驟記錄以下。app

 

一、數據準備ide

二、HSQL描述函數

三、Spark描述post

 

一、數據準備spa

咱們準備兩張Hive表,分別是orders(訂單表)和drivers(司機表),經過driver_id字段進行關聯。數據以下:code

ordersblog

orders表有兩個字段,訂單id:order_id和司機id:driver_id。司機id將做爲鏈接鍵。ci

經過select能夠看到三條數據。

hive (gulfstream_test)> select * from orders;
OK
orders.order_id orders.driver_id
1000    5000
1001    5001
1002    5002
Time taken: 0.387 seconds, Fetched: 3 row(s)

 

drivers

drivers表由兩個字段,司機id:driver_id和車輛id:car_id。司機id將做爲鏈接鍵。

經過select能夠看到兩條數據。

hive (gulfstream_test)> select * from drivers;
OK
drivers.driver_id       drivers.car_id
5000    100
5003    103
Time taken: 0.036 seconds, Fetched: 2 row(s)

 

 

二、HSQL描述

JOIN

天然鏈接,輸出鏈接鍵匹配的記錄。

能夠看到,經過driver_id匹配的數據只有一條。

hive (gulfstream_test)> select * from orders t1 join drivers t2 on (t1.driver_id = t2.driver_id) ;
OK
t1.order_id     t1.driver_id    t2.driver_id    t2.car_id
1000    5000    5000    100
Time taken: 36.079 seconds, Fetched: 1 row(s)

 

LEFT OUTER JOIN

左外連接,輸出鏈接鍵匹配的記錄,左側的表不管匹配與否都輸出。

能夠看到,經過driver_id匹配的數據只有一條,不過全部orders表中的記錄都被輸出了,drivers中未能匹配的字段被置爲空。

hive (gulfstream_test)> select * from orders t1 left outer join drivers t2 on (t1.driver_id = t2.driver_id) ;
OK
t1.order_id     t1.driver_id    t2.driver_id    t2.car_id
1000    5000    5000    100
1001    5001    NULL    NULL
1002    5002    NULL    NULL
Time taken: 36.063 seconds, Fetched: 3 row(s)

 

RIGHT OUTER JOIN

右外鏈接,輸出鏈接鍵匹配的記錄,右側的表不管匹配與否都輸出。

能夠看到,經過driver_id匹配的數據只有一條,不過全部drivers表中的記錄都被輸出了,orders中未能匹配的字段被置爲空。

hive (gulfstream_test)> select * from orders t1 right outer join drivers t2 on (t1.driver_id = t2.driver_id) ;
OK
t1.order_id     t1.driver_id    t2.driver_id    t2.car_id
1000    5000    5000    100
NULL    NULL    5003    103
Time taken: 30.089 seconds, Fetched: 2 row(s)

 

 

三、Spark描述

spark實現join的方式也是經過RDD的算子,spark一樣提供了三個算子join,leftOuterJoin,rightOuterJoin。

在下面給出的例子中,咱們經過spark-hive讀取了Hive中orders表和drivers表中的數據,這時候數據的表現形式是DataFrame,若是要使用Join操做:

1)首先須要先將DataFrame轉化成了JavaRDD。

2)不過,JavaRDD實際上是沒有join算子的,下面還須要經過mapToPair算子將JavaRDD轉換成JavaPairRDD,這樣就可使用Join了。 

下面例子中給出了三種join操做的實現方式,在join以後,經過collect()函數把數據拉到Driver端本地,並經過標準輸出打印。

須要指出的是

1)join算子(join,leftOuterJoin,rightOuterJoin)只能經過PairRDD使用;

2)join算子操做的Tuple2<Object1, Object2>類型中,Object1是鏈接鍵,我只試過Integer和String,Object2比較靈活,甚至能夠是整個Row。

這裏咱們使用driver_id做爲鏈接鍵。 因此在輸出Tuple2的時候,咱們將driver_id放在了前面。

 

Join.java

/*
*   spark-submit --queue=root.zhiliangbu_prod_datamonitor spark-join-1.0-SNAPSHOT-jar-with-dependencies.jar
* */
public class Join implements Serializable {

    private transient JavaSparkContext javaSparkContext;
    private transient HiveContext hiveContext;

    /*
    *   初始化Load
    *   建立sparkContext, sqlContext, hiveContext
    * */
    public Join() {
        initSparckContext();
        initHiveContext();
    }

    /*
    *   建立sparkContext
    * */
    private void initSparckContext() {
        String warehouseLocation = System.getProperty("user.dir");
        SparkConf sparkConf = new SparkConf()
                .setAppName("spark-join")
                .set("spark.sql.warehouse.dir", warehouseLocation)
                .setMaster("yarn-client");
        javaSparkContext = new JavaSparkContext(sparkConf);
    }

    /*
    *   建立hiveContext
    *   用於讀取Hive中的數據
    * */
    private void initHiveContext() {
        hiveContext = new HiveContext(javaSparkContext);
    }


    public void join() {
        /*
        *   生成rdd1
        * */
        String query1 = "select * from gulfstream_test.orders";
        DataFrame rows1 = hiveContext.sql(query1).select("order_id", "driver_id");
        JavaPairRDD<String, String> rdd1 = rows1.toJavaRDD().mapToPair(new PairFunction<Row, String, String>() {
            @Override
            public Tuple2<String, String> call(Row row) throws Exception {
                String orderId = (String)row.get(0);
                String driverId = (String)row.get(1);
                return new Tuple2<String, String>(driverId, orderId);
            }
        });
        /*
        *   生成rdd2
        * */
        String query2 = "select * from gulfstream_test.drivers";
        DataFrame rows2 = hiveContext.sql(query2).select("driver_id", "car_id");
        JavaPairRDD<String, String> rdd2 = rows2.toJavaRDD().mapToPair(new PairFunction<Row, String, String>() {
            @Override
            public Tuple2<String, String> call(Row row) throws Exception {
                String driverId = (String)row.get(0);
                String carId = (String)row.get(1);
                return new Tuple2<String, String>(driverId, carId);
            }
        });
        /*
        *   join
        * */
        System.out.println(" ****************** join *******************");
        JavaPairRDD<String, Tuple2<String, String>> joinRdd = rdd1.join(rdd2);
        Iterator<Tuple2<String, Tuple2<String, String>>> it1 = joinRdd.collect().iterator();
        while (it1.hasNext()) {
            Tuple2<String, Tuple2<String, String>> item = it1.next();
            System.out.println("driver_id:" + item._1 + ", order_id:" + item._2._1 + ", car_id:" + item._2._2 );
        }

        /*
        *   leftOuterJoin
        * */
        System.out.println(" ****************** leftOuterJoin *******************");
        JavaPairRDD<String, Tuple2<String, Optional<String>>> leftOuterJoinRdd = rdd1.leftOuterJoin(rdd2);
        Iterator<Tuple2<String, Tuple2<String, Optional<String>>>> it2 = leftOuterJoinRdd.collect().iterator();
        while (it2.hasNext()) {
            Tuple2<String, Tuple2<String, Optional<String>>> item = it2.next();
            System.out.println("driver_id:" + item._1 + ", order_id:" + item._2._1 + ", car_id:" + item._2._2 );
        }

        /*
        *   rightOuterJoin
        * */
        System.out.println(" ****************** rightOuterJoin *******************");
        JavaPairRDD<String, Tuple2<Optional<String>, String>> rightOuterJoinRdd = rdd1.rightOuterJoin(rdd2);
        Iterator<Tuple2<String, Tuple2<Optional<String>, String>>> it3 = rightOuterJoinRdd.collect().iterator();
        while (it3.hasNext()) {
            Tuple2<String, Tuple2<Optional<String>, String>> item = it3.next();
            System.out.println("driver_id:" + item._1 + ", order_id:" + item._2._1 + ", car_id:" + item._2._2 );
        }
    }

    public static void main(String[] args) {
        Join sj = new Join();
        sj.join();
    }

}

 

執行結果

其中Optional.absent()表示的就是null,能夠看到和HSQL是一致的。

複製代碼
Application ID is application_1508228032068_2746260, trackingURL: http://10.93.21.21:4040
 ****************** join *******************
driver_id:5000, order_id:1000, car_id:100                                       
 ****************** leftOuterJoin *******************
driver_id:5001, order_id:1001, car_id:Optional.absent()
driver_id:5002, order_id:1002, car_id:Optional.absent()
driver_id:5000, order_id:1000, car_id:Optional.of(100)
 ****************** rightOuterJoin *******************
driver_id:5003, order_id:Optional.absent(), car_id:103
driver_id:5000, order_id:Optional.of(1000), car_id:100
複製代碼

因爲數據量不大,我沒有從執行效率上進行考量。

根據經驗,通常在數據量較大的狀況下,HSQL的執行效率會高一些,若是數據量較小,Spark會快。 

相關文章
相關標籤/搜索