Hadoop YARN架構設計要點

YARN是開源項目Hadoop的一個資源管理系統,最初設計是爲了解決Hadoop中MapReduce計算框架中的資源管理問題,可是如今它已是一個更加通用的資源管理系統,能夠把MapReduce計算框架做爲一個應用程序運行在YARN系統之上,經過YARN來管理資源。若是你的應用程序也須要藉助YARN的資源管理功能,你也能夠實現YARN提供的編程API,將你的應用程序運行於YARN之上,將資源的分配與回收統一交給YARN去管理,能夠大大簡化資源管理功能的開發。當前,也有不少應用程序已經能夠構建於YARN之上,如Storm、Spark等計算框架。node

YARN總體架構算法

YARN是基於Master/Slave模式的分佈式架構,咱們先看一下,YARN的架構設計,如圖所示(來自官網文檔):編程

 



上圖,從邏輯上定義了YARN系統的核心組件和主要交互流程,各個組件說明以下:安全

  • YARN Client

YARN Client提交Application到RM,它會首先建立一個Application上下文件對象,並設置AM必需的資源請求信息,而後提交到RM。YARN Client也能夠與RM通訊,獲取到一個已經提交併運行的Application的狀態信息等,具體詳見後面ApplicationClientProtocol協議的分析說明。網絡

  • ResourceManager(RM)

RM是YARN集羣的Master,負責管理整個集羣的資源和資源分配。RM做爲集羣資源的管理和調度的角色,若是存在單點故障,則整個集羣的資源都沒法使用。在2.4.0版本才新增了RM HA的特性,這樣就增長了RM的可用性。架構

  • NodeManager(NM)

NM是YARN集羣的Slave,是集羣中實際擁有實際資源的工做節點。咱們提交Job之後,會將組成Job的多個Task調度到對應的NM上進行執行。Hadoop集羣中,爲了得到分佈式計算中的Locality特性,會將DN和NM在同一個節點上運行,這樣對應的HDFS上的Block可能就在本地,而無需在網絡間進行數據的傳輸。app

  • Container

Container是YARN集羣中資源的抽象,將NM上的資源進行量化,根據須要組裝成一個個Container,而後服務於已受權資源的計算任務。計算任務在完成計算後,系統會回收資源,以供後續計算任務申請使用。Container包含兩種資源:內存和CPU,後續Hadoop版本可能會增長硬盤、網絡等資源。框架

  • ApplicationMaster(AM)

AM主要管理和監控部署在YARN集羣上的Application,以MapReduce爲例,MapReduce Application是一個用來處理MapReduce計算的服務框架程序,爲用戶編寫的MapReduce程序提供運行時支持。一般咱們在編寫的一個MapReduce程序可能包含多個Map Task或Reduce Task,而各個Task的運行管理與監控都是由這個MapReduce Application來負責,好比運行Task的資源申請,由AM向RM申請;啓動/中止NM上某Task的對應的Container,由AM向NM請求來完成。異步

下面,咱們基於Hadoop 2.6.0的YARN源碼,來探討YARN內部實現原理。分佈式

YARN協議

YARN是一個分佈式資源管理系統,它包含了分佈的多個組件,咱們能夠經過這些組件之間設計的交互協議來講明,如圖所示:

 


下面咱們來詳細看看各個協議實現的功能:

  • ApplicationClientProtocol(Client -> RM)
