Ambari架構源碼解析前端
Ambari是hadoop分佈式集羣配置管理工具,是由hortonworks主導的開源項目。它已經成爲apache基金會的孵化器項目,已經成爲hadoop運維繫統中的得力助手。node
Ambari充分利用了一些已有的優秀開源軟件,巧妙地把它們結合起來,使其在分佈式環境中作到了集羣式服務管理能力、監控能力、展現能力,這些優秀的開源軟件有: python
(1)agent端,採用了puppet管理節點;ios
(2)在web端,採用ember.js做爲前端MVC框架和NodeJS相關工具,用handlebars.js做爲頁面渲染引擎,在CSS/HTML方面還用了Bootstrap框架。web
(3)在Server端,採用了Jetty、Spring、JAX-RS等。spring
(4)同時利用了Ganglia、Nagios的分佈式監控能力。數據庫
Ambari架構採用的是Server/Client的模式,主要由兩部分組成:ambari-agent和ambari-server。ambari依賴其它已經成熟的工具,例如其ambari-server 就依賴python,而ambari-agent還同時依賴ruby, puppet,facter等工具,還有它也依賴一些監控工具nagios和ganglia用於監控集羣情況。其中: apache
1. puppet是分佈式集羣配置管理工具,也是典型的Server/Client模式,可以集中式管理分佈式集羣的安裝配置部署,主要語言是ruby。 json
2. facter是用python寫的一個節點資源採集庫,用於採集節點的系統信息,例如OS信息,主機信息等。因爲ambari-agent主要是用python寫的,所以用facter能夠很好地採集到節點信息。api
Nagios是一款開源的免費網絡監視工具,能有效監控Windows、Linux和Unix的主機狀態,交換機路由器等網絡設備,打印機等。在系統或服務狀態異常時發出郵件或短信報警第一時間通知網站運維人員,在狀態恢復後發出正常的郵件或短信通知。
Ganglia是UC Berkeley發起的一個開源集羣監視項目,設計用於測量數以千計的節點。Ganglia的核心包含gmond、gmetad以及一個Web前端。主要是用來監控系統性能,如:cpu 、mem、硬盤利用率, I/O負載、網絡流量狀況等,經過曲線很容易見到每一個節點的工做狀態,對合理調整、分配系統資源,提升系統總體性能起到重要做用。
ambari-server是一個有狀態的,它維護着本身的一個有限狀態機FSM。同時這些狀態機存儲在數據庫中,前期數據庫主要採用postgres。以下圖所示,server端主要維護三類狀態:
1. Live Cluster State:集羣現有狀態,各個節點彙報上來的狀態信息會更改該狀態;
2. Desired State:用戶但願該節點所處狀態,是用戶在頁面進行了一系列的操做,須要更改某些服務的狀態,這些狀態尚未在節點上產生做用;
3. Action State:操做狀態,是狀態改變時的請求狀態,也能夠看做是一種中間狀態,這種狀態能夠輔助Live Cluster State向Desired State狀態轉變。
Ambari-server的Heartbeat Handler模塊用於接收各個agent的心跳請求(心跳請求裏面主要包含兩類信息:節點狀態信息和返回的操做結果),把節點狀態信息傳遞給FSM狀態機去維護着該節點的狀態,而且把返回的操做結果信息返回給Action Manager去作進一步的處理。
Coordinator模塊又能夠稱爲API handler,主要在接收WEB端操做請求後,會檢查它是否符合要求,stage planner分解成一組操做,最後提供給Action Manager去完成執行操做。
所以,從上圖就能夠看出,Ambari-Server的全部狀態信息的維護和變動都會記錄在數據庫中,用戶作一些更改服務的操做都會在數據庫上作一些相應的記錄,同時,agent經過心跳來得到數據庫的變動歷史。
ambari-agent是一個無狀態的。其功能主要分兩部分:
- 採集所在節點的信息而且彙總發心跳彙報給ambari-server;
- 處理ambari-server的執行請求。
所以它有兩種隊列:
- 消息隊列MessageQueue,或爲ResultQueue。包括節點狀態信息(包括註冊信息)和執行結果信息,而且彙總後經過心跳發送給ambari-server;
- 操做隊列ActionQueue。用於接收ambari-server返回過來的狀態操做,而後能過執行器按序調用puppet或python腳本等模塊完成任務。
Ambari-Server是一個WEB Server,提供統一的REST API接口,同時向web和agent開放了兩個不一樣的端口(默認前者是8080, 後者是8440或者8441)。它是由Jetty Server容器構建起來的,經過Spring Framework構建出來的WEB服務器,其中大量採用了google提供的Guice註解完成spring框架所須要的注入功能(想想,以前spring框架須要加載一個applicationcontext.xml文件來把bean注入進來,如今能夠用Guice註解的方式就能夠輕鬆完成)。 REST框架由JAX-RS標準來構建。
Ambari-Server接受來自兩處的REST請求,Agent過來的請求處理邏輯由包org.apache.ambari.server.agent處理, 而API所的處理邏輯來自org.apache.ambari.server.api。
Ambari-Server有一個狀態機管理模塊,全部節點的狀態信息更改都最終提供給狀態機進行更改操做,所以狀態機是一個很忙的組件。在Ambari-Server裏面,把每一次更改操做都把它看成是一類事件,採用事件驅動機制完成對應的任務。這種思想有點借鑑已經運用在hadoop 2.x YARN裏面的事件驅動機制。事件驅動機制可以一種高效的異步RPC請求方式,直接調用須要執行相應的代碼邏輯,而事件驅動只須要產生事件統一提交給事件處理器,所以事件驅動須要一個更復雜的有限狀態機結合起來一同使用。
Agent發送過來的心跳請求由org.apache.ambari.server.agent.HeartBeatHandler.handleHeartBeat(HeartBeat)來處理,執行完後,同時會返回org.apache.ambari.server.agent.HeartBeatResponse給agent。 org.apache.ambari.server.agent.HeartBeat裏面主要含了兩類信息:節點的狀態信息nodeStatus和服務狀態信息componentStatus。
public class HeartBeatHandler {
...
public HeartBeatResponse handleHeartBeat(HeartBeat heartbeat)
throws AmbariException {
long now = System.currentTimeMillis();
if (heartbeat.getAgentEnv() != null && heartbeat.getAgentEnv().getHostHealth() != null) {
heartbeat.getAgentEnv().getHostHealth().setServerTimeStampAtReporting(now);
}
String hostname = heartbeat.getHostname();
Long currentResponseId = hostResponseIds.get(hostname);
HeartBeatResponse response;
if (currentResponseId == null) {
//Server restarted, or unknown host.
LOG.error("CurrentResponseId unknown for " + hostname + " - send register command");
// 無responseId, 新請求,就進行註冊, responseId =0
return createRegisterCommand();
}
LOG.debug("Received heartbeat from host"
+ ", hostname=" + hostname
+ ", currentResponseId=" + currentResponseId
+ ", receivedResponseId=" + heartbeat.getResponseId());
if (heartbeat.getResponseId() == currentResponseId - 1) {
LOG.warn("Old responseId received - response was lost - returning cached response");
return hostResponses.get(hostname);
} else if (heartbeat.getResponseId() != currentResponseId) {
LOG.error("Error in responseId sequence - sending agent restart command");
// 心跳是歷史記錄,那麼就要求其重啓,從新註冊,responseId 不變
return createRestartCommand(currentResponseId);
}
response = new HeartBeatResponse();
//responseId 加 1 , 返回一個新的responseId,下次心跳又要把這個responseId帶回來。
response.setResponseId(++currentResponseId);
Host hostObject;
try {
hostObject = clusterFsm.getHost(hostname);
} catch (HostNotFoundException e) {
LOG.error("Host: {} not found. Agent is still heartbeating.", hostname);
if (LOG.isDebugEnabled()) {
LOG.debug("Host associated with the agent heratbeat might have been " +
"deleted", e);
}
// For now return empty response with only response id.
return response;
}
//失去心跳,要求從新註冊, responseId=0
if (hostObject.getState().equals(HostState.HEARTBEAT_LOST)) {
// After loosing heartbeat agent should reregister
LOG.warn("Host is in HEARTBEAT_LOST state - sending register command");
return createRegisterCommand();
}
hostResponseIds.put(hostname, currentResponseId);
hostResponses.put(hostname, response);
// If the host is waiting for component status updates, notify it
//若是主機正在等待組件狀態更新,請通知它
//節點已經進行了註冊,可是該節點尚未彙報相關狀態信息,等待服務狀態更新
if (heartbeat.componentStatus.size() > 0
&& hostObject.getState().equals(HostState.WAITING_FOR_HOST_STATUS_UPDATES)) {
try {
LOG.debug("Got component status updates");
//更新服務狀態機
hostObject.handleEvent(new HostStatusUpdatesReceivedEvent(hostname, now));
} catch (InvalidStateTransitionException e) {
LOG.warn("Failed to notify the host about component status updates", e);
}
}
if (heartbeat.getRecoveryReport() != null) {
RecoveryReport rr = heartbeat.getRecoveryReport();
processRecoveryReport(rr, hostname);
}
try {
if (heartbeat.getNodeStatus().getStatus().equals(HostStatus.Status.HEALTHY)) {
//向狀態機發送更新事件,更新節點至正常狀態
hostObject.handleEvent(new HostHealthyHeartbeatEvent(hostname, now,
heartbeat.getAgentEnv(), heartbeat.getMounts()));
} else { // 把節點列入不健康
hostObject.handleEvent(new HostUnhealthyHeartbeatEvent(hostname, now, null));
}
} catch (InvalidStateTransitionException ex) {
LOG.warn("Asking agent to re-register due to " + ex.getMessage(), ex);
hostObject.setState(HostState.INIT);
return createRegisterCommand();
}
/** * A host can belong to only one cluster. Though getClustersForHost(hostname) * returns a set of clusters, it will have only one entry. *主機只能屬於一個集羣。 經過getClustersForHost(hostname)返回一組集羣,它只有一個條目。 * * TODO: Handle the case when a host is a part of multiple clusters. * 處理 主機是多個集羣的一部分時的 狀況。 */
Set<Cluster> clusters = clusterFsm.getClustersForHost(hostname);
if (clusters.size() > 0) {
String clusterName = clusters.iterator().next().getClusterName();
if (recoveryConfigHelper.isConfigStale(clusterName, hostname, heartbeat.getRecoveryTimestamp())) {
RecoveryConfig rc = recoveryConfigHelper.getRecoveryConfig(clusterName, hostname);
response.setRecoveryConfig(rc);
if (response.getRecoveryConfig() != null) {
LOG.info("Recovery configuration set to {}", response.getRecoveryConfig().toString());
}
}
}
heartbeatProcessor.addHeartbeat(heartbeat);
// Send commands if node is active
if (hostObject.getState().equals(HostState.HEALTHY)) {
sendCommands(hostname, response);
annotateResponse(hostname, response);
}
return response;
}
...
}
安裝ambari-agent 服務時會把相應在的python代碼置於python執行的環境上下文中,例如其入口代碼多是/usr/lib/python2.6/site-packages/ambari_agent/main.py,而且進行相關初始化工做(例如驗證參數,與server創建鏈接,初始化安全驗證證書),最後會產生一個新的控制器Controller子線程來統一管理節點的狀態。Controller線程裏面有一個動做隊列ActionQueue線程,而且開啓向Server註冊和發心跳服務。能夠看出來,ambari-agent主要由兩個線程組成,Controller線程向Server發送註冊或心跳請求,請求到的Action數據放到ActionQueue線程裏面,ActionQueue線程維護着兩個隊列:commandQueue和resultQueue。ActionQueue線程會監聽commandQueue的情況。
class Controller(threading.Thread):
def __init__(self, config, range=30):
// 在初始化Controller以前,ambari-agent就會在main.py裏面進行判斷:ambari-server是否正常,正常纔會初始化Controller
// 省略初始化代碼
def run(self):
try:
// 初始化隊列線程
self.actionQueue = ActionQueue(self.config, controller=self)
self.actionQueue.start()
// 初始化註冊類
self.register = Register(self.config)
// 初始化心跳類
self.heartbeat = Heartbeat(self.actionQueue, self.config, self.alert_scheduler_handler.collector())
opener = urllib2.build_opener()
urllib2.install_opener(opener)
while True:
self.repeatRegistration = False
//開始註冊 而且 定時發心跳
self.registerAndHeartbeat()
if not self.repeatRegistration:
logger.info("Finished heartbeating and registering cycle")
break
except:
logger.exception("Controller thread failed with exception:")
raise
logger.info("Controller thread has successfully finished")
CommandQueue隊列主要有3類command:
1. REGISTER_COMMAND:該類命令主要通知agent從新向server發送註冊請求;
2. STATUS_COMMAND:該類命令主要告訴agent須要向server發送某組件的狀態信息;
3. EXECUTION_COMMAND:要求agent執行puppet或者軟件集升級任務;
ambari-agent是一個無狀態的,主要功能以下所示:
於是,它有兩種隊列:MessageQueue和ActionQueue。
架構圖以下所示:
而對於ambari-server來講,其是一個有狀態的,它維護着本身的一個有限狀態FSM。同時這些狀態存儲與數據庫當中(DB目前能夠支持多種,可按序自選),Server端主要維持三類狀態:
其架構圖以下所示:
ambari-server的Heartbeat Handler模塊用於接收各個Agent的心跳請求(其中包含節點狀態信息和返回的操做結果),把節點狀態信息傳遞給圖中的FSM模塊去維護該節點的狀態,並把響應以後的操做結果信息返回給Action Manager去作更加詳細的處理。Coordinator模塊能夠看做API Handler,主要在接收Web端操做請求後,校驗其合法性,Stage Planner分解成一組操做,最後提供給Action 過 Manager去完成執行操做。
於是,從上圖中,咱們能夠看出,ambari-server的全部狀態信息的維護和變化都會被記錄在數據庫當中,使用者作一些更改服務的操做都會在數據庫商作對應的記錄,同時,Agent經過心跳來獲取數據庫的變更歷史信息。