springboot基於spark-launcher構建rest api遠程提交spark任務

參考文章:使用springboot構建rest api遠程提交spark任務html

spark-submit動態提交的辦法(SparkLauncher實戰)java

用java提交一個Spark應用程序mysql

Spark-利用SparkLauncher 類以JAVA API 編程的方式提交spark job--imptgit

官網API參考: http://spark.apache.org/docs/latest/api/java/index.html?org/apache/spark/launcher/package-summary.htmlgithub

github代碼連接:github地址web

1. spark集羣及版本信息
服務器版本:centos7
hadoop版本:2.8.3
spark版本:2.3.3
使用springboot構建rest api遠程提交spark任務,將數據庫中的表數據存儲到hdfs上,任務單獨起一個項目,解除與springboot項目的耦合



spring

2. 構建springboot項目
1. pom配置
    <properties>
        <java.version>1.8</java.version>
        <spark.version>2.3.3</spark.version>
        <scala.version>2.11</scala.version>
    </properties>





sql

    <dependencies>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-web</artifactId>
        </dependency>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter</artifactId>
        </dependency>
        <dependency>
            <groupId>mysql</groupId>
            <artifactId>mysql-connector-java</artifactId>
            <version>5.1.46</version>
        </dependency>
        <dependency>
            <groupId>org.apache.spark</groupId>
            <artifactId>spark-launcher_${scala.version}</artifactId>
            <version>${spark.version}</version>
        </dependency>
        <dependency>
            <groupId>org.projectlombok</groupId>
            <artifactId>lombok</artifactId>
        </dependency>
        <dependency>
            <groupId>com.alibaba</groupId>
            <artifactId>fastjson</artifactId>
            <version>1.2.49</version>
        </dependency>


























數據庫

        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-test</artifactId>
            <scope>test</scope>
        </dependency>
    </dependencies>




apache

    <build>
        <finalName>spark</finalName>
        <plugins>
            <plugin>
                <groupId>org.springframework.boot</groupId>
                <artifactId>spring-boot-maven-plugin</artifactId>
                <configuration>
                    <mainClass>com.hrong.springbootspark.SpringbootSparkApplication</mainClass>
                </configuration>
                <executions>
                    <execution>
                        <goals>
                            <goal>repackage</goal>
                        </goals>
                    </execution>
                </executions>
            </plugin>
        </plugins>
    </build>

2. 項目結構




















3. 編寫代碼
1. 建立spark任務實體
import lombok.AllArgsConstructor;
import lombok.Data;
import lombok.NoArgsConstructor;
import org.springframework.beans.factory.annotation.Value;





import java.util.Map;

/**
 * @Author hrong
 **/
@Data
@NoArgsConstructor
@AllArgsConstructor
public class SparkApplicationParam {
    /**
     * 任務的主類
     */
    private String mainClass;
    /**
     * jar包路徑
     */
    private String jarPath;
    @Value("${spark.master:yarn}")
    private String master;
    @Value("${spark.deploy.mode:cluster}")
    private String deployMode;
    @Value("${spark.driver.memory:1g}")
    private String driverMemory;
    @Value("${spark.executor.memory:1g}")
    private String executorMemory;
    @Value("${spark.executor.cores:1}")
    private String executorCores;
    /**
     * 其餘配置:傳遞給spark job的參數
     */
    private Map<String, String> otherConfParams;



























