Spark(四十四):使用Java調用spark-submit.sh(支持 --deploy-mode client和cluster兩種方式)並獲取applicationId

以前也介紹過使用yarn api來submit spark任務,經過提交接口返回applicationId的用法,具體參考《Spark2.3(四十):如何使用java經過yarn api調度spark app,並根據appId監控任務,關閉任務,獲取任務日誌》;html

可是我更喜歡使用該篇文章中介紹的使用java來調用spark-submit.sh shell提交任務,並從spark-sbumit.sh執行界面獲取applicationId的方案。使用hadoop api方式須要配置好環境,以及根據hadoop版本不一樣,須要引入不通包。java

用java調用shell使用說明:

用java調用shell,使用sql

1 Process p=Runtime.getRuntime().exec(String[] cmd);

Runtime.exec()方法將產生一個本地的進程,並返回一個Process子類的實例,該實例可用於控制進程或取得進程的相關信息。
因爲調用Runtime.exec方法所建立的子進程沒有本身的終端或控制檯,所以該子進程的標準IO(如stdin,stdou,stderr)都經過shell

1     p.getOutputStream(),
2     p.getInputStream(),
3     p.getErrorStream()

方法重定向給它的父進程了.用戶須要用這些stream來向 子進程輸入數據或獲取子進程的輸出。
    例如:Runtime.getRuntime().exec("ls")apache

  • 另外須要關心的是Runtime.getRuntime().exec()中產生停滯(阻塞,blocking)的問題?

    由於Runtime.getRuntime().exec()要本身去處理stdout和stderr的輸出,就是說,執行的結果不知道是現有錯誤輸出(stderr),仍是現有標準輸出(stdout)。你沒法判斷到底那個先輸出,因此可能沒法讀取輸出,而一直阻塞。
    例如:你先處理標準輸出(stdout),可是處理的結果是先有錯誤輸出(stderr),一直在等錯誤輸出(stderr)被取走了,纔到標準輸出(stdout),這樣就產生了阻塞。api

  • 解決辦法:

    用兩個線程將標準輸出(stdout)和錯誤輸出(stderr)。 bash

   參考代碼:app

 1 import java.util.*;  
 2 import java.io.*;  
 3 
 4 class StreamGobbler extends Thread  
 5 {  
 6     InputStream is;  
 7     String type;  
 8       
 9     StreamGobbler(InputStream is, String type)  
10     {  
11         this.is = is;  
12         this.type = type;  
13     }  
14       
15     public void run()  
16     {  
17         try  
18         {  
19             InputStreamReader isr = new InputStreamReader(is);  
20             BufferedReader br = new BufferedReader(isr);  
21             String line=null;  
22             while ( (line = br.readLine()) != null)  
23                 System.out.println(type + ">" + line);      
24             } catch (IOException ioe)  
25               {  
26                 ioe.printStackTrace();    
27               }  
28     }  
29 }  
30 
31 public class ExecRunner  
32 {  
33     public static void main(String args[])  
34     {  
35         if (args.length < 1)  
36         {  
37             System.out.println("USAGE: java GoodWindowsExec <cmd>");  
38             System.exit(1);  
39         }  
40           
41         try  
42         {              
43             String osName = System.getProperty("os.name" );  
44             String[] cmd = new String[3];  
45             if( osName.equals( "Windows NT" ) )  
46             {  
47                 cmd[0] = "cmd.exe" ;  
48                 cmd[1] = "/C" ;  
49                 cmd[2] = args[0];  
50             }  
51             else if( osName.equals( "Windows 95" ) )  
52             {  
53                 cmd[0] = "command.com" ;  
54                 cmd[1] = "/C" ;  
55                 cmd[2] = args[0];  
56             } else {  
57                 StringTokenizer st = new StringTokenizer(command, " ");  
58                 cmd = new String[st.countTokens()];  
59                 int token = 0;  
60                 while (st.hasMoreTokens()) {  
61                     String tokenString = st.nextToken();  
62                     // System.out.println(tokenString);  
63                     cmd[token++] = tokenString;  
64                 }  
65             }  
66               
67             Runtime rt = Runtime.getRuntime();  
68             System.out.println("Execing " + cmd[0] + " " + cmd[1]   
69                                + " " + cmd[2]);  
70             Process proc = rt.exec(cmd);  
71             // any error message?  
72             StreamGobbler errorGobbler = new   
73                 StreamGobbler(proc.getErrorStream(), "ERROR");              
74               
75             // any output?  
76             StreamGobbler outputGobbler = new   
77                 StreamGobbler(proc.getInputStream(), "OUTPUT");  
78                   
79             // kick them off  
80             errorGobbler.start();  
81             outputGobbler.start();  
82                                       
83             // any error???  
84             int exitVal = proc.waitFor();  
85             System.out.println("ExitValue: " + exitVal);          
86         } catch (Throwable t)  
87           {  
88             t.printStackTrace();  
89           }  
90     }  
91 }  
View Code

