Hadoop源碼系列(一)FairScheduler申請和分配container的過程

一、如何申請資源

1.1 如何啓動AM並申請資源

1.1.1 如何啓動AM

val yarnClient = YarnClient.createYarnClient
setupCredentials()
yarnClient.init(yarnConf)
yarnClient.start()
// Get a new application from our RM
val newApp = yarnClient.createApplication()
val newAppResponse = newApp.getNewApplicationResponse()
appId = newAppResponse.getApplicationId()
 
// Set up the appropriate contexts to launch our AM
val containerContext = createContainerLaunchContext(newAppResponse)
val appContext = createApplicationSubmissionContext(newApp, containerContext)
 
// Finally, submit and monitor the application
logInfo(s"Submitting application $appId to ResourceManager")
yarnClient.submitApplication(appContext)

1.1.2 FairScheduler如何處理AM的ResourceRequest

一、FairScheduler接收到SchedulerEventType.APP_ADDED以後,調用addApplication方法把把RMApp添加到隊列裏面,結束以後發送RMAppEventType.APP_ACCEPTED給RMAppjava

二、RMApp啓動RMAttempt以後,發送SchedulerEventType.APP_ATTEMPT_ADDED給FairSchedulernode

LOG.info("Added Application Attempt " + applicationAttemptId + " to scheduler from user: " + user);

三、FairScheduler調用addApplicationAttempt方法,發送RMAppAttemptEventType.ATTEMPT_ADDED事件給RMAppAttempt,RMAppAttempt隨後調用Scheduler的allocate方法發送AM的ResourceRequestapp

四、FairScheduler在allocate方法裏面對該請求進行處理,FairScheduler對於AM的資源請求的優先級上並無特殊的照顧,詳細請看章節2 如何分配資源異步

1.2 AM啓動以後如何申請資源

1.2.一、註冊AM

amClient = AMRMClient.createAMRMClient()
amClient.init(conf)
amClient.start()
amClient.registerApplicationMaster(Utils.localHostName(), 0, uiAddress)

1.2.二、發送資源請求

// 1.建立資源請求
amClient.addContainerRequest(request)
// 2.發送資源請求
val allocateResponse = amClient.allocate(progressIndicator)
val allocatedContainers = allocateResponse.getAllocatedContainers()
if (allocatedContainers.size > 0) {
  // 3.請求返回以後處理Container
  handleAllocatedContainers(allocatedContainers.asScala)
}

1.2.三、啓動Container

def startContainer(): java.util.Map[String, ByteBuffer] = {
 val ctx = Records.newRecord(classOf[ContainerLaunchContext])
   .asInstanceOf[ContainerLaunchContext]
 val env = prepareEnvironment().asJava
 
 ctx.setLocalResources(localResources.asJava)
 ctx.setEnvironment(env)
 
 val credentials = UserGroupInformation.getCurrentUser().getCredentials()
 val dob = new DataOutputBuffer()
 credentials.writeTokenStorageToStream(dob)
 ctx.setTokens(ByteBuffer.wrap(dob.getData()))
 
 val commands = prepareCommand()
 
 ctx.setCommands(commands.asJava)
 ctx.setApplicationACLs(YarnSparkHadoopUtil.getApplicationAclsForYarn(securityMgr).asJava)
 
 // If external shuffle service is enabled, register with the Yarn shuffle service already
 // started on the NodeManager and, if authentication is enabled, provide it with our secret
 // key for fetching shuffle files later
 if (sparkConf.get(SHUFFLE_SERVICE_ENABLED)) {
   val secretString = securityMgr.getSecretKey()
   val secretBytes =
   if (secretString != null) {
     // This conversion must match how the YarnShuffleService decodes our secret
     JavaUtils.stringToBytes(secretString)
   } else {
     // Authentication is not enabled, so just provide dummy metadata
     ByteBuffer.allocate(0)
   }
   ctx.setServiceData(Collections.singletonMap("spark_shuffle", secretBytes))
 }
 
 // Send the start request to the ContainerManager
 try {
   nmClient.startContainer(container.get, ctx)
 } catch {
   case ex: Exception =>
     throw new SparkException(s"Exception while starting container ${container.get.getId}" +
       s" on host $hostname", ex)
 }
}
View Code

