spark on yarn 多Context實驗

前言

目前咱們的業務須要多個spark job串行一塊兒執行,每一個spark job所需的參數配置各不相同;之因此分開多個spark job,是咱們想保留每一個任務獨立執行的能力,提供獨立的服務能力,又想在任務須要時,將多個任務做爲一個執行鏈條串行執行;這裏主要介紹我在任務串行時想經過建立多個spark context來實現不一樣參數配置的實踐,資源管理是yarn。java

實驗過程

測試代碼sql

SparkSession sparkSession =  SparkSession.builder()//.master("local[1]")
.config("spark.driver.allowMultipleContexts", true)
.config("spark.executor.cores", 4)
.appName("Java Spark SQL basic example").getOrCreate();

sparkSession.sparkContext().stop();

SparkSession newSession = sparkSession.newSession();
newSession.conf().set("spark.executor.cores","2");
sparkSession.setDefaultSession(newSession);

結果:apache

[2018-07-11 09:39:29.879] [ERROR] [main] [org.apache.spark.deploy.yarn.ApplicationMaster] >>> Uncaught exception: 
org.apache.spark.SparkException: Exception thrown in awaitResult: 
	at org.apache.spark.util.ThreadUtils$.awaitResult(ThreadUtils.scala:194)
	at org.apache.spark.deploy.yarn.ApplicationMaster.runDriver(ApplicationMaster.scala:401)
	at org.apache.spark.deploy.yarn.ApplicationMaster.run(ApplicationMaster.scala:254)
	at org.apache.spark.deploy.yarn.ApplicationMaster$$anonfun$main$1.apply$mcV$sp(ApplicationMaster.scala:766)
	at org.apache.spark.deploy.SparkHadoopUtil$$anon$1.run(SparkHadoopUtil.scala:67)
	at org.apache.spark.deploy.SparkHadoopUtil$$anon$1.run(SparkHadoopUtil.scala:66)
	at java.security.AccessController.doPrivileged(Native Method)
	at javax.security.auth.Subject.doAs(Subject.java:422)
	at org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1548)
	at org.apache.spark.deploy.SparkHadoopUtil.runAsSparkUser(SparkHadoopUtil.scala:66)
	at org.apache.spark.deploy.yarn.ApplicationMaster$.main(ApplicationMaster.scala:764)
	at org.apache.spark.deploy.yarn.ApplicationMaster.main(ApplicationMaster.scala)
Caused by: java.lang.IllegalStateException: SparkContext has been shutdown
	at org.apache.spark.SparkContext.runJob(SparkContext.scala:1910)
	at org.apache.spark.SparkContext.runJob(SparkContext.scala:1931)
	at org.apache.spark.SparkContext.runJob(SparkContext.scala:1944)
	at org.apache.spark.sql.execution.SparkPlan.executeTake(SparkPlan.scala:333)
	at org.apache.spark.sql.execution.CollectLimitExec.executeCollect(limit.scala:38)
	at org.apache.spark.sql.Dataset$$anonfun$org$apache$spark$sql$Dataset$$execute$1$1.apply(Dataset.scala:2371)
	at org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:57)
	at org.apache.spark.sql.Dataset.withNewExecutionId(Dataset.scala:2765)
	at org.apache.spark.sql.Dataset.org$apache$spark$sql$Dataset$$execute$1(Dataset.scala:2370)
	at org.apache.spark.sql.Dataset.org$apache$spark$sql$Dataset$$collect(Dataset.scala:2377)
	at org.apache.spark.sql.Dataset$$anonfun$head$1.apply(Dataset.scala:2113)
	at org.apache.spark.sql.Dataset$$anonfun$head$1.apply(Dataset.scala:2112)
	at org.apache.spark.sql.Dataset.withTypedCallback(Dataset.scala:2795)
	at org.apache.spark.sql.Dataset.head(Dataset.scala:2112)
	at org.apache.spark.sql.Dataset.take(Dataset.scala:2327)
	at org.apache.spark.sql.Dataset.showString(Dataset.scala:248)
	at org.apache.spark.sql.Dataset.show(Dataset.scala:636)
	at org.apache.spark.sql.Dataset.show(Dataset.scala:595)
	at org.apache.spark.sql.Dataset.show(Dataset.scala:604)
	at com.vip.spark.api.SparkSimpleTest01.main(SparkSimpleTest01.java:41)
	at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
	at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
	at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
	at java.lang.reflect.Method.invoke(Method.java:498)
	at org.apache.spark.deploy.yarn.ApplicationMaster$$anon$2.run(ApplicationMaster.scala:637)