使用JAVA調用spark-submit.sh實現

spark-submit提交腳本submit_test.sh

#/bin/sh
jarspath=''

for file in `ls /home/dx/djj/sparkjars/*.jar`
do
  jarspath=${file},$jarspath
done
jarspath=${jarspath%?}

echo $jarspath

/home1/opt/cloudera/parcels/SPARK2-2.3.0.cloudera3-1.cdh5.13.3.p0.458809/lib/spark2/bin/spark-submit \
--master yarn \
--deploy-mode cluster \
--class com.dx.test.BroadcastTest \
--properties-file ./conf/spark-properties-mrs.conf \
--jars $jarspath \
--num-executors 10 \
--executor-memory 3G \
--executor-cores 1 \
--driver-memory 2G \
--driver-java-options "-XX:+TraceClassPaths" \
./test.jar $1 $2 $3 $4

注意:yarn的提交方式測試時,須要修改--deploy-mode參數:ide

cluster方式:--deploy-mode cluster \函數

client  方式:--deploy-mode client \

咱們若是須要從spark-submit中獲取到applicationId,就須要從spark-submit執行打印結果(也就是Process對象的標準輸出、錯誤輸出)過濾出applicationId,若是用過spark-submit.sh提交spark任務的話,你會發現執行時,在打印界面上會輸出applicationId。

  • yarn的client方式(--deploy-mode client)時,執行spark-submit.sh提交任務打印applicationid的位置:
19/04/02 11:38:29 INFO handler.ContextHandler: Started o.s.j.s.ServletContextHandler@215a34b4{/static,null,AVAILABLE,@Spark}
19/04/02 11:38:29 INFO handler.ContextHandler: Started o.s.j.s.ServletContextHandler@2e380628{/,null,AVAILABLE,@Spark}
19/04/02 11:38:29 INFO handler.ContextHandler: Started o.s.j.s.ServletContextHandler@1eaf1e62{/api,null,AVAILABLE,@Spark}
19/04/02 11:38:29 INFO handler.ContextHandler: Started o.s.j.s.ServletContextHandler@652ab8d9{/jobs/job/kill,null,AVAILABLE,@Spark}
19/04/02 11:38:29 INFO handler.ContextHandler: Started o.s.j.s.ServletContextHandler@51e0301d{/stages/stage/kill,null,AVAILABLE,@Spark}
19/04/02 11:38:31 INFO client.RMProxy: Connecting to ResourceManager at vm10.60.0.11.com.cn/10.60.0.11:8032
[Opened /usr/java/jdk1.8.0_152/jre/lib/jce.jar]
[Opened /usr/java/jdk1.8.0_152/jre/lib/charsets.jar]
19/04/02 11:40:24 INFO impl.YarnClientImpl: Submitted application application_1548381669007_0829
  • yarn的cluster方式(--deploy-mode cluster)時,執行spark-submit.sh提交任務打印applicationid的位置:
19/04/02 11:40:22 INFO yarn.Client: Application report for application_1548381669007_0828 (state: ACCEPTED)
19/04/02 11:40:23 INFO yarn.Client: Application report for application_1548381669007_0828 (state: ACCEPTED)
19/04/02 11:40:24 INFO yarn.Client: Application report for application_1548381669007_0828 (state: ACCEPTED)
19/04/02 11:40:25 INFO yarn.Client: Application report for application_1548381669007_0828 (state: ACCEPTED)
19/04/02 11:40:26 INFO yarn.Client: Application report for application_1548381669007_0828 (state: ACCEPTED)
19/04/02 11:40:27 INFO yarn.Client: Application report for application_1548381669007_0828 (state: ACCEPTED)
19/04/02 11:40:28 INFO yarn.Client: Application report for application_1548381669007_0828 (state: ACCEPTED)
19/04/02 11:40:29 INFO yarn.Client: Application report for application_1548381669007_0828 (state: ACCEPTED)
19/04/02 11:40:30 INFO yarn.Client: Application report for application_1548381669007_0828 (state: ACCEPTED)
19/04/02 11:40:31 INFO yarn.Client: Application report for application_1548381669007_0828 (state: ACCEPTED)
19/04/02 11:40:32 INFO yarn.Client: Application report for application_1548381669007_0828 (state: RUNNING) 19/04/02 11:40:33 INFO yarn.Client: Application report for application_1548381669007_0828 (state: RUNNING)

