Ambari深刻學習(II)-實現細節

在第一節中,咱們簡單講了一下Ambari的系統架構。咱們這一節主要分析Ambari的源代碼,總覽Ambari的具體實現方式及其工做細節。node

 1、Ambari-Server啓動

Ambari-Server是一個WEB Server,提供統一的REST API接口,同時向web和agent開放了兩個不一樣的端口(默認前者是8080, 後者是8440或者8441)。它是由Jetty Server容器構建起來的,經過Spring Framework構建出來的WEB服務器,其中大量採用了google提供的Guice註解完成spring框架所須要的注入功能(想想,以前spring框架須要加載一個applicationcontext.xml文件來把bean注入進來,如今能夠用Guice註解的方式就能夠輕鬆完成)。 REST框架由JAX-RS標準來構建。
 
 Ambari-Server接受來自兩處的REST請求,Agent過來的請求處理邏輯由包org.apache.ambari.server.agent處理, 而API所的處理邏輯來自org.apache.ambari.server.api。詳見以下代碼:
Java代碼   收藏代碼
  1. // API handler加載的REST處理包  
  2.  ServletHolder sh = new ServletHolder(ServletContainer.class);  
  3.     //採用包結構的形式進行加載  
  4.       sh.setInitParameter("com.sun.jersey.config.property.resourceConfigClass",  
  5.           "com.sun.jersey.api.core.PackagesResourceConfig");  
  6.     // 下面是/api/v1/接受的請求由哪些包來處理  
  7.       sh.setInitParameter("com.sun.jersey.config.property.packages",  
  8.           "org.apache.ambari.server.api.rest;" +  
  9.               "org.apache.ambari.server.api.services;" +  
  10.               "org.apache.ambari.eventdb.webservice;" +  
  11.               "org.apache.ambari.server.api");  
  12.       sh.setInitParameter("com.sun.jersey.api.json.POJOMappingFeature",  
  13.           "true");  
  14.       root.addServlet(sh, "/api/v1/*");  
  15.       sh.setInitOrder(2);  
  16.   
  17.       // Agent Handler加載的REST包,主要是org.apache.ambari.server.agent.rest.HeartBeatHandler來接受心跳請求  
  18.       ServletHolder agent = new ServletHolder(ServletContainer.class);  
  19.       agent.setInitParameter("com.sun.jersey.config.property.resourceConfigClass",  
  20.           "com.sun.jersey.api.core.PackagesResourceConfig");  
  21.       agent.setInitParameter("com.sun.jersey.config.property.packages",  
  22.           "org.apache.ambari.server.agent.rest;" + "org.apache.ambari.server.api");  
  23.       agent.setInitParameter("com.sun.jersey.api.json.POJOMappingFeature",  
  24.           "true");  
  25.       agentroot.addServlet(agent, "/agent/v1/*");  
  26.       agent.setInitOrder(3);  
  27.   
  28.       // 對agent發過來的數據包進行證書籤名所須要加載的包  
  29.       ServletHolder cert = new ServletHolder(ServletContainer.class);  
  30.       cert.setInitParameter("com.sun.jersey.config.property.resourceConfigClass",  
  31.           "com.sun.jersey.api.core.PackagesResourceConfig");  
  32.       cert.setInitParameter("com.sun.jersey.config.property.packages",  
  33.           "org.apache.ambari.server.security.unsecured.rest;" + "org.apache.ambari.server.api");  
  34.       cert.setInitParameter("com.sun.jersey.api.json.POJOMappingFeature",  
  35.           "true");  
  36.       agentroot.addServlet(cert, "/*");  
  37.       cert.setInitOrder(4);  
  38.      
  39.       // WEB客戶端提供的數據包  
  40.       ServletHolder resources = new ServletHolder(ServletContainer.class);  
  41.       resources.setInitParameter("com.sun.jersey.config.property.resourceConfigClass",  
  42.           "com.sun.jersey.api.core.PackagesResourceConfig");  
  43.       resources.setInitParameter("com.sun.jersey.config.property.packages",  
  44.           "org.apache.ambari.server.resources.api.rest;" + "org.apache.ambari.server.api");  
  45.       resources.setInitParameter("com.sun.jersey.api.json.POJOMappingFeature",  
  46.           "true");  
  47.       root.addServlet(resources, "/resources/*");  
  48.       resources.setInitOrder(6)  
 
 正如上一節所述,Ambari-Server有一個狀態機管理模塊,全部節點的狀態信息更改都最終提供給狀態機進行更改操做,所以狀態機是一個很忙的組件。在Ambari-Server裏面,把每一次更改操做都把它看成是一類事件,採用事件驅動機制完成對應的任務。這種思想有點借鑑已經運用在hadoop 2.x YARN裏面的事件驅動機制。事件驅動機制可以一種高效的異步RPC請求方式,直接調用須要執行相應的代碼邏輯,而事件驅動只須要產生事件統一提交給事件處理器,所以事件驅動須要一個更復雜的有限狀態機結合起來一同使用。

