public HAManager(VertxInternal vertx, DeploymentManager deploymentManager, ClusterManager clusterManager, int quorumSize, String group, boolean enabled) { this.vertx = vertx; this.deploymentManager = deploymentManager; this.clusterManager = clusterManager; //仲裁結點數量,默認1 this.quorumSize = enabled ? quorumSize : 0; //定義邏輯組名,默認__DEFAULT__ this.group = enabled ? group : "__DISABLED__"; this.enabled = enabled; this.haInfo = new JsonObject(); haInfo.put("verticles", new JsonArray()); haInfo.put("group", this.group); //獲取集羣管理 __vertx.haInfo,添加結點信息 this.clusterMap = clusterManager.getSyncMap(CLUSTER_MAP_NAME); this.nodeID = clusterManager.getNodeID(); synchronized (haInfo) { clusterMap.put(nodeID, haInfo.encode()); } /**添加一個節點偵聽器,偵聽節點join or leaves */ clusterManager.nodeListener(new NodeListener() { @Override public void nodeAdded(String nodeID) { HAManager.this.nodeAdded(nodeID); } @Override public void nodeLeft(String leftNodeID) { HAManager.this.nodeLeft(leftNodeID); } }); //定義週期器,每隔 1s 檢查 HADeployments quorumTimerID = vertx.setPeriodic(QUORUM_CHECK_PERIOD, tid -> checkHADeployments()); // 調用檢查仲裁來計算是否有初始仲裁 synchronized (this) { checkQuorum(); } } private void checkQuorum() { if (quorumSize == 0) {//判斷仲裁數量 this.attainedQuorum = true; } else { /**獲取group node數量*/ List<String> nodes = clusterManager.getNodes(); int count = 0; for (String node : nodes) { String json = clusterMap.get(node); if (json != null) { JsonObject clusterInfo = new JsonObject(json); String group = clusterInfo.getString("group"); if (group.equals(this.group)) { count++; } } } /**計算是否到達仲裁數量*/ boolean attained = count >= quorumSize; if (!attainedQuorum && attained) { log.info("A quorum has been obtained. Any deploymentIDs waiting on a quorum will now be deployed"); this.attainedQuorum = true; } else if (attainedQuorum && !attained) { log.info("There is no longer a quorum. Any HA deploymentIDs will be undeployed until a quorum is re-attained"); this.attainedQuorum = false; } } }
public void deployVerticle(final String verticleName, DeploymentOptions deploymentOptions, final Handler<AsyncResult<String>> doneHandler) { if (attainedQuorum) {//根據是否達到仲裁數量,不然添加到delay Queue doDeployVerticle(verticleName, deploymentOptions, doneHandler); } else { log.info("Quorum not attained. Deployment of verticle will be delayed until there's a quorum."); addToHADeployList(verticleName, deploymentOptions, doneHandler); } } /** * 部署verticle */ private void doDeployVerticle(final String verticleName, DeploymentOptions deploymentOptions, final Handler<AsyncResult<String>> doneHandler) { /**添加deploy verticle 後的回調 handler*/ final Handler<AsyncResult<String>> wrappedHandler = asyncResult -> { if (asyncResult.succeeded()) { // 添加當前 node 的 HA 相關信息,以便 other node瞭解 addToHA(asyncResult.result(), verticleName, deploymentOptions); } /**觸發已添加添加回調 hander*/ if (doneHandler != null) { doneHandler.handle(asyncResult); } else if (asyncResult.failed()) { log.error("Failed to deploy verticle", asyncResult.cause()); } }; //部署verticle deploymentManager.deployVerticle(verticleName, deploymentOptions, wrappedHandler); } /** * 添加deploy 任務到delay Queue */ private void addToHADeployList(final String verticleName, final DeploymentOptions deploymentOptions, final Handler<AsyncResult<String>> doneHandler) { toDeployOnQuorum.add(() -> { ContextImpl ctx = vertx.getContext(); try { ContextImpl.setContext(null); //部署verticle deployVerticle(verticleName, deploymentOptions, doneHandler); } finally { ContextImpl.setContext(ctx); } }); }
private void checkHADeployments() { try { if (attainedQuorum) {//判斷仲裁數量是否達到 deployHADeployments(); } else { undeployHADeployments(); } } catch (Throwable t) { log.error("Failed when checking HA deploymentIDs", t); } } private void deployHADeployments() { //獲取delay Queue 任務數 int size = toDeployOnQuorum.size(); if (size != 0) { log.info("There are " + size + " HA deploymentIDs waiting on a quorum. These will now be deployed"); Runnable task; /**處理全部 delay 部署任務*/ while ((task = toDeployOnQuorum.poll()) != null) { try { task.run(); } catch (Throwable t) { log.error("Failed to run redeployment task", t); } } } } private void undeployHADeployments() { /** 遍歷全部deploy verticle */ for (String deploymentID: deploymentManager.deployments()) { Deployment dep = deploymentManager.getDeployment(deploymentID); if (dep != null) { if (dep.deploymentOptions().isHa()) { ContextImpl ctx = vertx.getContext(); try { ContextImpl.setContext(null); //卸載 deploymentManager.undeployVerticle(deploymentID, result -> { if (result.succeeded()) { log.info("Successfully undeployed HA deployment " + deploymentID + "-" + dep.verticleIdentifier() + " as there is no quorum"); /**添加HA verticle 到 delay Queue 從新部署*/ addToHADeployList(dep.verticleIdentifier(), dep.deploymentOptions(), result1 -> { if (result1.succeeded()) { log.info("Successfully redeployed verticle " + dep.verticleIdentifier() + " after quorum was re-attained"); } else { log.error("Failed to redeploy verticle " + dep.verticleIdentifier() + " after quorum was re-attained", result1.cause()); } }); } else { log.error("Failed to undeploy deployment on lost quorum", result.cause()); } }); } finally { ContextImpl.setContext(ctx); } } } } }
note:不建議使用HA模塊,仍是利用Health Check做爲verticle服務檢查,出錯時自動重啓服務,
"啓用高可用性(HA)的狀況下部署Verticle。在該上下文中,當Verticle部署在忽然死亡的vert.x實例上時,
Verticle將從集羣中的另外一個vert.x實例上從新部署".
分析:沒有verticle持久化,上傳集羣中心或序列化分發other node,其它集羣結點local根本沒有
verticle(compile class)如何從新部署,畢竟不是共享內存node