過濾applicationId函數實現以下:

 

    /**
     * @param line stdin,stderr的一行信息。
     * */
    private String filterApplicationId(String line, boolean isCluster) {
        String applicationId = null;
        line = line.toLowerCase();

        // --deploy-mode client
        // 19/02/15 17:43:35 INFO impl.YarnClientImpl: Submitted application application_1548381669007_0051
        // 19/04/01 14:13:57 INFO impl.YarnClientImpl: Submitted application application_1548381669007_0781
        boolean isIndexSparkOwnLog = line.indexOf("INFO impl.YarnClientImpl: Submitted application ".toLowerCase()) != -1;
        // --deploy-mode cluster
        // 19/04/01 14:13:59 INFO yarn.Client: Application report for application_1548381669007_0781 (state: ACCEPTED)
        // 19/04/01 14:13:59 INFO yarn.Client: Application report for application_1548381669007_0781 (state: RUNNING)
        boolean isIndexSparkOwn2Log = line.indexOf("INFO yarn.Client: Application report for ".toLowerCase()) != -1;
        boolean isIndexSparkRunningLog = line.indexOf("(state: RUNNING)".toLowerCase()) != -1;

        if (isIndexSparkOwnLog || (isIndexSparkOwn2Log && isIndexSparkRunningLog)) {
            if (isIndexSparkOwnLog && false == isCluster) {
                int idx = line.indexOf("INFO impl.YarnClientImpl: Submitted application ".toLowerCase());
                applicationId = line.substring(idx + "INFO impl.YarnClientImpl: Submitted application ".length());
            } else if (isIndexSparkOwn2Log && true == isCluster) {
                int idx = line.indexOf("INFO yarn.Client: Application report for ".toLowerCase());
                applicationId = line.substring(idx + "INFO yarn.Client: Application report for ".length());
                if (line.indexOf("(state: RUNNING)".toLowerCase()) != -1) {
                    applicationId = applicationId.replace(" (state: RUNNING)".toLowerCase(), "");
                }
            }
        }

        if (applicationId != null && applicationId.startsWith("application_")) {
            System.out.println("====================================Index of applicationId:" + applicationId);
            System.out.println("====================================Index of applicationId:Complete ...");
        }

        return applicationId;
    }

 

若是過濾成功,就反回applicationId,過濾不到返回null。

對stdin,stderr Stream進行接收的線程定義:

class StreamFilterTask implements Callable<String> {
    private InputStream inputStream;
    private ConcurrentLinkedQueue<String> queue;
    private String streamType = null;
    private boolean isCluster;

    private StreamFilterTask() {
    }

    public StreamFilterTask(InputStream inputStream, ConcurrentLinkedQueue<String> queue, String streamType,
            boolean isCluster) {
        this.inputStream = inputStream;
        this.queue = queue;
        this.streamType = streamType;
        this.isCluster = isCluster;
    }

    @Override
    public String call() throws Exception {
        BufferedReader br = null;
        try {
            br = new BufferedReader(new InputStreamReader(inputStream));
            String line = null;
            while ((line = br.readLine()) != null) {
                System.out.println(line);

                // 維護隊列爲最近1000條記錄,超過就刪除。
                // size() 是要遍歷一遍集合的,因此儘可能要避免用size而改用isEmpty().
                if (this.streamType.equalsIgnoreCase("error")) {
                    if (queue.size() > 1000) {
                        // 檢索並刪除此隊列的頭,若是此隊列爲空,則返回空值。
                        queue.poll();
                    }
                    // 在該隊列的尾部插入指定的元素。因爲隊列未綁定,所以此方法永遠不會返回false。
                    queue.offer(line);
                }

                String applicationId = filterApplicationId(line, isCluster);

                if (applicationId != null && applicationId.startsWith("application_")) {
                    return applicationId;
                }
            }
        } catch (IOException ioe) {
            ioe.printStackTrace();
        } finally {
            if (br != null) {
                try {
                    br.close();
                } catch (Exception e) {
                    e.printStackTrace();
                }
            }
        }

        return null;
    }

