vertx模塊DeploymentManager部署管理器

DeploymentManager

public DeploymentManager(VertxInternal vertx) {
    this.vertx = vertx;
    loadVerticleFactories();
}

/**
  * ServiceHelper本質對jdk ServiceLoader的封裝,vertx大量使用 SPI 擴展功能實現
  */
private void loadVerticleFactories() {
    Collection<VerticleFactory> factories = ServiceHelper.loadFactories(VerticleFactory.class);
    factories.forEach(this::registerVerticleFactory);
    VerticleFactory defaultFactory = new JavaVerticleFactory();
    defaultFactory.init(vertx);
    defaultFactories.add(defaultFactory);
}

 

verticle部署

/**
  * 部署verticle
  * @params identifier 類全限定名稱
  * @params options 部署選項
  * @params completionHandler 回調處理邏輯
  */
public void deployVerticle(String identifier,
                         DeploymentOptions options,
                         Handler<AsyncResult<String>> completionHandler) {
    if (options.isMultiThreaded() && !options.isWorker()) {
      throw new IllegalArgumentException("If multi-threaded then must be worker too");
    }
    //獲取context
    ContextImpl callingContext = vertx.getOrCreateContext();
    //jdk9+不支持 isolationGroup,一般狀況都不會使用isolationGroup
    ClassLoader cl = getClassLoader(options, callingContext);
    // deployment Id 採用 UUID
    doDeployVerticle(identifier, generateDeploymentID(), options, callingContext, callingContext, cl, completionHandler);
}

/**
  * 部署verticle 邏輯
  */
private void doDeployVerticle(String identifier,
                            String deploymentID,
                            DeploymentOptions options,
                            ContextImpl parentContext,
                            ContextImpl callingContext,
                            ClassLoader cl,
                            Handler<AsyncResult<String>> completionHandler) {
    //根據identifier查找對應的 xxFactory,查找不到使用默認JavaVerticleFactory                        
    List<VerticleFactory> verticleFactories = resolveFactories(identifier);
    Iterator<VerticleFactory> iter = verticleFactories.iterator();
    doDeployVerticle(iter, null, identifier, deploymentID, options, parentContext, callingContext, cl, completionHandler);
}