2、Ambari-Server處理Ambari-Agent請求

Agent發送過來的心跳請求由org.apache.ambari.server.agent.HeartBeatHandler.handleHeartBeat(HeartBeat)來處理,執行完後,同時會返回org.apache.ambari.server.agent.HeartBeatResponse給agent。 org.apache.ambari.server.agent.HeartBeat裏面主要含了兩類信息:節點的狀態信息nodeStatus和服務狀態信息componentStatus。
Java代碼   收藏代碼
  1. public HeartBeatResponse handleHeartBeat(HeartBeat heartbeat)  
  2.      throws AmbariException {  
  3.    String hostname = heartbeat.getHostname();  
  4.    Long currentResponseId = hostResponseIds.get(hostname);  
  5.    HeartBeatResponse response;  
  6.    if (currentResponseId == null) {  
  7.      //Server restarted, or unknown host.  
  8.      LOG.error("CurrentResponseId unknown - send register command");  
  9.      return createRegisterCommand();  // 無responseId, 新請求,就進行註冊, responseId =0   
  10.    }  
  11.   
  12.    LOG.info("Received heartbeat from host"  
  13.        + ", hostname=" + hostname  
  14.        + ", currentResponseId=" + currentResponseId  
  15.        + ", receivedResponseId=" + heartbeat.getResponseId());  
  16.   
  17.    if (heartbeat.getResponseId() == currentResponseId - 1) {  
  18.      LOG.warn("Old responseId received - response was lost - returning cached response");  
  19.      return hostResponses.get(hostname);  
  20.    } else if (heartbeat.getResponseId() != currentResponseId) {  
  21.      LOG.error("Error in responseId sequence - sending agent restart command");  
  22.      return createRestartCommand(currentResponseId);   //  心跳是歷史記錄,那麼就要求其重啓,從新註冊,  
  23.                                                        //  responseId 不變  
  24.    }  
  25.   
  26.    response = new HeartBeatResponse();  
  27.    response.setResponseId(++currentResponseId);  // responseId 加 1 , 返回一個新的responseId,下次心跳又要把這個responseId帶回來。  
  28.    Host hostObject = clusterFsm.getHost(hostname);  
  29.   
  30.    if (hostObject.getState().equals(HostState.HEARTBEAT_LOST)) {     // 失去心跳  
  31.      // After loosing heartbeat agent should reregister  
  32.      LOG.warn("Host is in HEARTBEAT_LOST state - sending register command");  
  33.      return createRegisterCommand();   //失去鏈接,要求從新註冊, responseId=0  
  34.    }  
  35.   
  36.    hostResponseIds.put(hostname, currentResponseId);  
  37.    hostResponses.put(hostname, response);  
  38.   
  39.    long now = System.currentTimeMillis();  
  40.    HostState hostState = hostObject.getState();  
  41.    // If the host is waiting for component status updates, notify it  
  42.    if (heartbeat.componentStatus.size() > 0  
  43.        && hostObject.getState().equals(HostState.WAITING_FOR_HOST_STATUS_UPDATES)) {  // 節點已經進行了註冊,可是該節點尚未彙報相關狀態信息,等待服務狀態更新  
  44.      try {  
  45.        LOG.debug("Got component status updates");      
  46.        hostObject.handleEvent(new HostStatusUpdatesReceivedEvent(hostname, now));  // 更新服務狀態機  
  47.      } catch (InvalidStateTransitionException e) {  
  48.        LOG.warn("Failed to notify the host about component status updates", e);  
  49.      }  
  50.    }  
  51.   
  52.    try {  
  53.      if (heartbeat.getNodeStatus().getStatus().equals(HostStatus.Status.HEALTHY)) {  
  54.        hostObject.handleEvent(new HostHealthyHeartbeatEvent(hostname, now,  
  55.            heartbeat.getAgentEnv()));    // 向狀態機發送更新事件,更新節點至正常狀態  
  56.      } else {  
  57.        hostObject.handleEvent(new HostUnhealthyHeartbeatEvent(hostname, now,  
  58.            null));   // 把節點列入不健康  
  59.      }  
  60.      if (hostState != hostObject.getState()) scanner.updateHBaseMaster(hostObject);  // 更新 hbase master狀態,若是該節點上在master節點的話,  
  61.    } catch (InvalidStateTransitionException ex) {  
  62.      LOG.warn("Asking agent to reregister due to " + ex.getMessage(), ex);  
  63.      hostObject.setState(HostState.INIT);   // 出錯,從新註冊  
  64.      return createRegisterCommand();  
  65.    }  
  66.   
  67.    //Examine heartbeat for command reports  
  68.    processCommandReports(heartbeat, hostname, clusterFsm, now);  // 處理狀態更改的彙報信息,看進度到哪一點了。  
  69.   
  70.    // Examine heartbeart for component live status reports  
  71.    processStatusReports(heartbeat, hostname, clusterFsm);  // 處理該節點的服務狀態信息(也稱做爲組件)  
  72.   
  73.    // Send commands if node is active  
  74.    if (hostObject.getState().equals(HostState.HEALTHY)) {  
  75.      sendCommands(hostname, response);  //把該節點的命令AgentCommand組裝起來,統一返回給agent  
  76.    }  
  77.    return response;  
  78.  }  
 