    /**
     * @param line stdin,stderr的一行信息。
     * */
    private String filterApplicationId(String line, boolean isCluster) {
        String applicationId = null;
        line = line.toLowerCase();

        // --deploy-mode client
        // 19/02/15 17:43:35 INFO impl.YarnClientImpl: Submitted application application_1548381669007_0051
        // 19/04/01 14:13:57 INFO impl.YarnClientImpl: Submitted application application_1548381669007_0781
        boolean isIndexSparkOwnLog = line.indexOf("INFO impl.YarnClientImpl: Submitted application ".toLowerCase()) != -1;
        // --deploy-mode cluster
        // 19/04/01 14:13:59 INFO yarn.Client: Application report for application_1548381669007_0781 (state: ACCEPTED)
        // 19/04/01 14:13:59 INFO yarn.Client: Application report for application_1548381669007_0781 (state: RUNNING)
        boolean isIndexSparkOwn2Log = line.indexOf("INFO yarn.Client: Application report for ".toLowerCase()) != -1;
        boolean isIndexSparkRunningLog = line.indexOf("(state: RUNNING)".toLowerCase()) != -1;

        if (isIndexSparkOwnLog || (isIndexSparkOwn2Log && isIndexSparkRunningLog)) {
            if (isIndexSparkOwnLog && false == isCluster) {
                int idx = line.indexOf("INFO impl.YarnClientImpl: Submitted application ".toLowerCase());
                applicationId = line.substring(idx + "INFO impl.YarnClientImpl: Submitted application ".length());
            } else if (isIndexSparkOwn2Log && true == isCluster) {
                int idx = line.indexOf("INFO yarn.Client: Application report for ".toLowerCase());
                applicationId = line.substring(idx + "INFO yarn.Client: Application report for ".length());
                if (line.indexOf("(state: RUNNING)".toLowerCase()) != -1) {
                    applicationId = applicationId.replace(" (state: RUNNING)".toLowerCase(), "");
                }
            }
        }

        if (applicationId != null && applicationId.startsWith("application_")) {
            System.out.println("====================================Index of applicationId:" + applicationId);
            System.out.println("====================================Index of applicationId:Complete ...");
        }

        return applicationId;
    }
}

SubmitSpark類定義:

該類使用Porcess來處理腳本,經過獲取Process對象的stdin,stderr過濾applicationId,經過Process.waitFro(tiimeout,TimeUnit)來控制最大容許等待時間。