private void doDeployVerticle(Iterator<VerticleFactory> iter,
                            Throwable prevErr,
                            String identifier,
                            String deploymentID,
                            DeploymentOptions options,
                            ContextImpl parentContext,
                            ContextImpl callingContext,
                            ClassLoader cl,
                            Handler<AsyncResult<String>> completionHandler) {
    if (iter.hasNext()) {
      //是否解析,default false
      ...
      //執行部署verticle邏輯
      fut.setHandler(ar -> {
        Throwable err;
        if (ar.succeeded()) {
            //判斷解析以後名稱是否相等
            ...
            if (verticleFactory.blockingCreate()) {//是不是阻塞建立
             /**根據instances數量執行deploy*/
              vertx.<Verticle[]>executeBlocking(createFut -> {
                try {
                  Verticle[] verticles = createVerticles(verticleFactory, identifier, options.getInstances(), cl);
                  createFut.complete(verticles);
                } catch (Exception e) {
                  createFut.fail(e);
                }
              }, res -> {
                if (res.succeeded()) {
                  doDeploy(identifier, deploymentID, options, parentContext, callingContext, completionHandler, cl, res.result());
                } else {
                  // 失敗繼續執行下一個
                  doDeployVerticle(iter, res.cause(), identifier, deploymentID, options, parentContext, callingContext, cl, completionHandler);
                }
              });
              return;
            } else {
              try {
               /**根據instances數量執行deploy*/
                Verticle[] verticles = createVerticles(verticleFactory, identifier, options.getInstances(), cl);
                doDeploy(identifier, deploymentID, options, parentContext, callingContext, completionHandler, cl, verticles);
                return;
              } catch (Exception e) {
                err = e;
              }
            }
          }
        }
        ...
}

/**
  * 根據instances數量初始化verticle實例
  */
private Verticle[] createVerticles(VerticleFactory verticleFactory, String identifier, int instances, ClassLoader cl) throws Exception {
    Verticle[] verticles = new Verticle[instances];
    for (int i = 0; i < instances; i++) {
      verticles[i] = verticleFactory.createVerticle(identifier, cl);
      if (verticles[i] == null) {
        throw new NullPointerException("VerticleFactory::createVerticle returned null");
      }
    }
    return verticles;
}


private void doDeploy(String identifier, String deploymentID, DeploymentOptions options,
                    ContextImpl parentContext,
                    ContextImpl callingContext,
                    Handler<AsyncResult<String>> completionHandler,
                    ClassLoader tccl, Verticle... verticles) {
    // Copy一份副本
    JsonObject conf = options.getConfig() == null ? new JsonObject() : options.getConfig().copy(); 
    String poolName = options.getWorkerPoolName();//獲取定義的worker Pool 名稱

    Deployment parent = parentContext.getDeployment();
    DeploymentImpl deployment = new DeploymentImpl(parent, deploymentID, identifier, options);

    AtomicInteger deployCount = new AtomicInteger();
    AtomicBoolean failureReported = new AtomicBoolean();
    /**根據instances verticle 數量執行多實例,利用CPU多核心*/
    for (Verticle verticle: verticles) {
      /** 
        * 三種context類型,默認Standard
        * Standard Verticles(eventloop)
        * Worker Verticles(DeploymentOptions_workerPoolName & DeploymentOptions_workerPoolSize & DeploymentOptions_worker)
        * Multi-threaded worker verticles(worker Verticles 基礎上 & DeploymentOptions_multiThreaded)
        */
      WorkerExecutorImpl workerExec = poolName != null ? vertx.createSharedWorkerExecutor(poolName, options.getWorkerPoolSize(), options.getMaxWorkerExecuteTime()) : null;
      WorkerPool pool = workerExec != null ? workerExec.getPool() : null;
      ContextImpl context = options.isWorker() ? vertx.createWorkerContext(options.isMultiThreaded(), deploymentID, pool, conf, tccl) :
        vertx.createEventLoopContext(deploymentID, pool, conf, tccl);
        
      if (workerExec != null) {//使用worker verticle 添加Close Hook
        context.addCloseHook(workerExec);
      }
      //綁定當前context
      context.setDeployment(deployment);
      deployment.addVerticle(new VerticleHolder(verticle, context));
      
      //執行deploy
      context.runOnContext(v -> {
        try {
          //verticle初始化
          verticle.init(vertx, context);
          Future<Void> startFuture = Future.future();
          //調用deploy verticle的start 方法
          verticle.start(startFuture);
          //deployVerticle 調用complete方法的回調處理
          startFuture.setHandler(ar -> {
            if (ar.succeeded()) {
              if (parent != null) {//parentVericl不爲null,綁定父子映射關係
                if (parent.addChild(deployment)) {
                  deployment.child = true;
                } else {
                  // Orphan
                  deployment.undeploy(null);
                  return;
                }
              }
              VertxMetrics metrics = vertx.metricsSPI();//初始化verticle metrics
              if (metrics != null) {
                metrics.verticleDeployed(verticle);
              }
              deployments.put(deploymentID, deployment);//添加集合中
              //實例數量是否所有部署成功
              if (deployCount.incrementAndGet() == verticles.length) {
                reportSuccess(deploymentID, callingContext, completionHandler);
              }
            } else if (failureReported.compareAndSet(false, true)) {//部署失敗
              deployment.rollback(callingContext, completionHandler, context, ar.cause());
            }
          });
        } catch (Throwable t) {
          if (failureReported.compareAndSet(false, true))
            deployment.rollback(callingContext, completionHandler, context, t);
        }
      });
    }
}

 

Verticle Types

Verticle Types : 1、Standard Verticles、2、Worker Verticles、3、Multi-threaded worker verticles. 本質仍是eventloop處理connection、read/write、encode/decode , 事件處理(eventHandler)在配置 何種類型的verticle相應的context處理 , 調度邏輯在VertxHandler類配置的context調度, 和vertx.executeBlocking->{...} 相同效果.

Context Class diagram 如圖:安全

image

DeploymentOptions

//添加配置,當前部署verticle共享配置
private JsonObject config;

//是否使用WorkerContext
private boolean worker;   

/**
  * 是否使用mutil workerContext,不推薦,一般使用eventloop或worker,
  * 運行是在worker pool不一樣線程執行,需考慮線程安全,若是使用將沒法使用其餘多數模塊
  */
private boolean multiThreaded;

//隔離組配置,部署隔離相關配置,不推薦,而且JDK9+不支持
private String isolationGroup;

//定義的worker pool名稱前綴
private String workerPoolName;

// 定義打worker pool線程數量
private int workerPoolSize;

//定義最大worker poll 執行時間,Thread超過期間閥值輸出level:warn報警 ,不配置默認 60s
private long maxWorkerExecuteTime;

//是否啓用HA(高可用),默認false
private boolean ha;

//配置額外的Classpath路徑,若是未設置isolationGroup 則忽略
private List<String> extraClasspath;

//配置verticle 實例數量
private int instances;

//配置隔離的類,路徑爲徹底限定名,可以使用通配符 *
private List<String> isolatedClasses;
相關文章
相關標籤/搜索