本文主要研究一下artemis的CriticalAnalyzerPolicyjava
activemq-artemis-2.11.0/artemis-commons/src/main/java/org/apache/activemq/artemis/utils/critical/CriticalAnalyzerPolicy.javagit
public enum CriticalAnalyzerPolicy { HALT, SHUTDOWN, LOG; static { // for URI support on ClusterConnection BeanSupport.registerConverter(new CriticalAnalyzerPolicyConverter(), CriticalAnalyzerPolicy.class); } static class CriticalAnalyzerPolicyConverter implements Converter { @Override public <T> T convert(Class<T> type, Object value) { return type.cast(CriticalAnalyzerPolicy.valueOf(value.toString())); } } }
activemq-artemis-2.11.0/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ActiveMQServerImpl.javagithub
public class ActiveMQServerImpl implements ActiveMQServer { //...... private void initializeCriticalAnalyzer() throws Exception { // Some tests will play crazy frequenceistop/start CriticalAnalyzer analyzer = this.getCriticalAnalyzer(); if (analyzer == null) { if (configuration.isCriticalAnalyzer()) { // this will have its own ScheduledPool analyzer = new CriticalAnalyzerImpl(); } else { analyzer = EmptyCriticalAnalyzer.getInstance(); } this.analyzer = analyzer; } /* Calling this for cases where the server was stopped and now is being restarted... failback, etc...*/ analyzer.clear(); analyzer.setCheckTime(configuration.getCriticalAnalyzerCheckPeriod(), TimeUnit.MILLISECONDS).setTimeout(configuration.getCriticalAnalyzerTimeout(), TimeUnit.MILLISECONDS); if (configuration.isCriticalAnalyzer()) { analyzer.start(); } CriticalAction criticalAction = null; final CriticalAnalyzerPolicy criticalAnalyzerPolicy = configuration.getCriticalAnalyzerPolicy(); switch (criticalAnalyzerPolicy) { case HALT: criticalAction = criticalComponent -> { ActiveMQServerLogger.LOGGER.criticalSystemHalt(criticalComponent); threadDump(); sendCriticalNotification(criticalComponent); Runtime.getRuntime().halt(70); // Linux systems will have /usr/include/sysexits.h showing 70 as internal software error }; break; case SHUTDOWN: criticalAction = criticalComponent -> { ActiveMQServerLogger.LOGGER.criticalSystemShutdown(criticalComponent); threadDump(); // on the case of a critical failure, -1 cannot simply means forever. // in case graceful is -1, we will set it to 30 seconds sendCriticalNotification(criticalComponent); // you can't stop from the check thread, // nor can use an executor Thread stopThread = new Thread() { @Override public void run() { try { ActiveMQServerImpl.this.stop(); } catch (Throwable e) { logger.warn(e.getMessage(), e); } } }; stopThread.start(); }; break; case LOG: criticalAction = criticalComponent -> { ActiveMQServerLogger.LOGGER.criticalSystemLog(criticalComponent); threadDump(); sendCriticalNotification(criticalComponent); }; break; } analyzer.addAction(criticalAction); } //...... }
activemq-artemis-2.11.0/artemis-commons/src/main/java/org/apache/activemq/artemis/utils/critical/CriticalAnalyzer.javaapache
public interface CriticalAnalyzer extends ActiveMQComponent { default void clear() { } default int getNumberOfComponents() { return 0; } boolean isMeasuring(); void add(CriticalComponent component); void remove(CriticalComponent component); CriticalAnalyzer setCheckTime(long timeout, TimeUnit unit); long getCheckTimeNanoSeconds(); CriticalAnalyzer setTimeout(long timeout, TimeUnit unit); long getTimeout(TimeUnit unit); long getTimeoutNanoSeconds(); CriticalAnalyzer addAction(CriticalAction action); void check(); }
activemq-artemis-2.11.0/artemis-commons/src/main/java/org/apache/activemq/artemis/utils/critical/EmptyCriticalAnalyzer.javaide
public class EmptyCriticalAnalyzer implements CriticalAnalyzer { private static final EmptyCriticalAnalyzer instance = new EmptyCriticalAnalyzer(); public static EmptyCriticalAnalyzer getInstance() { return instance; } private EmptyCriticalAnalyzer() { } @Override public void add(CriticalComponent component) { } @Override public void remove(CriticalComponent component) { } @Override public boolean isMeasuring() { return false; } @Override public void start() throws Exception { } @Override public void stop() throws Exception { } @Override public long getTimeoutNanoSeconds() { return 0; } @Override public boolean isStarted() { return false; } @Override public CriticalAnalyzer setCheckTime(long timeout, TimeUnit unit) { return this; } @Override public long getCheckTimeNanoSeconds() { return 0; } @Override public CriticalAnalyzer setTimeout(long timeout, TimeUnit unit) { return this; } @Override public long getTimeout(TimeUnit unit) { return 0; } @Override public CriticalAnalyzer addAction(CriticalAction action) { return this; } @Override public void check() { } }
activemq-artemis-2.11.0/artemis-commons/src/main/java/org/apache/activemq/artemis/utils/critical/CriticalAnalyzerImpl.javaoop
public class CriticalAnalyzerImpl implements CriticalAnalyzer { private final Logger logger = Logger.getLogger(CriticalAnalyzer.class); private volatile long timeoutNanoSeconds; // one minute by default.. the server will change it for sure private volatile long checkTimeNanoSeconds = TimeUnit.SECONDS.toNanos(60); private final ActiveMQScheduledComponent scheduledComponent; private final AtomicBoolean running = new AtomicBoolean(false); public CriticalAnalyzerImpl() { // this will make the scheduled component to start its own pool /* Important: The scheduled component should have its own thread pool... * otherwise in case of a deadlock, or a starvation of the server the analyzer won't pick up any * issues and won't be able to shutdown the server or halt the VM */ this.scheduledComponent = new ActiveMQScheduledComponent(null, null, checkTimeNanoSeconds, TimeUnit.NANOSECONDS, false) { @Override public void run() { logger.trace("Checking critical analyzer"); check(); } }; } @Override public void clear() { actions.clear(); components.clear(); } private CopyOnWriteArrayList<CriticalAction> actions = new CopyOnWriteArrayList<>(); private final ConcurrentHashSet<CriticalComponent> components = new ConcurrentHashSet<>(); @Override public int getNumberOfComponents() { return components.size(); } @Override public boolean isMeasuring() { return true; } @Override public void add(CriticalComponent component) { components.add(component); } @Override public void remove(CriticalComponent component) { components.remove(component); } @Override public CriticalAnalyzer setCheckTime(long timeout, TimeUnit unit) { this.checkTimeNanoSeconds = unit.toNanos(timeout); this.scheduledComponent.setPeriod(timeout, unit); return this; } @Override public long getCheckTimeNanoSeconds() { if (checkTimeNanoSeconds == 0) { checkTimeNanoSeconds = getTimeout(TimeUnit.NANOSECONDS) / 2; } return checkTimeNanoSeconds; } @Override public CriticalAnalyzer setTimeout(long timeout, TimeUnit unit) { if (checkTimeNanoSeconds <= 0) { this.setCheckTime(timeout / 2, unit); } this.timeoutNanoSeconds = unit.toNanos(timeout); return this; } @Override public long getTimeout(TimeUnit unit) { if (timeoutNanoSeconds == 0) { timeoutNanoSeconds = TimeUnit.MINUTES.toNanos(2); } return unit.convert(timeoutNanoSeconds, TimeUnit.NANOSECONDS); } @Override public long getTimeoutNanoSeconds() { return timeoutNanoSeconds; } @Override public CriticalAnalyzer addAction(CriticalAction action) { this.actions.add(action); return this; } @Override public void check() { boolean retry = true; while (retry) { try { for (CriticalComponent component : components) { if (component.isExpired(timeoutNanoSeconds)) { fireAction(component); // no need to keep running if there's already a component failed return; } } retry = false; // got to the end of the list, no need to retry } catch (ConcurrentModificationException dontCare) { // lets retry on the loop } } } private void fireAction(CriticalComponent component) { for (CriticalAction action : actions) { try { action.run(component); } catch (Throwable e) { logger.warn(e.getMessage(), e); } } actions.clear(); } @Override public void start() { scheduledComponent.start(); } @Override public void stop() { scheduledComponent.stop(); } @Override public boolean isStarted() { return scheduledComponent.isStarted(); } }
CriticalAnalyzerPolicy定義了HALT, SHUTDOWN, LOG三個枚舉值;ActiveMQServerImpl的initializeCriticalAnalyzer方法先獲取CriticalAnalyzer,若爲null則建立一個,其中configuration.isCriticalAnalyzer()爲true時建立的是CriticalAnalyzerImpl,不然建立的是EmptyCriticalAnalyzer.getInstance();而後執行clear、設置checkTime,以後根據不一樣的criticalAnalyzerPolicy建立不一樣的criticalAction添加到analyzer;不一樣的criticalAnalyzerPolicy均會執行ActiveMQServerLogger.LOGGER.criticalSystemHalt(criticalComponent)、threadDump()以及sendCriticalNotification(criticalComponent),不一樣的是HALT還執行Runtime.getRuntime().halt(70),SHUTDOWN還執行ActiveMQServerImpl.this.stop(),而LOG沒有額外其餘操做this