public DeploymentManager(VertxInternal vertx) { this.vertx = vertx; loadVerticleFactories(); } /** * ServiceHelper本質對jdk ServiceLoader的封裝,vertx大量使用 SPI 擴展功能實現 */ private void loadVerticleFactories() { Collection<VerticleFactory> factories = ServiceHelper.loadFactories(VerticleFactory.class); factories.forEach(this::registerVerticleFactory); VerticleFactory defaultFactory = new JavaVerticleFactory(); defaultFactory.init(vertx); defaultFactories.add(defaultFactory); }
/** * 部署verticle * @params identifier 類全限定名稱 * @params options 部署選項 * @params completionHandler 回調處理邏輯 */ public void deployVerticle(String identifier, DeploymentOptions options, Handler<AsyncResult<String>> completionHandler) { if (options.isMultiThreaded() && !options.isWorker()) { throw new IllegalArgumentException("If multi-threaded then must be worker too"); } //獲取context ContextImpl callingContext = vertx.getOrCreateContext(); //jdk9+不支持 isolationGroup,一般狀況都不會使用isolationGroup ClassLoader cl = getClassLoader(options, callingContext); // deployment Id 採用 UUID doDeployVerticle(identifier, generateDeploymentID(), options, callingContext, callingContext, cl, completionHandler); } /** * 部署verticle 邏輯 */ private void doDeployVerticle(String identifier, String deploymentID, DeploymentOptions options, ContextImpl parentContext, ContextImpl callingContext, ClassLoader cl, Handler<AsyncResult<String>> completionHandler) { //根據identifier查找對應的 xxFactory,查找不到使用默認JavaVerticleFactory List<VerticleFactory> verticleFactories = resolveFactories(identifier); Iterator<VerticleFactory> iter = verticleFactories.iterator(); doDeployVerticle(iter, null, identifier, deploymentID, options, parentContext, callingContext, cl, completionHandler); } private void doDeployVerticle(Iterator<VerticleFactory> iter, Throwable prevErr, String identifier, String deploymentID, DeploymentOptions options, ContextImpl parentContext, ContextImpl callingContext, ClassLoader cl, Handler<AsyncResult<String>> completionHandler) { if (iter.hasNext()) { //是否解析,default false ... //執行部署verticle邏輯 fut.setHandler(ar -> { Throwable err; if (ar.succeeded()) { //判斷解析以後名稱是否相等 ... if (verticleFactory.blockingCreate()) {//是不是阻塞建立 /**根據instances數量執行deploy*/ vertx.<Verticle[]>executeBlocking(createFut -> { try { Verticle[] verticles = createVerticles(verticleFactory, identifier, options.getInstances(), cl); createFut.complete(verticles); } catch (Exception e) { createFut.fail(e); } }, res -> { if (res.succeeded()) { doDeploy(identifier, deploymentID, options, parentContext, callingContext, completionHandler, cl, res.result()); } else { // 失敗繼續執行下一個 doDeployVerticle(iter, res.cause(), identifier, deploymentID, options, parentContext, callingContext, cl, completionHandler); } }); return; } else { try { /**根據instances數量執行deploy*/ Verticle[] verticles = createVerticles(verticleFactory, identifier, options.getInstances(), cl); doDeploy(identifier, deploymentID, options, parentContext, callingContext, completionHandler, cl, verticles); return; } catch (Exception e) { err = e; } } } } ... } /** * 根據instances數量初始化verticle實例 */ private Verticle[] createVerticles(VerticleFactory verticleFactory, String identifier, int instances, ClassLoader cl) throws Exception { Verticle[] verticles = new Verticle[instances]; for (int i = 0; i < instances; i++) { verticles[i] = verticleFactory.createVerticle(identifier, cl); if (verticles[i] == null) { throw new NullPointerException("VerticleFactory::createVerticle returned null"); } } return verticles; } private void doDeploy(String identifier, String deploymentID, DeploymentOptions options, ContextImpl parentContext, ContextImpl callingContext, Handler<AsyncResult<String>> completionHandler, ClassLoader tccl, Verticle... verticles) { // Copy一份副本 JsonObject conf = options.getConfig() == null ? new JsonObject() : options.getConfig().copy(); String poolName = options.getWorkerPoolName();//獲取定義的worker Pool 名稱 Deployment parent = parentContext.getDeployment(); DeploymentImpl deployment = new DeploymentImpl(parent, deploymentID, identifier, options); AtomicInteger deployCount = new AtomicInteger(); AtomicBoolean failureReported = new AtomicBoolean(); /**根據instances verticle 數量執行多實例,利用CPU多核心*/ for (Verticle verticle: verticles) { /** * 三種context類型,默認Standard * Standard Verticles(eventloop) * Worker Verticles(DeploymentOptions_workerPoolName & DeploymentOptions_workerPoolSize & DeploymentOptions_worker) * Multi-threaded worker verticles(worker Verticles 基礎上 & DeploymentOptions_multiThreaded) */ WorkerExecutorImpl workerExec = poolName != null ? vertx.createSharedWorkerExecutor(poolName, options.getWorkerPoolSize(), options.getMaxWorkerExecuteTime()) : null; WorkerPool pool = workerExec != null ? workerExec.getPool() : null; ContextImpl context = options.isWorker() ? vertx.createWorkerContext(options.isMultiThreaded(), deploymentID, pool, conf, tccl) : vertx.createEventLoopContext(deploymentID, pool, conf, tccl); if (workerExec != null) {//使用worker verticle 添加Close Hook context.addCloseHook(workerExec); } //綁定當前context context.setDeployment(deployment); deployment.addVerticle(new VerticleHolder(verticle, context)); //執行deploy context.runOnContext(v -> { try { //verticle初始化 verticle.init(vertx, context); Future<Void> startFuture = Future.future(); //調用deploy verticle的start 方法 verticle.start(startFuture); //deployVerticle 調用complete方法的回調處理 startFuture.setHandler(ar -> { if (ar.succeeded()) { if (parent != null) {//parentVericl不爲null,綁定父子映射關係 if (parent.addChild(deployment)) { deployment.child = true; } else { // Orphan deployment.undeploy(null); return; } } VertxMetrics metrics = vertx.metricsSPI();//初始化verticle metrics if (metrics != null) { metrics.verticleDeployed(verticle); } deployments.put(deploymentID, deployment);//添加集合中 //實例數量是否所有部署成功 if (deployCount.incrementAndGet() == verticles.length) { reportSuccess(deploymentID, callingContext, completionHandler); } } else if (failureReported.compareAndSet(false, true)) {//部署失敗 deployment.rollback(callingContext, completionHandler, context, ar.cause()); } }); } catch (Throwable t) { if (failureReported.compareAndSet(false, true)) deployment.rollback(callingContext, completionHandler, context, t); } }); } }
Verticle Types : 1、Standard Verticles、2、Worker Verticles、3、Multi-threaded worker verticles. 本質仍是eventloop處理connection、read/write、encode/decode , 事件處理(eventHandler)在配置 何種類型的verticle相應的context處理 , 調度邏輯在VertxHandler類配置的context調度, 和vertx.executeBlocking->{...} 相同效果.
Context Class diagram 如圖:安全
//添加配置,當前部署verticle共享配置 private JsonObject config; //是否使用WorkerContext private boolean worker; /** * 是否使用mutil workerContext,不推薦,一般使用eventloop或worker, * 運行是在worker pool不一樣線程執行,需考慮線程安全,若是使用將沒法使用其餘多數模塊 */ private boolean multiThreaded; //隔離組配置,部署隔離相關配置,不推薦,而且JDK9+不支持 private String isolationGroup; //定義的worker pool名稱前綴 private String workerPoolName; // 定義打worker pool線程數量 private int workerPoolSize; //定義最大worker poll 執行時間,Thread超過期間閥值輸出level:warn報警 ,不配置默認 60s private long maxWorkerExecuteTime; //是否啓用HA(高可用),默認false private boolean ha; //配置額外的Classpath路徑,若是未設置isolationGroup 則忽略 private List<String> extraClasspath; //配置verticle 實例數量 private int instances; //配置隔離的類,路徑爲徹底限定名,可以使用通配符 * private List<String> isolatedClasses;