二、如何分配資源

2.1 接受資源請求步驟

在FairScheduler的allocate方法裏面僅僅是記錄ResourceRequest,並不會真正的立馬分配。ide

流程以下:oop

一、檢查該APP是否註冊過fetch

二、檢查資源的請求是否超過最大內存和最大CPU的限制ui

三、記錄資源請求的時間,最後container分配的延遲會體如今隊列metrics的appAttemptFirstContainerAllocationDelay當中spa

四、釋放AM發過來的已經不須要的資源,主要邏輯在FSAppAttempt的containerCompleted方法裏線程

五、更新資源請求,全部資源請求都是記錄在AppSchedulingInfo當中的requests(注意:只有是ANY的資源請求才會被立馬更新到QueueMetrics的PendingResources裏)

六、找出該APP被標記爲搶佔的container ID列表preemptionContainerIds

七、更新APP的黑名單列表,該信息被記錄在AppSchedulingInfo當中

八、從FSAppAttempt的newlyAllocatedContainers當中獲取最新被分配的container

九、返回preemptionContainerIds、HeadRoom、ContainerList、NMTokenList。(注:Headroom = Math.min(Math.min(queueFairShare - queueUsage, 0), maxAvailableResource)

2.2 請求和分配的關係

 請求和分配的過程是異步的,關係如上圖,每次調用allocate得到的container,實際上是以前的請求被分配的結果

2.3 如何分配

2.3.1 分配方式

分配有兩種方式:

一、接收到NodeManager的心跳的時候進行分配

NodeManager每隔一秒(yarn.resourcemanager.nodemanagers.heartbeat-interval-ms)給ResourceManager發送一個心跳事件NODE_UPDATE,接收到心跳事件以後,在FairScheduler的nodeUpdate方法裏進行處理。

NodeManager會彙報新啓動的Container列表newlyLaunchedContainers和已經結束的Container列表completedContainers。而後在attemptScheduling方法裏面進行分配。

 

二、持續調度方式

它有一個單獨的線程,線程名稱是FairSchedulerContinuousScheduling,每5毫秒對全部節點的資源進行排序,而後遍歷全部節點,調用attemptScheduling方法進行分配。

開啓持續調度模式以後,在接收到心跳事件NODE_UPDATE的時候,只有在completedContainers不爲空的狀況下,纔會進行調度

 

attemptScheduling首先會檢查是否有資源預留,若是有預留,則直接爲預留的APP分配container

沒有預留的分配過程以下:

一、最大可分配資源爲這臺機器的可用資源的一半,從root隊列開始自上而下進行分配Resource assignment = queueMgr.getRootQueue().assignContainer(node);

二、分配到一個Container以後,判斷是否要連續分配多個,最大支持連續分配多少個?

如下是涉及到的各個參數以及參數的默認值:

yarn.scheduler.fair.assignmultiple false (建議設置爲true)

yarn.scheduler.fair.dynamic.max.assign true (hadoop2.7以後就沒有這個參數了)

yarn.scheduler.fair.max.assign -1 (建議設置爲2~3,不要設置得太多,不然會有調度傾斜的問題)

 

2.3.2 如何從隊列當中選出APP進行資源分配

入口在queueMgr.getRootQueue().assignContainer(node);

一、檢查當前隊列的使用量是否小於最大資源量

二、首先對子隊列進行排序,優先順序請參照章節 2.3.4 如何肯定優先順序

三、排序完再調用子隊列的assignContainer方法分配container

四、一直遞歸到葉子隊列

葉子隊列如何進行分配?

一、先對runnableApps進行排序,排序完成以後,for循環遍歷一下

二、先檢查該Node是否在APP的黑名單當中

三、檢查該隊列是否能夠運行該APP的AM,主要是檢查是否超過了maxAMShare(根據amRunning字段判斷是否已經啓動了AM了)

檢查邏輯的僞代碼以下:

maxResource = getFairShare()
if (maxResource == 0) {
  // 最大資源是隊列的MaxShare和集羣總資源取一個小的值
  maxResource = Math.min(getRootQueue().AvailableResource(), getMaxShare());
}
maxAMResource = maxResource * maxAMShare
if (amResourceUsage + amResource) > maxAMResource) {
  // 能夠運行
  return true
} else {
  // 不能夠運行
  return false
}
View Code