下面咱們學習一下Ambari-Agent是如何處理heartbeat請求的。agent是由Python代碼所寫,每一個節點上都會有一個python的daemon進程與server進行交互。

3、Ambari-Agent執行流程

安裝ambari-agent 服務時會把相應在的python代碼置於python執行的環境上下文中,例如其入口代碼多是/usr/lib/python2.6/site-packages/ambari_agent/main.py,而且進行相關初始化工做(例如驗證參數,與server創建鏈接,初始化安全驗證證書),最後會產生一個新的控制器Controller子線程來統一管理節點的狀態。Controller線程裏面有一個動做隊列ActionQueue線程,而且開啓向Server註冊和發心跳服務。能夠看出來,ambari-agent主要由兩個線程組成,Controller線程向Server發送註冊或心跳請求,請求到的Action數據放到ActionQueue線程裏面,ActionQueue線程維護着兩個隊列:commandQueue和resultQueue。ActionQueue線程會監聽commandQueue的情況。
Python代碼   收藏代碼
  1. class Controller(threading.Thread):    
  2.   def __init__(self, config, range=30):  // 在初始化Controller以前,ambari-agent就會在main.py裏面進行判斷:ambari-server是否正常,正常纔會初始化Controller  
  3.   // 省略初始化代碼  
  4.   def run(self):    
  5.     self.actionQueue = ActionQueue(self.config)  // 初始化隊列線程  
  6.     self.actionQueue.start()  
  7.     self.register = Register(self.config)  // 初始化註冊類   
  8.     self.heartbeat = Heartbeat(self.actionQueue)  // 初始化心跳類  
  9.   
  10.     opener = urllib2.build_opener()  
  11.     urllib2.install_opener(opener)  
  12.   
  13.     while True:  
  14.       self.repeatRegistration = False  
  15.       self.registerAndHeartbeat()    //開始註冊 而且 定時發心跳  
  16.       if not self.repeatRegistration:  
  17.         break  
  18.   
  19.     pass  
 CommandQueue隊列主要有3類command: 
  1. REGISTER_COMMAND:該類命令主要通知agent從新向server發送註冊請求。
  2. STATUS_COMMAND:該類命令主要告訴agent須要向server發送某組件的狀態信息。
  3. EXECUTION_COMMAND:要求agent執行puppet或者軟件集升級任務
ActionQueue線程在執行STATUS_COMMAND時,會經過LiveStatus類構建一個StatusCheck檢測器,而且經過ps命令來檢測該組件是不是活着。
Python代碼   收藏代碼
  1. def getIsLive(self, pidPath):  
  2.     // ....  
  3.     //檢測該組件pid文件是否存在...  
  4.     res = self.sh.run(['ps -p', str(pid), '-f'])   //運行shell命令,檢測該進程是否存在  
  5.     lines = res['output'].strip().split(os.linesep)  
  6.     try:  
  7.       procInfo = lines[1]  
  8.       isLive = not procInfo == None  
  9.     except IndexError:  
  10.       logger.info('Process is dead')  
  11.     return isLive  
 ActionQueue線程在執行EXECUTION_COMMAND任務時,一般是用於執行相關Puppet任務,它會在[agent].prefix目錄下產生一個puppet文件,而後執行puppet apply命令執行一批puppet module文件完成配置更改和節點管理任務。
Python代碼   收藏代碼
  1. def runCommand(self, command, tmpoutfile, tmperrfile):  
  2.     taskId = 0   
  3.     if command.has_key("taskId"):  
  4.       taskId = command['taskId']  
  5.     siteppFileName = os.path.join(self.tmpDir, "site-" + str(taskId) + ".pp") // self.tmpdir是ambari-agent.ini裏面配置的agent.prefix參數, site-{taskId:int}.pp文件裏面主要是一組服務的配置參數  
  6.     generateManifest(command, siteppFileName, self.modulesdir, self.config)  //生成一個puppet配置文件  
  7.     result = self.run_manifest(command, siteppFileName, tmpoutfile, tmperrfile) // 會根據command命令裏repo_info參數值,執行相應的puppet命令  
  8.     return result  
相關文章
相關標籤/搜索