Ambari架構源碼解析

1. Ambari介紹

  Apache Ambari是一種基於Web的工具,支持Apache Hadoop集羣的供應、管理和監控。Ambari已支持大多數Hadoop組件,包括HDFS、MapReduce、Hive、Pig、 Hbase、Zookeeper、Sqoop和Hcatalog等。前端

1.1 基本概念

1. Resource:Ambari把能夠被管理的資源的抽象爲一個Resource實例,資源能夠包括服務、組件、主機節點等,一個resource實例中包含了一系列該資源的屬性;
2. Property:服務組件的指標名稱;
3. ResourceProvider和PropertyProvider:分別對應Resource和Property的提供方,獲取指標須要先獲取Resource,而後獲取Property對應的metric;
4. Query:Query是Resource的內部對象,表明了對該資源的操做;
5. Request:一個Request表明了對Resource的操做請求,包含http信息及要操做的Resource的實例,Request按照http的請求方式分爲四種:GET、PUT、DELETE、POST;
6. Predicate:一個Predicate表明了一系列表達式,如and、or等;node

1.2 基本組件

Ambari 能夠分爲 5個大的組件,分別是是 Ambari-server 、 Ambari-web 、 Ambari-agent 、 Ambari-metrics-collectorAmbari-metrics-monitor 。python

1. 在集羣的每一臺機器上都會部署 Ambari-agent 程序。 Agent 主要負責接收來着 Server 端的命令,這些命令能夠是安裝、啓動、中止 Hadoop 集羣上的某一服務。同時, agent 端須要向 Ambari-server 端上 報命令執行的結果,是執行成功仍是失敗。 ios

2. Ambari-Server 提供 REST 接口給Agent 和 Web 訪問,用戶甚至能夠不用界面,而是經過 curl 命令來操控集羣。 web

3. Ambari-metric-collector和 Ambari-metrics-monitor 是收集羣中組件 metrics 的模塊。spring

1.3 相關技術

Ambari充分利用了一些已有的優秀開源軟件,巧妙地把它們結合起來,使其在分佈式環境中作到了集羣式服務管理能力、監控能力、展現能力,這些優秀的開源軟件有: 
(1)agent端,採用了puppet管理節點。 
(2)在web端,採用ember.js做爲前端MVC框架和NodeJS相關工具,用handlebars.js做爲頁面渲染引擎,在CSS/HTML方面還用了Bootstrap框架。 
(3)在Server端,採用了Jetty、Spring、JAX-RS等。 
(4)同時利用了Ganglia、Nagios的分佈式監控能力。sql

Ambari架構採用的是Server/Client的模式,主要由兩部分組成:ambari-agent和ambari-server。ambari依賴其它已經成熟的工具,例如其ambari-server 就依賴python,而ambari-agent還同時依賴ruby, puppet,facter等工具,還有它也依賴一些監控工具nagios和ganglia用於監控集羣情況。 
其中: 
1. puppet是分佈式集羣配置管理工具,也是典型的Server/Client模式,可以集中式管理分佈式集羣的安裝配置部署,主要語言是ruby。 
2. facter是用python寫的一個節點資源採集庫,用於採集節點的系統信息,例如OS信息,主機信息等。因爲ambari-agent主要是用python寫的,所以用facter能夠很好地採集到節點信息。shell

2. Ambari項目目錄結構

2.1 整體目錄

ambari-server Ambari的Server程序,主要管理部署在每一個節點上的管理監控程序
ambari-agent 部署在監控節點上運行的管理監控程序
ambari-web Ambari頁面UI的代碼,做爲用戶與Ambari server交互的。
ambari-views 用於擴展Ambari Web UI中的框架
ambari-common Ambari-server 和Ambari-agent 共用的代碼
ambari-metrics 在Ambari所管理的集羣中用來收集、聚合和服務Hadoop和系統計量
contrib 自定義第三方庫
docs 文檔

   

 

 

 

 

 

 

2.2 ambari-server 目錄