    /**
     * 調用該方法可獲取spark任務的設置參數
     * @return SparkApplicationParam
     */
    public SparkApplicationParam getSparkApplicationParam(){
        return new SparkApplicationParam(mainClass, jarPath, master, deployMode, driverMemory, executorMemory, executorCores, otherConfParams);
    }
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
2. 任務參數對象
每一個任務執行的時候都必須指定運行參數,因此要繼承SparkApplicationParam對象





















































import com.hrong.springbootspark.entity.SparkApplicationParam;
import lombok.AllArgsConstructor;
import lombok.Data;
import lombok.NoArgsConstructor;


/**
 * @Author hrong
 **/
@Data
@NoArgsConstructor
@AllArgsConstructor
public class DataBaseExtractorVo extends SparkApplicationParam {
    /**
     * 數據庫鏈接地址
     */
    private String url;
    /**
     * 數據庫鏈接帳號
     */
    private String userName;
    /**
     * 數據庫密碼
     */
    private String password;
    /**
     * 指定的表名
     */
    private String table;
    /**
     * 目標文件類型
     */
    private String targetFileType;
    /**
     * 目標文件保存路徑
     */
    private String targetFilePath;
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
3. 定義spark提交方法
1. 定義interface
每一個spark任務運行時都須要指定運行參數,可是任務內部所需的參數不同,因此第一個參數爲通用的參數對象,第二個參數爲可變參數,根據不一樣的任務來進行傳值






































































import com.hrong.springbootspark.entity.SparkApplicationParam;

import java.io.IOException;

/**
 * @Author hrong
 * @description spark任務提交service
 **/
public interface ISparkSubmitService {
    /**
     * 提交spark任務入口
     * @param sparkAppParams spark任務運行所需參數
     * @param otherParams 單獨的job所需參數
     * @return 結果
     * @throws IOException          io
     * @throws InterruptedException 線程等待中斷異常
     */
    String submitApplication(SparkApplicationParam sparkAppParams, String... otherParams) throws IOException, InterruptedException;
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
2. 具體實現
import com.alibaba.fastjson.JSONObject;
import com.hrong.springbootspark.entity.SparkApplicationParam;
import com.hrong.springbootspark.service.ISparkSubmitService;
import com.hrong.springbootspark.util.HttpUtil;
import org.apache.spark.launcher.SparkAppHandle;
import org.apache.spark.launcher.SparkLauncher;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.stereotype.Service;











































import java.io.IOException;
import java.util.Map;
import java.util.concurrent.CountDownLatch;

/**
 * @Author hrong
 **/
@Service
public class SparkSubmitServiceImpl implements ISparkSubmitService {



    private static Logger log = LoggerFactory.getLogger(SparkSubmitServiceImpl.class);

    @Value("${driver.name:n151}")
    private String driverName;


