【總結】Spark優化(1)-多Job併發執行

Spark程序中一個Job的觸發是經過一個Action算子,好比count(), saveAsTextFile()等
java

在此次Spark優化測試中,從Hive中讀取數據,將其另外保存四份,其中兩個Job採用串行方式,另外兩個Job採用並行方式。將任務提交到Yarn中執行。可以明顯看出串行與兵線處理的性能。sql


每一個Job執行時間:apache

JobID 開始時間 結束時間 耗時
Job 0 16:59:45 17:00:34 49s
Job 1 17:00:34 17:01:13 39s
Job 2 17:01:15 17:01:55
40s
Job 3 17:01:16 17:02:12 56s

四個Job都是自執行相同操做,Job0,Job1一組採用串行方式,Job2,Job3採用並行方式。api

Job0,Job1串行方式耗時等於兩個Job耗時之和 49s+39s=88s多線程

Job2,Job3並行方式耗時等於最早開始和最後結束時間只差17:02:12-17:01:15=57side


wKioL1mmgtqQS23TAADBhyjHUwY778.png

代碼:oop

package com.cn.ctripotb;

import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.sql.DataFrame;
import org.apache.spark.sql.hive.HiveContext;


import java.util.*;
import java.util.concurrent.Callable;
import java.util.concurrent.Executors;

/**
 * Created by Administrator on 2016/9/12.
 */
public class HotelTest {
    static ResourceBundle rb = ResourceBundle.getBundle("filepath");
    public static void main(String[] args) {
        SparkConf conf = new SparkConf()
                .setAppName("MultiJobWithThread")
                .set("spark.serializer", "org.apache.spark.serializer.KryoSerializer");

        JavaSparkContext sc = new JavaSparkContext(conf);
        HiveContext hiveContext = new HiveContext(sc.sc());  //測試真實數據時要把這裏放開

        final DataFrame df = getHotelInfo(hiveContext);
        //沒有多線程處理的狀況,連續執行兩個Action操做,生成兩個Job
        df.rdd().saveAsTextFile(rb.getString("hdfspath") + "/file1",com.hadoop.compression.lzo.LzopCodec.class);
        df.rdd().saveAsTextFile(rb.getString("hdfspath") + "/file2",com.hadoop.compression.lzo.LzopCodec.class);

        //用Executor實現多線程方式處理Job
        java.util.concurrent.ExecutorService executorService = Executors.newFixedThreadPool(2);
        executorService.submit(new Callable<Void>() {
            @Override
            public Void call(){
                df.rdd().saveAsTextFile(rb.getString("hdfspath") + "/file3",com.hadoop.compression.lzo.LzopCodec.class);
                return null;
            }
        });
        executorService.submit(new Callable<Void>() {
            @Override
            public Void call(){
                df.rdd().saveAsTextFile(rb.getString("hdfspath") + "/file4",com.hadoop.compression.lzo.LzopCodec.class);
                return null;
            }
        });

        executorService.shutdown();
    }
    public static DataFrame getHotelInfo(HiveContext hiveContext){
        String sql = "select * from common.dict_hotel_ol";
        return  hiveContext.sql(sql);
    }
}
相關文章
相關標籤/搜索