以前也介紹過使用yarn api來submit spark任務,經過提交接口返回applicationId的用法,具體參考《Spark2.3(四十):如何使用java經過yarn api調度spark app,並根據appId監控任務,關閉任務,獲取任務日誌》;html
可是我更喜歡使用該篇文章中介紹的使用java來調用spark-submit.sh shell提交任務,並從spark-sbumit.sh執行界面獲取applicationId的方案。使用hadoop api方式須要配置好環境,以及根據hadoop版本不一樣,須要引入不通包。java
用java調用shell,使用sql
1 Process p=Runtime.getRuntime().exec(String[] cmd);
Runtime.exec()方法將產生一個本地的進程,並返回一個Process子類的實例,該實例可用於控制進程或取得進程的相關信息。
因爲調用Runtime.exec方法所建立的子進程沒有本身的終端或控制檯,所以該子進程的標準IO(如stdin,stdou,stderr)都經過shell
1 p.getOutputStream(), 2 p.getInputStream(), 3 p.getErrorStream()
方法重定向給它的父進程了.用戶須要用這些stream來向 子進程輸入數據或獲取子進程的輸出。
例如:Runtime.getRuntime().exec("ls")apache
由於Runtime.getRuntime().exec()要本身去處理stdout和stderr的輸出,就是說,執行的結果不知道是現有錯誤輸出(stderr),仍是現有標準輸出(stdout)。你沒法判斷到底那個先輸出,因此可能沒法讀取輸出,而一直阻塞。
例如:你先處理標準輸出(stdout),可是處理的結果是先有錯誤輸出(stderr),一直在等錯誤輸出(stderr)被取走了,纔到標準輸出(stdout),這樣就產生了阻塞。api
用兩個線程將標準輸出(stdout)和錯誤輸出(stderr)。 bash
參考代碼:app
1 import java.util.*; 2 import java.io.*; 3 4 class StreamGobbler extends Thread 5 { 6 InputStream is; 7 String type; 8 9 StreamGobbler(InputStream is, String type) 10 { 11 this.is = is; 12 this.type = type; 13 } 14 15 public void run() 16 { 17 try 18 { 19 InputStreamReader isr = new InputStreamReader(is); 20 BufferedReader br = new BufferedReader(isr); 21 String line=null; 22 while ( (line = br.readLine()) != null) 23 System.out.println(type + ">" + line); 24 } catch (IOException ioe) 25 { 26 ioe.printStackTrace(); 27 } 28 } 29 } 30 31 public class ExecRunner 32 { 33 public static void main(String args[]) 34 { 35 if (args.length < 1) 36 { 37 System.out.println("USAGE: java GoodWindowsExec <cmd>"); 38 System.exit(1); 39 } 40 41 try 42 { 43 String osName = System.getProperty("os.name" ); 44 String[] cmd = new String[3]; 45 if( osName.equals( "Windows NT" ) ) 46 { 47 cmd[0] = "cmd.exe" ; 48 cmd[1] = "/C" ; 49 cmd[2] = args[0]; 50 } 51 else if( osName.equals( "Windows 95" ) ) 52 { 53 cmd[0] = "command.com" ; 54 cmd[1] = "/C" ; 55 cmd[2] = args[0]; 56 } else { 57 StringTokenizer st = new StringTokenizer(command, " "); 58 cmd = new String[st.countTokens()]; 59 int token = 0; 60 while (st.hasMoreTokens()) { 61 String tokenString = st.nextToken(); 62 // System.out.println(tokenString); 63 cmd[token++] = tokenString; 64 } 65 } 66 67 Runtime rt = Runtime.getRuntime(); 68 System.out.println("Execing " + cmd[0] + " " + cmd[1] 69 + " " + cmd[2]); 70 Process proc = rt.exec(cmd); 71 // any error message? 72 StreamGobbler errorGobbler = new 73 StreamGobbler(proc.getErrorStream(), "ERROR"); 74 75 // any output? 76 StreamGobbler outputGobbler = new 77 StreamGobbler(proc.getInputStream(), "OUTPUT"); 78 79 // kick them off 80 errorGobbler.start(); 81 outputGobbler.start(); 82 83 // any error??? 84 int exitVal = proc.waitFor(); 85 System.out.println("ExitValue: " + exitVal); 86 } catch (Throwable t) 87 { 88 t.printStackTrace(); 89 } 90 } 91 }
#/bin/sh jarspath='' for file in `ls /home/dx/djj/sparkjars/*.jar` do jarspath=${file},$jarspath done jarspath=${jarspath%?} echo $jarspath /home1/opt/cloudera/parcels/SPARK2-2.3.0.cloudera3-1.cdh5.13.3.p0.458809/lib/spark2/bin/spark-submit \ --master yarn \ --deploy-mode cluster \ --class com.dx.test.BroadcastTest \ --properties-file ./conf/spark-properties-mrs.conf \ --jars $jarspath \ --num-executors 10 \ --executor-memory 3G \ --executor-cores 1 \ --driver-memory 2G \ --driver-java-options "-XX:+TraceClassPaths" \ ./test.jar $1 $2 $3 $4
注意:yarn的提交方式測試時,須要修改--deploy-mode參數:ide
cluster方式:--deploy-mode cluster \函數
client 方式:--deploy-mode client \
咱們若是須要從spark-submit中獲取到applicationId,就須要從spark-submit執行打印結果(也就是Process對象的標準輸出、錯誤輸出)過濾出applicationId,若是用過spark-submit.sh提交spark任務的話,你會發現執行時,在打印界面上會輸出applicationId。
19/04/02 11:38:29 INFO handler.ContextHandler: Started o.s.j.s.ServletContextHandler@215a34b4{/static,null,AVAILABLE,@Spark} 19/04/02 11:38:29 INFO handler.ContextHandler: Started o.s.j.s.ServletContextHandler@2e380628{/,null,AVAILABLE,@Spark} 19/04/02 11:38:29 INFO handler.ContextHandler: Started o.s.j.s.ServletContextHandler@1eaf1e62{/api,null,AVAILABLE,@Spark} 19/04/02 11:38:29 INFO handler.ContextHandler: Started o.s.j.s.ServletContextHandler@652ab8d9{/jobs/job/kill,null,AVAILABLE,@Spark} 19/04/02 11:38:29 INFO handler.ContextHandler: Started o.s.j.s.ServletContextHandler@51e0301d{/stages/stage/kill,null,AVAILABLE,@Spark} 19/04/02 11:38:31 INFO client.RMProxy: Connecting to ResourceManager at vm10.60.0.11.com.cn/10.60.0.11:8032 [Opened /usr/java/jdk1.8.0_152/jre/lib/jce.jar] [Opened /usr/java/jdk1.8.0_152/jre/lib/charsets.jar] 19/04/02 11:40:24 INFO impl.YarnClientImpl: Submitted application application_1548381669007_0829
19/04/02 11:40:22 INFO yarn.Client: Application report for application_1548381669007_0828 (state: ACCEPTED) 19/04/02 11:40:23 INFO yarn.Client: Application report for application_1548381669007_0828 (state: ACCEPTED) 19/04/02 11:40:24 INFO yarn.Client: Application report for application_1548381669007_0828 (state: ACCEPTED) 19/04/02 11:40:25 INFO yarn.Client: Application report for application_1548381669007_0828 (state: ACCEPTED) 19/04/02 11:40:26 INFO yarn.Client: Application report for application_1548381669007_0828 (state: ACCEPTED) 19/04/02 11:40:27 INFO yarn.Client: Application report for application_1548381669007_0828 (state: ACCEPTED) 19/04/02 11:40:28 INFO yarn.Client: Application report for application_1548381669007_0828 (state: ACCEPTED) 19/04/02 11:40:29 INFO yarn.Client: Application report for application_1548381669007_0828 (state: ACCEPTED) 19/04/02 11:40:30 INFO yarn.Client: Application report for application_1548381669007_0828 (state: ACCEPTED) 19/04/02 11:40:31 INFO yarn.Client: Application report for application_1548381669007_0828 (state: ACCEPTED) 19/04/02 11:40:32 INFO yarn.Client: Application report for application_1548381669007_0828 (state: RUNNING) 19/04/02 11:40:33 INFO yarn.Client: Application report for application_1548381669007_0828 (state: RUNNING)
/** * @param line stdin,stderr的一行信息。 * */ private String filterApplicationId(String line, boolean isCluster) { String applicationId = null; line = line.toLowerCase(); // --deploy-mode client // 19/02/15 17:43:35 INFO impl.YarnClientImpl: Submitted application application_1548381669007_0051 // 19/04/01 14:13:57 INFO impl.YarnClientImpl: Submitted application application_1548381669007_0781 boolean isIndexSparkOwnLog = line.indexOf("INFO impl.YarnClientImpl: Submitted application ".toLowerCase()) != -1; // --deploy-mode cluster // 19/04/01 14:13:59 INFO yarn.Client: Application report for application_1548381669007_0781 (state: ACCEPTED) // 19/04/01 14:13:59 INFO yarn.Client: Application report for application_1548381669007_0781 (state: RUNNING) boolean isIndexSparkOwn2Log = line.indexOf("INFO yarn.Client: Application report for ".toLowerCase()) != -1; boolean isIndexSparkRunningLog = line.indexOf("(state: RUNNING)".toLowerCase()) != -1; if (isIndexSparkOwnLog || (isIndexSparkOwn2Log && isIndexSparkRunningLog)) { if (isIndexSparkOwnLog && false == isCluster) { int idx = line.indexOf("INFO impl.YarnClientImpl: Submitted application ".toLowerCase()); applicationId = line.substring(idx + "INFO impl.YarnClientImpl: Submitted application ".length()); } else if (isIndexSparkOwn2Log && true == isCluster) { int idx = line.indexOf("INFO yarn.Client: Application report for ".toLowerCase()); applicationId = line.substring(idx + "INFO yarn.Client: Application report for ".length()); if (line.indexOf("(state: RUNNING)".toLowerCase()) != -1) { applicationId = applicationId.replace(" (state: RUNNING)".toLowerCase(), ""); } } } if (applicationId != null && applicationId.startsWith("application_")) { System.out.println("====================================Index of applicationId:" + applicationId); System.out.println("====================================Index of applicationId:Complete ..."); } return applicationId; }
若是過濾成功,就反回applicationId,過濾不到返回null。
class StreamFilterTask implements Callable<String> { private InputStream inputStream; private ConcurrentLinkedQueue<String> queue; private String streamType = null; private boolean isCluster; private StreamFilterTask() { } public StreamFilterTask(InputStream inputStream, ConcurrentLinkedQueue<String> queue, String streamType, boolean isCluster) { this.inputStream = inputStream; this.queue = queue; this.streamType = streamType; this.isCluster = isCluster; } @Override public String call() throws Exception { BufferedReader br = null; try { br = new BufferedReader(new InputStreamReader(inputStream)); String line = null; while ((line = br.readLine()) != null) { System.out.println(line); // 維護隊列爲最近1000條記錄,超過就刪除。 // size() 是要遍歷一遍集合的,因此儘可能要避免用size而改用isEmpty(). if (this.streamType.equalsIgnoreCase("error")) { if (queue.size() > 1000) { // 檢索並刪除此隊列的頭,若是此隊列爲空,則返回空值。 queue.poll(); } // 在該隊列的尾部插入指定的元素。因爲隊列未綁定,所以此方法永遠不會返回false。 queue.offer(line); } String applicationId = filterApplicationId(line, isCluster); if (applicationId != null && applicationId.startsWith("application_")) { return applicationId; } } } catch (IOException ioe) { ioe.printStackTrace(); } finally { if (br != null) { try { br.close(); } catch (Exception e) { e.printStackTrace(); } } } return null; } /** * @param line stdin,stderr的一行信息。 * */ private String filterApplicationId(String line, boolean isCluster) { String applicationId = null; line = line.toLowerCase(); // --deploy-mode client // 19/02/15 17:43:35 INFO impl.YarnClientImpl: Submitted application application_1548381669007_0051 // 19/04/01 14:13:57 INFO impl.YarnClientImpl: Submitted application application_1548381669007_0781 boolean isIndexSparkOwnLog = line.indexOf("INFO impl.YarnClientImpl: Submitted application ".toLowerCase()) != -1; // --deploy-mode cluster // 19/04/01 14:13:59 INFO yarn.Client: Application report for application_1548381669007_0781 (state: ACCEPTED) // 19/04/01 14:13:59 INFO yarn.Client: Application report for application_1548381669007_0781 (state: RUNNING) boolean isIndexSparkOwn2Log = line.indexOf("INFO yarn.Client: Application report for ".toLowerCase()) != -1; boolean isIndexSparkRunningLog = line.indexOf("(state: RUNNING)".toLowerCase()) != -1; if (isIndexSparkOwnLog || (isIndexSparkOwn2Log && isIndexSparkRunningLog)) { if (isIndexSparkOwnLog && false == isCluster) { int idx = line.indexOf("INFO impl.YarnClientImpl: Submitted application ".toLowerCase()); applicationId = line.substring(idx + "INFO impl.YarnClientImpl: Submitted application ".length()); } else if (isIndexSparkOwn2Log && true == isCluster) { int idx = line.indexOf("INFO yarn.Client: Application report for ".toLowerCase()); applicationId = line.substring(idx + "INFO yarn.Client: Application report for ".length()); if (line.indexOf("(state: RUNNING)".toLowerCase()) != -1) { applicationId = applicationId.replace(" (state: RUNNING)".toLowerCase(), ""); } } } if (applicationId != null && applicationId.startsWith("application_")) { System.out.println("====================================Index of applicationId:" + applicationId); System.out.println("====================================Index of applicationId:Complete ..."); } return applicationId; } }
該類使用Porcess來處理腳本,經過獲取Process對象的stdin,stderr過濾applicationId,經過Process.waitFro(tiimeout,TimeUnit)來控制最大容許等待時間。
class SubmitSpark { public String submit(String filePath, long timeoutMinutes, String charsetName) { String applicatioId = null; String command = filePath; boolean isCluster = false; BufferedReader bufferedReader = null; try { bufferedReader = new BufferedReader(new InputStreamReader(new FileInputStream(filePath), charsetName)); String line = null; while ((line = bufferedReader.readLine()) != null) { if (line.replace(" ", " ").toLowerCase().indexOf("--deploy-mode cluster") != -1) { isCluster = true; break; } } } catch (UnsupportedEncodingException e) { e.printStackTrace(); } catch (FileNotFoundException e) { e.printStackTrace(); } catch (IOException e) { e.printStackTrace(); } finally { if (bufferedReader != null) { try { bufferedReader.close(); } catch (IOException e) { e.printStackTrace(); } } } StringTokenizer st = new StringTokenizer(command, " "); String[] cmd = new String[st.countTokens()]; int token = 0; while (st.hasMoreTokens()) { String tokenString = st.nextToken(); cmd[token++] = tokenString; } Runtime rt = Runtime.getRuntime(); System.out.println("Execing " + command); Process proc = null; try { proc = rt.exec(cmd); ConcurrentLinkedQueue<String> queue = new ConcurrentLinkedQueue<String>(); ExecutorService executor = Executors.newFixedThreadPool(2); // 使用future存儲子線程執行後返回結果,必須在全部子線程都完成後纔可使用get(); // 若是在這裏使用get(),會形成等待同步。 // any output? Future<String> futureInput = executor.submit(new StreamFilterTask(proc.getInputStream(), queue, "input", isCluster)); // any error message? Future<String> futureError = executor.submit(new StreamFilterTask(proc.getErrorStream(), queue, "error", isCluster)); System.out.println(new SimpleDateFormat("yyyy-MM-dd HH:mm:ss").format(new Date()) + ",開始proc.waitFor(timeoutMinutes, TimeUnit.MINUTES);"); // any error??? boolean exitVal = proc.waitFor(timeoutMinutes, TimeUnit.MINUTES); System.out.println("exitVal:" + exitVal); proc.destroyForcibly(); System.out.println("proc.isAlive():" + proc.isAlive()); System.out.println(new SimpleDateFormat("yyyy-MM-dd HH:mm:ss").format(new Date()) + ",結束proc.waitFor(timeoutMinutes, TimeUnit.MINUTES);"); // applicationId無論--deploy-mode是cluster,仍是client方式,applicationId信息都從getErrorStream中讀取出來,所以只要能提交成功,就返回,除非資源不足,一直到超時失敗爲止。 System.out.println(new SimpleDateFormat("yyyy-MM-dd HH:mm:ss").format(new Date()) + ",開始獲取applicatioId = futureError.get();:"); if (futureError.get() != null) { applicatioId = futureError.get(); } System.out.println(new SimpleDateFormat("yyyy-MM-dd HH:mm:ss").format(new Date()) + ",結束獲取applicatioId = futureError.get();:" + applicatioId); // 若是是cluster方式,會阻塞,所以不能打開該段代碼 // System.out.println(new SimpleDateFormat("yyyy-MM-dd HH:mm:ss").format(new Date()) + ",開始獲取applicatioId = futureInput.get();:"); // if (futureInput.get() != null) { // applicatioId = futureInput.get(); // } // kill process進程 // System.out.println(new SimpleDateFormat("yyyy-MM-dd HH:mm:ss").format(new Date()) + ",結束獲取applicatioId = futureInput.get();:" // + applicatioId); // // System.out.println(new SimpleDateFormat("yyyy-MM-dd HH:mm:ss").format(new Date()) + ",開始獲取process Id"); // long pid = -1; // try { // Class<?> clazz = Class.forName("java.lang.UNIXProcess"); // Field field = clazz.getDeclaredField("pid"); // field.setAccessible(true); // pid = (Integer) field.get(proc); // } catch (Throwable e) { // e.printStackTrace(); // } // System.out.println(new SimpleDateFormat("yyyy-MM-dd HH:mm:ss").format(new Date()) + ",結束獲取process Id:" // + pid); // // System.out.println("proc.isAlive():" + proc.isAlive()); // String[] killCmd = { "sh", "-c", "kill -9 " + pid }; // Runtime.getRuntime().exec(killCmd).waitFor(); System.out.println("Complete:" + applicatioId); } catch (Throwable t) { t.printStackTrace(); } finally { System.out.println(new SimpleDateFormat("yyyy-MM-dd HH:mm:ss").format(new Date()) + ",開始獲取if (proc != null && proc.isAlive())"); if (proc != null && proc.isAlive()) { proc.destroyForcibly(); System.out.println("proc.isAlive():" + proc.isAlive()); } System.out.println(new SimpleDateFormat("yyyy-MM-dd HH:mm:ss").format(new Date()) + ",結束獲取if (proc != null && proc.isAlive())"); } return applicatioId; } }
注意:
1)過時時間不能過短,過短會形成結果:任務還未提交到yarn就結束,致使任務提交還未提交就被結束問題,沒法返回applicationId。
2)上邊的SparkSubmit函數即便返回了applicatioId後,java -cp test.jar com.dx.test.Submit該java執行spark-submit.sh shell的程序都不會退出,實際上process的stdin,stderr還在打開中;
3)即便打開上邊的kill process的代碼,stdin,stderr被關閉依然沒法讓java -cp test.jar com.dx.test.Submit程序退出。打開kill process代碼吧process對象給關閉後,(只要已經將spark任務提交到了yarn上)程序會catch到stdin,stderr的錯誤(在人爲關閉java執行shell提交spark程序以前,yarn client方式式yarn上的spark程序也不會退出,yarn cluster一旦提交到yarn關閉java程序也沒法關閉yarn上的spark程序)但yarn上的spark程序不會被關閉。所以,這個process代碼無關緊要。
package com.dx.test public class Submit { public static void main(String[] args) { String filePath = "./submit_test.sh"; String charsetName = "utf-8"; long timeoutMinutes = 5; SubmitSpark submitSpark = new SubmitSpark(); String applicationId = submitSpark.submit(filePath, timeoutMinutes, charsetName); System.out.println("return the applicationId:" + applicationId); } }
19/04/02 10:54:49 INFO client.RMProxy: Connecting to ResourceManager at vm10.60.0.11.com.cn/10.60.0.11:8032 [Opened /usr/java/jdk1.8.0_152/jre/lib/jce.jar] exitVal:false proc.isAlive():false 2019-04-02 10:56:38,結束proc.waitFor(timeoutMinutes, TimeUnit.MINUTES); 2019-04-02 10:56:38,開始獲取applicationId: java.io.IOException: Stream closed at java.io.BufferedInputStream.getBufIfOpen(BufferedInputStream.java:170) at java.io.BufferedInputStream.read1(BufferedInputStream.java:283) at java.io.BufferedInputStream.read(BufferedInputStream.java:345) at sun.nio.cs.StreamDecoder.readBytes(StreamDecoder.java:284) at sun.nio.cs.StreamDecoder.implRead(StreamDecoder.java:326) at sun.nio.cs.StreamDecoder.read(StreamDecoder.java:178) at java.io.InputStreamReader.read(InputStreamReader.java:184) at java.io.BufferedReader.fill(BufferedReader.java:161) at java.io.BufferedReader.readLine(BufferedReader.java:324) at java.io.BufferedReader.readLine(BufferedReader.java:389) at com.dx.test.StreamFilterTask.call(Submit.java:148) at com.dx.test.StreamFilterTask.call(Submit.java:1) at java.util.concurrent.FutureTask.run(FutureTask.java:266) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) at java.lang.Thread.run(Thread.java:748) java.io.IOException: Stream closed at java.io.BufferedInputStream.getBufIfOpen(BufferedInputStream.java:170) at java.io.BufferedInputStream.read1(BufferedInputStream.java:283) at java.io.BufferedInputStream.read(BufferedInputStream.java:345) at sun.nio.cs.StreamDecoder.readBytes(StreamDecoder.java:284) at sun.nio.cs.StreamDecoder.implRead(StreamDecoder.java:326) at sun.nio.cs.StreamDecoder.read(StreamDecoder.java:178) at java.io.InputStreamReader.read(InputStreamReader.java:184) at java.io.BufferedReader.fill(BufferedReader.java:161) at java.io.BufferedReader.readLine(BufferedReader.java:324) at java.io.BufferedReader.readLine(BufferedReader.java:389) at com.dx.test.StreamFilterTask.call(Submit.java:148) at com.dx.test.StreamFilterTask.call(Submit.java:1) at java.util.concurrent.FutureTask.run(FutureTask.java:266) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) at java.lang.Thread.run(Thread.java:748) 2019-04-02 10:56:52,結束獲取applicationId:null 2019-04-02 10:56:52,開始獲取process Id 2019-04-02 10:56:52,結束獲取process Id:13994 proc.isAlive():false Complete:null 2019-04-02 10:56:52,開始獲取if (proc != null && proc.isAlive()) 2019-04-02 10:56:52,結束獲取if (proc != null && proc.isAlive()) return the applicationId:null
備註:上邊這個測試時打開了kill proces那段代碼的狀況下,實際不打開kill proces這段代碼測試結果也同樣。
1)獲取不到applicationId,可是此時程序有可能已經被提交到yarn上【但此次測試打印結果能夠看到,任務還未被提交到yarn就結束了】。
2)此時窗口處於阻塞狀態,CTRL+c結束java -cp ./test.jar com.dx.test.Submit執行,此時yarn上的spark程序會被關閉。
19/04/02 10:57:00 INFO yarn.Client: Uploading resource file:/home1/boco/duanjiajun/sparkjars/bcprov-jdk15on-1.52.jar -> hdfs://vm10.60.0.11.com.cn:8020/user/boco/.sparkStaging/application_1548381669007_0816/bcprov-jdk15on-1.52.jar exitVal:false proc.isAlive():false 2019-04-02 10:57:01,結束proc.waitFor(timeoutMinutes, TimeUnit.MINUTES); 2019-04-02 10:57:01,開始獲取applicationId: java.io.IOException: Stream closed at java.io.BufferedInputStream.getBufIfOpen(BufferedInputStream.java:170) at java.io.BufferedInputStream.read1(BufferedInputStream.java:283) at java.io.BufferedInputStream.read(BufferedInputStream.java:345) at sun.nio.cs.StreamDecoder.readBytes(StreamDecoder.java:284) at sun.nio.cs.StreamDecoder.implRead(StreamDecoder.java:326) at sun.nio.cs.StreamDecoder.read(StreamDecoder.java:178) at java.io.InputStreamReader.read(InputStreamReader.java:184) at java.io.BufferedReader.fill(BufferedReader.java:161) at java.io.BufferedReader.readLine(BufferedReader.java:324) at java.io.BufferedReader.readLine(BufferedReader.java:389) at com.dx.test.StreamFilterTask.call(Submit.java:148) at com.dx.test.StreamFilterTask.call(Submit.java:1) at java.util.concurrent.FutureTask.run(FutureTask.java:266) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) at java.lang.Thread.run(Thread.java:748) 2019-04-02 10:57:01,結束獲取applicationId:null 2019-04-02 10:57:01,開始獲取process Id 2019-04-02 10:57:01,結束獲取process Id:14359 proc.isAlive():false Complete:null 2019-04-02 10:57:01,開始獲取if (proc != null && proc.isAlive()) 2019-04-02 10:57:01,結束獲取if (proc != null && proc.isAlive()) return the applicationId:null
備註:上邊這個測試時打開了kill proces那段代碼的狀況下,實際不打開kill proces這段代碼測試結果也同樣。
1)獲取不到applicationId,且此時程序有可能已經被提交到yarn上【但此次測試打印結果能夠看到,任務還未被提交到yarn就結束了】。
2)此時窗口處於阻塞狀態,CTRL+c結束java -cp ./test.jar com.dx.test.Submit執行,若是已經將spark任務提交到yarn上去了,此時yarn上的spark程序不會被關閉。
19/04/02 11:40:30 INFO yarn.Client: Application report for application_1548381669007_0828 (state: ACCEPTED) 19/04/02 11:40:31 INFO yarn.Client: Application report for application_1548381669007_0828 (state: ACCEPTED) 19/04/02 11:40:32 INFO yarn.Client: Application report for application_1548381669007_0828 (state: RUNNING) ====================================Index of applicationId:application_1548381669007_0828 ====================================Index of applicationId:Complete ... exitVal:false proc.isAlive():true 2019-04-02 11:42:59,結束proc.waitFor(timeoutMinutes, TimeUnit.MINUTES); 2019-04-02 11:42:59,開始獲取applicatioId = futureError.get();: 2019-04-02 11:42:59,結束獲取applicatioId = futureError.get();:application_1548381669007_0828 Complete:application_1548381669007_0828 2019-04-02 11:42:59,開始獲取if (proc != null && proc.isAlive()) 2019-04-02 11:42:59,結束獲取if (proc != null && proc.isAlive()) return the applicationId:application_1548381669007_0828 ^Cbash-4.1$
此時手動結束進程,不會終止yarn上的spark程序
19/04/02 11:40:24 INFO impl.YarnClientImpl: Submitted application application_1548381669007_0829 ====================================Index of applicationId:application_1548381669007_0829 ====================================Index of applicationId:Complete ... the value is :86 root |-- timestamp: timestamp (nullable = true) |-- value: long (nullable = true) |-- int_id: long (nullable = true) root |-- int_id: string (nullable = false) |-- job_result: string (nullable = true) Query started: a82ad759-8b14-4d58-93a3-8bed7617dd9c ------------------------------------------- Batch: 0 ------------------------------------------- listener...application_1548381669007_0829 +------+----------+ |int_id|job_result| +------+----------+ | 0| null| | 1| 1,86| | 2| 2,86| | 3| 3,86| | 4| 4,86| | 5| 5,86| | 6| 6,86| | 7| 7,86| | 8| 8,86| | 9| 9,86| | 10| 10,86| | 11| null| | 12| null| | 13| null| | 14| null| | 0| null| | 1| 1,86| | 2| 2,86| | 3| 3,86| | 4| 4,86| +------+----------+ only showing top 20 rows 。。。 listener...application_1548381669007_0829 Query made progress: { "id" : "a82ad759-8b14-4d58-93a3-8bed7617dd9c", "runId" : "a53447f1-056e-4d84-b27e-7007829bc1e2", "name" : null, "timestamp" : "2019-04-02T03:43:10.001Z", "batchId" : 9, "numInputRows" : 1000, "inputRowsPerSecond" : 100.0, "processedRowsPerSecond" : 1584.7860538827258, "durationMs" : { "addBatch" : 417, "getBatch" : 21, "getOffset" : 0, "queryPlanning" : 38, "triggerExecution" : 631, "walCommit" : 154 }, "stateOperators" : [ ], "sources" : [ { "description" : "RateSource[rowsPerSecond=100, rampUpTimeSeconds=0, numPartitions=64]", "startOffset" : 107, "endOffset" : 117, "numInputRows" : 1000, "inputRowsPerSecond" : 100.0, "processedRowsPerSecond" : 1584.7860538827258 } ], "sink" : { "description" : "org.apache.spark.sql.execution.streaming.ConsoleSinkProvider@58975f19" } } the value is :83 Trigger accumulator value: 10 Load count accumulator value: 11 exitVal:false proc.isAlive():false 2019-04-02 11:43:19,結束proc.waitFor(timeoutMinutes, TimeUnit.MINUTES); 2019-04-02 11:43:19,開始獲取applicatioId = futureError.get();: 2019-04-02 11:43:19,結束獲取applicatioId = futureError.get();:application_1548381669007_0829 Complete:application_1548381669007_0829 2019-04-02 11:43:19,開始獲取if (proc != null && proc.isAlive()) 2019-04-02 11:43:19,結束獲取if (proc != null && proc.isAlive()) return the applicationId:application_1548381669007_0829 java.io.IOException: Stream closed at java.io.BufferedInputStream.getBufIfOpen(BufferedInputStream.java:170) at java.io.BufferedInputStream.read1(BufferedInputStream.java:283) at java.io.BufferedInputStream.read(BufferedInputStream.java:345) at sun.nio.cs.StreamDecoder.readBytes(StreamDecoder.java:284) at sun.nio.cs.StreamDecoder.implRead(StreamDecoder.java:326) at sun.nio.cs.StreamDecoder.read(StreamDecoder.java:178) at java.io.InputStreamReader.read(InputStreamReader.java:184) at java.io.BufferedReader.fill(BufferedReader.java:161) at java.io.BufferedReader.readLine(BufferedReader.java:324) at java.io.BufferedReader.readLine(BufferedReader.java:389) at com.dx.test.StreamFilterTask.call(Submit.java:162) at com.dx.test.StreamFilterTask.call(Submit.java:1) at java.util.concurrent.FutureTask.run(FutureTask.java:266) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) at java.lang.Thread.run(Thread.java:748) #這裏這個錯誤被程序catch的錯誤,打印出來的錯誤,不會致使程序中心。 ^Cbash-4.1$
此時手動結束進程,將會終止yarn上的spark程序