本文主要研究一下artemis的connectionTtlCheckIntervaljava
activemq-artemis-2.11.0/artemis-server/src/main/java/org/apache/activemq/artemis/core/config/impl/ConfigurationImpl.javagit
public class ConfigurationImpl implements Configuration, Serializable { private long connectionTtlCheckInterval = ActiveMQDefaultConfiguration.getDefaultConnectionTtlCheckInterval(); //...... @Override public long getConnectionTtlCheckInterval() { return connectionTtlCheckInterval; } @Override public ConfigurationImpl setConnectionTtlCheckInterval(long connectionTtlCheckInterval) { this.connectionTtlCheckInterval = connectionTtlCheckInterval; return this; } //...... }
activemq-artemis-2.11.0/artemis-server/src/main/java/org/apache/activemq/artemis/core/remoting/server/impl/RemotingServiceImpl.javagithub
public class RemotingServiceImpl implements RemotingService, ServerConnectionLifeCycleListener { //...... private FailureCheckAndFlushThread failureCheckAndFlushThread; private long connectionTtlCheckInterval; //...... public synchronized void start() throws Exception { if (started) { return; } logger.tracef("Starting remoting service %s", this); paused = false; // The remoting service maintains it's own thread pool for handling remoting traffic // If OIO each connection will have it's own thread // If NIO these are capped at nio-remoting-threads which defaults to num cores * 3 // This needs to be a different thread pool to the main thread pool especially for OIO where we may need // to support many hundreds of connections, but the main thread pool must be kept small for better performance ThreadFactory tFactory = AccessController.doPrivileged(new PrivilegedAction<ThreadFactory>() { @Override public ThreadFactory run() { return new ActiveMQThreadFactory("ActiveMQ-remoting-threads-" + server.toString() + "-" + System.identityHashCode(this), false, Thread.currentThread().getContextClassLoader()); } }); threadPool = Executors.newCachedThreadPool(tFactory); for (TransportConfiguration info : acceptorsConfig) { createAcceptor(info); } /** * Don't start the acceptors here. Only start the acceptors at the every end of the start-up process to avoid * race conditions. See {@link #startAcceptors()}. */ // This thread checks connections that need to be closed, and also flushes confirmations failureCheckAndFlushThread = new FailureCheckAndFlushThread(connectionTtlCheckInterval); failureCheckAndFlushThread.start(); started = true; } //...... }
activemq-artemis-2.11.0/artemis-server/src/main/java/org/apache/activemq/artemis/core/remoting/server/impl/RemotingServiceImpl.javaapache
private final class FailureCheckAndFlushThread extends Thread { private final long pauseInterval; private volatile boolean closed; private final CountDownLatch latch = new CountDownLatch(1); FailureCheckAndFlushThread(final long pauseInterval) { super("activemq-failure-check-thread"); this.pauseInterval = pauseInterval; } public void close(final boolean criticalError) { closed = true; latch.countDown(); if (!criticalError) { try { join(); } catch (InterruptedException e) { throw new ActiveMQInterruptedException(e); } } } @Override public void run() { while (!closed) { try { long now = System.currentTimeMillis(); Set<Pair<Object, Long>> toRemove = new HashSet<>(); for (ConnectionEntry entry : connections.values()) { final RemotingConnection conn = entry.connection; boolean flush = true; if (entry.ttl != -1) { if (!conn.checkDataReceived()) { if (now >= entry.lastCheck + entry.ttl) { toRemove.add(new Pair<>(conn.getID(), entry.ttl)); flush = false; } } else { entry.lastCheck = now; } } if (flush) { flushExecutor.execute(new Runnable() { @Override public void run() { try { // this is using a different thread // as if anything wrong happens on flush // failure detection could be affected conn.scheduledFlush(); } catch (Throwable e) { ActiveMQServerLogger.LOGGER.failedToFlushOutstandingDataFromTheConnection(e); } } }); } } for (final Pair<Object, Long> pair : toRemove) { final RemotingConnection conn = getConnection(pair.getA()); if (conn != null) { // In certain cases (replicationManager for instance) calling fail could take some time // We can't pause the FailureCheckAndFlushThread as that would lead other clients to fail for // missing pings flushExecutor.execute(new Runnable() { @Override public void run() { conn.fail(ActiveMQMessageBundle.BUNDLE.clientExited(conn.getRemoteAddress(), pair.getB())); } }); removeConnection(pair.getA()); } } if (latch.await(pauseInterval, TimeUnit.MILLISECONDS)) return; } catch (Throwable e) { ActiveMQServerLogger.LOGGER.errorOnFailureCheck(e); } } } }
ConfigurationImpl定義了connectionTtlCheckInterval屬性,默認爲2000;RemotingServiceImpl的start方法使用connectionTtlCheckInterval建立了FailureCheckAndFlushThread,並執行failureCheckAndFlushThread.start()方法app