YARN是開源項目Hadoop的一個資源管理系統,最初設計是爲了解決Hadoop中MapReduce計算框架中的資源管理問題,可是如今它已是一個更加通用的資源管理系統,能夠把MapReduce計算框架做爲一個應用程序運行在YARN系統之上,經過YARN來管理資源。若是你的應用程序也須要藉助YARN的資源管理功能,你也能夠實現YARN提供的編程API,將你的應用程序運行於YARN之上,將資源的分配與回收統一交給YARN去管理,能夠大大簡化資源管理功能的開發。當前,也有不少應用程序已經能夠構建於YARN之上,如Storm、Spark等計算框架。node
YARN總體架構算法
YARN是基於Master/Slave模式的分佈式架構,咱們先看一下,YARN的架構設計,如圖所示(來自官網文檔):編程
上圖,從邏輯上定義了YARN系統的核心組件和主要交互流程,各個組件說明以下:安全
YARN Client提交Application到RM,它會首先建立一個Application上下文件對象,並設置AM必需的資源請求信息,而後提交到RM。YARN Client也能夠與RM通訊,獲取到一個已經提交併運行的Application的狀態信息等,具體詳見後面ApplicationClientProtocol協議的分析說明。網絡
RM是YARN集羣的Master,負責管理整個集羣的資源和資源分配。RM做爲集羣資源的管理和調度的角色,若是存在單點故障,則整個集羣的資源都沒法使用。在2.4.0版本才新增了RM HA的特性,這樣就增長了RM的可用性。架構
NM是YARN集羣的Slave,是集羣中實際擁有實際資源的工做節點。咱們提交Job之後,會將組成Job的多個Task調度到對應的NM上進行執行。Hadoop集羣中,爲了得到分佈式計算中的Locality特性,會將DN和NM在同一個節點上運行,這樣對應的HDFS上的Block可能就在本地,而無需在網絡間進行數據的傳輸。app
Container是YARN集羣中資源的抽象,將NM上的資源進行量化,根據須要組裝成一個個Container,而後服務於已受權資源的計算任務。計算任務在完成計算後,系統會回收資源,以供後續計算任務申請使用。Container包含兩種資源:內存和CPU,後續Hadoop版本可能會增長硬盤、網絡等資源。框架
AM主要管理和監控部署在YARN集羣上的Application,以MapReduce爲例,MapReduce Application是一個用來處理MapReduce計算的服務框架程序,爲用戶編寫的MapReduce程序提供運行時支持。一般咱們在編寫的一個MapReduce程序可能包含多個Map Task或Reduce Task,而各個Task的運行管理與監控都是由這個MapReduce Application來負責,好比運行Task的資源申請,由AM向RM申請;啓動/中止NM上某Task的對應的Container,由AM向NM請求來完成。異步
下面,咱們基於Hadoop 2.6.0的YARN源碼,來探討YARN內部實現原理。分佈式
YARN協議
YARN是一個分佈式資源管理系統,它包含了分佈的多個組件,咱們能夠經過這些組件之間設計的交互協議來講明,如圖所示:
下面咱們來詳細看看各個協議實現的功能:
協議方法 | 功能描述 |
getNewApplication | 獲取一個新的ApplicationId,例如返回的ApplicationId爲application_1418024756741 |
submitApplication | 提交一個Application到RM |
forceKillApplication | 終止一個已經提交的Application |
getApplicationReport | 獲取一個Application的狀態報告信息ApplicationReport,包括用戶、隊列、名稱、AM所在節點、AM的RPC端口、跟蹤URL、AM狀態、診斷信息(若是出錯的話)、啓動時間、提交Application的Client(若是啓用安全策略) |
getClusterMetrics | 獲取YARN集羣信息,如節點數量 |
getApplications | 獲取Application狀態報告信息,和getApplicationReport相似,只不過增長了過濾器功能 |
getClusterNodes | 獲取集羣內全部節點的狀態報告信息 |
getQueueInfo | 獲取隊列信息 |
getQueueUserAcls | 獲取當前用戶的隊列ACL信息 |
getDelegationToken | 獲取訪問令牌信息,用於Container與RM端服務交互 |
renewDelegationToken | 更新已存在的訪問令牌信息 |
cancelDelegationToken | 取消訪問令牌 |
moveApplicationAcrossQueues | 將Application移動到另外一個隊列中 |
getApplicationAttemptReport | 獲取Application Attempt狀態報告信息ApplicationAttemptReport |
getApplicationAttemptReport | 獲取Application Attempt狀態報告信息,和getApplicationAttemptReport相似,只不過增長了過濾器功能 |
getContainerReport | 根據ContainerId獲取Container狀態報告信息ContainerReport,例如Container名稱爲container_e17_1410901177871_0001_01_000005,各個段的含義:container_e<epoch>_<clusterTimestamp>_<appId>_<attemptId>_<containerId> |
getContainers | 根據ApplicationAttemptId獲取一個Application Attempt所使用的Container的狀態報告信息,例如Container名稱爲container_1410901177871_0001_01_000005 |
submitReservation | 預約資源,以備在特殊狀況下可以從集羣獲取到資源來運行程序,例如預留出資源供AM啓動 |
updateReservation | 更新預約資源 |
deleteReservation | 刪除預約 |
getNodeToLabels | 獲取節點對應的Label集合 |
getClusterNodeLabels | 獲取集羣中全部節點的Label |
協議方法 | 功能描述 |
registerNodeManager | NM向RM註冊 |
nodeHeartbeat | NM向RM發送心跳狀態報告 |
協議方法 | 功能描述 |
registerApplicationMaster | AM向RM註冊 |
finishApplicationMaster | AM通知RM已經完成(成功/失敗) |
allocate | AM向RM申請資源 |
協議方法 | 功能描述 |
startContainers | AM向NM請求啓動Container |
stopContainers | AM向NM請求中止Container |
getContainerStatuses | AM向NM請求查詢當前Container的狀態 |
協議方法 | 功能描述 |
getGroupsForUser | 獲取用戶所在用戶組,該協議繼承自GetUserMappingsProtocol |
refreshQueues | 刷新隊列配置 |
refreshNodes | 刷新節點配置 |
refreshSuperUserGroupsConfiguration | 刷新超級用戶組配置 |
refreshUserToGroupsMappings | 刷新用戶->用戶組映射信息 |
refreshAdminAcls | 刷新Admin的ACL信息 |
refreshServiceAcls | 刷新服務級別信息(SLA) |
updateNodeResource | 更新在RM端維護的RMNode資源信息 |
addToClusterNodeLabels | 向集羣中節點添加Label |
removeFromClusterNodeLabels | 移除集羣中節點Label |
replaceLabelsOnNode | 替換集羣中節點Label |
協議方法 | 功能描述 |
monitorHealth | HA Framework監控服務的健康狀態 |
transitionToActive | 使RM轉移到Active狀態 |
transitionToStandby | 使RM轉移到Standby狀態 |
getServiceStatus | 獲取服務狀態信息 |
YARN RPC實現
1.X版本的Hadoop使用默認實現的Writable協議做爲RPC協議,而在2.X版本,重寫了RPC框架,改爲默認使用Protobuf協議做爲Hadoop的默認RPC通訊協議。 YARN RPC的實現,以下面類圖所示:
經過上圖能夠看出,RpcEngine有兩個實現:WritableRpcEngine和ProtobufRpcEngine,默認使用ProtobufRpcEngine,咱們能夠選擇使用1.X默認的RPC通訊協議,甚至能夠自定義實現。
ResourceManager內部原理
RM是YARN分佈式系統的主節點,ResourceManager服務進程內部有不少組件提供其餘服務,包括對外RPC服務,已經維護內部一些對象狀態的服務等,RM的內部結構如圖所示:
上圖中RM內部各個組件(Dispatcher/EventHandler/Service)的功能,能夠查看源碼。
這裏,說一下ResourceScheduler組件,它是RM內部最重要的一個組件,用它來實現資源的分配與回收,它提供了必定算法,在運行時能夠根據算法提供的策略來對資源進行調度。YARN內部有3種資源調度策略的實現:FifoScheduler、FairScheduler、CapacityScheduler,其中默認實現爲CapacityScheduler。CapacityScheduler實現了資源更加細粒度的分配,能夠設置多級隊列,每一個隊列都有必定的容量,即對隊列設置資源上限和下限,而後對每一級別隊列分別再採用合適的調度策略(如FIFO)進行調度。
若是咱們想實現本身的資源調度策略,能夠直接實現YARN的資源調度接口ResourceScheduler,而後修改yarn-site.xml中的配置項yarn.resourcemanager.scheduler.class便可。
NodeManager內部原理
NM是YARN系統中實際持有資源的從節點,也是實際用戶程序運行的宿主節點,內部結構如圖所示:
上圖中NM內部各個組件(Dispatcher/EventHandler/Service)的功能,能夠查看源碼,再也不累述。
事件處理機制
事件處理能夠分紅2大類,一類是同步處理事件,事件處理過程會阻塞調用進程,一般這樣的事件處理邏輯很是簡單,不會長時間阻塞;另外一類就是異步處理處理事件,一般在接收到事件之後,會有一個用來派發事件的Dispatcher,將事件發到對應的事件隊列中,這採用生產者-消費者模式,消費者這會監視着隊列,並從取出事件進行異步處理。
YARN中處處能夠見到事件處理,其中比較特殊一點的就是將狀態機(StateMachine)做爲一個事件處理器,從而經過事件來觸發特定對象狀態的變遷,經過這種方式來管理對象狀態。咱們先看一下YARN中事件處理的機制,以ResourceManager端爲例,以下圖所示:
產生的事件經過Dispatcher進行派發並進行處理,若是EventHandler處理邏輯比較簡單,直接同步處理,不然可能會採用異步處理的方式。在EventHandler處理的過程當中,還可能產生新的事件Event,而後再次經過RM的Dispatcher進行派發,然後處理。
狀態機
咱們以RM端管理的RMAppImpl對象爲例,它表示一個Application運行過程當中,在RM端的所維護的Application的狀態,該對象對應的全部狀態及其狀態轉移路徑,以下圖所示:
在上圖中若是加上觸發狀態轉移的事件及其類型,可能整個圖會顯得很亂,因此這裏,我詳細畫了一個分圖,用來講明,每個狀態的變化都是有哪一種類型的事件觸發的,根據這個圖,能夠方便地閱讀源碼,以下圖所示:
NMLivelinessMonitor源碼分析實例
YARN主要採用了Dispatcher+EventHandler+Service這樣的抽象,將全部的內部/外部組件採用這種機制來實現,因爲存在不少的Service和EventHandler,並且有的組件可能既是一個Service,同時仍是一個EventHandler,因此在閱讀代碼的時候可能會感受迷茫,這裏我給出了一個閱讀NMLivelinessMonitor服務的實例,僅供想研究源碼的人蔘考。
NMLivelinessMonitor是ResourceManager端的一個監控服務實現,它主要是用來監控註冊的節點的Liveliness狀態,這裏是監控NodeManager的狀態。該服務會週期性地檢查NodeManager的心跳信息來確保註冊到ResourceManager的NodeManager當前處於活躍狀態,能夠執行資源分配以及處理計算任務,在NMLivelinessMonitor類繼承的抽象泛型類AbstractLivelinessMonitor中有一個Map,以下所示:
private Map<O, Long> running = new HashMap<O, Long>();
在ResourceManager中能夠看到,NMLivelinessMonitor的實例是其一個成員:
protected NMLivelinessMonitor nmLivelinessMonitor;
看一下NMLivelinessMonitor類的實現,它繼承自抽象泛型類AbstractLivelinessMonitor,看NMLivelinessMonitor類的聲明:
public class NMLivelinessMonitor extends AbstractLivelinessMonitor<NodeId>
1 @Override 2 protected void expire(NodeId id) { 3 dispatcher.handle( 4 new RMNodeEvent(id, RMNodeEventType.EXPIRE)); 5 }
咱們能夠經過該類NMLivelinessMonitor抽象基類中看到調用expire方法的邏輯,是在一個內部線程類PingChecker中,代碼以下所示:
01 private class PingChecker implements Runnable { 02 03 @Override 04 public void run() { 05 while (!stopped && !Thread.currentThread().isInterrupted()) { 06 synchronized (AbstractLivelinessMonitor.this) { 07 Iterator<Map.Entry<O, Long>> iterator = 08 running.entrySet().iterator(); 09 10 //avoid calculating current time everytime in loop 11 long currentTime = clock.getTime(); 12 13 while (iterator.hasNext()) { 14 Map.Entry<O, Long> entry = iterator.next(); 15 if (currentTime > entry.getValue() + expireInterval) { 16 iterator.remove(); 17 expire(entry.getKey()); // 調用抽象方法expire,會在子類中實現 18 LOG.info("Expired:" + entry.getKey().toString() + 19 " Timed out after " + expireInterval/1000 + " secs"); 20 } 21 } 22 } 23 try { 24 Thread.sleep(monitorInterval); 25 } catch (InterruptedException e) { 26 LOG.info(getName() + " thread interrupted"); 27 break; 28 } 29 } 30 } 31 }
這裏面的泛型O在NMLivelinessMonitor類中就是NodeId,因此最關心的邏輯就是前面提到的NMLivelinessMonitor中的expire方法的實現。在expire方法中,調用了dispatcher的handle方法來處理,因此dispatcher應該是一個EventHandler對象,後面咱們會看到,它實際上是經過ResourceManager中的dispatcher成員,也就是AsyncDispatcher來獲取到的(AsyncDispatcher內部有一個組合而成的EventHandler)。
下面,咱們接着看NMLivelinessMonitor是如何建立的,在ResourceManager.RMActiveServices類的serviceInit()方法中,代碼以下所示:
1 nmLivelinessMonitor = createNMLivelinessMonitor();
2 addService(nmLivelinessMonitor);
跟蹤代碼繼續看createNMLivelinessMonitor方法,以下所示:
1 private NMLivelinessMonitor createNMLivelinessMonitor() { 2 return new NMLivelinessMonitor(this.rmContext 3 .getDispatcher()); 4 }
上面經過rmContext的getDispatcher獲取到一個Dispatcher對象,來做爲NMLivelinessMonitor構造方法的參數,咱們須要看一下這個Dispatcher是如何建立的,查看ResourceManager.serviceInit方法,代碼以下所示:
1 rmDispatcher = setupDispatcher(); 2 addIfService(rmDispatcher); 3 rmContext.setDispatcher(rmDispatcher);
繼續跟蹤代碼,setupDispatcher()方法實現以下所示:
1 private Dispatcher setupDispatcher() { 2 Dispatcher dispatcher = createDispatcher(); 3 dispatcher.register(RMFatalEventType.class, 4 new ResourceManager.RMFatalEventDispatcher()); 5 return dispatcher; 6 }
繼續看createDispatcher()方法代碼實現:
1 protected Dispatcher createDispatcher() { 2 return new AsyncDispatcher(); 3 }
能夠看到,在這裏建立了一個AsyncDispatcher對象在建立的NMLivelinessMonitor實例中包含一個AsyncDispatcher實例。回到前面,咱們須要知道這個AsyncDispatcher調用getEventHandler()返回的EventHandler的處理邏輯是如何的,NMLivelinessMonitor的代碼實現以下所示:
01 public class NMLivelinessMonitor extends AbstractLivelinessMonitor<NodeId> { 02 03 private EventHandler dispatcher; 04 05 public NMLivelinessMonitor(Dispatcher d) { 06 super("NMLivelinessMonitor", new SystemClock()); 07 this.dispatcher = d.getEventHandler(); // 調用AsyncDispatcher的getEventHandler()方法獲取EventHandler 08 } 09 10 public void serviceInit(Configuration conf) throws Exception { 11 int expireIntvl = conf.getInt(YarnConfiguration.RM_NM_EXPIRY_INTERVAL_MS, 12 YarnConfiguration.DEFAULT_RM_NM_EXPIRY_INTERVAL_MS); 13 setExpireInterval(expireIntvl); 14 setMonitorInterval(expireIntvl/3); 15 super.serviceInit(conf); 16 } 17 18 @Override 19 protected void expire(NodeId id) { 20 dispatcher.handle( 21 new RMNodeEvent(id, RMNodeEventType.EXPIRE)); 22 } 23 }
查看AsyncDispatcher類的getEventHandler()方法,代碼以下所示:
1 @Override 2 public EventHandler getEventHandler() { 3 if (handlerInstance == null) { 4 handlerInstance = new GenericEventHandler(); 5 } 6 return handlerInstance; 7 }
可見,這裏面不管是第一次調用仍是其餘對象已經調用過該方法,這裏面最終只有一個GenericEventHandler實例做爲這個dispatcher的內部EventHandler實例,因此繼續跟蹤代碼,看GenericEventHandler實現,以下所示:
01 class GenericEventHandler implements EventHandler<Event> { 02 public void handle(Event event) { 03 if (blockNewEvents) { 04 return; 05 } 06 drained = false; 07 08 /* all this method does is enqueue all the events onto the queue */ 09 int qSize = eventQueue.size(); 10 if (qSize !=0 && qSize %1000 == 0) { 11 LOG.info("Size of event-queue is " + qSize); 12 } 13 int remCapacity = eventQueue.remainingCapacity(); 14 if (remCapacity < 1000) { 15 LOG.warn("Very low remaining capacity in the event-queue: " 16 + remCapacity); 17 } 18 try { 19 eventQueue.put(event); // 將Event放入到隊列eventQueue中 20 } catch (InterruptedException e) { 21 if (!stopped) { 22 LOG.warn("AsyncDispatcher thread interrupted", e); 23 } 24 throw new YarnRuntimeException(e); 25 } 26 }; 27 }
將傳入handle方法的Event丟進了eventQueue隊列,也就是說GenericEventHandler是基於eventQueue的一個生產者,那麼消費者是AsyncDispatcher內部的另外一個線程,以下所示:
1 @Override 2 protected void serviceStart() throws Exception { 3 //start all the components 4 super.serviceStart(); 5 eventHandlingThread = new Thread(createThread()); // 調用建立消費eventQueue隊列中事件的線程 6 eventHandlingThread.setName("AsyncDispatcher event handler"); 7 eventHandlingThread.start(); 8 }
查看createThread()方法,以下所示:
01 Runnable createThread() { 02 return new Runnable() { 03 @Override 04 public void run() { 05 while (!stopped && !Thread.currentThread().isInterrupted()) { 06 drained = eventQueue.isEmpty(); 07 // blockNewEvents is only set when dispatcher is draining to stop, 08 // adding this check is to avoid the overhead of acquiring the lock 09 // and calling notify every time in the normal run of the loop. 10 if (blockNewEvents) { 11 synchronized (waitForDrained) { 12 if (drained) { 13 waitForDrained.notify(); 14 } 15 } 16 } 17 Event event; 18 try { 19 event = eventQueue.take(); // 從隊列取出事件Event 20 } catch(InterruptedException ie) { 21 if (!stopped) { 22 LOG.warn("AsyncDispatcher thread interrupted", ie); 23 } 24 return; 25 } 26 if (event != null) { 27 dispatch(event); // 分發處理該有效事件Event 28 } 29 } 30 } 31 }; 32 }
能夠看到,從eventQueue隊列中取出Event,而後調用dispatch(event);來處理事件,看dispatch(event)方法,以下所示:
01 @SuppressWarnings("unchecked") 02 protected void dispatch(Event event) { 03 //all events go thru this loop 04 if (LOG.isDebugEnabled()) { 05 LOG.debug("Dispatching the event " + event.getClass().getName() + "." 06 + event.toString()); 07 } 08 09 Class<? extends Enum> type = event.getType().getDeclaringClass(); 10 11 try{ 12 EventHandler handler = eventDispatchers.get(type); // 經過event獲取到事件類型,再根據事件類型獲取到已經註冊的EventHandler 13 if(handler != null) { 14 handler.handle(event); // 使用對應的EventHandler處理事件event 15 } else { 16 throw new Exception("No handler for registered for " + type); 17 } 18 } catch (Throwable t) { 19 //TODO Maybe log the state of the queue 20 LOG.fatal("Error in dispatcher thread", t); 21 // If serviceStop is called, we should exit this thread gracefully. 22 if (exitOnDispatchException 23 && (ShutdownHookManager.get().isShutdownInProgress()) == false 24 && stopped == false) { 25 LOG.info("Exiting, bbye.."); 26 System.exit(-1); 27 } 28 } 29 }
能夠看到,根據已經註冊的Map<Class, EventHandler> eventDispatchers表,選擇對應的EventHandler來執行實際的事件處理邏輯。這裏,再看看這個EventHandler是在哪裏住的。前面已經看到,NMLivelinessMonitor類的expire方法中,傳入的是new RMNodeEvent(id, RMNodeEventType.EXPIRE),咱們再查看ResourceManager.RMActiveServices.serviceInit()方法:
// Register event handler for RmNodes 2 rmDispatcher.register( 3 RMNodeEventType.class, new NodeEventDispatcher(rmContext)); // 註冊:事件類型RMNodeEventType,EventHandler實現類NodeEventDispatcher
可見RMNodeEventType類型的事件是使用ResourceManager.NodeEventDispatcher這個EventHandler來處理的,同時它也是一個Dispatcher,如今再看NodeEventDispatcher的實現:
01 @Private 02 public static final class NodeEventDispatcher implements 03 EventHandler<RMNodeEvent> { 04 05 private final RMContext rmContext; 06 07 public NodeEventDispatcher(RMContext rmContext) { 08 this.rmContext = rmContext; 09 } 10 11 @Override 12 public void handle(RMNodeEvent event) { 13 NodeId nodeId = event.getNodeId(); 14 RMNode node = this.rmContext.getRMNodes().get(nodeId); // 調用getRMNodes()獲取到一個ConcurrentMap<NodeId, RMNode>,它維護每一個NodeId的狀態(RMNode是一個狀態機對象) 15 if (node != null) { 16 try { 17 ((EventHandler<RMNodeEvent>) node).handle(event); // RMNode的實現爲RMNodeImpl,它也是一個EventHandler 18 } catch (Throwable t) { 19 LOG.error("Error in handling event type " + event.getType() 20 + " for node " + nodeId, t); 21 } 22 } 23 } 24 }
這個裏面尚未真正地去處理,而是基於RMNode狀態機對象來進行轉移處理,因此咱們繼續看RMNode的實現RMNodeImpl,由於前面事件類型RMNodeEventType.EXPIRE,咱們看狀態機建立時對該事件類型的轉移動做是如何註冊的:
1 private static final StateMachineFactory<RMNodeImpl, 02 NodeState, 03 RMNodeEventType, 04 RMNodeEvent> stateMachineFactory 05 = new StateMachineFactory<RMNodeImpl, 06 NodeState, 07 RMNodeEventType, 08 RMNodeEvent>(NodeState.NEW) 09 ... 10 .addTransition(NodeState.RUNNING, NodeState.LOST, 11 RMNodeEventType.EXPIRE, 12 new DeactivateNodeTransition(NodeState.LOST)) 13 ... 14 .addTransition(NodeState.UNHEALTHY, NodeState.LOST, 15 RMNodeEventType.EXPIRE, 16 new DeactivateNodeTransition(NodeState.LOST))
在ResourceManager端維護的NodeManager的信息使用RMNodeImpl來表示(在內存中保存ConcurrentMap),因此當前若是expire方法被調用,RMNodeImpl會根據狀態機對象中已經註冊的前置轉移狀態(pre-transition state)、後置轉移狀態(post-transition state)、事件類型(event type)、轉移Hook程序,來對事件進行處理,並使當前RMNodeImpl的狀態由前置轉移狀態更新爲後置轉移狀態。對於上面代碼,若是當前RMNodeImpl狀態是NodeState.RUNNING,事件爲RMNodeEventType.EXPIRE類型,則會調用Hook程序實現DeactivateNodeTransition,狀態更新爲NodeState.LOST;若是當前RMNodeImpl狀態是NodeState.UNHEALTHY,事件爲RMNodeEventType.EXPIRE類型,則會調用Hook程序實現DeactivateNodeTransition,狀態更新爲NodeState.LOST。具體地,每一個Transition的處理邏輯如何,能夠查看對應的Transition實現代碼。