聊聊artemis的DelayedAddRedistributor

本文主要研究一下artemis的DelayedAddRedistributorjava

addRedistributor

activemq-artemis-2.11.0/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/QueueImpl.javagit

public class QueueImpl extends CriticalComponentImpl implements Queue {

   //......

   public synchronized void addRedistributor(final long delay) {
      clearRedistributorFuture();

      if (redistributor != null) {
         // Just prompt delivery
         deliverAsync();
      }

      if (delay > 0) {
         if (consumers.isEmpty()) {
            DelayedAddRedistributor dar = new DelayedAddRedistributor(executor);

            redistributorFuture = scheduledExecutor.schedule(dar, delay, TimeUnit.MILLISECONDS);
         }
      } else {
         internalAddRedistributor(executor);
      }
   }

   //......
}
  • QueueImpl的addRedistributor在delay大於0的時候會建立並調度DelayedAddRedistributor

DelayedAddRedistributor

activemq-artemis-2.11.0/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/QueueImpl.javagithub

public class QueueImpl extends CriticalComponentImpl implements Queue {

   //......

   private class DelayedAddRedistributor implements Runnable {

      private final ArtemisExecutor executor1;

      DelayedAddRedistributor(final ArtemisExecutor executor) {
         this.executor1 = executor;
      }

      @Override
      public void run() {
         synchronized (QueueImpl.this) {
            internalAddRedistributor(executor1);

            clearRedistributorFuture();
         }
      }
   }

   private void internalAddRedistributor(final ArtemisExecutor executor) {
      // create the redistributor only once if there are no local consumers
      if (consumers.isEmpty() && redistributor == null) {
         if (logger.isTraceEnabled()) {
            logger.trace("QueueImpl::Adding redistributor on queue " + this.toString());
         }

         redistributor = (new ConsumerHolder(new Redistributor(this, storageManager, postOffice, executor, QueueImpl.REDISTRIBUTOR_BATCH_SIZE)));

         redistributor.consumer.start();

         deliverAsync();
      }
   }

   private void clearRedistributorFuture() {
      ScheduledFuture<?> future = redistributorFuture;
      redistributorFuture = null;
      if (future != null) {
         future.cancel(false);
      }
   }

   public void deliverAsync() {
      deliverAsync(false);
   }

   private void deliverAsync(boolean noWait) {
      if (scheduledRunners.get() < MAX_SCHEDULED_RUNNERS) {
         scheduledRunners.incrementAndGet();
         checkDepage(noWait);
         try {
            getExecutor().execute(deliverRunner);
         } catch (RejectedExecutionException ignored) {
            // no-op
            scheduledRunners.decrementAndGet();
         }
      }
   }

   //......
}
  • DelayedAddRedistributor實現了Runnable方法,其run方先執行internalAddRedistributor,後執行clearRedistributorFuture;internalAddRedistributor會建立Redistributor以及ConsumerHolder,而後執行redistributor.consumer.start(),最後執行deliverAsync方法調度執行DeliverRunner

ConsumerHolder

activemq-artemis-2.11.0/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/QueueImpl.javaredis

protected static class ConsumerHolder<T extends Consumer> implements PriorityAware {

      ConsumerHolder(final T consumer) {
         this.consumer = consumer;
      }

      final T consumer;

      LinkedListIterator<MessageReference> iter;

      private void resetIterator() {
         if (iter != null) {
            iter.close();
         }
         iter = null;
      }

      private Consumer consumer() {
         return consumer;
      }

      @Override
      public boolean equals(Object o) {
         if (this == o) return true;
         if (o == null || getClass() != o.getClass()) return false;
         ConsumerHolder<?> that = (ConsumerHolder<?>) o;
         return Objects.equals(consumer, that.consumer);
      }

      @Override
      public int hashCode() {
         return Objects.hash(consumer);
      }

      @Override
      public int getPriority() {
         return consumer.getPriority();
      }
   }
  • ConsumerHolder實現了PriorityAware接口,其getPriority方法返回的是consumer.getPriority()

Redistributor

activemq-artemis-2.11.0/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/cluster/impl/Redistributor.javaapache

public class Redistributor implements Consumer {

   private boolean active;

   private final StorageManager storageManager;

   private final PostOffice postOffice;

   private final Executor executor;

   private final int batchSize;

   private final Queue queue;

   private int count;

   private final long sequentialID;

   // a Flush executor here is happening inside another executor.
   // what may cause issues under load. Say you are running out of executors for cases where you don't need to wait at all.
   // So, instead of using a future we will use a plain ReusableLatch here
   private ReusableLatch pendingRuns = new ReusableLatch();

   public Redistributor(final Queue queue,
                        final StorageManager storageManager,
                        final PostOffice postOffice,
                        final Executor executor,
                        final int batchSize) {
      this.queue = queue;

      this.sequentialID = storageManager.generateID();

      this.storageManager = storageManager;

      this.postOffice = postOffice;

      this.executor = executor;

      this.batchSize = batchSize;
   }

   @Override
   public long sequentialID() {
      return sequentialID;
   }

   @Override
   public Filter getFilter() {
      return null;
   }

   @Override
   public String debug() {
      return toString();
   }

