Spark從入門到入土(二):任務提交

spark任務提交有三種方式mysql

1:經過local方式提交linux

2:經過spark-submit腳本提交到集羣spring

3:經過spark提交的API SparkLauncher提交到集羣,這種方式能夠將提交過程集成到咱們的spring工程中,更加靈活sql

先來看一下spark架構,能夠幫助理解任務的提交

任務提交

驅動程序:執行應用程序main方法的進程mongodb

集羣管理器:啓動執行器節點,有Mesos、YARN(Hadoop)、獨立集羣管理器(spark自帶的集羣管理器),在standalone模式中即爲Master主節點。bash

執行器節點:工做進程,負責在spark做業中運行任務服務器

過程大概以下架構

①:執行器節點(工做節點)在啓動時會向驅動器註冊本身app

②:用戶提交任務,驅動器調用main方法,驅動器與集羣管理器通訊申請執行器資源ide

③:集羣管理器爲驅動器程序啓動執行器節點

④:驅動器程序執行應用程序,將任務發送到工做節點

⑤:工做節點進行計算並保存結果

⑥:驅動器main方法退出,經過集羣管理器釋放資源

注意:在客戶端模式下,spark-submit 會將驅動器程序運行 在 spark-submit 被調用的這臺機器上。在集羣模式下,驅動器程序會被傳輸並執行 於集羣的一個工做節點上

1、本地方式提交
//該代碼是對企業架構下的不一樣級別的海量告警信息進行離線統計,按部門、日期、級別進行分組統計
public static void main(String[] args) {
        logger.info("開始執行spark任務");
        SimpleDateFormat sdf = new SimpleDateFormat("yyyyMMdd");

        SparkConf conf = new SparkConf()
                .setAppName("離線統計")
                //.setMaster("spark://" + SparkConfig.SPARK_MASTER_HOST + ":" + SparkConfig.SPARK_MASTER_PORT)
                .setMaster("local")
                .set("spark.mongodb.input.uri", MongoConfig.SPARK_MONGODB_URL_PREFIX + BaseConstant.ALARM_SOURCE_TABLE)
                .set("spark.mongodb.output.uri", MongoConfig.SPARK_MONGODB_URL_PREFIX + BaseConstant
                        .ALARM_TARGET_TABLE);
        JavaSparkContext jsc = new JavaSparkContext(conf);
        JavaMongoRDD<Document> lines = MongoSpark.load(jsc).withPipeline(
                singletonList(
                        getCondition()
                )
        );

        //按部門時間分組計算
        JavaPairRDD<String, AlarmStatisticBean> pairs =
                lines.filter((Function<Document, Boolean>) line -> {
                    //代碼略掉
                    //過濾函數,true:不過濾,false:過濾
                    return true;
                }).mapToPair( 
                        //對RDD中的每一個元素調用指定函數,並返回<String, AlarmStatisticBean>類型的對象
                        //鍵值對, key: orgId_day  value: level[]   例:1_20190101, [0,1,0]
                        (PairFunction<Document, String, AlarmStatisticBean>) line -> {
                            Long orgId = line.getLong("orgId");
                            String statisticDate = sdf.format(line.getLong("createTimestamp") * 1000);
                            AlarmStatisticBean bean = new AlarmStatisticBean();
                            Long level = line.getLong("levelDictId");
                            if (level == AlarmTypeEnum.LEVEL1.getType()) {
                                bean.setLevel1Count(1);
                            }
                            if (level == AlarmTypeEnum.LEVEL2.getType()) {
                                bean.setLevel2Count(1);
                            }
                            if (level == AlarmTypeEnum.LEVEL3.getType()) {
                                bean.setLevel3Count(1);
                            }
                            bean.setOrgId(orgId.intValue());
                            bean.setDay(statisticDate);
                            String code = orgId + "_" + statisticDate;
                            return new Tuple2<>(code, bean);
                        }
                        //分組多列求和 1_20190101, [x,y,z]
                ).reduceByKey((Function2<AlarmStatisticBean, AlarmStatisticBean, AlarmStatisticBean>) (v1, v2) -> {
                   //reduceByKey的做用是合併具備相同鍵的值
                    v1.setLevel1Count(v1.getLevel1Count() + v2.getLevel1Count());
                    v1.setLevel2Count(v1.getLevel2Count() + v2.getLevel2Count());
                    v1.setLevel3Count(v1.getLevel3Count() + v2.getLevel3Count());
                    return v1;
                });
        logger.info("------------------------->>>>>" + pairs.count());
        List<Document> documents = new ArrayList<>();

        //類型轉換,以便持久化到DB(mongo或mysql)
        for (Tuple2<String, AlarmStatisticBean> tuple2 : pairs.collect()) {
            Document document = new Document("day", tuple2._2.getDay())
                    .append("level1Count", tuple2._2.getLevel1Count())
                    .append("level2Count", tuple2._2.getLevel2Count())
                    .append("level3Count", tuple2._2.getLevel3Count())
                    .append("orgId", tuple2._2.getOrgId());
            documents.add(document);
        }

        MongoManager.saveToMongo(documents, BaseConstant.ALARM_TARGET_TABLE);
    }
