vertx模塊HAManager高可用

HAManager

public HAManager(VertxInternal vertx, DeploymentManager deploymentManager,
               ClusterManager clusterManager, int quorumSize, String group, boolean enabled) {
    this.vertx = vertx;
    this.deploymentManager = deploymentManager;
    this.clusterManager = clusterManager;
    //仲裁結點數量,默認1
    this.quorumSize = enabled ? quorumSize : 0;
    //定義邏輯組名,默認__DEFAULT__
    this.group = enabled ? group : "__DISABLED__";
    this.enabled = enabled;
    this.haInfo = new JsonObject();
    haInfo.put("verticles", new JsonArray());
    haInfo.put("group", this.group);
    //獲取集羣管理 __vertx.haInfo,添加結點信息
    this.clusterMap = clusterManager.getSyncMap(CLUSTER_MAP_NAME);
    this.nodeID = clusterManager.getNodeID();
    synchronized (haInfo) {
      clusterMap.put(nodeID, haInfo.encode());
    }
    /**添加一個節點偵聽器,偵聽節點join or leaves */
    clusterManager.nodeListener(new NodeListener() {
      @Override
      public void nodeAdded(String nodeID) {
        HAManager.this.nodeAdded(nodeID);
      }

      @Override
      public void nodeLeft(String leftNodeID) {
        HAManager.this.nodeLeft(leftNodeID);
      }
    });
    //定義週期器,每隔 1s 檢查 HADeployments
    quorumTimerID = vertx.setPeriodic(QUORUM_CHECK_PERIOD, tid -> checkHADeployments());
    // 調用檢查仲裁來計算是否有初始仲裁
    synchronized (this) {
      checkQuorum();
    }
}

private void checkQuorum() {
    if (quorumSize == 0) {//判斷仲裁數量
      this.attainedQuorum = true;
    } else {
     /**獲取group node數量*/
      List<String> nodes = clusterManager.getNodes();
      int count = 0;
      for (String node : nodes) {
        String json = clusterMap.get(node);
        if (json != null) {
          JsonObject clusterInfo = new JsonObject(json);
          String group = clusterInfo.getString("group");
          if (group.equals(this.group)) {
            count++;
          }
        }
      }
      /**計算是否到達仲裁數量*/
      boolean attained = count >= quorumSize;
      if (!attainedQuorum && attained) {
        log.info("A quorum has been obtained. Any deploymentIDs waiting on a quorum will now be deployed");
        this.attainedQuorum = true;
      } else if (attainedQuorum && !attained) {
        log.info("There is no longer a quorum. Any HA deploymentIDs will be undeployed until a quorum is re-attained");
        this.attainedQuorum = false;
      }
    }
}

 

deploy an HA verticle

public void deployVerticle(final String verticleName, DeploymentOptions deploymentOptions,
                         final Handler<AsyncResult<String>> doneHandler) {
    if (attainedQuorum) {//根據是否達到仲裁數量,不然添加到delay Queue
      doDeployVerticle(verticleName, deploymentOptions, doneHandler);
    } else {
      log.info("Quorum not attained. Deployment of verticle will be delayed until there's a quorum.");
      addToHADeployList(verticleName, deploymentOptions, doneHandler);
    }
}

/**
  * 部署verticle
  */
private void doDeployVerticle(final String verticleName, DeploymentOptions deploymentOptions,
                            final Handler<AsyncResult<String>> doneHandler) {
    /**添加deploy verticle 後的回調 handler*/
    final Handler<AsyncResult<String>> wrappedHandler = asyncResult -> {
      if (asyncResult.succeeded()) {
        // 添加當前 node 的 HA 相關信息,以便 other node瞭解
        addToHA(asyncResult.result(), verticleName, deploymentOptions);
      }
      /**觸發已添加添加回調 hander*/
      if (doneHandler != null) {
        doneHandler.handle(asyncResult);
      } else if (asyncResult.failed()) {
        log.error("Failed to deploy verticle", asyncResult.cause());
      }
    };
    //部署verticle
    deploymentManager.deployVerticle(verticleName, deploymentOptions, wrappedHandler);
}

/**
  * 添加deploy 任務到delay Queue
  */
private void addToHADeployList(final String verticleName, final DeploymentOptions deploymentOptions,
                             final Handler<AsyncResult<String>> doneHandler) {
    toDeployOnQuorum.add(() -> {
      ContextImpl ctx = vertx.getContext();
      try {
        ContextImpl.setContext(null);
         //部署verticle
        deployVerticle(verticleName, deploymentOptions, doneHandler);
      } finally {
        ContextImpl.setContext(ctx);
      }
    });
}

 

週期每秒檢測

private void checkHADeployments() {
    try {
      if (attainedQuorum) {//判斷仲裁數量是否達到
        deployHADeployments();
      } else {
        undeployHADeployments();
      }
    } catch (Throwable t) {
      log.error("Failed when checking HA deploymentIDs", t);
    }
}

 private void deployHADeployments() {
   //獲取delay Queue 任務數
    int size = toDeployOnQuorum.size();
    if (size != 0) {
      log.info("There are " + size + " HA deploymentIDs waiting on a quorum. These will now be deployed");
      Runnable task;
      /**處理全部 delay 部署任務*/
      while ((task = toDeployOnQuorum.poll()) != null) {
        try {
          task.run();
        } catch (Throwable t) {
          log.error("Failed to run redeployment task", t);
        }
      }
    }
}

private void undeployHADeployments() {
  /** 遍歷全部deploy verticle */
    for (String deploymentID: deploymentManager.deployments()) {
      Deployment dep = deploymentManager.getDeployment(deploymentID);
      if (dep != null) {
        if (dep.deploymentOptions().isHa()) {
          ContextImpl ctx = vertx.getContext();
          try {
            ContextImpl.setContext(null);
            //卸載
            deploymentManager.undeployVerticle(deploymentID, result -> {
              if (result.succeeded()) {
                log.info("Successfully undeployed HA deployment " + deploymentID + "-" + dep.verticleIdentifier() + " as there is no quorum");
                /**添加HA verticle 到 delay Queue 從新部署*/
                addToHADeployList(dep.verticleIdentifier(), dep.deploymentOptions(), result1 -> {
                  if (result1.succeeded()) {
                    log.info("Successfully redeployed verticle " + dep.verticleIdentifier() + " after quorum was re-attained");
                  } else {
                    log.error("Failed to redeploy verticle " + dep.verticleIdentifier() + " after quorum was re-attained", result1.cause());
                  }
                });
              } else {
                log.error("Failed to undeploy deployment on lost quorum", result.cause());
              }
            });
          } finally {
            ContextImpl.setContext(ctx);
          }
        }
      }
    }
}

 

note:不建議使用HA模塊,仍是利用Health Check做爲verticle服務檢查,出錯時自動重啓服務,
"啓用高可用性(HA)的狀況下部署Verticle。在該上下文中,當Verticle部署在忽然死亡的vert.x實例上時,
Verticle將從集羣中的另外一個vert.x實例上從新部署".
分析:沒有verticle持久化,上傳集羣中心或序列化分發other node,其它集羣結點local根本沒有
verticle(compile class)如何從新部署,畢竟不是共享內存node

相關文章
相關標籤/搜索