   @Override
   public String toManagementString() {
      return "Redistributor[" + queue.getName() + "/" + queue.getID() + "]";
   }

   @Override
   public void disconnect() {
      //noop
   }

   public synchronized void start() {
      active = true;
   }

   @Override
   public synchronized HandleStatus handle(final MessageReference reference) throws Exception {
      if (!active) {
         return HandleStatus.BUSY;
      } else if (reference.getMessage().getGroupID() != null) {
         //we shouldn't redistribute with message groups return NO_MATCH so other messages can be delivered
         return HandleStatus.NO_MATCH;
      }

      final Transaction tx = new TransactionImpl(storageManager);

      final Pair<RoutingContext, Message> routingInfo = postOffice.redistribute(reference.getMessage(), queue, tx);

      if (routingInfo == null) {
         tx.rollback();
         return HandleStatus.BUSY;
      }

      if (!reference.getMessage().isLargeMessage()) {

         postOffice.processRoute(routingInfo.getB(), routingInfo.getA(), false);

         ackRedistribution(reference, tx);
      } else {
         active = false;
         executor.execute(new Runnable() {
            @Override
            public void run() {
               try {

                  postOffice.processRoute(routingInfo.getB(), routingInfo.getA(), false);

                  ackRedistribution(reference, tx);

                  synchronized (Redistributor.this) {
                     active = true;

                     count++;

                     queue.deliverAsync();
                  }
               } catch (Exception e) {
                  try {
                     tx.rollback();
                  } catch (Exception e2) {
                     // Nothing much we can do now
                     ActiveMQServerLogger.LOGGER.failedToRollback(e2);
                  }
               }
            }
         });
      }

      return HandleStatus.HANDLED;
   }

   //......
}
  • Redistributor實現了Consumer接口,其start方法標記active爲true;其handle方法在active爲false時返回HandleStatus.BUSY;以後執行postOffice.redistribute(reference.getMessage(), queue, tx)獲取routingInfo,而後對於非largeMessage的執行postOffice.processRoute(routingInfo.getB(), routingInfo.getA(), false)以及ackRedistribution(reference, tx),最後返回HandleStatus.HANDLED

DeliverRunner

activemq-artemis-2.11.0/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/QueueImpl.javaapp

public class QueueImpl extends CriticalComponentImpl implements Queue {

   //......

   private final class DeliverRunner implements Runnable {

      @Override
      public void run() {
         try {
            // during the transition between paging and nonpaging, we could have this using a different executor
            // and at this short period we could have more than one delivery thread running in async mode
            // this will avoid that possibility
            // We will be using the deliverRunner instance as the guard object to avoid multiple threads executing
            // an asynchronous delivery
            enterCritical(CRITICAL_DELIVER);
            boolean needCheckDepage = false;
            try {
               deliverLock.lock();
               try {
                  needCheckDepage = deliver();
               } finally {
                  deliverLock.unlock();
               }
            } finally {
               leaveCritical(CRITICAL_DELIVER);
            }

            if (needCheckDepage) {
               enterCritical(CRITICAL_CHECK_DEPAGE);
               try {
                  checkDepage(true);
               } finally {
                  leaveCritical(CRITICAL_CHECK_DEPAGE);
               }
            }

         } catch (Exception e) {
            ActiveMQServerLogger.LOGGER.errorDelivering(e);
         }
      }
   }

   private synchronized HandleStatus handle(final MessageReference reference, final Consumer consumer) {
      HandleStatus status;
      try {
         status = consumer.handle(reference);
      } catch (Throwable t) {
         ActiveMQServerLogger.LOGGER.removingBadConsumer(t, consumer, reference);

         // If the consumer throws an exception we remove the consumer
         try {
            removeConsumer(consumer);
         } catch (Exception e) {
            ActiveMQServerLogger.LOGGER.errorRemovingConsumer(e);
         }
         return HandleStatus.BUSY;
      }

      if (status == null) {
         throw new IllegalStateException("ClientConsumer.handle() should never return null");
      }

      return status;
   }

   //......
}
  • DeliverRunner實現了Runnable接口,其run方法會執行deliver方法,該方法會執行handle方法,後者會執行consumer.handle(reference);而在redistributor不爲null時,其consumer爲redistributor.consumer

小結

QueueImpl的addRedistributor在delay大於0的時候會建立並調度DelayedAddRedistributor;DelayedAddRedistributor實現了Runnable方法,其run方先執行internalAddRedistributor,後執行clearRedistributorFuture;internalAddRedistributor會建立Redistributor以及ConsumerHolder,而後執行redistributor.consumer.start(),最後執行deliverAsync方法調度執行DeliverRunner;DeliverRunner實現了Runnable接口,其run方法會執行deliver方法,該方法會執行handle方法,後者會執行consumer.handle(reference);而在redistributor不爲null時,其consumer爲redistributor.consumer;redistributor.consumer的handle方法對於非largeMessage的執行postOffice.processRoute(routingInfo.getB(), routingInfo.getA(), false)以及ackRedistribution(reference, tx),最後返回HandleStatus.HANDLEDasync

doc

相關文章
相關標籤/搜索