複製代碼

直接運行上述main方法會將AlarmStatisticBean對象保存到MongoDB中

2、spark-submit腳本

提交到集羣時,須要註釋掉local

SparkConf conf = new SparkConf()
                .setAppName("離線統計")
                .setMaster("spark://" + SparkConfig.SPARK_MASTER_HOST + ":" + SparkConfig.SPARK_MASTER_PORT)
                //.setMaster("local")
...
複製代碼
//帶*的是公司名或項目名稱,不影響閱讀
spark-submit --class com.*.*.meter.alarm.AlarmStatisticService --master spark://master:7077 /opt/middleware/*-alarm-task-1.0-jar-with-dependencies.ja
複製代碼

--master spark:// 表示會使用獨立模式,也就是使用spark自帶的獨立集羣管理器。提交時使用的主機名和端口精確匹配用戶頁面中的URL,這裏建議直接從http://172.*.*.6:8080頁面上覆制URL,避免沒必要要的麻煩。

3、SparkLauncher提交

SparkLauncher也提供了兩種方式提交任務

3.一、launch

SparkLauncher其實是根據JDK自帶的ProcessBuilder構造了一個UNIXProcess子進程提交任務,提交的形式跟spark-submit同樣。這個子進程會以阻塞的方式等待程序的運行結果。簡單來看就是拼接spark-submit命令,並以子進程的方式啓動。 代碼中的process.getInputStream()實際上對應linux進程的標準輸出stdout process.getErrorStream()實際上對應linux進程的錯誤信息stderr process.getOutputStream()實際上對應linux進程的輸入信息stdin

@Scheduled(fixedRate = 5000 * 60)
    public void alarmStatistic() {
        logger.info("=====>>>>>離線統計定時任務!", System.currentTimeMillis());

        try {
            HashMap env = new HashMap();
            //這兩個屬性必須設置
            env.put("HADOOP_CONF_DIR", CommonConfig.HADOOP_CONF_DIR);
            env.put("JAVA_HOME", CommonConfig.JAVA_HOME);

            SparkLauncher handle = new SparkLauncher(env)
                    .setSparkHome(SparkConfig.SPARK_HOME)
                    .setAppResource(CommonConfig.ALARM_JAR_PATH)
                    .setMainClass(CommonConfig.ALARM_JAR_MAIN_CLASS)
                    .setMaster("spark://" + SparkConfig.SPARK_MASTER_HOST + ":" + SparkConfig.SPARK_MASTER_PORT)
                    .setDeployMode(SparkConfig.SPARK_DEPLOY_MODE)
                    .setVerbose(SparkConfig.SPARK_VERBOSE)
                    .setConf("spark.app.id", CommonConfig.ALARM_APP_ID)
                    .setConf("spark.driver.memory", SparkConfig.SPARK_DRIVER_MEMORY)
                    .setConf("spark.rpc.message.maxSize", SparkConfig.SPARK_RPC_MESSAGE_MAXSIZE)
                    .setConf("spark.executor.memory", SparkConfig.SPARK_EXECUTOR_MEMORY)
                    .setConf("spark.executor.instances", SparkConfig.SPARK_EXECUTOR_INSTANCES)
                    .setConf("spark.executor.cores", SparkConfig.SPARK_EXECUTOR_CORES)
                    .setConf("spark.default.parallelism", SparkConfig.SPARK_DEFAULT_PARALLELISM)
                    .setConf("spark.driver.allowMultipleContexts", SparkConfig.SPARK_DRIVER_ALLOWMULTIPLECONTEXTS)
                    .setVerbose(true);

            Process process = handle.launch();
            InputStreamRunnable inputStream = new InputStreamRunnable(process.getInputStream(), "alarm task input");
            ExecutorUtils.getExecutorService().submit(inputStream);
            InputStreamRunnable errorStream = new InputStreamRunnable(process.getErrorStream(), "alarm task error");
            ExecutorUtils.getExecutorService().submit(errorStream);

            logger.info("Waiting for finish...");
            int exitCode = process.waitFor();
            logger.info("Finished! Exit code:" + exitCode);
        } catch (Exception e) {
            logger.error("submit spark task error", e);
        }
    }
