聊聊artemis的FederationManager

本文主要研究一下artemis的FederationManagerjava

FederationManager

activemq-artemis-2.11.0/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/federation/FederationManager.javagit

public class FederationManager implements ActiveMQComponent {

   private final ActiveMQServer server;

   private Map<String, Federation> federations = new HashMap<>();
   private State state;

   enum State {
      STOPPED,
      STOPPING,
      /**
       * Deployed means {@link FederationManager#deploy()} was called but
       * {@link FederationManager#start()} was not called.
       * <p>
       * We need the distinction if {@link FederationManager#stop()} is called before 'start'. As
       * otherwise we would leak locators.
       */
      DEPLOYED, STARTED,
   }


   public FederationManager(final ActiveMQServer server) {
      this.server = server;
   }

   @Override
   public synchronized void start() throws ActiveMQException {
      if (state == State.STARTED) return;
      deploy();
      for (Federation federation : federations.values()) {
         federation.start();
      }
      state = State.STARTED;
   }

   @Override
   public synchronized void stop() {
      if (state == State.STOPPED) return;
      state = State.STOPPING;


      for (Federation federation : federations.values()) {
         federation.stop();
      }
      federations.clear();
      state = State.STOPPED;
   }

   @Override
   public boolean isStarted() {
      return state == State.STARTED;
   }

   public synchronized void deploy() throws ActiveMQException {
      for (FederationConfiguration federationConfiguration : server.getConfiguration().getFederationConfigurations()) {
         deploy(federationConfiguration);
      }
      if (state != State.STARTED) {
         state = State.DEPLOYED;
      }
   }

   public synchronized boolean undeploy(String name) {
      Federation federation = federations.remove(name);
      if (federation != null) {
         federation.stop();
      }
      return true;
   }



   public synchronized boolean deploy(FederationConfiguration federationConfiguration) throws ActiveMQException {
      Federation federation = federations.get(federationConfiguration.getName());
      if (federation == null) {
         federation = newFederation(federationConfiguration);
      } else if (!Objects.equals(federation.getConfig().getCredentials(), federationConfiguration.getCredentials())) {
         undeploy(federationConfiguration.getName());
         federation = newFederation(federationConfiguration);
      }
      federation.deploy();
      return true;
   }

   private synchronized Federation newFederation(FederationConfiguration federationConfiguration) throws ActiveMQException {
      Federation federation = new Federation(server, federationConfiguration);
      federations.put(federationConfiguration.getName(), federation);
      if (state == State.STARTED) {
         federation.start();
      }
      return federation;
   }

   public Federation get(String name) {
      return federations.get(name);
   }

   public void register(FederatedAbstract federatedAbstract) {
      server.registerBrokerPlugin(federatedAbstract);
   }

   public void unregister(FederatedAbstract federatedAbstract) {
      server.unRegisterBrokerPlugin(federatedAbstract);
   }

}
  • FederationManager實現了ActiveMQComponent接口,它提供了start()、stop()、deploy、undeploy等方法;其中start方法會先執行deploy方法,而後遍歷federations.values()執行federation.start();stop方法則是遍歷federations.values()執行federation.stop(),而後清空federations;deploy方法在federation爲null時執行newFederation,而後執行federation.deploy(),若不爲null且credentials與配置不一致則執行undeploy,從新newFederation;undeploy方法則是將federation從federations中移除而後執行federation.stop()

Federation

activemq-artemis-2.11.0/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/federation/Federation.javagithub

public class Federation {


   private final ActiveMQServer server;
   private final SimpleString name;

   private final Map<String, FederationUpstream> upstreams = new HashMap<>();
   private final Map<String, FederationDownstream> downstreams = new HashMap<>();
   private final FederationConfiguration config;
   private FederationManager.State state;

   //......

   public synchronized void deploy() throws ActiveMQException {
      for (FederationUpstreamConfiguration upstreamConfiguration : config.getUpstreamConfigurations()) {
         deploy(upstreamConfiguration, config.getFederationPolicyMap());
      }
      for (FederationDownstreamConfiguration downstreamConfiguration : config.getDownstreamConfigurations()) {
         deploy(downstreamConfiguration, config.getFederationPolicyMap());
      }
      if (state != FederationManager.State.STARTED) {
         state = FederationManager.State.DEPLOYED;
      }
   }

   public synchronized boolean deploy(FederationUpstreamConfiguration upstreamConfiguration, Map<String, FederationPolicy> federationPolicyMap) throws ActiveMQException {
      String name = upstreamConfiguration.getName();
      FederationUpstream upstream = upstreams.get(name);

      //If connection has changed we will need to do a full undeploy and redeploy.
      if (upstream == null) {
         undeploy(name);
         upstream = deploy(name, upstreamConfiguration);
      } else if (!upstream.getConnection().getConfig().equals(upstreamConfiguration.getConnectionConfiguration())) {
         undeploy(name);
         upstream = deploy(name, upstreamConfiguration);
      }

      upstream.deploy(upstreamConfiguration.getPolicyRefs(), federationPolicyMap);
      return true;
   }

   public synchronized boolean deploy(FederationDownstreamConfiguration downstreamConfiguration, Map<String, FederationPolicy> federationPolicyMap) throws ActiveMQException {
      String name = downstreamConfiguration.getName();
      FederationDownstream downstream = downstreams.get(name);

      //If connection has changed we will need to do a full undeploy and redeploy.
      if (downstream == null) {
         undeploy(name);
         downstream = deploy(name, downstreamConfiguration);
      } else if (!downstream.getConnection().getConfig().equals(downstreamConfiguration.getConnectionConfiguration())) {
         undeploy(name);
         downstream = deploy(name, downstreamConfiguration);
      }

      downstream.deploy(config);
      return true;
   }

   private synchronized FederationUpstream deploy(String name, FederationUpstreamConfiguration upstreamConfiguration) {
      FederationUpstream upstream = new FederationUpstream(server, this, name, upstreamConfiguration);
      upstreams.put(name, upstream);
      if (state == FederationManager.State.STARTED) {
         upstream.start();
      }
      return upstream;
   }

   private synchronized FederationDownstream deploy(String name, FederationDownstreamConfiguration downstreamConfiguration) {
      //If we have a matching upstream connection already configured then use it for the initiating downstream connection
      FederationConnection connection = null;
      if (downstreamConfiguration.getConnectionConfiguration().isShareConnection()) {
         for (FederationUpstream upstream : upstreams.values()) {
            if (upstream.getConfig().getConnectionConfiguration()
                .equals(downstreamConfiguration.getConnectionConfiguration())) {
               connection = upstream.getConnection();
               connection.setSharedConnection(true);
               break;
            }
         }
      }

      FederationDownstream downstream = new FederationDownstream(server, this, name, downstreamConfiguration, connection);
      downstreams.put(name, downstream);
      if (state == FederationManager.State.STARTED) {
         downstream.start();
      }
      return downstream;
   }

   //......

}
  • Federation的deploy方法先遍歷config.getUpstreamConfigurations(),進行upstream的deploy,再遍歷config.getDownstreamConfigurations(),進行downstream的deploy

小結

FederationManager實現了ActiveMQComponent接口,它提供了start()、stop()、deploy、undeploy等方法;其中start方法會先執行deploy方法,而後遍歷federations.values()執行federation.start();stop方法則是遍歷federations.values()執行federation.stop(),而後清空federations;deploy方法在federation爲null時執行newFederation,而後執行federation.deploy(),若不爲null且credentials與配置不一致則執行undeploy,從新newFederation;undeploy方法則是將federation從federations中移除而後執行federation.stop()apache

doc

相關文章
相關標籤/搜索