public class FederationManager implements ActiveMQComponent { private final ActiveMQServer server; private Map<String, Federation> federations = new HashMap<>(); private State state; enum State { STOPPED, STOPPING, /** * Deployed means {@link FederationManager#deploy()} was called but * {@link FederationManager#start()} was not called. * <p> * We need the distinction if {@link FederationManager#stop()} is called before 'start'. As * otherwise we would leak locators. */ DEPLOYED, STARTED, } public FederationManager(final ActiveMQServer server) { this.server = server; } @Override public synchronized void start() throws ActiveMQException { if (state == State.STARTED) return; deploy(); for (Federation federation : federations.values()) { federation.start(); } state = State.STARTED; } @Override public synchronized void stop() { if (state == State.STOPPED) return; state = State.STOPPING; for (Federation federation : federations.values()) { federation.stop(); } federations.clear(); state = State.STOPPED; } @Override public boolean isStarted() { return state == State.STARTED; } public synchronized void deploy() throws ActiveMQException { for (FederationConfiguration federationConfiguration : server.getConfiguration().getFederationConfigurations()) { deploy(federationConfiguration); } if (state != State.STARTED) { state = State.DEPLOYED; } } public synchronized boolean undeploy(String name) { Federation federation = federations.remove(name); if (federation != null) { federation.stop(); } return true; } public synchronized boolean deploy(FederationConfiguration federationConfiguration) throws ActiveMQException { Federation federation = federations.get(federationConfiguration.getName()); if (federation == null) { federation = newFederation(federationConfiguration); } else if (!Objects.equals(federation.getConfig().getCredentials(), federationConfiguration.getCredentials())) { undeploy(federationConfiguration.getName()); federation = newFederation(federationConfiguration); } federation.deploy(); return true; } private synchronized Federation newFederation(FederationConfiguration federationConfiguration) throws ActiveMQException { Federation federation = new Federation(server, federationConfiguration); federations.put(federationConfiguration.getName(), federation); if (state == State.STARTED) { federation.start(); } return federation; } public Federation get(String name) { return federations.get(name); } public void register(FederatedAbstract federatedAbstract) { server.registerBrokerPlugin(federatedAbstract); } public void unregister(FederatedAbstract federatedAbstract) { server.unRegisterBrokerPlugin(federatedAbstract); } }
public class Federation { private final ActiveMQServer server; private final SimpleString name; private final Map<String, FederationUpstream> upstreams = new HashMap<>(); private final Map<String, FederationDownstream> downstreams = new HashMap<>(); private final FederationConfiguration config; private FederationManager.State state; //...... public synchronized void deploy() throws ActiveMQException { for (FederationUpstreamConfiguration upstreamConfiguration : config.getUpstreamConfigurations()) { deploy(upstreamConfiguration, config.getFederationPolicyMap()); } for (FederationDownstreamConfiguration downstreamConfiguration : config.getDownstreamConfigurations()) { deploy(downstreamConfiguration, config.getFederationPolicyMap()); } if (state != FederationManager.State.STARTED) { state = FederationManager.State.DEPLOYED; } } public synchronized boolean deploy(FederationUpstreamConfiguration upstreamConfiguration, Map<String, FederationPolicy> federationPolicyMap) throws ActiveMQException { String name = upstreamConfiguration.getName(); FederationUpstream upstream = upstreams.get(name); //If connection has changed we will need to do a full undeploy and redeploy. if (upstream == null) { undeploy(name); upstream = deploy(name, upstreamConfiguration); } else if (!upstream.getConnection().getConfig().equals(upstreamConfiguration.getConnectionConfiguration())) { undeploy(name); upstream = deploy(name, upstreamConfiguration); } upstream.deploy(upstreamConfiguration.getPolicyRefs(), federationPolicyMap); return true; } public synchronized boolean deploy(FederationDownstreamConfiguration downstreamConfiguration, Map<String, FederationPolicy> federationPolicyMap) throws ActiveMQException { String name = downstreamConfiguration.getName(); FederationDownstream downstream = downstreams.get(name); //If connection has changed we will need to do a full undeploy and redeploy. if (downstream == null) { undeploy(name); downstream = deploy(name, downstreamConfiguration); } else if (!downstream.getConnection().getConfig().equals(downstreamConfiguration.getConnectionConfiguration())) { undeploy(name); downstream = deploy(name, downstreamConfiguration); } downstream.deploy(config); return true; } private synchronized FederationUpstream deploy(String name, FederationUpstreamConfiguration upstreamConfiguration) { FederationUpstream upstream = new FederationUpstream(server, this, name, upstreamConfiguration); upstreams.put(name, upstream); if (state == FederationManager.State.STARTED) { upstream.start(); } return upstream; } private synchronized FederationDownstream deploy(String name, FederationDownstreamConfiguration downstreamConfiguration) { //If we have a matching upstream connection already configured then use it for the initiating downstream connection FederationConnection connection = null; if (downstreamConfiguration.getConnectionConfiguration().isShareConnection()) { for (FederationUpstream upstream : upstreams.values()) { if (upstream.getConfig().getConnectionConfiguration() .equals(downstreamConfiguration.getConnectionConfiguration())) { connection = upstream.getConnection(); connection.setSharedConnection(true); break; } } } FederationDownstream downstream = new FederationDownstream(server, this, name, downstreamConfiguration, connection); downstreams.put(name, downstream); if (state == FederationManager.State.STARTED) { downstream.start(); } return downstream; } //...... }