CoarseGrainedSchedulerBackend和CoarseGrainedExecutorBackend

CoarseGrainedSchedulerBackend是Driver端用到的,CoarseGrainedExecutorBackend是Executor端用到的。他們都是Backend,什麼是Backend?Backend其實就是負責端到端通訊的,這兩個CoarseGrained的Backend是負責Driver和Executor之間的通訊的。java

什麼是Driver呢?Driver就是咱們編寫的spark代碼,裏面的main函數就是Driver跑的代碼。bash

什麼是Executor呢?Executor就是執行spark的Task任務的地方,Backend接收到Driver的LaunchTask消息後,調用Executor類的launchTask方法來執行任務。app

Driver會啓動CoarseGrainedSchedulerBackend,經過CoarseGrainedSchedulerBackend來向集羣申請機器以便啓動Executor,會找到一臺機器,發送命令讓機器啓動一個ExecutorRunner,ExecutorRunner裏啓動CoarseGrainedExecutorBackend向Driver註冊,並建立Executor來處理CoarseGrainedExecutorBackend接收到的請求。剛剛說的是Standalone部署下的流程,Yarn下大部分相似,只有向集羣申請機器來啓動Executor這一步不太同樣,這個簡單說一下吧。函數

Yarn環境下,是經過spark-yarn工程裏的幾個類一級yarn自己的功能來一塊兒完成機器的部署和分區任務的分發。url

spark-yarn包含兩個文件:client.java和ApplicationMaster.java。spa

client.java功能是向yarn申請資源來執行ApplicationMaster.java的代碼,因此這裏主要看下ApplicationMaster.java的代碼功能是什麼。code

ApplicationMaster首先幹兩件事,啓動一個"/bin/mesos-master"和多個"/bin/mesos-slave",這都是向yarn申請資源而後部署上去執行的,都是yarn的功能部分,"/bin/mesos-master"和"/bin/mesos-slave"是yarn環境裏自帶的兩個bin程序,能夠當作是相似Standalone環境下的Master和Worker。orm

launchContainer方法是啓動yarn的container,也就是前面說的在container上啓動「/bin/mesos-slave",mesos-slave會向mesos-master註冊的。等須要的slave節點資源所有申請啓動完成後,調用startApplication()方法開始執行Driver。ip

startApplication()方法:資源

// Start the user's application
  private void startApplication() throws IOException {
    try {
      String sparkClasspath = getSparkClasspath();
      String jobJar = new File("job.jar").getAbsolutePath();
      String javaArgs = "-Xms" + (masterMem - 128) + "m -Xmx" + (masterMem - 128) + "m";
      javaArgs += " -Djava.library.path=" + mesosHome + "/lib/java";
      String substitutedArgs = programArgs.replaceAll("\\[MASTER\\]", masterUrl);
      if (mainClass.equals("")) {
        javaArgs += " -cp " + sparkClasspath + " -jar " + jobJar + " " + substitutedArgs; 
      } else {
        javaArgs += " -cp " + sparkClasspath + ":" + jobJar + " " + mainClass + " " + substitutedArgs;
      }
      String java = "java";
      if (System.getenv("JAVA_HOME") != null) {
        java = System.getenv("JAVA_HOME") + "/bin/java";
      }
      String bashCommand = java + " " + javaArgs +
          " 1>" + logDirectory + "/application.stdout" +
          " 2>" + logDirectory + "/application.stderr";
      LOG.info("Command: " + bashCommand);
      String[] command = new String[] {"bash", "-c", bashCommand};
      String[] env = new String[] {"SPARK_HOME=" + sparkHome, "MASTER=" + masterUrl, 
          "SPARK_MEM=" + (slaveMem - 128) + "m"};
      application = Runtime.getRuntime().exec(command, env);      
      new Thread("wait for user application") {
        public void run() {
          try {
            appExitCode = application.waitFor();
            appExited = true;
            LOG.info("User application exited with code " + appExitCode);
          } catch (InterruptedException e) {
            e.printStackTrace();
          }
        }
      }.start();
    } catch (SparkClasspathException e) {      
      unregister(false);
      System.exit(1);
      return;
    }
  }

這就是啓動Driver了,masterUrl就是」bin/mesos-master「的地址,設置成了環境變量」MASTER「來用了,yarn下的master的地址格式是」mesos://host:port「,Standalone下是」spark://host:port「。

在SparkContext下會根據master地址格式,作不一樣的處理,這段代碼是這樣:

master match {
      case "local" =>
        checkResourcesPerTask(clusterMode = false, Some(1))
        val scheduler = new TaskSchedulerImpl(sc, MAX_LOCAL_TASK_FAILURES, isLocal = true)
        val backend = new LocalSchedulerBackend(sc.getConf, scheduler, 1)
        scheduler.initialize(backend)
        (backend, scheduler)

      case LOCAL_N_REGEX(threads) =>
        def localCpuCount: Int = Runtime.getRuntime.availableProcessors()
        // local[*] estimates the number of cores on the machine; local[N] uses exactly N threads.
        val threadCount = if (threads == "*") localCpuCount else threads.toInt
        if (threadCount <= 0) {
          throw new SparkException(s"Asked to run locally with $threadCount threads")
        }
        checkResourcesPerTask(clusterMode = false, Some(threadCount))
        val scheduler = new TaskSchedulerImpl(sc, MAX_LOCAL_TASK_FAILURES, isLocal = true)
        val backend = new LocalSchedulerBackend(sc.getConf, scheduler, threadCount)
        scheduler.initialize(backend)
        (backend, scheduler)

      case LOCAL_N_FAILURES_REGEX(threads, maxFailures) =>
        def localCpuCount: Int = Runtime.getRuntime.availableProcessors()
        // local[*, M] means the number of cores on the computer with M failures
        // local[N, M] means exactly N threads with M failures
        val threadCount = if (threads == "*") localCpuCount else threads.toInt
        checkResourcesPerTask(clusterMode = false, Some(threadCount))
        val scheduler = new TaskSchedulerImpl(sc, maxFailures.toInt, isLocal = true)
        val backend = new LocalSchedulerBackend(sc.getConf, scheduler, threadCount)
        scheduler.initialize(backend)
        (backend, scheduler)

      case SPARK_REGEX(sparkUrl) =>
        checkResourcesPerTask(clusterMode = true, None)
        val scheduler = new TaskSchedulerImpl(sc)
        val masterUrls = sparkUrl.split(",").map("spark://" + _)
        val backend = new StandaloneSchedulerBackend(scheduler, sc, masterUrls)
        scheduler.initialize(backend)
        (backend, scheduler)

      case LOCAL_CLUSTER_REGEX(numSlaves, coresPerSlave, memoryPerSlave) =>
        checkResourcesPerTask(clusterMode = true, Some(coresPerSlave.toInt))
        // Check to make sure memory requested <= memoryPerSlave. Otherwise Spark will just hang.
        val memoryPerSlaveInt = memoryPerSlave.toInt
        if (sc.executorMemory > memoryPerSlaveInt) {
          throw new SparkException(
            "Asked to launch cluster with %d MiB RAM / worker but requested %d MiB/worker".format(
              memoryPerSlaveInt, sc.executorMemory))
        }

        val scheduler = new TaskSchedulerImpl(sc)
        val localCluster = new LocalSparkCluster(
          numSlaves.toInt, coresPerSlave.toInt, memoryPerSlaveInt, sc.conf)
        val masterUrls = localCluster.start()
        val backend = new StandaloneSchedulerBackend(scheduler, sc, masterUrls)
        scheduler.initialize(backend)
        backend.shutdownCallback = (backend: StandaloneSchedulerBackend) => {
          localCluster.stop()
        }
        (backend, scheduler)

      case masterUrl =>
        checkResourcesPerTask(clusterMode = true, None)
        val cm = getClusterManager(masterUrl) match {
          case Some(clusterMgr) => clusterMgr
          case None => throw new SparkException("Could not parse Master URL: '" + master + "'")
        }
        try {
          val scheduler = cm.createTaskScheduler(sc, masterUrl)
          val backend = cm.createSchedulerBackend(sc, masterUrl, scheduler)
          cm.initialize(scheduler, backend)
          (backend, scheduler)
        } catch {
          case se: SparkException => throw se
          case NonFatal(e) =>
            throw new SparkException("External scheduler cannot be instantiated", e)
        }
    }
  }

若是是yarn,會落到最後一個case語句:

case masterUrl =>
        checkResourcesPerTask(clusterMode = true, None)
        val cm = getClusterManager(masterUrl) match {
          case Some(clusterMgr) => clusterMgr
          case None => throw new SparkException("Could not parse Master URL: '" + master + "'")
        }
        try {
          val scheduler = cm.createTaskScheduler(sc, masterUrl)
          val backend = cm.createSchedulerBackend(sc, masterUrl, scheduler)
          cm.initialize(scheduler, backend)
          (backend, scheduler)
        } catch {
          case se: SparkException => throw se
          case NonFatal(e) =>
            throw new SparkException("External scheduler cannot be instantiated", e)
        }

這裏會用到ClusterManager的類,這又是什麼東東呢?spark難就難在這,涉及的概念太多。

private def getClusterManager(url: String): Option[ExternalClusterManager] = {
    val loader = Utils.getContextOrSparkClassLoader
    val serviceLoaders =
      ServiceLoader.load(classOf[ExternalClusterManager], loader).asScala.filter(_.canCreate(url))
    if (serviceLoaders.size > 1) {
      throw new SparkException(
        s"Multiple external cluster managers registered for the url $url: $serviceLoaders")
    }
    serviceLoaders.headOption
  }

找到全部的ExternalClusterManager類及子類,看哪一個類的canCreate方法對url返回true,咱們這裏就是找知足"mesos://host:port"的類。

相關文章
相關標籤/搜索