目錄 描述
org.apache.ambari.server.api.services 對web接口的入口方法,處理/api/v1/* 的請求
org.apache.ambari.server.controller 對Ambari中cluster的管理處理,如新增host,更service、刪除component等
org.apache.ambari.server.controller.internal 主要存放ResourceProvider和PropertyProvider;
org.apache.ambari.service.orm.* 對數據庫的操做
org.apache.ambari.server.agent.rest 處理與Agent的接口的入口方法
org.apache.ambari.security 使用Spring Security來作權限管理

 每一種Resource都對應一個ResourceProvider,以下表所示:數據庫

 

Resource.Type ResourceProvider
Workflow WorkflowResourceProvider
Job JobResourceProvider
TaskAttempt TaskAttemptResourceProvider
View ViewResourceProvider
ViewInstance ViewInstanceResourceProvider
Blueprint BlueprintResourceProvider
Cluster ClusterResourceProvider
Service ServiceResourceProvider
Component ComponentResourceProvider
Host HostResourceProvider
HostComponent HostComponentResourceProvider
Configuration ConfigurationResourceProvider
Action ActionResourceProvider
Request RequestResourceProvider
Task TaskResourceProvider
User UserResourceProvider
Stack StackResourceProvider
StackVersion StackVersionResourceProvider
StackService StackServiceResourceProvider
StackServiceComponent StackServiceComponentResourceProvider
StackConfiguration StackConfigurationResourceProvider
OperatingSystem OperatingSystemResourceProvider
Repository RepositoryResourceProvider
RootService RootServiceResourceProvider
RootServiceComponent RootServiceComponentResourceProvider
RootServiceHostComponent RootServiceHostComponentResourceProvider
ConfigGroup ConfigGroupResourceProvider
RequestSchedule RequestScheduleResourceProvider  

 

2.3 Ambari-agent目錄

3. Ambari-server

3.1 ambari-server架構

  ambari-server是一個有狀態的,它維護着本身的一個有限狀態機FSM,同時這些狀態機存儲在數據庫中,默認數據庫爲postgressql數據庫。 apache

1. Ambarii-Server提供ambari web,rest api,ambari shell三大方式操做機羣; 
2. ambari將集羣的配置、各個服務的配置等信息存在ambari server端的DB中; 
3. ambari server與ambari agent的交流走RPC,即agent向server報告心跳,server將command經過respons發回給agent,agent本地執行命令,好比:agent端執行相應的python腳本; 
4. ambari有本身的一套監控、告警、鏡像服務,以可插拔的形式供上層服務調用; 

 

 

  Ambari-Server是一個WEB Server,提供統一的REST API接口,同時向web和agent開放了兩個不一樣的端口(默認前者是8080, 後者是8440或者8441)。它是由Jetty Server容器構建起來的,經過Spring Framework構建出來的WEB服務器,其中大量採用了google提供的Guice註解完成spring框架所須要的注入功能,REST服務由JAX-RS標準來實現。


 

以下圖所示,server端主要維護三類狀態: 
1. Live Cluster State:集羣現有狀態,各個節點彙報上來的狀態信息會更改該狀態; 
2. Desired State:用戶但願該節點所處狀態,是用戶在頁面進行了一系列的操做,須要更改某些服務的狀態,這些狀態尚未在節點上產生做用; 
3. Action State:操做狀態,是狀態改變時的請求狀態,也能夠看做是一種中間狀態,這種狀態能夠輔助Live Cluster State向Desired State狀態轉變。

Ambari-server的Heartbeat Handler模塊用於接收各個agent的心跳請求(心跳請求裏面主要包含兩類信息:節點狀態信息和返回的操做結果),把節點狀態信息傳遞給FSM狀態機去維護着該節點的狀態,而且把返回的操做結果信息返回給Action Manager去作進一步的處理。 
Coordinator模塊又能夠稱爲API handler,主要在接收WEB端操做請求後,會檢查它是否符合要求,stage planner分解成一組操做,最後提供給Action Manager去完成執行操做。

  所以,從上圖就能夠看出,Ambari-Server的全部狀態信息的維護和變動都會記錄在數據庫中,用戶作一些更改服務的操做都會在數據庫上作一些相應的記錄,同時,agent經過心跳來得到數據庫的變動歷史。

 


 

  Ambari Server 會讀取 Stack 和 Service 的配置文件。當用 Ambari 建立集羣的時候,Ambari Server 傳送 Stack 和 Service 的配置文件以及 Service 生命週期的控制腳本到 Ambari Agent。Agent 拿到配置文件後,會下載安裝公共源裏軟件包(Redhat,就是使用 yum 服務)。安裝完成後,Ambari Server 會通知 Agent 去啓動 Service。以後 Ambari Server 會按期發送命令到 Agent 檢查 Service 的狀態,Agent 上報給 Server,並呈如今 Ambari 的 GUI 上。 
Ambari Server 支持 Rest API,這樣能夠很容易的擴展和定製化 Ambari。甚至於不用登錄 Ambari 的 GUI,只須要在命令行經過 curl 就能夠控制 Ambari,以及控制 Hadoop 的 cluster。具體的 API 能夠參見 Apache Ambari 的官方網頁 API reference。 

4. Ambari-agent

4.1 ambari-agent架構

ambari-agent是無狀態的,其功能主要分兩部分:

  • 採集所在節點的信息而且彙總發心跳彙報給ambari-server
  • 處理ambari-server的執行請求

所以它有兩種隊列:

  • 消息隊列MessageQueue,或爲ResultQueue。包括節點狀態信息(包括註冊信息)和執行結果信息,而且彙總後經過心跳發送給ambari-server;
  • 操做隊列ActionQueue。用於接收ambari-server返回過來的狀態操做,而後能過執行器按序調用puppet或python腳本等模塊完成任務。 

 

4.2 Ambari-agent引導流程

分別是用SSH和人工手動的非SSH 
步驟: 
1. Ambari Server經過調用bootstrap.py來初始化整個bootstrap進程 
2. Server端經過SSH Keys在Agent上配置Ambari Repo:經過scp 命令將Ambari Server上的ambari.repo文件拷貝到Agent Host上。 
3. 複製Ambari Agent Setup script:利用scp命令將setupAgent.py腳本複製到Agent host上。 
4. 在各個Agent上執行Ambari Agent Setup script:SSH到各個Agent Host上而後執行setupAgent.py。 
5. 在Agent上安裝epel-release:用yum工具來安裝epel-release包 
6. 在Agent上安裝Ambari-agent:用yum工具來安裝Ambari-Agent包 
7. 配置Ambari-agent.ini:修改/etc/ambari-agent/conf/ambari-agent.ini,並設置agent host上的hostname 
8. 啓動Ambari-agent:啓動Ambari-agent進程 
9. 開始Ambari Agent註冊:agent開始registration進程

人工手動引導 
具體步驟內容基本同上

 

4.3 Agent註冊流程

步驟 
1. 鏈接握手端口8441:Ambari Agent鏈接到Ambari Server的握手端口8441。 
2. 下載Server Certification:Ambari Agent下載Server Certification。 
3. 請求籤署Agent Certification:Ambari Agent請求Ambari Server來簽署Agent證書。 
4. 簽署Agent Cert:Ambari Server經過密碼簽署Agent證書。 
5. 下載Agent Cert並斷掉鏈接:Ambari Agent下載Agent證書,而後斷掉以前的鏈接。 
6. 鏈接註冊端口8440:Ambari Agent鏈接到Ambari Server的註冊端口8441 
7. 用Agent Cert執行2WAY auth:在Agent和Server之間完成2WAY權限認證。 
8. 獲取FQDN:Ambari Agent host獲取Fully Qualified Domain Name(FQDN) 
9. 註冊Host:利用FQDN,host向Ambari Server提出註冊。 
10. 完成Host註冊:Ambari Server完成host的註冊過程,把host加入到Ambari數據庫 。
11. Agent心跳程序啓動:Ambari Agent向Ambari Server開啓心跳程序,確認各類命令的執行 。

5. Ambari-web內部架構

Ambari-web使用了一個流行的前端Embar.js MVC框架實現,Embar.js是一個TodoMVC框架,它涵蓋了現今典型的單頁面應用(single page application)幾乎全部的行爲。

使用了nodejs

使用brunch 做爲項目的構建管理工具

Brunch ,是一個超快的HTML5構建工具。它有以下功能:

(1)編譯你的腳本、模板、樣式、連接它們。

(2)將腳本和模板封裝進common.js/AMD模塊裏,連接腳本和樣式。

(3)爲連接文件生成源地圖,複製資源和靜態文件。

(4)經過縮減代碼和優化圖片來收縮輸出,看管你的文件更改。

(5)並經過控制檯和系統提示通知你錯誤。

Nodejs 是一個基於Chrome JavaScript運行時創建的一個平臺,用來方便的搭建快速的易於擴展的網絡應用,NodeJS藉助事件驅動,非阻塞I/O模型變得輕量和高效,很是適合運行在分佈式設備的數據密集型的實時應用。

6. 源碼分析

6.1 ambari-server處理ambari-agent請求

  Agent發送過來的心跳請求由org.apache.ambari.server.agent.HeartBeatHandler.handleHeartBeat(HeartBeat)來處理,執行完後,同時會返回org.apache.ambari.server.agent.HeartBeatResponse給agent。 org.apache.ambari.server.agent.HeartBeat裏面主要含了兩類信息:節點的狀態信息nodeStatus和服務狀態信息componentStatus。

public class HeartBeatHandler {
  ...
    public HeartBeatResponse handleHeartBeat(HeartBeat heartbeat)
      throws AmbariException {
    long now = System.currentTimeMillis();
    if (heartbeat.getAgentEnv() != null && heartbeat.getAgentEnv().getHostHealth() != null) {
      heartbeat.getAgentEnv().getHostHealth().setServerTimeStampAtReporting(now);
    }

    String hostname = heartbeat.getHostname();
    Long currentResponseId = hostResponseIds.get(hostname);
    HeartBeatResponse response;

    if (currentResponseId == null) {
      //Server restarted, or unknown host.
      LOG.error("CurrentResponseId unknown for " + hostname + " - send register command");
      // 無responseId, 新請求,就進行註冊, responseId =0
      return createRegisterCommand();
    }

    LOG.debug("Received heartbeat from host"
        + ", hostname=" + hostname
        + ", currentResponseId=" + currentResponseId
        + ", receivedResponseId=" + heartbeat.getResponseId());

    if (heartbeat.getResponseId() == currentResponseId - 1) {
      LOG.warn("Old responseId received - response was lost - returning cached response");
      return hostResponses.get(hostname);
    } else if (heartbeat.getResponseId() != currentResponseId) {
      LOG.error("Error in responseId sequence - sending agent restart command");
      // 心跳是歷史記錄,那麼就要求其重啓,從新註冊,responseId 不變
      return createRestartCommand(currentResponseId);
    }

    response = new HeartBeatResponse();
    //responseId 加 1 , 返回一個新的responseId,下次心跳又要把這個responseId帶回來。
    response.setResponseId(++currentResponseId);

    Host hostObject;
    try {
      hostObject = clusterFsm.getHost(hostname);
    } catch (HostNotFoundException e) {
      LOG.error("Host: {} not found. Agent is still heartbeating.", hostname);
      if (LOG.isDebugEnabled()) {
        LOG.debug("Host associated with the agent heratbeat might have been " +
          "deleted", e);
      }
      // For now return empty response with only response id.
      return response;
    }
    //失去心跳,要求從新註冊, responseId=0
    if (hostObject.getState().equals(HostState.HEARTBEAT_LOST)) {
      // After loosing heartbeat agent should reregister
      LOG.warn("Host is in HEARTBEAT_LOST state - sending register command");
      return createRegisterCommand();
    }

    hostResponseIds.put(hostname, currentResponseId);
    hostResponses.put(hostname, response);

    // If the host is waiting for component status updates, notify it
    //若是主機正在等待組件狀態更新,請通知它
    //節點已經進行了註冊,可是該節點尚未彙報相關狀態信息,等待服務狀態更新
    if (heartbeat.componentStatus.size() > 0
        && hostObject.getState().equals(HostState.WAITING_FOR_HOST_STATUS_UPDATES)) {
      try {
        LOG.debug("Got component status updates");
        //更新服務狀態機
        hostObject.handleEvent(new HostStatusUpdatesReceivedEvent(hostname, now));
      } catch (InvalidStateTransitionException e) {
        LOG.warn("Failed to notify the host about component status updates", e);
      }
    }

    if (heartbeat.getRecoveryReport() != null) {
      RecoveryReport rr = heartbeat.getRecoveryReport();
      processRecoveryReport(rr, hostname);
    }

    try {
      if (heartbeat.getNodeStatus().getStatus().equals(HostStatus.Status.HEALTHY)) {
        //向狀態機發送更新事件,更新節點至正常狀態
        hostObject.handleEvent(new HostHealthyHeartbeatEvent(hostname, now,
            heartbeat.getAgentEnv(), heartbeat.getMounts()));
      } else { // 把節點列入不健康
        hostObject.handleEvent(new HostUnhealthyHeartbeatEvent(hostname, now, null));
      }
    } catch (InvalidStateTransitionException ex) {
      LOG.warn("Asking agent to re-register due to " + ex.getMessage(), ex);
      hostObject.setState(HostState.INIT);
      return createRegisterCommand();
    }

    /**
     * A host can belong to only one cluster. Though getClustersForHost(hostname)
     * returns a set of clusters, it will have only one entry.
     *主機只能屬於一個集羣。 經過getClustersForHost(hostname)返回一組集羣,它只有一個條目。
     *
     * TODO: Handle the case when a host is a part of multiple clusters.
     * 處理 主機是多個集羣的一部分時的 狀況。
     */
    Set<Cluster> clusters = clusterFsm.getClustersForHost(hostname);

    if (clusters.size() > 0) {
      String clusterName = clusters.iterator().next().getClusterName();

      if (recoveryConfigHelper.isConfigStale(clusterName, hostname, heartbeat.getRecoveryTimestamp())) {
        RecoveryConfig rc = recoveryConfigHelper.getRecoveryConfig(clusterName, hostname);
        response.setRecoveryConfig(rc);

        if (response.getRecoveryConfig() != null) {
          LOG.info("Recovery configuration set to {}", response.getRecoveryConfig().toString());
        }
      }
    }

    heartbeatProcessor.addHeartbeat(heartbeat);

    // Send commands if node is active
    if (hostObject.getState().equals(HostState.HEALTHY)) {
      sendCommands(hostname, response);
      annotateResponse(hostname, response);
    }

    return response;
  }

  ...
}

