聊聊artemis的ConnectionLoadBalancingPolicy

本文主要研究一下artemis的ConnectionLoadBalancingPolicyjava

ServerLocatorImpl.selectConnector

activemq-artemis-2.11.0/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/client/impl/ServerLocatorImpl.javagit

public final class ServerLocatorImpl implements ServerLocatorInternal, DiscoveryListener {

   //......

   private TransportConfiguration selectConnector() {
      Pair<TransportConfiguration, TransportConfiguration>[] usedTopology;

      flushTopology();

      synchronized (topologyArrayGuard) {
         usedTopology = topologyArray;
      }

      synchronized (this) {
         if (usedTopology != null && useTopologyForLoadBalancing) {
            if (logger.isTraceEnabled()) {
               logger.trace("Selecting connector from topology.");
            }
            int pos = loadBalancingPolicy.select(usedTopology.length);
            Pair<TransportConfiguration, TransportConfiguration> pair = usedTopology[pos];

            return pair.getA();
         } else {
            if (logger.isTraceEnabled()) {
               logger.trace("Selecting connector from initial connectors.");
            }

            int pos = loadBalancingPolicy.select(initialConnectors.length);

            return initialConnectors[pos];
         }
      }
   }

   //......
}
  • ServerLocatorImpl的selectConnector方法會對於useTopologyForLoadBalancing的會經過loadBalancingPolicy.select(usedTopology.length)來獲取pos,以後返回usedTopology[pos].getA();不然經過loadBalancingPolicy.select(initialConnectors.length)獲取pos,以後返回initialConnectors[pos]

ConnectionLoadBalancingPolicy

activemq-artemis-2.11.0/artemis-core-client/src/main/java/org/apache/activemq/artemis/api/core/client/loadbalance/ConnectionLoadBalancingPolicy.javagithub

public interface ConnectionLoadBalancingPolicy {

   /**
    * Returns the selected index according to the policy implementation.
    *
    * @param max maximum position index that can be selected
    */
   int select(int max);
}
  • ConnectionLoadBalancingPolicy定義了select接口,返回選中的index;它有四個實現類分別是FirstElementConnectionLoadBalancingPolicy、RandomConnectionLoadBalancingPolicy、RoundRobinConnectionLoadBalancingPolicy、RandomStickyConnectionLoadBalancingPolicy

FirstElementConnectionLoadBalancingPolicy

activemq-artemis-2.11.0/artemis-core-client/src/main/java/org/apache/activemq/artemis/api/core/client/loadbalance/FirstElementConnectionLoadBalancingPolicy.javaapache

public final class FirstElementConnectionLoadBalancingPolicy implements ConnectionLoadBalancingPolicy {

   /**
    * @param max param is ignored
    * @return 0
    */
   @Override
   public int select(final int max) {
      return 0;
   }
}
  • FirstElementConnectionLoadBalancingPolicy實現了ConnectionLoadBalancingPolicy接口,其select方法始終返回0

RandomConnectionLoadBalancingPolicy

activemq-artemis-2.11.0/artemis-core-client/src/main/java/org/apache/activemq/artemis/api/core/client/loadbalance/RandomConnectionLoadBalancingPolicy.javaapi

public final class RandomConnectionLoadBalancingPolicy implements ConnectionLoadBalancingPolicy {

   /**
    * Returns a pseudo random number between {@code 0} (inclusive) and {@code max} exclusive.
    *
    * @param max the upper limit of the random number selection
    * @see java.util.Random#nextInt(int)
    */
   @Override
   public int select(final int max) {
      return RandomUtil.randomInterval(0, max);
   }
}
  • RandomConnectionLoadBalancingPolicy實現了ConnectionLoadBalancingPolicy接口,其select方法使用RandomUtil.randomInterval(0, max)隨機返回一個index

RoundRobinConnectionLoadBalancingPolicy

activemq-artemis-2.11.0/artemis-core-client/src/main/java/org/apache/activemq/artemis/api/core/client/loadbalance/RoundRobinConnectionLoadBalancingPolicy.javadom

public final class RoundRobinConnectionLoadBalancingPolicy implements ConnectionLoadBalancingPolicy, Serializable {

   private static final long serialVersionUID = 7511196010141439559L;

   private boolean first = true;

   private int pos;

   @Override
   public int select(final int max) {
      if (first) {
         // We start on a random one
         pos = RandomUtil.randomInterval(0, max);

         first = false;
      } else {
         pos++;

         if (pos >= max) {
            pos = 0;
         }
      }

      return pos;
   }
}
  • RoundRobinConnectionLoadBalancingPolicy實現了ConnectionLoadBalancingPolicy接口,其select在第一次執行的時候隨機選擇一個pos,以後對pos遞增,對於遞增以後大於等於max的重置pos爲0

RandomStickyConnectionLoadBalancingPolicy

activemq-artemis-2.11.0/artemis-core-client/src/main/java/org/apache/activemq/artemis/api/core/client/loadbalance/RandomStickyConnectionLoadBalancingPolicy.javaide

public final class RandomStickyConnectionLoadBalancingPolicy implements ConnectionLoadBalancingPolicy {

   private int pos = -1;

   /**
    * @see java.util.Random#nextInt(int)
    */
   @Override
   public int select(final int max) {
      if (pos == -1) {
         pos = RandomUtil.randomInterval(0, max);
      }

      return pos;
   }
}
  • RandomStickyConnectionLoadBalancingPolicy實現了ConnectionLoadBalancingPolicy接口,其select方法第一次隨機選擇一個pos,以後都返回該pos

小結

ConnectionLoadBalancingPolicy定義了select接口,返回選中的index;它有四個實現類分別是FirstElementConnectionLoadBalancingPolicy、RandomConnectionLoadBalancingPolicy、RoundRobinConnectionLoadBalancingPolicy、RandomStickyConnectionLoadBalancingPolicythis

doc

相關文章
相關標籤/搜索