class SubmitSpark {
    public String submit(String filePath, long timeoutMinutes, String charsetName) {
        String applicatioId = null;

        String command = filePath;
        boolean isCluster = false;
        BufferedReader bufferedReader = null;
        try {
            bufferedReader = new BufferedReader(new InputStreamReader(new FileInputStream(filePath), charsetName));
            String line = null;
            while ((line = bufferedReader.readLine()) != null) {
                if (line.replace("  ", " ").toLowerCase().indexOf("--deploy-mode cluster") != -1) {
                    isCluster = true;
                    break;
                }
            }
        } catch (UnsupportedEncodingException e) {
            e.printStackTrace();
        } catch (FileNotFoundException e) {
            e.printStackTrace();
        } catch (IOException e) {
            e.printStackTrace();
        } finally {
            if (bufferedReader != null) {
                try {
                    bufferedReader.close();
                } catch (IOException e) {
                    e.printStackTrace();
                }
            }
        }

        StringTokenizer st = new StringTokenizer(command, " ");
        String[] cmd = new String[st.countTokens()];
        int token = 0;
        while (st.hasMoreTokens()) {
            String tokenString = st.nextToken();
            cmd[token++] = tokenString;
        }

        Runtime rt = Runtime.getRuntime();
        System.out.println("Execing " + command);
        Process proc = null;

        try {
            proc = rt.exec(cmd);

            ConcurrentLinkedQueue<String> queue = new ConcurrentLinkedQueue<String>();
            ExecutorService executor = Executors.newFixedThreadPool(2);

            // 使用future存儲子線程執行後返回結果,必須在全部子線程都完成後纔可使用get();
            // 若是在這裏使用get(),會形成等待同步。
            // any output?
            Future<String> futureInput = executor.submit(new StreamFilterTask(proc.getInputStream(), queue, "input",
                    isCluster));
            // any error message?
            Future<String> futureError = executor.submit(new StreamFilterTask(proc.getErrorStream(), queue, "error",
                    isCluster));

            System.out.println(new SimpleDateFormat("yyyy-MM-dd HH:mm:ss").format(new Date())
                    + ",開始proc.waitFor(timeoutMinutes, TimeUnit.MINUTES);");
            // any error???
            boolean exitVal = proc.waitFor(timeoutMinutes, TimeUnit.MINUTES);
            System.out.println("exitVal:" + exitVal);
            proc.destroyForcibly();
            System.out.println("proc.isAlive():" + proc.isAlive());
            System.out.println(new SimpleDateFormat("yyyy-MM-dd HH:mm:ss").format(new Date())
                    + ",結束proc.waitFor(timeoutMinutes, TimeUnit.MINUTES);");

            // applicationId無論--deploy-mode是cluster,仍是client方式,applicationId信息都從getErrorStream中讀取出來,所以只要能提交成功,就返回,除非資源不足,一直到超時失敗爲止。
            System.out.println(new SimpleDateFormat("yyyy-MM-dd HH:mm:ss").format(new Date()) + ",開始獲取applicatioId = futureError.get();:");
            if (futureError.get() != null) {
                applicatioId = futureError.get();
            }    
            System.out.println(new SimpleDateFormat("yyyy-MM-dd HH:mm:ss").format(new Date()) + ",結束獲取applicatioId = futureError.get();:"
                    + applicatioId);
            
            // 若是是cluster方式,會阻塞,所以不能打開該段代碼
            // System.out.println(new SimpleDateFormat("yyyy-MM-dd HH:mm:ss").format(new Date()) + ",開始獲取applicatioId = futureInput.get();:");
            // if (futureInput.get() != null) {
            //      applicatioId = futureInput.get();
            // }

            // kill process進程
            // System.out.println(new SimpleDateFormat("yyyy-MM-dd HH:mm:ss").format(new Date()) + ",結束獲取applicatioId = futureInput.get();:"
            //             + applicatioId);
            // 
            // System.out.println(new SimpleDateFormat("yyyy-MM-dd HH:mm:ss").format(new Date()) + ",開始獲取process Id");
            // long pid = -1;
            // try {
            //         Class<?> clazz = Class.forName("java.lang.UNIXProcess");
            //         Field field = clazz.getDeclaredField("pid");
            //         field.setAccessible(true);
            //         pid = (Integer) field.get(proc);
            // } catch (Throwable e) {
            //         e.printStackTrace();
            // }
            // System.out.println(new SimpleDateFormat("yyyy-MM-dd HH:mm:ss").format(new Date()) + ",結束獲取process Id:"
            //             + pid);
            // 
            // System.out.println("proc.isAlive():" + proc.isAlive());
            // String[] killCmd = { "sh", "-c", "kill -9 " + pid };
            // Runtime.getRuntime().exec(killCmd).waitFor();
            
            System.out.println("Complete:" + applicatioId);
        } catch (Throwable t) {
            t.printStackTrace();
        } finally {
            System.out.println(new SimpleDateFormat("yyyy-MM-dd HH:mm:ss").format(new Date())
                    + ",開始獲取if (proc != null && proc.isAlive())");
            if (proc != null && proc.isAlive()) {
                proc.destroyForcibly();
                System.out.println("proc.isAlive():" + proc.isAlive());
            }
            System.out.println(new SimpleDateFormat("yyyy-MM-dd HH:mm:ss").format(new Date())
                    + ",結束獲取if (proc != null && proc.isAlive())");
        }

        return applicatioId;
    }
}

注意:

1)過時時間不能過短,過短會形成結果:任務還未提交到yarn就結束,致使任務提交還未提交就被結束問題,沒法返回applicationId。

2)上邊的SparkSubmit函數即便返回了applicatioId後,java -cp test.jar com.dx.test.Submit該java執行spark-submit.sh shell的程序都不會退出,實際上process的stdin,stderr還在打開中;

