[YARN] MRAppMaster心跳原理

博客原文
hackshell
shell

最近集羣遇到一個問題,就是集羣在跑任務的時候,AM會超時10min而被KILL,但任務重跑則成功,問題是隨機的出現的,因此初步懷疑是由於AM心跳彙報出現問題或則RM由於繁忙hang住,AM由於某些機制致使等待10min不彙報心跳,因此咱們仍是先了解,AM是如何向RM彙報心跳的。app

在MRAppMaster中,ContainerAllocatorRouter負責向RM申請資源(發送心跳)ide

RMAM

RMContainerAllocator其最終父類是RMCommunicator,它實現了RMHeartbeatHandler接口函數

public interface RMHeartbeatHandler {
  long getLastHeartbeatTime(); // 獲取上一次心跳的時間
  void runOnNextHeartbeat(Runnable callback); // 回調註冊到callback隊列的callback函數
}

每一次心跳回來,都會執行一次註冊在heartbeatCallbacks中的回調函數:this

allocatorThread = new Thread(new Runnable() {
      @Override
      public void run() {
        while (!stopped.get() && !Thread.currentThread().isInterrupted()) {
              ......
              heartbeat();            
              lastHeartbeatTime = context.getClock().getTime();// 記錄上一次心跳時間
              executeHeartbeatCallbacks(); // 執行回調函數
              ....
});

RMCommunicator類中:spa

private void executeHeartbeatCallbacks() {
    Runnable callback = null;
    while ((callback = heartbeatCallbacks.poll()) != null) {
      callback.run();
    }
  }

在RMCommunicator啓動時,首先會向RM註冊,把本身的host和port告訴RM,而後在啓動一條線程(startAllocatorThread)按期的調用RMContainerAllocator中實現的heartbeat方法(向RM申請資源,按期彙報信息,告訴RM本身還活着)。線程

AM初始化同時也會初始化RMCommunicator:代理

protected void serviceStart() throws Exception {
  scheduler= createSchedulerProxy(); // 獲取RM的代理
  register(); // 註冊
  startAllocatorThread(); // 心跳線程
....
}

AM的ContainerAllocatorRouter事件處理流程以下圖:日誌

RMALLO

註冊流程:code

調用RMCommunicator遠程調用ApplicationMasterService的registerApplicationMaster方法,設置維護responseId,而後把它加入AMLivelinessMonitor中,並使用map記錄時間,用來監控AM是否由於長時間沒有心跳而超時,若是AM長時間沒有心跳信息更新,RM就會通知NodeManager把AM移除。

心跳線程:

在發送心跳的過程當中,即也是獲取資源的過程

@Override
  protected synchronized void heartbeat() throws Exception {
    scheduleStats.updateAndLogIfChanged("Before Scheduling: ");
    List<Container> allocatedContainers = getResources();// 重要的方法
    if (allocatedContainers.size() > 0) {
      scheduledRequests.assign(allocatedContainers);
    }
   ......
  }

獲取資源的過程:

private List<Container> getResources() throws Exception {
     ...
     response = makeRemoteRequest(); // 和RM進行交互
     ...
     // 優先處理RM發送過來的命令
     if (response.getAMCommand() != null) {
         switch(response.getAMCommand()) {
                case AM_RESYNC:
                case AM_SHUTDOWN:
                     eventHandler.handle(new JobEvent(this.getJob().getID(),
                                     JobEventType.JOB_AM_REBOOT));
                     throw new YarnRuntimeException("Resource Manager doesn't recognize AttemptId: " +
                             this.getContext().getApplicationID());
                default:
                     ....
      }
     // 等等一系列處理
}
}

構建請求:

protected AllocateResponse makeRemoteRequest() throws IOException {
    AllocateRequest allocateRequest =
        AllocateRequest.newInstance(lastResponseID,
          super.getApplicationProgress(), new ArrayList<ResourceRequest>(ask),
          new ArrayList<ContainerId>(release), blacklistRequest);
    AllocateResponse allocateResponse;
    allocateResponse = scheduler.allocate(allocateRequest); // RPC調用ApplicationMasterService的allocate方法
    .....
}

每一次心跳的調用都會刷新AMLivelinessMonitor的時間,表明AM還活着

並且咱們經過代碼能夠看出,資源請求被封裝爲一個ask,即一個ResourceRequest的ArrayList的資源列表 例如:

priority:20 host:host9 capability:<memory:2048, vCores:1>
priority:20 host:host2 capability:<memory:2048, vCores:1>
priority:20 host:host10 capability:<memory:2048, vCores:1>
priority:20 host:/rack/rack3203 capability:<memory:2048, vCores:1>
priority:20 host:/rack/rack3202 capability:<memory:2048, vCores:1>
priority:20 host:* capability:<memory:2048, vCores:1>

然而,ask是如何被構造的呢?

RMContainerAllocator中的addMap,addReduce,assign方法中對ask的數據內容進行了修改

addContainerReq --> addResourceRequest --> addResourceRequestToAsk;

經過在代碼本身添加日誌能夠看出,資源會被分爲local,rack,和any級別去申請資源

最終變爲一個ask list發送到RM上:

ask Capability:<memory:2048, vCores:1> ResourceName:* NumContainers:384 Priority:20 RelaxLocality:true
 ask Capability:<memory:2048, vCores:1> ResourceName:/rack/rack3201 NumContainers:227 Priority:20 RelaxLocality:true
 ask Capability:<memory:2048, vCores:1> ResourceName:/rack/rack3202 NumContainers:231 Priority:20 RelaxLocality:true
 ask Capability:<memory:2048, vCores:1> ResourceName:/rack/rack3203 NumContainers:152 Priority:20 RelaxLocality:true
 ask Capability:<memory:2048, vCores:1> ResourceName:/rack/rack3204 NumContainers:158 Priority:20 RelaxLocality:true
 ask Capability:<memory:2048, vCores:1> ResourceName:host1 NumContainers:46 Priority:20 RelaxLocality:true
 ask Capability:<memory:2048, vCores:1> ResourceName:host5 NumContainers:52 Priority:20 RelaxLocality:true
 ask Capability:<memory:2048, vCores:1> ResourceName:host6 NumContainers:38 Priority:20 RelaxLocality:true

相似日誌爲:

getResources() for application_1438330253091_0004: ask=29 release= 0 newContainers=0 finishedContainers=0 resourcelimit=<memory:0, vCores:0> knownNMs=24
相關文章
相關標籤/搜索