    @Override
    public String submitApplication(SparkApplicationParam sparkAppParams, String... otherParams) throws IOException, InterruptedException {
        log.info("spark任務傳入參數:{}", sparkAppParams.toString());
        CountDownLatch countDownLatch = new CountDownLatch(1);
        Map<String, String> confParams = sparkAppParams.getOtherConfParams();
        SparkLauncher launcher = new SparkLauncher()
                .setAppResource(sparkAppParams.getJarPath())
                .setMainClass(sparkAppParams.getMainClass())
                .setMaster(sparkAppParams.getMaster())
                .setDeployMode(sparkAppParams.getDeployMode())
                .setConf("spark.driver.memory", sparkAppParams.getDriverMemory())
                .setConf("spark.executor.memory", sparkAppParams.getExecutorMemory())
                .setConf("spark.executor.cores", sparkAppParams.getExecutorCores());
        if (confParams != null && confParams.size() != 0) {
            log.info("開始設置spark job運行參數:{}", JSONObject.toJSONString(confParams));
            for (Map.Entry<String, String> conf : confParams.entrySet()) {
                log.info("{}:{}", conf.getKey(), conf.getValue());
                launcher.setConf(conf.getKey(), conf.getValue());
            }
        }
        if (otherParams.length != 0) {
            log.info("開始設置spark job參數:{}", otherParams);
            launcher.addAppArgs(otherParams);
        }
        log.info("參數設置完成,開始提交spark任務");
        SparkAppHandle handle = launcher.setVerbose(true).startApplication(new SparkAppHandle.Listener() {
                    @Override
                    public void stateChanged(SparkAppHandle sparkAppHandle) {
                        if (sparkAppHandle.getState().isFinal()) {
                            countDownLatch.countDown();
                        }
                        log.info("stateChanged:{}", sparkAppHandle.getState().toString());
                    }
































                    @Override
                    public void infoChanged(SparkAppHandle sparkAppHandle) {
                        log.info("infoChanged:{}", sparkAppHandle.getState().toString());
                    }
                });
        log.info("The task is executing, please wait ....");
        //線程等待任務結束
        countDownLatch.await();
        log.info("The task is finished!");
        //經過Spark原生的監測api獲取執行結果信息,須要在spark-default.xml、spark-env.sh、yarn-site.xml進行相應的配置
        String estUrl = "http://"+driverName+":18080/api/v1/applications/" + handle.getAppId();
        return HttpUtil.httpGet(restUrl, null);
    }
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
4. Controller寫法
controller主要的職責就是接受頁面的參數,將參數傳遞到service層

























































































import com.hrong.springbootspark.service.ISparkSubmitService;
import com.hrong.springbootspark.vo.DataBaseExtractorVo;
import com.hrong.springbootspark.vo.Result;
import lombok.extern.slf4j.Slf4j;
import org.springframework.stereotype.Controller;
import org.springframework.web.bind.annotation.PostMapping;
import org.springframework.web.bind.annotation.RequestBody;
import org.springframework.web.bind.annotation.ResponseBody;






import javax.annotation.Resource;
import java.io.IOException;

/**
 * @Author hrong
 **/
@Slf4j
@Controller
public class SparkController {
    @Resource
    private ISparkSubmitService iSparkSubmitService;
    /**
     * 調用service進行遠程提交spark任務
     * @param vo 頁面參數
     * @return 執行結果
     */
    @ResponseBody
    @PostMapping("/extract/database")
    public Object dbExtractAndLoad2Hdfs(@RequestBody DataBaseExtractorVo vo){
        try {
            return iSparkSubmitService.submitApplication(vo.getSparkApplicationParam(),
                    vo.getUrl(),
                    vo.getTable(),
                    vo.getUserName(),
                    vo.getPassword(),
                    vo.getTargetFileType(),
                    vo.getTargetFilePath());
        } catch (IOException | InterruptedException e) {
            e.printStackTrace();
            log.error("執行出錯:{}", e.getMessage());
            return Result.err(500, e.getMessage());
        }
    }
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
3. 構建Spark任務項目(Maven項目)
1. pom配置
<properties>
        <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
        <project.reporting.outputEncoding>UTF-8</project.reporting.outputEncoding>
        <java.version>1.8</java.version>
        <hadoop.version>2.8.3</hadoop.version>
        <spark.version>2.3.3</spark.version>
        <scala.version>2.11</scala.version>
        <scala-library.version>2.11.8</scala-library.version>
        <mysql.version>5.1.46</mysql.version>
        <oracle.version>11g</oracle.version>
        <codehaus.version>3.0.10</codehaus.version>
    </properties>
    <dependencies>
        <dependency>
            <groupId>mysql</groupId>
            <artifactId>mysql-connector-java</artifactId>
            <version>${mysql.version}</version>
        </dependency>
        <!-- 下載好了jar包install到本地的,沒法使用maven下載 -->
        <dependency>
            <groupId>com.oracle.driver</groupId>
            <artifactId>jdbc-driver</artifactId>
            <version>${oracle.version}</version>
        </dependency>
        <!--spark相關開始-->
        <dependency>
            <groupId>org.apache.spark</groupId>
            <artifactId>spark-core_${scala.version}</artifactId>
            <version>${spark.version}</version>
            <scope>provided</scope>
        </dependency>
        <dependency>
            <groupId>org.apache.spark</groupId>
            <artifactId>spark-sql_${scala.version}</artifactId>
            <version>${spark.version}</version>
            <scope>provided</scope>
        </dependency>
        <dependency>
            <groupId>org.codehaus.janino</groupId>
            <artifactId>commons-compiler</artifactId>
            <version>${codehaus.version}</version>
        </dependency>
        <dependency>
            <groupId>org.scala-lang</groupId>
            <artifactId>scala-library</artifactId>
            <version>${scala-library.version}</version>
            <scope>provided</scope>
        </dependency>
        <dependency>
            <groupId>org.apache.hadoop</groupId>
            <artifactId>hadoop-common</artifactId>
            <version>${hadoop.version}</version>
            <scope>provided</scope>
        </dependency>
    </dependencies>
    <build>
        <finalName>spark-job</finalName>
        <pluginManagement>
            <plugins>
                <plugin>
                    <groupId>net.alchim31.maven</groupId>
                    <artifactId>scala-maven-plugin</artifactId>
                    <version>3.2.2</version>
                </plugin>
            </plugins>
        </pluginManagement>
        <plugins>
            <plugin>
                <groupId>net.alchim31.maven</groupId>
                <artifactId>scala-maven-plugin</artifactId>
                <executions>
                    <execution>
                        <id>scala-compile-first</id>
                        <phase>process-resources</phase>
                        <goals>
                            <goal>add-source</goal>
                            <goal>compile</goal>
                        </goals>
                    </execution>
                    <execution>
                        <id>scala-test-compile</id>
                        <phase>process-test-resources</phase>
                        <goals>
                            <goal>testCompile</goal>
                        </goals>
                    </execution>
                </executions>
            </plugin>
            <plugin>
                <groupId>org.apache.maven.plugins</groupId>
                <artifactId>maven-compiler-plugin</artifactId>
                <executions>
                    <execution>
                        <phase>compile</phase>
                        <goals>
                            <goal>compile</goal>
                        </goals>
                    </execution>
                </executions>
            </plugin>
            <plugin>
                <groupId>org.apache.maven.plugins</groupId>
                <artifactId>maven-shade-plugin</artifactId>
                <version>2.4.3</version>
                <executions>
                    <execution>
                        <phase>package</phase>
                        <goals>
                            <goal>shade</goal>
                        </goals>
                        <configuration>
                            <filters>
                                <filter>
                                    <artifact>*:*</artifact>
                                    <excludes>
                                        <exclude>META-INF/*.SF</exclude>
                                        <exclude>META-INF/*.DSA</exclude>
                                        <exclude>META-INF/*.RSA</exclude>
                                    </excludes>
                                </filter>
                            </filters>
                        </configuration>
                    </execution>
                </executions>
            </plugin>
        </plugins>
    </build>
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
2. 項目結構










































































































































































































































































































































3. spark job代碼
獲取外部參數,鏈接數據庫,並將指定表中的數據根據指定的格式、目錄轉存到hdfs上

import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.SparkSession;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;



/**
 * @Author hrong
 * @Description 將數據庫中的表數據保存到hdfs上
 **/
public class DbTableEtl {
    private static Logger log = LoggerFactory.getLogger(DbTableEtl.class);




    public static void main(String[] args) {
        SparkSession spark = SparkSession.builder()
                .appName(DbTableEtl.class.getSimpleName())
                .getOrCreate();
        String url = args[0];
        String dbtable = args[1];
        String user = args[2];
        String password = args[3];
        String targetFileType = args[4];
        String targetFilePath = args[5];
        Dataset<Row> dbData = spark.read()
                .format("jdbc")
                .option("url", url)
                .option("dbtable", dbtable)
                .option("user", user)
                .option("password", password)
                .load();
        log.info("展現部分樣例數據,即將開始導入到hdfs");
        dbData.show(20, false);
        dbData.write().mode("overwrite").format(targetFileType).save(targetFilePath);
    }
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
3. 項目打包
直接使用IDEA自帶打包功能

























































1. springboot項目


2. Spark job項目


4. 上傳至服務器


5. 將spark-job上傳至hdfs


6. 啓動springboot項目


7. 使用postman調用接口
指定jarPath、mainClass、deployMode以及任務所需參數


8. 調用結果
程序開始提交任務

程序執行結束

代碼放在了github上面,連接:github地址  

相關文章
相關標籤/搜索