本文主要研究一下artemis的ExpiryScannerjava
activemq-artemis-2.11.0/artemis-server/src/main/java/org/apache/activemq/artemis/core/postoffice/impl/PostOfficeImpl.javagit
public class PostOfficeImpl implements PostOffice, NotificationListener, BindingsFactory { //...... private ExpiryReaper expiryReaperRunnable; //...... public synchronized void startExpiryScanner() { if (expiryReaperPeriod > 0) { if (expiryReaperRunnable != null) expiryReaperRunnable.stop(); expiryReaperRunnable = new ExpiryReaper(server.getScheduledPool(), server.getExecutorFactory().getExecutor(), expiryReaperPeriod, TimeUnit.MILLISECONDS, false); expiryReaperRunnable.start(); } } public synchronized void stop() throws Exception { started = false; managementService.removeNotificationListener(this); if (expiryReaperRunnable != null) expiryReaperRunnable.stop(); if (addressQueueReaperRunnable != null) addressQueueReaperRunnable.stop(); addressManager.clear(); queueInfos.clear(); } //...... }
activemq-artemis-2.11.0/artemis-server/src/main/java/org/apache/activemq/artemis/core/postoffice/impl/PostOfficeImpl.javagithub
private final class ExpiryReaper extends ActiveMQScheduledComponent { ExpiryReaper(ScheduledExecutorService scheduledExecutorService, Executor executor, long checkPeriod, TimeUnit timeUnit, boolean onDemand) { super(scheduledExecutorService, executor, checkPeriod, timeUnit, onDemand); } @Override public void run() { // The reaper thread should be finished case the PostOffice is gone // This is to avoid leaks on PostOffice between stops and starts for (Queue queue : getLocalQueues()) { try { queue.expireReferences(); } catch (Exception e) { ActiveMQServerLogger.LOGGER.errorExpiringMessages(e); } } } }
activemq-artemis-2.11.0/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/QueueImpl.javaapache
public class QueueImpl extends CriticalComponentImpl implements Queue { //...... private final ExpiryScanner expiryScanner = new ExpiryScanner(); //...... public void expireReferences() { if (isExpirationRedundant()) { return; } if (!queueDestroyed && expiryScanner.scannerRunning.get() == 0) { expiryScanner.scannerRunning.incrementAndGet(); getExecutor().execute(expiryScanner); } } //...... }
activemq-artemis-2.11.0/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/QueueImpl.javaapi
class ExpiryScanner implements Runnable { public AtomicInteger scannerRunning = new AtomicInteger(0); @Override public void run() { boolean expired = false; boolean hasElements = false; int elementsExpired = 0; LinkedList<MessageReference> expiredMessages = new LinkedList<>(); synchronized (QueueImpl.this) { if (queueDestroyed) { return; } if (logger.isDebugEnabled()) { logger.debug("Scanning for expires on " + QueueImpl.this.getName()); } LinkedListIterator<MessageReference> iter = iterator(); try { while (postOffice.isStarted() && iter.hasNext()) { hasElements = true; MessageReference ref = iter.next(); if (ref.getMessage().isExpired()) { incDelivering(ref); expired = true; expiredMessages.add(ref); iter.remove(); if (++elementsExpired >= MAX_DELIVERIES_IN_LOOP) { logger.debug("Breaking loop of expiring"); scannerRunning.incrementAndGet(); getExecutor().execute(this); break; } } } } finally { try { iter.close(); } catch (Throwable ignored) { } scannerRunning.decrementAndGet(); logger.debug("Scanning for expires on " + QueueImpl.this.getName() + " done"); } } if (!expiredMessages.isEmpty()) { Transaction tx = new TransactionImpl(storageManager); for (MessageReference ref : expiredMessages) { if (tx == null) { tx = new TransactionImpl(storageManager); } try { expire(tx, ref); refRemoved(ref); } catch (Exception e) { ActiveMQServerLogger.LOGGER.errorExpiringReferencesOnQueue(e, ref); } } try { tx.commit(); } catch (Exception e) { ActiveMQServerLogger.LOGGER.unableToCommitTransaction(e); } logger.debug("Expired " + elementsExpired + " references"); } // If empty we need to schedule depaging to make sure we would depage expired messages as well if ((!hasElements || expired) && pageIterator != null && pageIterator.tryNext() > 0) { scheduleDepage(true); } } }
1000
),若爲true則提交到executor執行並跳出循環;以後遍歷expiredMessages,挨個執行expire(tx, ref)以及refRemoved(ref),最後執行tx.commit()activemq-artemis-2.11.0/artemis-core-client/src/main/java/org/apache/activemq/artemis/api/core/Message.javaide
public interface Message { //...... default boolean isExpired() { if (getExpiration() == 0) { return false; } return System.currentTimeMillis() - getExpiration() >= 0; } //...... }
activemq-artemis-2.11.0/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/QueueImpl.javaoop
public class QueueImpl extends CriticalComponentImpl implements Queue { //...... private void expire(final Transaction tx, final MessageReference ref) throws Exception { SimpleString expiryAddress = addressSettingsRepository.getMatch(address.toString()).getExpiryAddress(); if (expiryAddress != null) { Bindings bindingList = postOffice.lookupBindingsForAddress(expiryAddress); if (bindingList == null || bindingList.getBindings().isEmpty()) { ActiveMQServerLogger.LOGGER.errorExpiringReferencesNoBindings(expiryAddress); acknowledge(tx, ref, AckReason.EXPIRED, null); } else { move(expiryAddress, tx, ref, true, true); } } else { if (!printErrorExpiring) { printErrorExpiring = true; // print this only once ActiveMQServerLogger.LOGGER.errorExpiringReferencesNoQueue(name); } acknowledge(tx, ref, AckReason.EXPIRED, null); } if (server != null && server.hasBrokerMessagePlugins()) { ExpiryLogger expiryLogger = (ExpiryLogger)tx.getProperty(TransactionPropertyIndexes.EXPIRY_LOGGER); if (expiryLogger == null) { expiryLogger = new ExpiryLogger(); tx.putProperty(TransactionPropertyIndexes.EXPIRY_LOGGER, expiryLogger); tx.addOperation(expiryLogger); } expiryLogger.addExpiry(address, ref); } } //...... }
activemq-artemis-2.11.0/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/QueueImpl.javapost
public class QueueImpl extends CriticalComponentImpl implements Queue { //...... private void move(final SimpleString toAddress, final Transaction tx, final MessageReference ref, final boolean expiry, final boolean rejectDuplicate, final long... queueIDs) throws Exception { Message copyMessage = makeCopy(ref, expiry); copyMessage.setAddress(toAddress); if (queueIDs != null && queueIDs.length > 0) { ByteBuffer buffer = ByteBuffer.allocate(8 * queueIDs.length); for (long id : queueIDs) { buffer.putLong(id); } copyMessage.putBytesProperty(Message.HDR_ROUTE_TO_IDS.toString(), buffer.array()); } postOffice.route(copyMessage, tx, false, rejectDuplicate); if (expiry) { acknowledge(tx, ref, AckReason.EXPIRED, null); } else { acknowledge(tx, ref); } } //...... }