協議方法 功能描述
getNewApplication 獲取一個新的ApplicationId,例如返回的ApplicationId爲application_1418024756741
submitApplication 提交一個Application到RM
forceKillApplication 終止一個已經提交的Application
getApplicationReport 獲取一個Application的狀態報告信息ApplicationReport,包括用戶、隊列、名稱、AM所在節點、AM的RPC端口、跟蹤URL、AM狀態、診斷信息(若是出錯的話)、啓動時間、提交Application的Client(若是啓用安全策略)
getClusterMetrics 獲取YARN集羣信息,如節點數量
getApplications 獲取Application狀態報告信息,和getApplicationReport相似,只不過增長了過濾器功能
getClusterNodes 獲取集羣內全部節點的狀態報告信息
getQueueInfo 獲取隊列信息
getQueueUserAcls 獲取當前用戶的隊列ACL信息
getDelegationToken 獲取訪問令牌信息,用於Container與RM端服務交互
renewDelegationToken 更新已存在的訪問令牌信息
cancelDelegationToken 取消訪問令牌
moveApplicationAcrossQueues 將Application移動到另外一個隊列中
getApplicationAttemptReport 獲取Application Attempt狀態報告信息ApplicationAttemptReport
getApplicationAttemptReport 獲取Application Attempt狀態報告信息,和getApplicationAttemptReport相似,只不過增長了過濾器功能
getContainerReport 根據ContainerId獲取Container狀態報告信息ContainerReport,例如Container名稱爲container_e17_1410901177871_0001_01_000005,各個段的含義:container_e<epoch>_<clusterTimestamp>_<appId>_<attemptId>_<containerId>
getContainers 根據ApplicationAttemptId獲取一個Application Attempt所使用的Container的狀態報告信息,例如Container名稱爲container_1410901177871_0001_01_000005
submitReservation 預約資源,以備在特殊狀況下可以從集羣獲取到資源來運行程序,例如預留出資源供AM啓動
updateReservation 更新預約資源
deleteReservation 刪除預約
getNodeToLabels 獲取節點對應的Label集合
getClusterNodeLabels 獲取集羣中全部節點的Label
  • ResourceTracker(NM -> RM)
協議方法 功能描述
registerNodeManager NM向RM註冊
nodeHeartbeat NM向RM發送心跳狀態報告
  • ApplicationMasterProtocol(AM -> RM)
協議方法 功能描述
registerApplicationMaster AM向RM註冊
finishApplicationMaster AM通知RM已經完成(成功/失敗)
allocate AM向RM申請資源
  • ContainerManagementProtocol(AM -> NM)
協議方法 功能描述
startContainers AM向NM請求啓動Container
stopContainers AM向NM請求中止Container
getContainerStatuses AM向NM請求查詢當前Container的狀態
  • ResourceManagerAdministrationProtocol(RM Admin -> RM)
協議方法 功能描述
getGroupsForUser 獲取用戶所在用戶組,該協議繼承自GetUserMappingsProtocol
refreshQueues 刷新隊列配置
refreshNodes 刷新節點配置
refreshSuperUserGroupsConfiguration 刷新超級用戶組配置
refreshUserToGroupsMappings 刷新用戶->用戶組映射信息
refreshAdminAcls 刷新Admin的ACL信息
refreshServiceAcls 刷新服務級別信息(SLA)
updateNodeResource 更新在RM端維護的RMNode資源信息
addToClusterNodeLabels 向集羣中節點添加Label
removeFromClusterNodeLabels 移除集羣中節點Label
replaceLabelsOnNode 替換集羣中節點Label
  • HAServiceProtocol(Active RM HA Framework Standby RM)
協議方法 功能描述
monitorHealth HA Framework監控服務的健康狀態
transitionToActive 使RM轉移到Active狀態
transitionToStandby 使RM轉移到Standby狀態
getServiceStatus 獲取服務狀態信息

YARN RPC實現

1.X版本的Hadoop使用默認實現的Writable協議做爲RPC協議,而在2.X版本,重寫了RPC框架,改爲默認使用Protobuf協議做爲Hadoop的默認RPC通訊協議。 YARN RPC的實現,以下面類圖所示:

 


經過上圖能夠看出,RpcEngine有兩個實現:WritableRpcEngine和ProtobufRpcEngine,默認使用ProtobufRpcEngine,咱們能夠選擇使用1.X默認的RPC通訊協議,甚至能夠自定義實現。

ResourceManager內部原理

RM是YARN分佈式系統的主節點,ResourceManager服務進程內部有不少組件提供其餘服務,包括對外RPC服務,已經維護內部一些對象狀態的服務等,RM的內部結構如圖所示:

 


上圖中RM內部各個組件(Dispatcher/EventHandler/Service)的功能,能夠查看源碼。
這裏,說一下ResourceScheduler組件,它是RM內部最重要的一個組件,用它來實現資源的分配與回收,它提供了必定算法,在運行時能夠根據算法提供的策略來對資源進行調度。YARN內部有3種資源調度策略的實現:FifoScheduler、FairScheduler、CapacityScheduler,其中默認實現爲CapacityScheduler。CapacityScheduler實現了資源更加細粒度的分配,能夠設置多級隊列,每一個隊列都有必定的容量,即對隊列設置資源上限和下限,而後對每一級別隊列分別再採用合適的調度策略(如FIFO)進行調度。
若是咱們想實現本身的資源調度策略,能夠直接實現YARN的資源調度接口ResourceScheduler,而後修改yarn-site.xml中的配置項yarn.resourcemanager.scheduler.class便可。

