Spark程序中一個Job的觸發是經過一個Action算子,好比count(), saveAsTextFile()等
java
在此次Spark優化測試中,從Hive中讀取數據,將其另外保存四份,其中兩個Job採用串行方式,另外兩個Job採用並行方式。將任務提交到Yarn中執行。可以明顯看出串行與兵線處理的性能。sql
每一個Job執行時間:apache
JobID | 開始時間 | 結束時間 | 耗時 |
Job 0 | 16:59:45 | 17:00:34 | 49s |
Job 1 | 17:00:34 | 17:01:13 | 39s |
Job 2 | 17:01:15 | 17:01:55 |
40s |
Job 3 | 17:01:16 | 17:02:12 | 56s |
四個Job都是自執行相同操做,Job0,Job1一組採用串行方式,Job2,Job3採用並行方式。api
Job0,Job1串行方式耗時等於兩個Job耗時之和 49s+39s=88s多線程
Job2,Job3並行方式耗時等於最早開始和最後結束時間只差17:02:12-17:01:15=57side
代碼:oop
package com.cn.ctripotb; import org.apache.spark.SparkConf; import org.apache.spark.api.java.JavaSparkContext; import org.apache.spark.sql.DataFrame; import org.apache.spark.sql.hive.HiveContext; import java.util.*; import java.util.concurrent.Callable; import java.util.concurrent.Executors; /** * Created by Administrator on 2016/9/12. */ public class HotelTest { static ResourceBundle rb = ResourceBundle.getBundle("filepath"); public static void main(String[] args) { SparkConf conf = new SparkConf() .setAppName("MultiJobWithThread") .set("spark.serializer", "org.apache.spark.serializer.KryoSerializer"); JavaSparkContext sc = new JavaSparkContext(conf); HiveContext hiveContext = new HiveContext(sc.sc()); //測試真實數據時要把這裏放開 final DataFrame df = getHotelInfo(hiveContext); //沒有多線程處理的狀況,連續執行兩個Action操做,生成兩個Job df.rdd().saveAsTextFile(rb.getString("hdfspath") + "/file1",com.hadoop.compression.lzo.LzopCodec.class); df.rdd().saveAsTextFile(rb.getString("hdfspath") + "/file2",com.hadoop.compression.lzo.LzopCodec.class); //用Executor實現多線程方式處理Job java.util.concurrent.ExecutorService executorService = Executors.newFixedThreadPool(2); executorService.submit(new Callable<Void>() { @Override public Void call(){ df.rdd().saveAsTextFile(rb.getString("hdfspath") + "/file3",com.hadoop.compression.lzo.LzopCodec.class); return null; } }); executorService.submit(new Callable<Void>() { @Override public Void call(){ df.rdd().saveAsTextFile(rb.getString("hdfspath") + "/file4",com.hadoop.compression.lzo.LzopCodec.class); return null; } }); executorService.shutdown(); } public static DataFrame getHotelInfo(HiveContext hiveContext){ String sql = "select * from common.dict_hotel_ol"; return hiveContext.sql(sql); } }