博客原文
hackshellshell
最近集羣遇到一個問題,就是集羣在跑任務的時候,AM會超時10min而被KILL,但任務重跑則成功,問題是隨機的出現的,因此初步懷疑是由於AM心跳彙報出現問題或則RM由於繁忙hang住,AM由於某些機制致使等待10min不彙報心跳,因此咱們仍是先了解,AM是如何向RM彙報心跳的。app
在MRAppMaster中,ContainerAllocatorRouter負責向RM申請資源(發送心跳)ide
RMContainerAllocator其最終父類是RMCommunicator,它實現了RMHeartbeatHandler接口函數
public interface RMHeartbeatHandler { long getLastHeartbeatTime(); // 獲取上一次心跳的時間 void runOnNextHeartbeat(Runnable callback); // 回調註冊到callback隊列的callback函數 }
每一次心跳回來,都會執行一次註冊在heartbeatCallbacks中的回調函數:this
allocatorThread = new Thread(new Runnable() { @Override public void run() { while (!stopped.get() && !Thread.currentThread().isInterrupted()) { ...... heartbeat(); lastHeartbeatTime = context.getClock().getTime();// 記錄上一次心跳時間 executeHeartbeatCallbacks(); // 執行回調函數 .... });
RMCommunicator類中:spa
private void executeHeartbeatCallbacks() { Runnable callback = null; while ((callback = heartbeatCallbacks.poll()) != null) { callback.run(); } }
在RMCommunicator啓動時,首先會向RM註冊,把本身的host和port告訴RM,而後在啓動一條線程(startAllocatorThread)按期的調用RMContainerAllocator中實現的heartbeat方法(向RM申請資源,按期彙報信息,告訴RM本身還活着)。線程
AM初始化同時也會初始化RMCommunicator:代理
protected void serviceStart() throws Exception { scheduler= createSchedulerProxy(); // 獲取RM的代理 register(); // 註冊 startAllocatorThread(); // 心跳線程 .... }
AM的ContainerAllocatorRouter事件處理流程以下圖:日誌
註冊流程:code
調用RMCommunicator遠程調用ApplicationMasterService的registerApplicationMaster方法,設置維護responseId,而後把它加入AMLivelinessMonitor中,並使用map記錄時間,用來監控AM是否由於長時間沒有心跳而超時,若是AM長時間沒有心跳信息更新,RM就會通知NodeManager把AM移除。
心跳線程:
在發送心跳的過程當中,即也是獲取資源的過程
@Override protected synchronized void heartbeat() throws Exception { scheduleStats.updateAndLogIfChanged("Before Scheduling: "); List<Container> allocatedContainers = getResources();// 重要的方法 if (allocatedContainers.size() > 0) { scheduledRequests.assign(allocatedContainers); } ...... }
獲取資源的過程:
private List<Container> getResources() throws Exception { ... response = makeRemoteRequest(); // 和RM進行交互 ... // 優先處理RM發送過來的命令 if (response.getAMCommand() != null) { switch(response.getAMCommand()) { case AM_RESYNC: case AM_SHUTDOWN: eventHandler.handle(new JobEvent(this.getJob().getID(), JobEventType.JOB_AM_REBOOT)); throw new YarnRuntimeException("Resource Manager doesn't recognize AttemptId: " + this.getContext().getApplicationID()); default: .... } // 等等一系列處理 } }
構建請求:
protected AllocateResponse makeRemoteRequest() throws IOException { AllocateRequest allocateRequest = AllocateRequest.newInstance(lastResponseID, super.getApplicationProgress(), new ArrayList<ResourceRequest>(ask), new ArrayList<ContainerId>(release), blacklistRequest); AllocateResponse allocateResponse; allocateResponse = scheduler.allocate(allocateRequest); // RPC調用ApplicationMasterService的allocate方法 ..... }
每一次心跳的調用都會刷新AMLivelinessMonitor的時間,表明AM還活着
並且咱們經過代碼能夠看出,資源請求被封裝爲一個ask,即一個ResourceRequest的ArrayList的資源列表 例如:
priority:20 host:host9 capability:<memory:2048, vCores:1> priority:20 host:host2 capability:<memory:2048, vCores:1> priority:20 host:host10 capability:<memory:2048, vCores:1> priority:20 host:/rack/rack3203 capability:<memory:2048, vCores:1> priority:20 host:/rack/rack3202 capability:<memory:2048, vCores:1> priority:20 host:* capability:<memory:2048, vCores:1>
然而,ask是如何被構造的呢?
RMContainerAllocator中的addMap,addReduce,assign方法中對ask的數據內容進行了修改
addContainerReq --> addResourceRequest --> addResourceRequestToAsk;
經過在代碼本身添加日誌能夠看出,資源會被分爲local,rack,和any級別去申請資源
最終變爲一個ask list發送到RM上:
ask Capability:<memory:2048, vCores:1> ResourceName:* NumContainers:384 Priority:20 RelaxLocality:true ask Capability:<memory:2048, vCores:1> ResourceName:/rack/rack3201 NumContainers:227 Priority:20 RelaxLocality:true ask Capability:<memory:2048, vCores:1> ResourceName:/rack/rack3202 NumContainers:231 Priority:20 RelaxLocality:true ask Capability:<memory:2048, vCores:1> ResourceName:/rack/rack3203 NumContainers:152 Priority:20 RelaxLocality:true ask Capability:<memory:2048, vCores:1> ResourceName:/rack/rack3204 NumContainers:158 Priority:20 RelaxLocality:true ask Capability:<memory:2048, vCores:1> ResourceName:host1 NumContainers:46 Priority:20 RelaxLocality:true ask Capability:<memory:2048, vCores:1> ResourceName:host5 NumContainers:52 Priority:20 RelaxLocality:true ask Capability:<memory:2048, vCores:1> ResourceName:host6 NumContainers:38 Priority:20 RelaxLocality:true
相似日誌爲:
getResources() for application_1438330253091_0004: ask=29 release= 0 newContainers=0 finishedContainers=0 resourcelimit=<memory:0, vCores:0> knownNMs=24