聊聊artemis的NetworkHealthCheck

本文主要研究一下artemis的NetworkHealthCheckjava

NetworkHealthCheck

activemq-artemis-2.11.0/artemis-commons/src/main/java/org/apache/activemq/artemis/core/server/NetworkHealthCheck.javagit

public class NetworkHealthCheck extends ActiveMQScheduledComponent {

   private static final Logger logger = Logger.getLogger(NetworkHealthCheck.class);

   private final Set<ActiveMQComponent> componentList = new ConcurrentHashSet<>();
   private final Set<InetAddress> addresses = new ConcurrentHashSet<>();
   private final Set<URL> urls = new ConcurrentHashSet<>();
   private NetworkInterface networkInterface;

   public static final String IPV6_DEFAULT_COMMAND = "ping6 -c 1 %2$s";

   public static final String IPV4_DEFAULT_COMMAND = "ping -c 1 -t %d %s";

   private String ipv4Command = IPV4_DEFAULT_COMMAND;

   private String ipv6Command = IPV6_DEFAULT_COMMAND;

   // To be used on tests. As we use the loopback as a valid address on tests.
   private boolean ignoreLoopback = false;

   private boolean ownShutdown = false;

   /**
    * The timeout to be used on isReachable
    */
   private int networkTimeout;

   //......

   public void run() {
      boolean healthy = check();

      if (healthy) {
         for (ActiveMQComponent component : componentList) {
            if (!component.isStarted() && ownShutdown) {
               try {
                  ActiveMQUtilLogger.LOGGER.startingService(component.toString());
                  component.start();
               } catch (Exception e) {
                  ActiveMQUtilLogger.LOGGER.errorStartingComponent(e, component.toString());
               }
            }
            ownShutdown = false;
         }
      } else {
         for (ActiveMQComponent component : componentList) {
            if (component.isStarted()) {
               ownShutdown = true;
               try {
                  ActiveMQUtilLogger.LOGGER.stoppingService(component.toString());
                  component.stop();
               } catch (Exception e) {
                  ActiveMQUtilLogger.LOGGER.errorStoppingComponent(e, component.toString());
               }
            }
         }
      }

   }

   //......   
}
  • NetworkHealthCheck繼承了ActiveMQScheduledComponent,其run方法先執行check判斷是否healthy,以後遍歷componentList,對於非healthy的且component.isStarted()爲true的更新ownShutdown爲true,而後執行component.stop();對於healthy的且component.isStarted()爲false以及ownShutdown爲true的執行component.start(),最後更新ownShutdown爲false

check

activemq-artemis-2.11.0/artemis-commons/src/main/java/org/apache/activemq/artemis/core/server/NetworkHealthCheck.javagithub

public class NetworkHealthCheck extends ActiveMQScheduledComponent {

   //......

   public boolean check() {
      if (isEmpty()) {
         return true;
      }

      for (InetAddress address : addresses) {
         if (check(address)) {
            return true;
         }
      }

      for (URL url : urls) {
         if (check(url)) {
            return true;
         }
      }

      return false;
   }

   public boolean isEmpty() {
      return addresses.isEmpty() && urls.isEmpty();
   }

   public boolean check(InetAddress address) {
      if (address == null) {
         return false;
      }

      try {
         if (!hasCustomPingCommand() && isReachable(address)) {
            if (logger.isTraceEnabled()) {
               logger.tracef(address + " OK");
            }
            return true;
         } else {
            return purePing(address);
         }
      } catch (Exception e) {
         ActiveMQUtilLogger.LOGGER.failedToCheckAddress(e, address.toString());
         return false;
      }
   }

   public boolean hasCustomPingCommand() {
      return !getIpv4Command().equals(IPV4_DEFAULT_COMMAND) || !getIpv6Command().equals(IPV6_DEFAULT_COMMAND);
   }

   protected boolean isReachable(InetAddress address) throws IOException {
      return address.isReachable(networkInterface, 0, networkTimeout);
   }

   public boolean purePing(InetAddress address) throws IOException, InterruptedException {
      long timeout = Math.max(1, TimeUnit.MILLISECONDS.toSeconds(networkTimeout));
      // it did not work with a simple isReachable, it could be because there's no root access, so we will try ping executable

      if (logger.isTraceEnabled()) {
         logger.trace("purePing on canonical address " + address.getCanonicalHostName());
      }
      ProcessBuilder processBuilder;
      if (address instanceof Inet6Address) {
         processBuilder = buildProcess(ipv6Command, timeout, address.getCanonicalHostName());
      } else {
         processBuilder = buildProcess(ipv4Command, timeout, address.getCanonicalHostName());
      }

      Process pingProcess = processBuilder.start();

      readStream(pingProcess.getInputStream(), false);
      readStream(pingProcess.getErrorStream(), true);

      return pingProcess.waitFor() == 0;
   }

   public boolean check(URL url) {
      if (url == null) {
         return false;
      }

      try {
         URLConnection connection = url.openConnection();
         connection.setReadTimeout(networkTimeout);
         InputStream is = connection.getInputStream();
         is.close();
         return true;
      } catch (Exception e) {
         ActiveMQUtilLogger.LOGGER.failedToCheckURL(e, url.toString());
         return false;
      }
   }

   //......
}
  • check方法對於addresses且urls爲空的返回true,以後遍歷addresses及urls挨個執行check;對於InetAddress的check,在hasCustomPingCommand方法返回false且isReachable返回true的狀況下返回true,不然使用purePing進行檢查;purePing方法使用Process來執行ping;對於url的check,則使用jdk的URLConnection來openConnection,成功返回true,超時或其餘異常返回false

小結

NetworkHealthCheck繼承了ActiveMQScheduledComponent,其run方法先執行check判斷是否healthy,以後遍歷componentList,對於非healthy的且component.isStarted()爲true的更新ownShutdown爲true,而後執行component.stop();對於healthy的且component.isStarted()爲false以及ownShutdown爲true的執行component.start(),最後更新ownShutdown爲falseapache

doc

相關文章
相關標籤/搜索