在第一節中,咱們簡單講了一下Ambari的系統架構。咱們這一節主要分析Ambari的源代碼,總覽Ambari的具體實現方式及其工做細節。node
1、Ambari-Server啓動
Ambari-Server是一個WEB Server,提供統一的REST API接口,同時向web和agent開放了兩個不一樣的端口(默認前者是8080, 後者是8440或者8441)。它是由Jetty Server容器構建起來的,經過Spring Framework構建出來的WEB服務器,其中大量採用了google提供的Guice註解完成spring框架所須要的注入功能(想想,以前spring框架須要加載一個applicationcontext.xml文件來把bean注入進來,如今能夠用Guice註解的方式就能夠輕鬆完成)。 REST框架由JAX-RS標準來構建。
Ambari-Server接受來自兩處的REST請求,Agent過來的請求處理邏輯由包org.apache.ambari.server.agent處理, 而API所的處理邏輯來自org.apache.ambari.server.api。詳見以下代碼:
- ServletHolder sh = new ServletHolder(ServletContainer.class);
-
- sh.setInitParameter("com.sun.jersey.config.property.resourceConfigClass",
- "com.sun.jersey.api.core.PackagesResourceConfig");
-
- sh.setInitParameter("com.sun.jersey.config.property.packages",
- "org.apache.ambari.server.api.rest;" +
- "org.apache.ambari.server.api.services;" +
- "org.apache.ambari.eventdb.webservice;" +
- "org.apache.ambari.server.api");
- sh.setInitParameter("com.sun.jersey.api.json.POJOMappingFeature",
- "true");
- root.addServlet(sh, "/api/v1/*");
- sh.setInitOrder(2);
-
-
- ServletHolder agent = new ServletHolder(ServletContainer.class);
- agent.setInitParameter("com.sun.jersey.config.property.resourceConfigClass",
- "com.sun.jersey.api.core.PackagesResourceConfig");
- agent.setInitParameter("com.sun.jersey.config.property.packages",
- "org.apache.ambari.server.agent.rest;" + "org.apache.ambari.server.api");
- agent.setInitParameter("com.sun.jersey.api.json.POJOMappingFeature",
- "true");
- agentroot.addServlet(agent, "/agent/v1/*");
- agent.setInitOrder(3);
-
-
- ServletHolder cert = new ServletHolder(ServletContainer.class);
- cert.setInitParameter("com.sun.jersey.config.property.resourceConfigClass",
- "com.sun.jersey.api.core.PackagesResourceConfig");
- cert.setInitParameter("com.sun.jersey.config.property.packages",
- "org.apache.ambari.server.security.unsecured.rest;" + "org.apache.ambari.server.api");
- cert.setInitParameter("com.sun.jersey.api.json.POJOMappingFeature",
- "true");
- agentroot.addServlet(cert, "/*");
- cert.setInitOrder(4);
-
-
- ServletHolder resources = new ServletHolder(ServletContainer.class);
- resources.setInitParameter("com.sun.jersey.config.property.resourceConfigClass",
- "com.sun.jersey.api.core.PackagesResourceConfig");
- resources.setInitParameter("com.sun.jersey.config.property.packages",
- "org.apache.ambari.server.resources.api.rest;" + "org.apache.ambari.server.api");
- resources.setInitParameter("com.sun.jersey.api.json.POJOMappingFeature",
- "true");
- root.addServlet(resources, "/resources/*");
- resources.setInitOrder(6)
正如上一節所述,Ambari-Server有一個狀態機管理模塊,全部節點的狀態信息更改都最終提供給狀態機進行更改操做,所以狀態機是一個很忙的組件。在Ambari-Server裏面,把每一次更改操做都把它看成是一類事件,採用事件驅動機制完成對應的任務。這種思想有點借鑑已經運用在hadoop 2.x YARN裏面的事件驅動機制。事件驅動機制可以一種高效的異步RPC請求方式,直接調用須要執行相應的代碼邏輯,而事件驅動只須要產生事件統一提交給事件處理器,所以事件驅動須要一個更復雜的有限狀態機結合起來一同使用。
2、Ambari-Server處理Ambari-Agent請求
Agent發送過來的心跳請求由org.apache.ambari.server.agent.HeartBeatHandler.handleHeartBeat(HeartBeat)來處理,執行完後,同時會返回org.apache.ambari.server.agent.HeartBeatResponse給agent。 org.apache.ambari.server.agent.HeartBeat裏面主要含了兩類信息:節點的狀態信息nodeStatus和服務狀態信息componentStatus。
- public HeartBeatResponse handleHeartBeat(HeartBeat heartbeat)
- throws AmbariException {
- String hostname = heartbeat.getHostname();
- Long currentResponseId = hostResponseIds.get(hostname);
- HeartBeatResponse response;
- if (currentResponseId == null) {
-
- LOG.error("CurrentResponseId unknown - send register command");
- return createRegisterCommand();
- }
-
- LOG.info("Received heartbeat from host"
- + ", hostname=" + hostname
- + ", currentResponseId=" + currentResponseId
- + ", receivedResponseId=" + heartbeat.getResponseId());
-
- if (heartbeat.getResponseId() == currentResponseId - 1) {
- LOG.warn("Old responseId received - response was lost - returning cached response");
- return hostResponses.get(hostname);
- } else if (heartbeat.getResponseId() != currentResponseId) {
- LOG.error("Error in responseId sequence - sending agent restart command");
- return createRestartCommand(currentResponseId);
-
- }
-
- response = new HeartBeatResponse();
- response.setResponseId(++currentResponseId);
- Host hostObject = clusterFsm.getHost(hostname);
-
- if (hostObject.getState().equals(HostState.HEARTBEAT_LOST)) {
-
- LOG.warn("Host is in HEARTBEAT_LOST state - sending register command");
- return createRegisterCommand();
- }
-
- hostResponseIds.put(hostname, currentResponseId);
- hostResponses.put(hostname, response);
-
- long now = System.currentTimeMillis();
- HostState hostState = hostObject.getState();
-
- if (heartbeat.componentStatus.size() > 0
- && hostObject.getState().equals(HostState.WAITING_FOR_HOST_STATUS_UPDATES)) {
- try {
- LOG.debug("Got component status updates");
- hostObject.handleEvent(new HostStatusUpdatesReceivedEvent(hostname, now));
- } catch (InvalidStateTransitionException e) {
- LOG.warn("Failed to notify the host about component status updates", e);
- }
- }
-
- try {
- if (heartbeat.getNodeStatus().getStatus().equals(HostStatus.Status.HEALTHY)) {
- hostObject.handleEvent(new HostHealthyHeartbeatEvent(hostname, now,
- heartbeat.getAgentEnv()));
- } else {
- hostObject.handleEvent(new HostUnhealthyHeartbeatEvent(hostname, now,
- null));
- }
- if (hostState != hostObject.getState()) scanner.updateHBaseMaster(hostObject);
- } catch (InvalidStateTransitionException ex) {
- LOG.warn("Asking agent to reregister due to " + ex.getMessage(), ex);
- hostObject.setState(HostState.INIT);
- return createRegisterCommand();
- }
-
-
- processCommandReports(heartbeat, hostname, clusterFsm, now);
-
-
- processStatusReports(heartbeat, hostname, clusterFsm);
-
-
- if (hostObject.getState().equals(HostState.HEALTHY)) {
- sendCommands(hostname, response);
- }
- return response;
- }
下面咱們學習一下Ambari-Agent是如何處理heartbeat請求的。agent是由Python代碼所寫,每一個節點上都會有一個python的daemon進程與server進行交互。
3、Ambari-Agent執行流程
安裝ambari-agent 服務時會把相應在的python代碼置於python執行的環境上下文中,例如其入口代碼多是/usr/lib/python2.6/site-packages/ambari_agent/main.py,而且進行相關初始化工做(例如驗證參數,與server創建鏈接,初始化安全驗證證書),最後會產生一個新的控制器Controller子線程來統一管理節點的狀態。Controller線程裏面有一個動做隊列ActionQueue線程,而且開啓向Server註冊和發心跳服務。能夠看出來,ambari-agent主要由兩個線程組成,Controller線程向Server發送註冊或心跳請求,請求到的Action數據放到ActionQueue線程裏面,ActionQueue線程維護着兩個隊列:commandQueue和resultQueue。ActionQueue線程會監聽commandQueue的情況。
- class Controller(threading.Thread):
- def __init__(self, config, range=30): // 在初始化Controller以前,ambari-agent就會在main.py裏面進行判斷:ambari-server是否正常,正常纔會初始化Controller
- // 省略初始化代碼
- def run(self):
- self.actionQueue = ActionQueue(self.config) // 初始化隊列線程
- self.actionQueue.start()
- self.register = Register(self.config) // 初始化註冊類
- self.heartbeat = Heartbeat(self.actionQueue) // 初始化心跳類
-
- opener = urllib2.build_opener()
- urllib2.install_opener(opener)
-
- while True:
- self.repeatRegistration = False
- self.registerAndHeartbeat() //開始註冊 而且 定時發心跳
- if not self.repeatRegistration:
- break
-
- pass
CommandQueue隊列主要有3類command:
- REGISTER_COMMAND:該類命令主要通知agent從新向server發送註冊請求。
- STATUS_COMMAND:該類命令主要告訴agent須要向server發送某組件的狀態信息。
- EXECUTION_COMMAND:要求agent執行puppet或者軟件集升級任務
ActionQueue線程在執行STATUS_COMMAND時,會經過LiveStatus類構建一個StatusCheck檢測器,而且經過ps命令來檢測該組件是不是活着。
- def getIsLive(self, pidPath):
- // ....
- //檢測該組件pid文件是否存在...
- res = self.sh.run(['ps -p', str(pid), '-f']) //運行shell命令,檢測該進程是否存在
- lines = res['output'].strip().split(os.linesep)
- try:
- procInfo = lines[1]
- isLive = not procInfo == None
- except IndexError:
- logger.info('Process is dead')
- return isLive
ActionQueue線程在執行EXECUTION_COMMAND任務時,一般是用於執行相關Puppet任務,它會在[agent].prefix目錄下產生一個puppet文件,而後執行puppet apply命令執行一批puppet module文件完成配置更改和節點管理任務。
- def runCommand(self, command, tmpoutfile, tmperrfile):
- taskId = 0
- if command.has_key("taskId"):
- taskId = command['taskId']
- siteppFileName = os.path.join(self.tmpDir, "site-" + str(taskId) + ".pp") // self.tmpdir是ambari-agent.ini裏面配置的agent.prefix參數, site-{taskId:int}.pp文件裏面主要是一組服務的配置參數
- generateManifest(command, siteppFileName, self.modulesdir, self.config) //生成一個puppet配置文件
- result = self.run_manifest(command, siteppFileName, tmpoutfile, tmperrfile) // 會根據command命令裏repo_info參數值,執行相應的puppet命令
- return result