複製代碼
運行過程示意圖

運行過程
1:用戶程序啓動(SparkLauncher,非驅動程序)時會在當前節點上啓動一個SparkSubmit進程,並將驅動程序(即spark任務)發送到任意一個工做節點上,在工做節點上啓動DriverWrapper進程

2:驅動程序會從集羣管理器(standalone模式下是master服務器)申請執行器資源

3:集羣管理器反饋執行器資源給驅動器

4:驅動器Driver將任務發送到執行器節點執行

spark首頁監控

能夠看到啓動的Driver

首頁監控-驅動器
進一步能夠查看到執行器狀況
首頁監控-執行器

也可經過服務器進程查看各進程之間的關係

Master節點
工做節點-驅動器
工做節點-執行器1
工做節點-執行器2

3.二、startApplication()方式
@Scheduled(fixedRate = 5000 * 60)
    public void alarmStatistic() {
        logger.info("=====>>>>>告警離線統計定時任務!", System.currentTimeMillis());

        try {
            HashMap env = new HashMap();
            //這兩個屬性必須設置
            env.put("HADOOP_CONF_DIR", CommonConfig.HADOOP_CONF_DIR);
            env.put("JAVA_HOME", CommonConfig.JAVA_HOME);

            CountDownLatch countDownLatch = new CountDownLatch(1);
            SparkAppHandle handle = new SparkLauncher(env)
                    .setSparkHome(SparkConfig.SPARK_HOME)
                    .setAppResource(CommonConfig.ALARM_JAR_PATH)
                    .setMainClass(CommonConfig.ALARM_JAR_MAIN_CLASS)
                    .setMaster("spark://" + SparkConfig.SPARK_MASTER_HOST + ":" + SparkConfig.SPARK_MASTER_PORT)
//                    .setMaster("yarn")
                    .setDeployMode(SparkConfig.SPARK_DEPLOY_MODE)
                    .setVerbose(SparkConfig.SPARK_VERBOSE)
                    .setConf("spark.app.id", CommonConfig.ALARM_APP_ID)
                    .setConf("spark.driver.memory", SparkConfig.SPARK_DRIVER_MEMORY)
                    .setConf("spark.rpc.message.maxSize", SparkConfig.SPARK_RPC_MESSAGE_MAXSIZE)
                    .setConf("spark.executor.memory", SparkConfig.SPARK_EXECUTOR_MEMORY)
                    .setConf("spark.executor.instances", SparkConfig.SPARK_EXECUTOR_INSTANCES)
                    .setConf("spark.executor.cores", SparkConfig.SPARK_EXECUTOR_CORES)
                    .setConf("spark.default.parallelism", SparkConfig.SPARK_DEFAULT_PARALLELISM)
                    .setConf("spark.driver.allowMultipleContexts", SparkConfig.SPARK_DRIVER_ALLOWMULTIPLECONTEXTS)
                    .setVerbose(true).startApplication(new SparkAppHandle.Listener() {
                        //這裏監放任務狀態,當任務結束時(無論是什麼緣由結束),isFinal()方法會返回true,不然返回false
                        @Override
                        public void stateChanged(SparkAppHandle sparkAppHandle) {
                            if (sparkAppHandle.getState().isFinal()) {
                                countDownLatch.countDown();
                            }
                            System.out.println("state:" + sparkAppHandle.getState().toString());
                        }

                        @Override
                        public void infoChanged(SparkAppHandle sparkAppHandle) {
                            System.out.println("Info:" + sparkAppHandle.getState().toString());
                        }
                    });
            logger.info("The task is executing, please wait ....");
            //線程等待任務結束
            countDownLatch.await();
            logger.info("The task is finished!");
        } catch (Exception e) {
            logger.error("submit spark task error", e);
        }
    }
複製代碼

這種模式下,據本人親自測試,只有在yarn工做模式下能夠提交成功,在standalone模式下老是提交失敗,若是有人知道的能夠告訴我

yarn模式須要安裝hadoop集羣,提交任務的流程基本和上面是同樣的,不一樣的是集羣管理器不在是spark自帶的集羣管理器,而是由yarn來管理,這也是官方推薦的提交方式,比較麻煩的就是須要安裝hadoop集羣,hadoop的安裝參加另外一篇 Hadoop集羣搭建

從監控頁面能夠看到application的執行狀況

yarn監控頁面
相關文章
相關標籤/搜索