spark context 已中止,囧...api

查看源碼,說要建立一個新的context就要先停掉舊的啊...app

/**
 * Main entry point for Spark functionality. A SparkContext represents the connection to a Spark
 * cluster, and can be used to create RDDs, accumulators and broadcast variables on that cluster.
 *
 * Only one SparkContext may be active per JVM.  You must `stop()` the active SparkContext before
 * creating a new one.  This limitation may eventually be removed; see SPARK-2243 for more details.
 *
 * @param config a Spark Config object describing the application configuration. Any settings in
 *   this config overrides the default configs as well as system properties.
 */
class SparkContext(config: SparkConf) extends Logging with ExecutorAllocationClient {

  // The call site where this SparkContext was constructed.
  private val creationSite: CallSite = Utils.getCallSite()

  // If true, log warnings instead of throwing exceptions when multiple SparkContexts are active
  private val allowMultipleContexts: Boolean =
    config.getBoolean("spark.driver.allowMultipleContexts", false)

... 省了n行代碼

無解,沒法刪除舊context就沒法建立新的配置的context,這就意味着第一個context 在yarn裏申請的資源將會是全部任務的共享資源,而且沒法改變...jvm

再看一下spark on yarn的資源申請過程,確實是這樣,第一個container就是spark context的配置,後續的container都在這個基礎上向resource manager 申請,若是不殺掉這個application master的container,就沒法重現申請新的container,而一旦使用sparkContext.stop 方法殺掉Context,則整個應用會死掉,囧.... spark on yarn cluster 模式ide

看到這裏,我還不死心,想在spark Context 源碼上看有沒有入口能夠切換不一樣的context,或者讓多個context並存oop

/**
   * Called to ensure that no other SparkContext is running in this JVM.
   *
   * Throws an exception if a running context is detected and logs a warning if another thread is
   * constructing a SparkContext.  This warning is necessary because the current locking scheme
   * prevents us from reliably distinguishing between cases where another context is being
   * constructed and cases where another constructor threw an exception.
   */
  private def assertNoOtherContextIsRunning(
      sc: SparkContext,
      allowMultipleContexts: Boolean): Unit = {
    SPARK_CONTEXT_CONSTRUCTOR_LOCK.synchronized {
      Option(activeContext.get()).filter(_ ne sc).foreach { ctx =>
          val errMsg = "Only one SparkContext may be running in this JVM (see SPARK-2243)." +
            " To ignore this error, set spark.driver.allowMultipleContexts = true. " +
            s"The currently running SparkContext was created at:\n${ctx.creationSite.longForm}"
          val exception = new SparkException(errMsg)
          if (allowMultipleContexts) {
            logWarning("Multiple running SparkContexts detected in the same JVM!", exception)
          } else {
            throw exception
          }
        }

      contextBeingConstructed.filter(_ ne sc).foreach { otherContext =>
        // Since otherContext might point to a partially-constructed context, guard against
        // its creationSite field being null:
        val otherContextCreationSite =
          Option(otherContext.creationSite).map(_.longForm).getOrElse("unknown location")
        val warnMsg = "Another SparkContext is being constructed (or threw an exception in its" +
          " constructor).  This may indicate an error, since only one SparkContext may be" +
          " running in this JVM (see SPARK-2243)." +
          s" The other SparkContext was created at:\n$otherContextCreationSite"
        logWarning(warnMsg)
      }
    }
  }

看到這裏,spark context 使用一個同步鎖確保在master端同個jvm下只能建立一個context,而且全部切換當前context方法都是私有方法,我....測試

再看看源碼裏面說的 SPARK-2243 issues https://issues.apache.org/jira/browse/SPARK-2243ui

Won't Fix,呵呵,不想看了,路已被堵死,只能找別的出路了。

解決方法

在進程級別控制 spark job,經過腳本串行執行,即多個 spark job submit 的命令在一塊兒的腳本,這個不用我寫了吧 囧。。

相關文章
相關標籤/搜索