6.2 Ambari-Agent執行流程

  安裝ambari-agent 服務時會把相應在的python代碼置於python執行的環境上下文中,例如其入口代碼多是/usr/lib/python2.6/site-packages/ambari_agent/main.py,而且進行相關初始化工做(例如驗證參數,與server創建鏈接,初始化安全驗證證書),最後會產生一個新的控制器Controller子線程來統一管理節點的狀態。Controller線程裏面有一個動做隊列ActionQueue線程,而且開啓向Server註冊和發心跳服務。能夠看出來,ambari-agent主要由兩個線程組成,Controller線程向Server發送註冊或心跳請求,請求到的Action數據放到ActionQueue線程裏面,ActionQueue線程維護着兩個隊列:CommandQueue和ResultQueue。ActionQueue線程會監聽CommandQueue的情況。

class Controller(threading.Thread):    
  def __init__(self, config, range=30):  
  // 在初始化Controller以前,ambari-agent就會在main.py裏面進行判斷:ambari-server是否正常,正常纔會初始化Controller  
  // 省略初始化代碼
  def run(self):
    try:
      // 初始化隊列線程
      self.actionQueue = ActionQueue(self.config, controller=self)
      self.actionQueue.start()
      // 初始化註冊類
      self.register = Register(self.config)
      // 初始化心跳類
      self.heartbeat = Heartbeat(self.actionQueue, self.config, self.alert_scheduler_handler.collector())

      opener = urllib2.build_opener()
      urllib2.install_opener(opener)

      while True:
        self.repeatRegistration = False
        //開始註冊 而且 定時發心跳 
        self.registerAndHeartbeat()
        if not self.repeatRegistration:
          logger.info("Finished heartbeating and registering cycle")
          break
    except:
      logger.exception("Controller thread failed with exception:")
      raise

    logger.info("Controller thread has successfully finished")