四、給該APP分配container

 

下面以一個例子來講明分配的過程是如何選擇隊列的:

假設隊列的結構是這樣子的

root

---->BU_1

-------->A

-------->B

---->BU_2

-------->C

-------->D

 

2.3.3 任務分配Container的本地性

任務分配Container的時候會考慮請求的本地性,對於調度器來講,它的本地性分爲三種:NODE_LOCAL, RACK_LOCAL, OFF_SWITCH

具體方法位於FSAppAttempt的assignContainer方法

遍歷優先級

    給該優先級的調度機會+1

    獲取RackLocal和NodeLocal的任務

    計算容許分配的本地性級別allowedLocality,默認是NODE_LOCAL

        一、心跳分配方式

        計算調度機會,若是該優先級的任務的調度機會超過了(節點數 * NODE_LOCAL閾值),降級爲RACK_LOCAL,若是該優先級的任務的調度機會超過了(節點數 * RACK_LOCAL閾值),降級爲OFF_SWITCH

        二、連續分配方式

        計算等待時間waitTime -= lastScheduledContainer.get(priority);

        若是waitTime超過了NODE_LOCAL容許的delay時間,就降級爲RACK_LOCAL,再超過RACK_LOCAL容許的delay的時間,就降級爲OFF_SWITCH

  分配NODE_LOCAL的container

  容許分配的本地性級別>=RACK_LOCAL,分配RACK_LOCAL的container

  容許分配的本地性級別=OFF_SWITCH,分配OFF_SWITCH的container

  都分不到,等待下一次機會

 

相關參數:

默認值全是-1,則容許的本地性級別是OFF_SWITCH

yarn.scheduler.fair.locality-delay-node-ms -1

yarn.scheduler.fair.locality-delay-rack-ms -1

yarn.scheduler.fair.locality.threshold.node -1

yarn.scheduler.fair.locality.threshold.rack -1

 

2.3.4 Container分配

一、檢查該節點的資源是否足夠,若是資源充足

 二、若是當前的allowedLocality比實際分配的本地性低,則重置allowedLocality

三、把新分配的Container加到newlyAllocatedContainers和liveContainers列表中

四、把分配的container信息同步到appSchedulingInfo當中

五、發送RMContainerEventType.START事件

六、更新FSSchedulerNode記錄的container信息

七、若是被分配的是AM,則設置amRunning爲true

 

若是資源不夠,則檢查是否能夠預留資源

條件:

1)Container的資源請求必須小於Scheduler的增量分配內存 * 倍數(默認應該是2g)

2)若是已經存在的預留數 < 本地性對應的可用節點 * 預留比例

3)一個節點只容許同時爲一個APP預留資源

 

相關參數:

yarn.scheduler.increment-allocation-mb 1024

yarn.scheduler.increment-allocation-vcores 1

yarn.scheduler.reservation-threshold.increment-multiple 2

yarn.scheduler.fair.reservable-nodes 0.05

 

2.3.4 如何肯定優先順序

該比較規則同時適用於隊列和APP,詳細代碼位於FairSharePolicy當中

MinShare = Math.min(getMinShare(), getDemand())

一、(當前資源使用量 / MinShare)的比值越小,優先級越高

二、若是雙方資源使用量都超過MinShare,則(當前資源使用量 / 權重)的比值越小,優先級越高

三、啓動時間越早,優先級越高

四、最後實在比不出來,就比名字...

 

從上面分配的規則當中能看出來MinShare是很是重要的一個指標,當資源使用量沒有超過MinShare以前,隊列在分配的時候就會比較優先,切記必定要設置啊!

 

 

注:getMinShare()是FairScheduler當中隊列的minResources

<minResources>6887116 mb,4491 vcores</minResources>
相關文章
相關標籤/搜索