NodeManager內部原理

NM是YARN系統中實際持有資源的從節點,也是實際用戶程序運行的宿主節點,內部結構如圖所示:

 


上圖中NM內部各個組件(Dispatcher/EventHandler/Service)的功能,能夠查看源碼,再也不累述。

事件處理機制

事件處理能夠分紅2大類,一類是同步處理事件,事件處理過程會阻塞調用進程,一般這樣的事件處理邏輯很是簡單,不會長時間阻塞;另外一類就是異步處理處理事件,一般在接收到事件之後,會有一個用來派發事件的Dispatcher,將事件發到對應的事件隊列中,這採用生產者-消費者模式,消費者這會監視着隊列,並從取出事件進行異步處理。
YARN中處處能夠見到事件處理,其中比較特殊一點的就是將狀態機(StateMachine)做爲一個事件處理器,從而經過事件來觸發特定對象狀態的變遷,經過這種方式來管理對象狀態。咱們先看一下YARN中事件處理的機制,以ResourceManager端爲例,以下圖所示:

 


產生的事件經過Dispatcher進行派發並進行處理,若是EventHandler處理邏輯比較簡單,直接同步處理,不然可能會採用異步處理的方式。在EventHandler處理的過程當中,還可能產生新的事件Event,而後再次經過RM的Dispatcher進行派發,然後處理。

狀態機

咱們以RM端管理的RMAppImpl對象爲例,它表示一個Application運行過程當中,在RM端的所維護的Application的狀態,該對象對應的全部狀態及其狀態轉移路徑,以下圖所示:

 


在上圖中若是加上觸發狀態轉移的事件及其類型,可能整個圖會顯得很亂,因此這裏,我詳細畫了一個分圖,用來講明,每個狀態的變化都是有哪一種類型的事件觸發的,根據這個圖,能夠方便地閱讀源碼,以下圖所示:
RMAppImpl

NMLivelinessMonitor源碼分析實例

YARN主要採用了Dispatcher+EventHandler+Service這樣的抽象,將全部的內部/外部組件採用這種機制來實現,因爲存在不少的Service和EventHandler,並且有的組件可能既是一個Service,同時仍是一個EventHandler,因此在閱讀代碼的時候可能會感受迷茫,這裏我給出了一個閱讀NMLivelinessMonitor服務的實例,僅供想研究源碼的人蔘考。
NMLivelinessMonitor是ResourceManager端的一個監控服務實現,它主要是用來監控註冊的節點的Liveliness狀態,這裏是監控NodeManager的狀態。該服務會週期性地檢查NodeManager的心跳信息來確保註冊到ResourceManager的NodeManager當前處於活躍狀態,能夠執行資源分配以及處理計算任務,在NMLivelinessMonitor類繼承的抽象泛型類AbstractLivelinessMonitor中有一個Map,以下所示:

private Map<O, Long> running = new HashMap<O, Long>();

這裏面O被替換成了NodeId,而值類型Long表示時間戳,也就是表達了一個NodeManager向ResourceManager最後發送心跳信息時間戳,經過檢測running中的時間戳;來判斷NodeManager是否能夠正常使用。

在ResourceManager中能夠看到,NMLivelinessMonitor的實例是其一個成員:

protected NMLivelinessMonitor nmLivelinessMonitor;

看一下NMLivelinessMonitor類的實現,它繼承自抽象泛型類AbstractLivelinessMonitor,看NMLivelinessMonitor類的聲明:

public class NMLivelinessMonitor extends AbstractLivelinessMonitor<NodeId>
View Code
在類實現中,有一個重寫(@Override)的protected的方法expire,以下所示:
1    @Override
2    protected void expire(NodeId id) {
3      dispatcher.handle(
4          new RMNodeEvent(id, RMNodeEventType.EXPIRE));
5    }
View Code

咱們能夠經過該類NMLivelinessMonitor抽象基類中看到調用expire方法的邏輯,是在一個內部線程類PingChecker中,代碼以下所示:

01    private class PingChecker implements Runnable {
02     
03      @Override
04      public void run() {
05        while (!stopped && !Thread.currentThread().isInterrupted()) {
06          synchronized (AbstractLivelinessMonitor.this) {
07            Iterator<Map.Entry<O, Long>> iterator =
08              running.entrySet().iterator();
09     
10            //avoid calculating current time everytime in loop
11            long currentTime = clock.getTime();
12     
13            while (iterator.hasNext()) {
14              Map.Entry<O, Long> entry = iterator.next();
15              if (currentTime > entry.getValue() + expireInterval) {
16                iterator.remove();
17                expire(entry.getKey()); // 調用抽象方法expire,會在子類中實現
18                LOG.info("Expired:" + entry.getKey().toString() +
19                        " Timed out after " + expireInterval/1000 + " secs");
20              }
21            }
22          }
23          try {
24            Thread.sleep(monitorInterval);
25          } catch (InterruptedException e) {
26            LOG.info(getName() + " thread interrupted");
27            break;
28          }
29        }
30      }
31    }
View Code

這裏面的泛型O在NMLivelinessMonitor類中就是NodeId,因此最關心的邏輯就是前面提到的NMLivelinessMonitor中的expire方法的實現。在expire方法中,調用了dispatcher的handle方法來處理,因此dispatcher應該是一個EventHandler對象,後面咱們會看到,它實際上是經過ResourceManager中的dispatcher成員,也就是AsyncDispatcher來獲取到的(AsyncDispatcher內部有一個組合而成的EventHandler)。
下面,咱們接着看NMLivelinessMonitor是如何建立的,在ResourceManager.RMActiveServices類的serviceInit()方法中,代碼以下所示:

1    nmLivelinessMonitor = createNMLivelinessMonitor();
2    addService(nmLivelinessMonitor);
View Code

跟蹤代碼繼續看createNMLivelinessMonitor方法,以下所示:

1    private NMLivelinessMonitor createNMLivelinessMonitor() {
2      return new NMLivelinessMonitor(this.rmContext
3          .getDispatcher());
4    }
View Code

上面經過rmContext的getDispatcher獲取到一個Dispatcher對象,來做爲NMLivelinessMonitor構造方法的參數,咱們須要看一下這個Dispatcher是如何建立的,查看ResourceManager.serviceInit方法,代碼以下所示:

1    rmDispatcher = setupDispatcher();
2    addIfService(rmDispatcher);
3    rmContext.setDispatcher(rmDispatcher);
View Code

繼續跟蹤代碼,setupDispatcher()方法實現以下所示:

1    private Dispatcher setupDispatcher() {
2      Dispatcher dispatcher = createDispatcher();
3      dispatcher.register(RMFatalEventType.class,
4          new ResourceManager.RMFatalEventDispatcher());
5      return dispatcher;
6    }
View Code

繼續看createDispatcher()方法代碼實現:

1    protected Dispatcher createDispatcher() {
2      return new AsyncDispatcher();
3    }
View Code

能夠看到,在這裏建立了一個AsyncDispatcher對象在建立的NMLivelinessMonitor實例中包含一個AsyncDispatcher實例。回到前面,咱們須要知道這個AsyncDispatcher調用getEventHandler()返回的EventHandler的處理邏輯是如何的,NMLivelinessMonitor的代碼實現以下所示:

01    public class NMLivelinessMonitor extends AbstractLivelinessMonitor<NodeId> {
02     
03      private EventHandler dispatcher;
04       
05      public NMLivelinessMonitor(Dispatcher d) {
06        super("NMLivelinessMonitor", new SystemClock());
07        this.dispatcher = d.getEventHandler(); // 調用AsyncDispatcher的getEventHandler()方法獲取EventHandler
08      }
09     
10      public void serviceInit(Configuration conf) throws Exception {
11        int expireIntvl = conf.getInt(YarnConfiguration.RM_NM_EXPIRY_INTERVAL_MS,
12                YarnConfiguration.DEFAULT_RM_NM_EXPIRY_INTERVAL_MS);
13        setExpireInterval(expireIntvl);
14        setMonitorInterval(expireIntvl/3);
15        super.serviceInit(conf);
16      }
17     
18      @Override
19      protected void expire(NodeId id) {
20        dispatcher.handle(
21            new RMNodeEvent(id, RMNodeEventType.EXPIRE));
22      }
23    }
View Code

