本文主要研究一下artemis的DelayedAddRedistributorjava
activemq-artemis-2.11.0/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/QueueImpl.javagit
public class QueueImpl extends CriticalComponentImpl implements Queue { //...... public synchronized void addRedistributor(final long delay) { clearRedistributorFuture(); if (redistributor != null) { // Just prompt delivery deliverAsync(); } if (delay > 0) { if (consumers.isEmpty()) { DelayedAddRedistributor dar = new DelayedAddRedistributor(executor); redistributorFuture = scheduledExecutor.schedule(dar, delay, TimeUnit.MILLISECONDS); } } else { internalAddRedistributor(executor); } } //...... }
activemq-artemis-2.11.0/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/QueueImpl.javagithub
public class QueueImpl extends CriticalComponentImpl implements Queue { //...... private class DelayedAddRedistributor implements Runnable { private final ArtemisExecutor executor1; DelayedAddRedistributor(final ArtemisExecutor executor) { this.executor1 = executor; } @Override public void run() { synchronized (QueueImpl.this) { internalAddRedistributor(executor1); clearRedistributorFuture(); } } } private void internalAddRedistributor(final ArtemisExecutor executor) { // create the redistributor only once if there are no local consumers if (consumers.isEmpty() && redistributor == null) { if (logger.isTraceEnabled()) { logger.trace("QueueImpl::Adding redistributor on queue " + this.toString()); } redistributor = (new ConsumerHolder(new Redistributor(this, storageManager, postOffice, executor, QueueImpl.REDISTRIBUTOR_BATCH_SIZE))); redistributor.consumer.start(); deliverAsync(); } } private void clearRedistributorFuture() { ScheduledFuture<?> future = redistributorFuture; redistributorFuture = null; if (future != null) { future.cancel(false); } } public void deliverAsync() { deliverAsync(false); } private void deliverAsync(boolean noWait) { if (scheduledRunners.get() < MAX_SCHEDULED_RUNNERS) { scheduledRunners.incrementAndGet(); checkDepage(noWait); try { getExecutor().execute(deliverRunner); } catch (RejectedExecutionException ignored) { // no-op scheduledRunners.decrementAndGet(); } } } //...... }
activemq-artemis-2.11.0/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/QueueImpl.javaredis
protected static class ConsumerHolder<T extends Consumer> implements PriorityAware { ConsumerHolder(final T consumer) { this.consumer = consumer; } final T consumer; LinkedListIterator<MessageReference> iter; private void resetIterator() { if (iter != null) { iter.close(); } iter = null; } private Consumer consumer() { return consumer; } @Override public boolean equals(Object o) { if (this == o) return true; if (o == null || getClass() != o.getClass()) return false; ConsumerHolder<?> that = (ConsumerHolder<?>) o; return Objects.equals(consumer, that.consumer); } @Override public int hashCode() { return Objects.hash(consumer); } @Override public int getPriority() { return consumer.getPriority(); } }
activemq-artemis-2.11.0/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/cluster/impl/Redistributor.javaapache
public class Redistributor implements Consumer { private boolean active; private final StorageManager storageManager; private final PostOffice postOffice; private final Executor executor; private final int batchSize; private final Queue queue; private int count; private final long sequentialID; // a Flush executor here is happening inside another executor. // what may cause issues under load. Say you are running out of executors for cases where you don't need to wait at all. // So, instead of using a future we will use a plain ReusableLatch here private ReusableLatch pendingRuns = new ReusableLatch(); public Redistributor(final Queue queue, final StorageManager storageManager, final PostOffice postOffice, final Executor executor, final int batchSize) { this.queue = queue; this.sequentialID = storageManager.generateID(); this.storageManager = storageManager; this.postOffice = postOffice; this.executor = executor; this.batchSize = batchSize; } @Override public long sequentialID() { return sequentialID; } @Override public Filter getFilter() { return null; } @Override public String debug() { return toString(); } @Override public String toManagementString() { return "Redistributor[" + queue.getName() + "/" + queue.getID() + "]"; } @Override public void disconnect() { //noop } public synchronized void start() { active = true; } @Override public synchronized HandleStatus handle(final MessageReference reference) throws Exception { if (!active) { return HandleStatus.BUSY; } else if (reference.getMessage().getGroupID() != null) { //we shouldn't redistribute with message groups return NO_MATCH so other messages can be delivered return HandleStatus.NO_MATCH; } final Transaction tx = new TransactionImpl(storageManager); final Pair<RoutingContext, Message> routingInfo = postOffice.redistribute(reference.getMessage(), queue, tx); if (routingInfo == null) { tx.rollback(); return HandleStatus.BUSY; } if (!reference.getMessage().isLargeMessage()) { postOffice.processRoute(routingInfo.getB(), routingInfo.getA(), false); ackRedistribution(reference, tx); } else { active = false; executor.execute(new Runnable() { @Override public void run() { try { postOffice.processRoute(routingInfo.getB(), routingInfo.getA(), false); ackRedistribution(reference, tx); synchronized (Redistributor.this) { active = true; count++; queue.deliverAsync(); } } catch (Exception e) { try { tx.rollback(); } catch (Exception e2) { // Nothing much we can do now ActiveMQServerLogger.LOGGER.failedToRollback(e2); } } } }); } return HandleStatus.HANDLED; } //...... }
activemq-artemis-2.11.0/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/QueueImpl.javaapp
public class QueueImpl extends CriticalComponentImpl implements Queue { //...... private final class DeliverRunner implements Runnable { @Override public void run() { try { // during the transition between paging and nonpaging, we could have this using a different executor // and at this short period we could have more than one delivery thread running in async mode // this will avoid that possibility // We will be using the deliverRunner instance as the guard object to avoid multiple threads executing // an asynchronous delivery enterCritical(CRITICAL_DELIVER); boolean needCheckDepage = false; try { deliverLock.lock(); try { needCheckDepage = deliver(); } finally { deliverLock.unlock(); } } finally { leaveCritical(CRITICAL_DELIVER); } if (needCheckDepage) { enterCritical(CRITICAL_CHECK_DEPAGE); try { checkDepage(true); } finally { leaveCritical(CRITICAL_CHECK_DEPAGE); } } } catch (Exception e) { ActiveMQServerLogger.LOGGER.errorDelivering(e); } } } private synchronized HandleStatus handle(final MessageReference reference, final Consumer consumer) { HandleStatus status; try { status = consumer.handle(reference); } catch (Throwable t) { ActiveMQServerLogger.LOGGER.removingBadConsumer(t, consumer, reference); // If the consumer throws an exception we remove the consumer try { removeConsumer(consumer); } catch (Exception e) { ActiveMQServerLogger.LOGGER.errorRemovingConsumer(e); } return HandleStatus.BUSY; } if (status == null) { throw new IllegalStateException("ClientConsumer.handle() should never return null"); } return status; } //...... }
QueueImpl的addRedistributor在delay大於0的時候會建立並調度DelayedAddRedistributor;DelayedAddRedistributor實現了Runnable方法,其run方先執行internalAddRedistributor,後執行clearRedistributorFuture;internalAddRedistributor會建立Redistributor以及ConsumerHolder,而後執行redistributor.consumer.start(),最後執行deliverAsync方法調度執行DeliverRunner;DeliverRunner實現了Runnable接口,其run方法會執行deliver方法,該方法會執行handle方法,後者會執行consumer.handle(reference);而在redistributor不爲null時,其consumer爲redistributor.consumer;redistributor.consumer的handle方法對於非largeMessage的執行postOffice.processRoute(routingInfo.getB(), routingInfo.getA(), false)以及ackRedistribution(reference, tx),最後返回HandleStatus.HANDLEDasync