目前咱們的業務須要多個spark job串行一塊兒執行,每一個spark job所需的參數配置各不相同;之因此分開多個spark job,是咱們想保留每一個任務獨立執行的能力,提供獨立的服務能力,又想在任務須要時,將多個任務做爲一個執行鏈條串行執行;這裏主要介紹我在任務串行時想經過建立多個spark context來實現不一樣參數配置的實踐,資源管理是yarn。java
測試代碼sql
SparkSession sparkSession = SparkSession.builder()//.master("local[1]") .config("spark.driver.allowMultipleContexts", true) .config("spark.executor.cores", 4) .appName("Java Spark SQL basic example").getOrCreate(); sparkSession.sparkContext().stop(); SparkSession newSession = sparkSession.newSession(); newSession.conf().set("spark.executor.cores","2"); sparkSession.setDefaultSession(newSession);
結果:apache
[2018-07-11 09:39:29.879] [ERROR] [main] [org.apache.spark.deploy.yarn.ApplicationMaster] >>> Uncaught exception: org.apache.spark.SparkException: Exception thrown in awaitResult: at org.apache.spark.util.ThreadUtils$.awaitResult(ThreadUtils.scala:194) at org.apache.spark.deploy.yarn.ApplicationMaster.runDriver(ApplicationMaster.scala:401) at org.apache.spark.deploy.yarn.ApplicationMaster.run(ApplicationMaster.scala:254) at org.apache.spark.deploy.yarn.ApplicationMaster$$anonfun$main$1.apply$mcV$sp(ApplicationMaster.scala:766) at org.apache.spark.deploy.SparkHadoopUtil$$anon$1.run(SparkHadoopUtil.scala:67) at org.apache.spark.deploy.SparkHadoopUtil$$anon$1.run(SparkHadoopUtil.scala:66) at java.security.AccessController.doPrivileged(Native Method) at javax.security.auth.Subject.doAs(Subject.java:422) at org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1548) at org.apache.spark.deploy.SparkHadoopUtil.runAsSparkUser(SparkHadoopUtil.scala:66) at org.apache.spark.deploy.yarn.ApplicationMaster$.main(ApplicationMaster.scala:764) at org.apache.spark.deploy.yarn.ApplicationMaster.main(ApplicationMaster.scala) Caused by: java.lang.IllegalStateException: SparkContext has been shutdown at org.apache.spark.SparkContext.runJob(SparkContext.scala:1910) at org.apache.spark.SparkContext.runJob(SparkContext.scala:1931) at org.apache.spark.SparkContext.runJob(SparkContext.scala:1944) at org.apache.spark.sql.execution.SparkPlan.executeTake(SparkPlan.scala:333) at org.apache.spark.sql.execution.CollectLimitExec.executeCollect(limit.scala:38) at org.apache.spark.sql.Dataset$$anonfun$org$apache$spark$sql$Dataset$$execute$1$1.apply(Dataset.scala:2371) at org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:57) at org.apache.spark.sql.Dataset.withNewExecutionId(Dataset.scala:2765) at org.apache.spark.sql.Dataset.org$apache$spark$sql$Dataset$$execute$1(Dataset.scala:2370) at org.apache.spark.sql.Dataset.org$apache$spark$sql$Dataset$$collect(Dataset.scala:2377) at org.apache.spark.sql.Dataset$$anonfun$head$1.apply(Dataset.scala:2113) at org.apache.spark.sql.Dataset$$anonfun$head$1.apply(Dataset.scala:2112) at org.apache.spark.sql.Dataset.withTypedCallback(Dataset.scala:2795) at org.apache.spark.sql.Dataset.head(Dataset.scala:2112) at org.apache.spark.sql.Dataset.take(Dataset.scala:2327) at org.apache.spark.sql.Dataset.showString(Dataset.scala:248) at org.apache.spark.sql.Dataset.show(Dataset.scala:636) at org.apache.spark.sql.Dataset.show(Dataset.scala:595) at org.apache.spark.sql.Dataset.show(Dataset.scala:604) at com.vip.spark.api.SparkSimpleTest01.main(SparkSimpleTest01.java:41) at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) at java.lang.reflect.Method.invoke(Method.java:498) at org.apache.spark.deploy.yarn.ApplicationMaster$$anon$2.run(ApplicationMaster.scala:637)
spark context 已中止,囧...api
查看源碼,說要建立一個新的context就要先停掉舊的啊...app
/** * Main entry point for Spark functionality. A SparkContext represents the connection to a Spark * cluster, and can be used to create RDDs, accumulators and broadcast variables on that cluster. * * Only one SparkContext may be active per JVM. You must `stop()` the active SparkContext before * creating a new one. This limitation may eventually be removed; see SPARK-2243 for more details. * * @param config a Spark Config object describing the application configuration. Any settings in * this config overrides the default configs as well as system properties. */ class SparkContext(config: SparkConf) extends Logging with ExecutorAllocationClient { // The call site where this SparkContext was constructed. private val creationSite: CallSite = Utils.getCallSite() // If true, log warnings instead of throwing exceptions when multiple SparkContexts are active private val allowMultipleContexts: Boolean = config.getBoolean("spark.driver.allowMultipleContexts", false) ... 省了n行代碼
無解,沒法刪除舊context就沒法建立新的配置的context,這就意味着第一個context 在yarn裏申請的資源將會是全部任務的共享資源,而且沒法改變...jvm
再看一下spark on yarn的資源申請過程,確實是這樣,第一個container就是spark context的配置,後續的container都在這個基礎上向resource manager 申請,若是不殺掉這個application master的container,就沒法重現申請新的container,而一旦使用sparkContext.stop 方法殺掉Context,則整個應用會死掉,囧.... ide
看到這裏,我還不死心,想在spark Context 源碼上看有沒有入口能夠切換不一樣的context,或者讓多個context並存oop
/** * Called to ensure that no other SparkContext is running in this JVM. * * Throws an exception if a running context is detected and logs a warning if another thread is * constructing a SparkContext. This warning is necessary because the current locking scheme * prevents us from reliably distinguishing between cases where another context is being * constructed and cases where another constructor threw an exception. */ private def assertNoOtherContextIsRunning( sc: SparkContext, allowMultipleContexts: Boolean): Unit = { SPARK_CONTEXT_CONSTRUCTOR_LOCK.synchronized { Option(activeContext.get()).filter(_ ne sc).foreach { ctx => val errMsg = "Only one SparkContext may be running in this JVM (see SPARK-2243)." + " To ignore this error, set spark.driver.allowMultipleContexts = true. " + s"The currently running SparkContext was created at:\n${ctx.creationSite.longForm}" val exception = new SparkException(errMsg) if (allowMultipleContexts) { logWarning("Multiple running SparkContexts detected in the same JVM!", exception) } else { throw exception } } contextBeingConstructed.filter(_ ne sc).foreach { otherContext => // Since otherContext might point to a partially-constructed context, guard against // its creationSite field being null: val otherContextCreationSite = Option(otherContext.creationSite).map(_.longForm).getOrElse("unknown location") val warnMsg = "Another SparkContext is being constructed (or threw an exception in its" + " constructor). This may indicate an error, since only one SparkContext may be" + " running in this JVM (see SPARK-2243)." + s" The other SparkContext was created at:\n$otherContextCreationSite" logWarning(warnMsg) } } }
看到這裏,spark context 使用一個同步鎖確保在master端同個jvm下只能建立一個context,而且全部切換當前context方法都是私有方法,我....測試
再看看源碼裏面說的 SPARK-2243 issues https://issues.apache.org/jira/browse/SPARK-2243ui
Won't Fix,呵呵,不想看了,路已被堵死,只能找別的出路了。
在進程級別控制 spark job,經過腳本串行執行,即多個 spark job submit 的命令在一塊兒的腳本,這個不用我寫了吧 囧。。