原本不打算寫的了,可是真的是閒來無事,成天看美劇也沒啥意思。這一章打算講一下Spark on yarn的實現,1.0.0裏面已是一個stable的版本了,但是1.0.1也出來了,離1.0.0發佈才一個月的時間,更新太快了,節奏跟不上啊,這裏仍舊是講1.0.0的代碼,因此各位朋友也不要再問我講的是哪一個版本,目前爲止發佈的文章都是基於1.0.0的代碼。html
在第一章《spark-submit提交做業過程》的時候,咱們講過Spark on yarn的在cluster模式下它的main class是org.apache.spark.deploy.yarn.Client。okay,這個就是咱們的頭號目標。web
找到main函數,裏面調用了run方法,咱們直接看run方法。apache
val appId = runApp() monitorApplication(appId) System.exit(0)
運行App,跟蹤App,最後退出。咱們先看runApp吧。api
def runApp(): ApplicationId = { // 校驗參數,內存不能小於384Mb,Executor的數量不能少於1個。 validateArgs() // 這兩個是父類的方法,初始化而且啓動Client init(yarnConf) start() // 記錄集羣的信息(e.g, NodeManagers的數量,隊列的信息). logClusterResourceDetails() // 準備提交請求到ResourcManager (specifically its ApplicationsManager (ASM)// Get a new client application. val newApp = super.createApplication() val newAppResponse = newApp.getNewApplicationResponse() val appId = newAppResponse.getApplicationId() // 檢查集羣的內存是否知足當前的做業需求 verifyClusterResources(newAppResponse) // 準備資源和環境變量. //1.得到工做目錄的具體地址: /.sparkStaging/appId/ val appStagingDir = getAppStagingDir(appId) //2.建立工做目錄,設置工做目錄權限,上傳運行時所須要的jar包 val localResources = prepareLocalResources(appStagingDir) //3.設置運行時須要的環境變量 val launchEnv = setupLaunchEnv(localResources, appStagingDir) //4.設置運行時JVM參數,設置SPARK_USE_CONC_INCR_GC爲true的話,就使用CMS的垃圾回收機制 val amContainer = createContainerLaunchContext(newAppResponse, localResources, launchEnv) // 設置application submission context. val appContext = newApp.getApplicationSubmissionContext() appContext.setApplicationName(args.appName) appContext.setQueue(args.amQueue) appContext.setAMContainerSpec(amContainer) appContext.setApplicationType("SPARK") // 設置ApplicationMaster的內存,Resource是表示資源的類,目前有CPU和內存兩種. val memoryResource = Records.newRecord(classOf[Resource]).asInstanceOf[Resource] memoryResource.setMemory(args.amMemory + YarnAllocationHandler.MEMORY_OVERHEAD) appContext.setResource(memoryResource) // 提交Application. submitApp(appContext) appId }
monitorApplication就不說了,不停的調用getApplicationReport方法得到最新的Report,而後調用getYarnApplicationState獲取當前狀態,若是狀態爲FINISHED、FAILED、KILLED就退出。安全
說到這裏,順便把跟yarn相關的參數也貼出來一下,你們一看就清楚了。app
while (!args.isEmpty) { args match { case ("--jar") :: value :: tail => userJar = value args = tail case ("--class") :: value :: tail => userClass = value args = tail case ("--args" | "--arg") :: value :: tail => if (args(0) == "--args") { println("--args is deprecated. Use --arg instead.") } userArgsBuffer += value args = tail case ("--master-class" | "--am-class") :: value :: tail => if (args(0) == "--master-class") { println("--master-class is deprecated. Use --am-class instead.") } amClass = value args = tail case ("--master-memory" | "--driver-memory") :: MemoryParam(value) :: tail => if (args(0) == "--master-memory") { println("--master-memory is deprecated. Use --driver-memory instead.") } amMemory = value args = tail case ("--num-workers" | "--num-executors") :: IntParam(value) :: tail => if (args(0) == "--num-workers") { println("--num-workers is deprecated. Use --num-executors instead.") } numExecutors = value args = tail case ("--worker-memory" | "--executor-memory") :: MemoryParam(value) :: tail => if (args(0) == "--worker-memory") { println("--worker-memory is deprecated. Use --executor-memory instead.") } executorMemory = value args = tail case ("--worker-cores" | "--executor-cores") :: IntParam(value) :: tail => if (args(0) == "--worker-cores") { println("--worker-cores is deprecated. Use --executor-cores instead.") } executorCores = value args = tail case ("--queue") :: value :: tail => amQueue = value args = tail case ("--name") :: value :: tail => appName = value args = tail case ("--addJars") :: value :: tail => addJars = value args = tail case ("--files") :: value :: tail => files = value args = tail case ("--archives") :: value :: tail => archives = value args = tail case Nil => if (userClass == null) { printUsageAndExit(1) } case _ => printUsageAndExit(1, args) } }
直接看run方法就能夠了,main函數就幹了那麼一件事...tcp
def run() { // 設置本地目錄,默認是先使用yarn的YARN_LOCAL_DIRS目錄,再到LOCAL_DIRS System.setProperty("spark.local.dir", getLocalDirs()) // set the web ui port to be ephemeral for yarn so we don't conflict with // other spark processes running on the same box System.setProperty("spark.ui.port", "0") // when running the AM, the Spark master is always "yarn-cluster" System.setProperty("spark.master", "yarn-cluster") // 設置優先級爲30,和mapreduce的優先級同樣。它比HDFS的優先級高,由於它的操做是清理該做業在hdfs上面的Staging目錄 ShutdownHookManager.get().addShutdownHook(new AppMasterShutdownHook(this), 30) appAttemptId = getApplicationAttemptId() // 經過yarn.resourcemanager.am.max-attempts來設置,默認是2 // 目前發現它只在清理Staging目錄的時候用 isLastAMRetry = appAttemptId.getAttemptId() >= maxAppAttempts amClient = AMRMClient.createAMRMClient() amClient.init(yarnConf) amClient.start() // setup AmIpFilter for the SparkUI - do this before we start the UI // 方法的介紹說是yarn用來保護ui界面的,我感受是設置ip代理的 addAmIpFilter() // 註冊ApplicationMaster到內部的列表裏 ApplicationMaster.register(this) // 安全認證相關的東西,默認是不開啓的,免得給本身找事 val securityMgr = new SecurityManager(sparkConf) // 啓動driver程序 userThread = startUserClass() // 等待SparkContext被實例化,主要是等待spark.driver.port property被使用 // 等待結束以後,實例化一個YarnAllocationHandler waitForSparkContextInitialized() // Do this after Spark master is up and SparkContext is created so that we can register UI Url. // 向yarn註冊當前的ApplicationMaster, 這個時候isFinished不能爲true,是true就說明程序失敗了 synchronized { if (!isFinished) { registerApplicationMaster() registered = true } } // 申請Container來啓動Executor allocateExecutors() // 等待程序運行結束 userThread.join() System.exit(0) }
run方法裏面主要乾了5項工做:ide
一、初始化工做函數
二、啓動driver程序oop
三、註冊ApplicationMaster
四、分配Executors
五、等待程序運行結束
咱們重點看分配Executor方法。
private def allocateExecutors() { try { logInfo("Allocating " + args.numExecutors + " executors.") // 分host、rack、任意機器三種類型向ResourceManager提交ContainerRequest // 請求的Container數量可能大於須要的數量 yarnAllocator.addResourceRequests(args.numExecutors) // Exits the loop if the user thread exits. while (yarnAllocator.getNumExecutorsRunning < args.numExecutors && userThread.isAlive) { if (yarnAllocator.getNumExecutorsFailed >= maxNumExecutorFailures) { finishApplicationMaster(FinalApplicationStatus.FAILED, "max number of executor failures reached") } // 把請求回來的資源進行分配,並釋放掉多餘的資源 yarnAllocator.allocateResources() ApplicationMaster.incrementAllocatorLoop(1) Thread.sleep(100) } } finally { // In case of exceptions, etc - ensure that count is at least ALLOCATOR_LOOP_WAIT_COUNT, // so that the loop in ApplicationMaster#sparkContextInitialized() breaks. ApplicationMaster.incrementAllocatorLoop(ApplicationMaster.ALLOCATOR_LOOP_WAIT_COUNT) } logInfo("All executors have launched.") // 啓動一個線程來狀態報告 if (userThread.isAlive) { // Ensure that progress is sent before YarnConfiguration.RM_AM_EXPIRY_INTERVAL_MS elapses. val timeoutInterval = yarnConf.getInt(YarnConfiguration.RM_AM_EXPIRY_INTERVAL_MS, 120000) // we want to be reasonably responsive without causing too many requests to RM. val schedulerInterval = sparkConf.getLong("spark.yarn.scheduler.heartbeat.interval-ms", 5000) // must be <= timeoutInterval / 2. val interval = math.min(timeoutInterval / 2, schedulerInterval) launchReporterThread(interval) } }
這裏面咱們只須要看addResourceRequests和allocateResources方法便可。
先說addResourceRequests方法,代碼就不貼了。
Client向ResourceManager提交Container的請求,分三種類型:優先選擇機器、同一個rack的機器、任意機器。
優先選擇機器是在RDD裏面的getPreferredLocations得到的機器位置,若是沒有優先選擇機器,也就沒有同一個rack之說了,能夠是任意機器。
下面咱們接着看allocateResources方法。
def allocateResources() { // We have already set the container request. Poll the ResourceManager for a response. // This doubles as a heartbeat if there are no pending container requests. // 以前已經提交過Container請求了,如今只須要獲取response便可 val progressIndicator = 0.1f val allocateResponse = amClient.allocate(progressIndicator) val allocatedContainers = allocateResponse.getAllocatedContainers() if (allocatedContainers.size > 0) { var numPendingAllocateNow = numPendingAllocate.addAndGet(-1 * allocatedContainers.size) if (numPendingAllocateNow < 0) { numPendingAllocateNow = numPendingAllocate.addAndGet(-1 * numPendingAllocateNow) } val hostToContainers = new HashMap[String, ArrayBuffer[Container]]() for (container <- allocatedContainers) { // 內存 > Executor所需內存 + 384 if (isResourceConstraintSatisfied(container)) { // 把container收入名冊當中,等待發落 val host = container.getNodeId.getHost val containersForHost = hostToContainers.getOrElseUpdate(host, new ArrayBuffer[Container]()) containersForHost += container } else { // 內存不夠,釋放掉它 releaseContainer(container) } } // 找到合適的container來使用. val dataLocalContainers = new HashMap[String, ArrayBuffer[Container]]() val rackLocalContainers = new HashMap[String, ArrayBuffer[Container]]() val offRackContainers = new HashMap[String, ArrayBuffer[Container]]() // 遍歷全部的host for (candidateHost <- hostToContainers.keySet) { val maxExpectedHostCount = preferredHostToCount.getOrElse(candidateHost, 0) val requiredHostCount = maxExpectedHostCount - allocatedContainersOnHost(candidateHost) val remainingContainersOpt = hostToContainers.get(candidateHost) var remainingContainers = remainingContainersOpt.get if (requiredHostCount >= remainingContainers.size) { // 須要的比現有的多,把符合數據本地性的添加到dataLocalContainers映射關係裏 dataLocalContainers.put(candidateHost, remainingContainers) // 沒有containner剩下的. remainingContainers = null } else if (requiredHostCount > 0) { // 得到的container比所須要的多,把多餘的釋放掉 val (dataLocal, remaining) = remainingContainers.splitAt(remainingContainers.size - requiredHostCount) dataLocalContainers.put(candidateHost, dataLocal) for (container <- remaining) releaseContainer(container) remainingContainers = null } // 數據所在機器已經分配滿任務了,只能在同一個rack裏面挑選了 if (remainingContainers != null) { val rack = YarnAllocationHandler.lookupRack(conf, candidateHost) if (rack != null) { val maxExpectedRackCount = preferredRackToCount.getOrElse(rack, 0) val requiredRackCount = maxExpectedRackCount - allocatedContainersOnRack(rack) - rackLocalContainers.getOrElse(rack, List()).size if (requiredRackCount >= remainingContainers.size) { // Add all remaining containers to to `dataLocalContainers`. dataLocalContainers.put(rack, remainingContainers) remainingContainers = null } else if (requiredRackCount > 0) { // Container list has more containers that we need for data locality. val (rackLocal, remaining) = remainingContainers.splitAt(remainingContainers.size - requiredRackCount) val existingRackLocal = rackLocalContainers.getOrElseUpdate(rack, new ArrayBuffer[Container]()) existingRackLocal ++= rackLocal remainingContainers = remaining } } } if (remainingContainers != null) { // 仍是不夠,只能放到別的rack的機器上運行了 offRackContainers.put(candidateHost, remainingContainers) } } // 按照數據所在機器、同一個rack、任意機器來排序 val allocatedContainersToProcess = new ArrayBuffer[Container](allocatedContainers.size) allocatedContainersToProcess ++= TaskSchedulerImpl.prioritizeContainers(dataLocalContainers) allocatedContainersToProcess ++= TaskSchedulerImpl.prioritizeContainers(rackLocalContainers) allocatedContainersToProcess ++= TaskSchedulerImpl.prioritizeContainers(offRackContainers) // 遍歷選擇了的Container,爲每一個Container啓動一個ExecutorRunnable線程專門負責給它發送命令 for (container <- allocatedContainersToProcess) { val numExecutorsRunningNow = numExecutorsRunning.incrementAndGet() val executorHostname = container.getNodeId.getHost val containerId = container.getId // 內存須要大於Executor的內存 + 384 val executorMemoryOverhead = (executorMemory + YarnAllocationHandler.MEMORY_OVERHEAD) if (numExecutorsRunningNow > maxExecutors) { // 正在運行的比須要的多了,釋放掉多餘的Container releaseContainer(container) numExecutorsRunning.decrementAndGet() } else { val executorId = executorIdCounter.incrementAndGet().toString val driverUrl = "akka.tcp://spark@%s:%s/user/%s".format( sparkConf.get("spark.driver.host"), sparkConf.get("spark.driver.port"), CoarseGrainedSchedulerBackend.ACTOR_NAME) // To be safe, remove the container from `pendingReleaseContainers`. pendingReleaseContainers.remove(containerId) // 把container記錄到已分配的rack的映射關係當中 val rack = YarnAllocationHandler.lookupRack(conf, executorHostname) allocatedHostToContainersMap.synchronized { val containerSet = allocatedHostToContainersMap.getOrElseUpdate(executorHostname, new HashSet[ContainerId]()) containerSet += containerId allocatedContainerToHostMap.put(containerId, executorHostname) if (rack != null) { allocatedRackCount.put(rack, allocatedRackCount.getOrElse(rack, 0) + 1) } } // 啓動一個線程給它進行跟蹤服務,給它發送運行Executor的命令 val executorRunnable = new ExecutorRunnable( container, conf, sparkConf, driverUrl, executorId, executorHostname, executorMemory, executorCores) new Thread(executorRunnable).start() } } }
一、把從ResourceManager中得到的Container進行選擇,選擇順序是按照前面的介紹的三種類別依次進行,優先選擇機器 > 同一個rack的機器 > 任意機器。
二、選擇了Container以後,給每個Container都啓動一個ExecutorRunner一對一貼身服務,給它發送運行CoarseGrainedExecutorBackend的命令。
三、ExecutorRunner經過NMClient來向NodeManager發送請求。
總結:
把做業發佈到yarn上面去執行這塊涉及到的類很少,主要是涉及到Client、ApplicationMaster、YarnAllocationHandler、ExecutorRunner這四個類。
一、Client做爲Yarn的客戶端,負責向Yarn發送啓動ApplicationMaster的命令。
二、ApplicationMaster就像項目經理同樣負責整個項目所須要的工做,包括請求資源,分配資源,啓動Driver和Executor,Executor啓動失敗的錯誤處理。
三、ApplicationMaster的請求、分配資源是經過YarnAllocationHandler來進行的。
四、Container選擇的順序是:優先選擇機器 > 同一個rack的機器 > 任意機器。
五、ExecutorRunner只負責向Container發送啓動CoarseGrainedExecutorBackend的命令。
六、Executor的錯誤處理是在ApplicationMaster的launchReporterThread方法裏面,它啓動的線程除了報告運行狀態,還會監控Executor的運行,一旦發現有丟失的Executor就從新請求。
七、在yarn目錄下看到的名稱裏面帶有YarnClient的是屬於yarn-client模式的類,實現和前面的也差很少。
其它的內容更可能是Yarn的客戶端api使用,我也不太會,只是看到了能懂個意思,哈哈。
岑玉海
轉載請註明出處,謝謝!