查看AsyncDispatcher類的getEventHandler()方法,代碼以下所示:

1    @Override
2    public EventHandler getEventHandler() {
3      if (handlerInstance == null) {
4        handlerInstance = new GenericEventHandler();
5      }
6      return handlerInstance;
7    }
View Code

可見,這裏面不管是第一次調用仍是其餘對象已經調用過該方法,這裏面最終只有一個GenericEventHandler實例做爲這個dispatcher的內部EventHandler實例,因此繼續跟蹤代碼,看GenericEventHandler實現,以下所示:

01    class GenericEventHandler implements EventHandler<Event> {
02      public void handle(Event event) {
03        if (blockNewEvents) {
04          return;
05        }
06        drained = false;
07     
08        /* all this method does is enqueue all the events onto the queue */
09        int qSize = eventQueue.size();
10        if (qSize !=0 && qSize %1000 == 0) {
11          LOG.info("Size of event-queue is " + qSize);
12        }
13        int remCapacity = eventQueue.remainingCapacity();
14        if (remCapacity < 1000) {
15          LOG.warn("Very low remaining capacity in the event-queue: "
16              + remCapacity);
17        }
18        try {
19          eventQueue.put(event); // 將Event放入到隊列eventQueue中
20        } catch (InterruptedException e) {
21          if (!stopped) {
22            LOG.warn("AsyncDispatcher thread interrupted", e);
23          }
24          throw new YarnRuntimeException(e);
25        }
26      };
27    }
View Code

將傳入handle方法的Event丟進了eventQueue隊列,也就是說GenericEventHandler是基於eventQueue的一個生產者,那麼消費者是AsyncDispatcher內部的另外一個線程,以下所示:

1    @Override
2    protected void serviceStart() throws Exception {
3      //start all the components
4      super.serviceStart();
5      eventHandlingThread = new Thread(createThread()); // 調用建立消費eventQueue隊列中事件的線程
6      eventHandlingThread.setName("AsyncDispatcher event handler");
7      eventHandlingThread.start();
8    }
View Code
 

查看createThread()方法,以下所示:

01    Runnable createThread() {
02      return new Runnable() {
03        @Override
04        public void run() {
05          while (!stopped && !Thread.currentThread().isInterrupted()) {
06            drained = eventQueue.isEmpty();
07            // blockNewEvents is only set when dispatcher is draining to stop,
08            // adding this check is to avoid the overhead of acquiring the lock
09            // and calling notify every time in the normal run of the loop.
10            if (blockNewEvents) {
11              synchronized (waitForDrained) {
12                if (drained) {
13                  waitForDrained.notify();
14                }
15              }
16            }
17            Event event;
18            try {
19              event = eventQueue.take(); // 從隊列取出事件Event
20            } catch(InterruptedException ie) {
21              if (!stopped) {
22                LOG.warn("AsyncDispatcher thread interrupted", ie);
23              }
24              return;
25            }
26            if (event != null) {
27              dispatch(event); // 分發處理該有效事件Event
28            }
29          }
30        }
31      };
32    }
View Code

能夠看到,從eventQueue隊列中取出Event,而後調用dispatch(event);來處理事件,看dispatch(event)方法,以下所示:

01    @SuppressWarnings("unchecked")
02    protected void dispatch(Event event) {
03      //all events go thru this loop
04      if (LOG.isDebugEnabled()) {
05        LOG.debug("Dispatching the event " + event.getClass().getName() + "."
06            + event.toString());
07      }
08     
09      Class<? extends Enum> type = event.getType().getDeclaringClass();
10     
11      try{
12        EventHandler handler = eventDispatchers.get(type); // 經過event獲取到事件類型,再根據事件類型獲取到已經註冊的EventHandler
13        if(handler != null) {
14          handler.handle(event); // 使用對應的EventHandler處理事件event
15        } else {
16          throw new Exception("No handler for registered for " + type);
17        }
18      } catch (Throwable t) {
19        //TODO Maybe log the state of the queue
20        LOG.fatal("Error in dispatcher thread", t);
21        // If serviceStop is called, we should exit this thread gracefully.
22        if (exitOnDispatchException
23            && (ShutdownHookManager.get().isShutdownInProgress()) == false
24            && stopped == false) {
25          LOG.info("Exiting, bbye..");
26          System.exit(-1);
27        }
28      }
29    }
View Code