3)即便打開上邊的kill process的代碼,stdin,stderr被關閉依然沒法讓java -cp test.jar com.dx.test.Submit程序退出。打開kill process代碼吧process對象給關閉後,(只要已經將spark任務提交到了yarn上)程序會catch到stdin,stderr的錯誤(在人爲關閉java執行shell提交spark程序以前,yarn client方式式yarn上的spark程序也不會退出,yarn cluster一旦提交到yarn關閉java程序也沒法關閉yarn上的spark程序)但yarn上的spark程序不會被關閉。所以,這個process代碼無關緊要。

測試:

package com.dx.test

public class Submit {
    public static void main(String[] args) {
        String filePath = "./submit_test.sh";
        String charsetName = "utf-8";
        long timeoutMinutes = 5;

        SubmitSpark submitSpark = new SubmitSpark();
        String applicationId = submitSpark.submit(filePath, timeoutMinutes, charsetName);

        System.out.println("return the applicationId:" + applicationId);
    }
}

超時時間設置爲2minutes

  • yarn --deploy-mode client時,執行會出現如下問題:此時超時時間設置爲2 minutes
19/04/02 10:54:49 INFO client.RMProxy: Connecting to ResourceManager at vm10.60.0.11.com.cn/10.60.0.11:8032
[Opened /usr/java/jdk1.8.0_152/jre/lib/jce.jar]
exitVal:false
proc.isAlive():false
2019-04-02 10:56:38,結束proc.waitFor(timeoutMinutes, TimeUnit.MINUTES);
2019-04-02 10:56:38,開始獲取applicationId:
java.io.IOException: Stream closed
        at java.io.BufferedInputStream.getBufIfOpen(BufferedInputStream.java:170)
        at java.io.BufferedInputStream.read1(BufferedInputStream.java:283)
        at java.io.BufferedInputStream.read(BufferedInputStream.java:345)
        at sun.nio.cs.StreamDecoder.readBytes(StreamDecoder.java:284)
        at sun.nio.cs.StreamDecoder.implRead(StreamDecoder.java:326)
        at sun.nio.cs.StreamDecoder.read(StreamDecoder.java:178)
        at java.io.InputStreamReader.read(InputStreamReader.java:184)
        at java.io.BufferedReader.fill(BufferedReader.java:161)
        at java.io.BufferedReader.readLine(BufferedReader.java:324)
        at java.io.BufferedReader.readLine(BufferedReader.java:389)
        at com.dx.test.StreamFilterTask.call(Submit.java:148)
        at com.dx.test.StreamFilterTask.call(Submit.java:1)
        at java.util.concurrent.FutureTask.run(FutureTask.java:266)
        at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
        at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
        at java.lang.Thread.run(Thread.java:748)
java.io.IOException: Stream closed
        at java.io.BufferedInputStream.getBufIfOpen(BufferedInputStream.java:170)
        at java.io.BufferedInputStream.read1(BufferedInputStream.java:283)
        at java.io.BufferedInputStream.read(BufferedInputStream.java:345)
        at sun.nio.cs.StreamDecoder.readBytes(StreamDecoder.java:284)
        at sun.nio.cs.StreamDecoder.implRead(StreamDecoder.java:326)
        at sun.nio.cs.StreamDecoder.read(StreamDecoder.java:178)
        at java.io.InputStreamReader.read(InputStreamReader.java:184)
        at java.io.BufferedReader.fill(BufferedReader.java:161)
        at java.io.BufferedReader.readLine(BufferedReader.java:324)
        at java.io.BufferedReader.readLine(BufferedReader.java:389)
        at com.dx.test.StreamFilterTask.call(Submit.java:148)
        at com.dx.test.StreamFilterTask.call(Submit.java:1)
        at java.util.concurrent.FutureTask.run(FutureTask.java:266)
        at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
        at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
        at java.lang.Thread.run(Thread.java:748)
2019-04-02 10:56:52,結束獲取applicationId:null
2019-04-02 10:56:52,開始獲取process Id
2019-04-02 10:56:52,結束獲取process Id:13994
proc.isAlive():false
Complete:null
2019-04-02 10:56:52,開始獲取if (proc != null && proc.isAlive())
2019-04-02 10:56:52,結束獲取if (proc != null && proc.isAlive())
return the applicationId:null

備註:上邊這個測試時打開了kill proces那段代碼的狀況下,實際不打開kill proces這段代碼測試結果也同樣。

