本博客內容基於Spark2.2版本,在閱讀文章並想實際操做前,請確保你有:java
- 一臺配置好Spark和yarn的服務器
- 支持正常
spark-submit --master yarn xxxx
的任務提交
老版本
老版本任務提交是基於啓動本地進程,執行腳本spark-submit xxx
** 的方式作的。其中一個關鍵的問題就是得到提交Spark任務的Application-id,由於這個id是跟任務狀態的跟蹤有關係的。若是你的資源管理框架用的是yarn,應該知道每一個運行的任務都有一個applicaiton_id,這個id的生成規則是:node
appplication_時間戳_數字
老版本的spark經過修改SparkConf參數spark.app.id
就能夠手動指定id,新版本的代碼是直接讀取的taskBackend中的applicationId()方法,這個方法具體的實現是根據實現類來定的。在yarn中,是經過Yarn的YarnClusterSchedulerBackend實現的。sql
感興趣的同窗能夠看一下,生成applicaiton_id的邏輯在hadoop-yarn工程的ContainerId中定義。數據庫
總結一句話就是,想要自定義id,甭想了!!!!apache
因而當時腦殼瓜不靈光的我,就想到那就等應用建立好了以後,直接寫到數據庫裏面唄。怎麼寫呢?服務器
- 我事先生成一個自定義的id,當作參數傳遞到spark應用裏面;
- 等spark初始化後,就能夠經過sparkContext取得對應的application_id以及url
- 而後再driver鏈接數據庫,插入一條關聯關係
新版本
仍是歸結於互聯網時代的信息大爆炸,我看到羣友的聊天,知道了SparkLauncer這個東西,調查後發現他能夠基於Java代碼自動提交Spark任務。SparkLauncher支持兩種模式:app
- new SparkLauncher().launch() 直接啓動一個Process,效果跟之前同樣
- new SparkLauncher().startApplicaiton(監聽器) 返回一個SparkAppHandler,並(可選)傳入一個監聽器
固然是更傾向於第二種啦,由於好處不少:框架
- 自帶輸出重定向(Output,Error都有,支持寫到文件裏面),超級爽的功能
- 能夠自定義監聽器,當信息或者狀態變動時,都能進行操做(對我沒啥用)
- 返回的SparkAppHandler支持 暫停、中止、斷連、得到AppId、得到State等多種功能,我就想要這個!!!!
一步一步,代碼展現
首先建立一個最基本的Spark程序:dom
import org.apache.spark.sql.SparkSession; import java.util.ArrayList; import java.util.List; public class HelloWorld { public static void main(String[] args) throws InterruptedException { SparkSession spark = SparkSession .builder() //.master("yarn") //.appName("hello-wrold") //.config("spark.some.config.option", "some-value") .getOrCreate(); List<Person> persons = new ArrayList<>(); persons.add(new Person("zhangsan", 22, "male")); persons.add(new Person("lisi", 25, "male")); persons.add(new Person("wangwu", 23, "female")); spark.createDataFrame(persons, Person.class).show(false); spark.close(); } }
而後建立SparkLauncher類:ide
import org.apache.spark.launcher.SparkAppHandle; import org.apache.spark.launcher.SparkLauncher; import java.io.IOException; public class Launcher { public static void main(String[] args) throws IOException { SparkAppHandle handler = new SparkLauncher() .setAppName("hello-world") .setSparkHome(args[0]) .setMaster(args[1]) .setConf("spark.driver.memory", "2g") .setConf("spark.executor.memory", "1g") .setConf("spark.executor.cores", "3") .setAppResource("/home/xinghailong/launcher/launcher_test.jar")
//此處應寫類的全限定名 .setMainClass("HelloWorld") .addAppArgs("I come from Launcher") .setDeployMode("cluster") .startApplication(new SparkAppHandle.Listener(){ @Override public void stateChanged(SparkAppHandle handle) { System.out.println("********** state changed **********"); } @Override public void infoChanged(SparkAppHandle handle) { System.out.println("********** info changed **********"); } }); while(!"FINISHED".equalsIgnoreCase(handler.getState().toString()) && !"FAILED".equalsIgnoreCase(handler.getState().toString())){ System.out.println("id "+handler.getAppId()); System.out.println("state "+handler.getState()); try { Thread.sleep(10000); } catch (InterruptedException e) { e.printStackTrace(); } } } }
打包完成後上傳到部署Spark的服務器上。因爲SparkLauncher所在的類引用了SparkLauncher,因此還須要把這個jar也上傳到服務器上。
[xiangcong@hnode10 launcher]$ ls launcher_test.jar spark-launcher_2.11-2.2.0.jar [xiangcong@hnode10 launcher]$ pwd /home/xiangcong/launcher
因爲SparkLauncher須要指定SPARK_HOME,所以若是你的機器能夠執行spark-submit,那麼就看一下spark-submit裏面,SPARK_HOME是在哪
[xiangcong@hnode10 launcher]$ which spark2-submit /var/lib/hadoop-hdfs/bin/spark2-submit
最後幾行就能看到:
export SPARK2_HOME=/var/lib/hadoop-hdfs/app/spark # disable randomized hash for string in Python 3.3+ export PYTHONHASHSEED=0 exec "${SPARK2_HOME}"/bin/spark-class org.apache.spark.deploy.SparkSubmit "$@"
綜上,咱們須要的是:
- 一個自定義的Jar,裏面包含Spark應用和SparkLauncher類
- 一個SparkLauncher的jar,spark-launcher_2.11-2.2.0.jar 版本根據你本身的來就行
- 一個當前目錄的路徑
- 一個SARK_HOME環境變量指定的目錄
而後執行命令啓動測試:
java -Djava.ext.dirs=/home/xinghailong/launcher -cp launcher_test.jar Launcher /var/lib/hadoop-hdfs/app/spark yarn
說明:
-Djava.ext.dirs
設置當前目錄爲java類加載的目錄- 傳入兩個參數,一個是SPARK_HOME;一個是啓動模式
觀察發現成功啓動運行了:
id null state UNKNOWN Mar 10, 2018 12:00:52 PM org.apache.spark.launcher.OutputRedirector redirect INFO: 18/03/10 12:00:52 WARN util.NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable ********** state changed ********** ...省略一大堆拷貝jar的日誌 ********** info changed ********** ********** state changed ********** Mar 10, 2018 12:00:55 PM org.apache.spark.launcher.OutputRedirector redirect INFO: 18/03/10 12:00:55 INFO yarn.Client: Application report for application_1518263195995_37615 (state: ACCEPTED) ... 省略一堆重定向的日誌 application_1518263195995_37615 (state: ACCEPTED) id application_1518263195995_37615 state SUBMITTED Mar 10, 2018 12:01:00 PM org.apache.spark.launcher.OutputRedirector redirect INFO: 18/03/10 12:01:00 INFO yarn.Client: Application report for application_1518263195995_37615 (state: RUNNING) ********** state changed ********** ... 省略一堆重定向的日誌 INFO: user: hdfs ********** state changed ********** Mar 10, 2018 12:01:08 PM org.apache.spark.launcher.OutputRedirector redirect INFO: 18/03/10 12:01:08 INFO util.ShutdownHookManager: Shutdown hook called Mar 10, 2018 12:01:08 PM org.apache.spark.launcher.OutputRedirector redirect INFO: 18/03/10 12:01:08 INFO util.ShutdownHookManager: Deleting directory /tmp/spark-f07e0213-61fa-4710-90f5-2fd2030e0701
總結
這樣就實現了基於Java應用提交Spark任務,並得到其Appliation_id和狀態進行定位跟蹤的需求了。