CommandQueue隊列主要有3類command: 
1. REGISTER_COMMAND:該類命令主要通知agent從新向server發送註冊請求。 
2. STATUS_COMMAND:該類命令主要告訴agent須要向server發送某組件的狀態信息。 
3. EXECUTION_COMMAND:要求agent執行puppet或者軟件集升級任務

3、獲取指標流程:

  1. jersy接口接收到請求,建立一個ResourceInstance實例;
  2. 解析http請求構造一個Request對象,而後交給reques的process()方法來處理;
  3. reques解析url或http_body獲得一個Predicate對象;
  4. 根據http類型獲取handler,GET請求對應ReadHandler;
  5. handler向Query對象中添加分頁、Render、Predicate等屬性後,而後讓query.execute();
  6. 根據Resource.Type得到對應的ResourceProvider對象,調用其getResources方法獲得Set;
  7. 調用對應的PropertyProvider填充Resource;
  8. 處理結果,放回json結果

Ambari-Server啓動

Ambari-Server接受來自兩處的REST請求,Agent過來的請求處理邏輯由包org.apache.ambari.server.agent處理, 而API所的處理邏輯來自org.apache.ambari.server.api。詳見以下代碼:

「`

「`

Ambari-Server有一個狀態機管理模塊,全部節點的狀態信息更改都最終提供給狀態機進行更改操做,所以狀態機是一個很忙的組件。在Ambari-Server裏面,把每一次更改操做都把它看成是一類事件,採用事件驅動機制完成對應的任務。這種思想有點借鑑已經運用在hadoop 2.x YARN裏面的事件驅動機制。事件驅動機制可以一種高效的異步RPC請求方式,直接調用須要執行相應的代碼邏輯,而事件驅動只須要產生事件統一提交給事件處理器,所以事件驅動須要一個更復雜的有限狀態機結合起來一同使用。

相關文章
相關標籤/搜索