1)獲取不到applicationId,可是此時程序有可能已經被提交到yarn上【但此次測試打印結果能夠看到,任務還未被提交到yarn就結束了】。
2)此時窗口處於阻塞狀態,CTRL+c結束java -cp ./test.jar com.dx.test.Submit執行,此時yarn上的spark程序會被關閉。

  • yarn --deploy-mode cluster時,執行會出現如下問題:此時超時時間設置爲2 minutes
19/04/02 10:57:00 INFO yarn.Client: Uploading resource file:/home1/boco/duanjiajun/sparkjars/bcprov-jdk15on-1.52.jar -> hdfs://vm10.60.0.11.com.cn:8020/user/boco/.sparkStaging/application_1548381669007_0816/bcprov-jdk15on-1.52.jar
exitVal:false
proc.isAlive():false
2019-04-02 10:57:01,結束proc.waitFor(timeoutMinutes, TimeUnit.MINUTES);
2019-04-02 10:57:01,開始獲取applicationId:
java.io.IOException: Stream closed
        at java.io.BufferedInputStream.getBufIfOpen(BufferedInputStream.java:170)
        at java.io.BufferedInputStream.read1(BufferedInputStream.java:283)
        at java.io.BufferedInputStream.read(BufferedInputStream.java:345)
        at sun.nio.cs.StreamDecoder.readBytes(StreamDecoder.java:284)
        at sun.nio.cs.StreamDecoder.implRead(StreamDecoder.java:326)
        at sun.nio.cs.StreamDecoder.read(StreamDecoder.java:178)
        at java.io.InputStreamReader.read(InputStreamReader.java:184)
        at java.io.BufferedReader.fill(BufferedReader.java:161)
        at java.io.BufferedReader.readLine(BufferedReader.java:324)
        at java.io.BufferedReader.readLine(BufferedReader.java:389)
        at com.dx.test.StreamFilterTask.call(Submit.java:148)
        at com.dx.test.StreamFilterTask.call(Submit.java:1)
        at java.util.concurrent.FutureTask.run(FutureTask.java:266)
        at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
        at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
        at java.lang.Thread.run(Thread.java:748)
2019-04-02 10:57:01,結束獲取applicationId:null
2019-04-02 10:57:01,開始獲取process Id
2019-04-02 10:57:01,結束獲取process Id:14359
proc.isAlive():false
Complete:null
2019-04-02 10:57:01,開始獲取if (proc != null && proc.isAlive())
2019-04-02 10:57:01,結束獲取if (proc != null && proc.isAlive())
return the applicationId:null

備註:上邊這個測試時打開了kill proces那段代碼的狀況下,實際不打開kill proces這段代碼測試結果也同樣。

1)獲取不到applicationId,且此時程序有可能已經被提交到yarn上【但此次測試打印結果能夠看到,任務還未被提交到yarn就結束了】。
2)此時窗口處於阻塞狀態,CTRL+c結束java -cp ./test.jar com.dx.test.Submit執行,若是已經將spark任務提交到yarn上去了,此時yarn上的spark程序不會被關閉。

設置超時時間爲5 minutes

  • --deploy-mode cluster方式,設置超時時間爲5 minutes
19/04/02 11:40:30 INFO yarn.Client: Application report for application_1548381669007_0828 (state: ACCEPTED)
19/04/02 11:40:31 INFO yarn.Client: Application report for application_1548381669007_0828 (state: ACCEPTED)
19/04/02 11:40:32 INFO yarn.Client: Application report for application_1548381669007_0828 (state: RUNNING)
====================================Index of applicationId:application_1548381669007_0828
====================================Index of applicationId:Complete ...
exitVal:false
proc.isAlive():true
2019-04-02 11:42:59,結束proc.waitFor(timeoutMinutes, TimeUnit.MINUTES);
2019-04-02 11:42:59,開始獲取applicatioId = futureError.get();:
2019-04-02 11:42:59,結束獲取applicatioId = futureError.get();:application_1548381669007_0828
Complete:application_1548381669007_0828
2019-04-02 11:42:59,開始獲取if (proc != null && proc.isAlive())
2019-04-02 11:42:59,結束獲取if (proc != null && proc.isAlive())
return the applicationId:application_1548381669007_0828
^Cbash-4.1$ 

此時手動結束進程,不會終止yarn上的spark程序

  • --deploy-mode client方式,設置超時時間爲5 minutes
19/04/02 11:40:24 INFO impl.YarnClientImpl: Submitted application application_1548381669007_0829
====================================Index of applicationId:application_1548381669007_0829
====================================Index of applicationId:Complete ...
the value is :86
root
 |-- timestamp: timestamp (nullable = true)
 |-- value: long (nullable = true)
 |-- int_id: long (nullable = true)