能夠看到,根據已經註冊的Map<Class, EventHandler> eventDispatchers表,選擇對應的EventHandler來執行實際的事件處理邏輯。這裏,再看看這個EventHandler是在哪裏住的。前面已經看到,NMLivelinessMonitor類的expire方法中,傳入的是new RMNodeEvent(id, RMNodeEventType.EXPIRE),咱們再查看ResourceManager.RMActiveServices.serviceInit()方法:

    // Register event handler for RmNodes
2    rmDispatcher.register(
3        RMNodeEventType.class, new NodeEventDispatcher(rmContext)); // 註冊:事件類型RMNodeEventType,EventHandler實現類NodeEventDispatcher
View Code

可見RMNodeEventType類型的事件是使用ResourceManager.NodeEventDispatcher這個EventHandler來處理的,同時它也是一個Dispatcher,如今再看NodeEventDispatcher的實現:

01    @Private
02    public static final class NodeEventDispatcher implements
03        EventHandler<RMNodeEvent> {
04     
05      private final RMContext rmContext;
06     
07      public NodeEventDispatcher(RMContext rmContext) {
08        this.rmContext = rmContext;
09      }
10     
11      @Override
12      public void handle(RMNodeEvent event) {
13        NodeId nodeId = event.getNodeId();
14        RMNode node = this.rmContext.getRMNodes().get(nodeId); // 調用getRMNodes()獲取到一個ConcurrentMap<NodeId, RMNode>,它維護每一個NodeId的狀態(RMNode是一個狀態機對象)
15        if (node != null) {
16          try {
17            ((EventHandler<RMNodeEvent>) node).handle(event); // RMNode的實現爲RMNodeImpl,它也是一個EventHandler
18          } catch (Throwable t) {
19            LOG.error("Error in handling event type " + event.getType()
20                + " for node " + nodeId, t);
21          }
22        }
23      }
24    }
View Code

這個裏面尚未真正地去處理,而是基於RMNode狀態機對象來進行轉移處理,因此咱們繼續看RMNode的實現RMNodeImpl,由於前面事件類型RMNodeEventType.EXPIRE,咱們看狀態機建立時對該事件類型的轉移動做是如何註冊的:

1      private static final StateMachineFactory<RMNodeImpl,
02                                               NodeState,
03                                               RMNodeEventType,
04                                               RMNodeEvent> stateMachineFactory
05                     = new StateMachineFactory<RMNodeImpl,
06                                               NodeState,
07                                               RMNodeEventType,
08                                               RMNodeEvent>(NodeState.NEW)
09    ...
10         .addTransition(NodeState.RUNNING, NodeState.LOST,
11             RMNodeEventType.EXPIRE,
12             new DeactivateNodeTransition(NodeState.LOST))
13    ...
14         .addTransition(NodeState.UNHEALTHY, NodeState.LOST,
15             RMNodeEventType.EXPIRE,
16             new DeactivateNodeTransition(NodeState.LOST))
View Code

在ResourceManager端維護的NodeManager的信息使用RMNodeImpl來表示(在內存中保存ConcurrentMap),因此當前若是expire方法被調用,RMNodeImpl會根據狀態機對象中已經註冊的前置轉移狀態(pre-transition state)、後置轉移狀態(post-transition state)、事件類型(event type)、轉移Hook程序,來對事件進行處理,並使當前RMNodeImpl的狀態由前置轉移狀態更新爲後置轉移狀態。對於上面代碼,若是當前RMNodeImpl狀態是NodeState.RUNNING,事件爲RMNodeEventType.EXPIRE類型,則會調用Hook程序實現DeactivateNodeTransition,狀態更新爲NodeState.LOST;若是當前RMNodeImpl狀態是NodeState.UNHEALTHY,事件爲RMNodeEventType.EXPIRE類型,則會調用Hook程序實現DeactivateNodeTransition,狀態更新爲NodeState.LOST。具體地,每一個Transition的處理邏輯如何,能夠查看對應的Transition實現代碼。

相關文章
相關標籤/搜索