root
 |-- int_id: string (nullable = false)
 |-- job_result: string (nullable = true)

Query started: a82ad759-8b14-4d58-93a3-8bed7617dd9c
-------------------------------------------
Batch: 0
-------------------------------------------
listener...application_1548381669007_0829
+------+----------+
|int_id|job_result|
+------+----------+
|     0|      null|
|     1|      1,86|
|     2|      2,86|
|     3|      3,86|
|     4|      4,86|
|     5|      5,86|
|     6|      6,86|
|     7|      7,86|
|     8|      8,86|
|     9|      9,86|
|    10|     10,86|
|    11|      null|
|    12|      null|
|    13|      null|
|    14|      null|
|     0|      null|
|     1|      1,86|
|     2|      2,86|
|     3|      3,86|
|     4|      4,86|
+------+----------+
only showing top 20 rows
。。。
listener...application_1548381669007_0829
Query made progress: {
  "id" : "a82ad759-8b14-4d58-93a3-8bed7617dd9c",
  "runId" : "a53447f1-056e-4d84-b27e-7007829bc1e2",
  "name" : null,
  "timestamp" : "2019-04-02T03:43:10.001Z",
  "batchId" : 9,
  "numInputRows" : 1000,
  "inputRowsPerSecond" : 100.0,
  "processedRowsPerSecond" : 1584.7860538827258,
  "durationMs" : {
    "addBatch" : 417,
    "getBatch" : 21,
    "getOffset" : 0,
    "queryPlanning" : 38,
    "triggerExecution" : 631,
    "walCommit" : 154
  },
  "stateOperators" : [ ],
  "sources" : [ {
    "description" : "RateSource[rowsPerSecond=100, rampUpTimeSeconds=0, numPartitions=64]",
    "startOffset" : 107,
    "endOffset" : 117,
    "numInputRows" : 1000,
    "inputRowsPerSecond" : 100.0,
    "processedRowsPerSecond" : 1584.7860538827258
  } ],
  "sink" : {
    "description" : "org.apache.spark.sql.execution.streaming.ConsoleSinkProvider@58975f19"
  }
}
the value is :83
Trigger    accumulator value: 10
Load count accumulator value: 11
exitVal:false
proc.isAlive():false
2019-04-02 11:43:19,結束proc.waitFor(timeoutMinutes, TimeUnit.MINUTES);
2019-04-02 11:43:19,開始獲取applicatioId = futureError.get();:
2019-04-02 11:43:19,結束獲取applicatioId = futureError.get();:application_1548381669007_0829
Complete:application_1548381669007_0829
2019-04-02 11:43:19,開始獲取if (proc != null && proc.isAlive())
2019-04-02 11:43:19,結束獲取if (proc != null && proc.isAlive())
return the applicationId:application_1548381669007_0829
java.io.IOException: Stream closed
        at java.io.BufferedInputStream.getBufIfOpen(BufferedInputStream.java:170)
        at java.io.BufferedInputStream.read1(BufferedInputStream.java:283)
        at java.io.BufferedInputStream.read(BufferedInputStream.java:345)
        at sun.nio.cs.StreamDecoder.readBytes(StreamDecoder.java:284)
        at sun.nio.cs.StreamDecoder.implRead(StreamDecoder.java:326)
        at sun.nio.cs.StreamDecoder.read(StreamDecoder.java:178)
        at java.io.InputStreamReader.read(InputStreamReader.java:184)
        at java.io.BufferedReader.fill(BufferedReader.java:161)
        at java.io.BufferedReader.readLine(BufferedReader.java:324)
        at java.io.BufferedReader.readLine(BufferedReader.java:389)
        at com.dx.test.StreamFilterTask.call(Submit.java:162)
        at com.dx.test.StreamFilterTask.call(Submit.java:1)
        at java.util.concurrent.FutureTask.run(FutureTask.java:266)
        at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
        at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
        at java.lang.Thread.run(Thread.java:748)
#這裏這個錯誤被程序catch的錯誤,打印出來的錯誤,不會致使程序中心。
^Cbash-4.1$ 

此時手動結束進程,將會終止yarn上的spark程序

 

參考《JAVA調用Shell腳本--及阻塞的解決辦法

相關文章
相關標籤/搜索