聊聊artemis的lastValueProperty

本文主要研究一下artemis的lastValuePropertyjava

CoreMessage

activemq-artemis-2.11.0/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/message/impl/CoreMessage.javagit

public class CoreMessage extends RefCountMessage implements ICoreMessage {

   //......

   public SimpleString getLastValueProperty() {
      return getSimpleStringProperty(Message.HDR_LAST_VALUE_NAME);
   }

   @Override
   public Message setLastValueProperty(SimpleString lastValueName) {
      return putStringProperty(Message.HDR_LAST_VALUE_NAME, lastValueName);
   }

   //......
}
  • CoreMessage提供了getLastValueProperty、setLastValueProperty方法,設置的是message的Message.HDR_LAST_VALUE_NAME(_AMQ_LVQ_NAME)屬性

MessageReferenceImpl

activemq-artemis-2.11.0/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/MessageReferenceImpl.javagithub

public class MessageReferenceImpl extends LinkedListImpl.Node<MessageReferenceImpl> implements MessageReference, Runnable {

   //......

   public SimpleString getLastValueProperty() {
      SimpleString lastValue = message.getSimpleStringProperty(queue.getLastValueKey());
      if (lastValue == null) {
         lastValue = message.getLastValueProperty();
      }
      return lastValue;
   }

   //......
}
  • MessageReferenceImpl提供了getLastValueProperty方法,它先從message獲取queue.getLastValueKey()的屬性值,若是爲null再讀取message.getLastValueProperty()

LastValueQueue

activemq-artemis-2.11.0/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/LastValueQueue.javaapache

public class LastValueQueue extends QueueImpl {

   private final Map<SimpleString, HolderReference> map = new ConcurrentHashMap<>();
   private final SimpleString lastValueKey;

   //......

   @Override
   public synchronized void addTail(final MessageReference ref, final boolean direct) {
      if (scheduleIfPossible(ref)) {
         return;
      }
      final SimpleString prop = ref.getLastValueProperty();

      if (prop != null) {
         HolderReference hr = map.get(prop);

         if (hr != null) {
            // We need to overwrite the old ref with the new one and ack the old one

            replaceLVQMessage(ref, hr);

         } else {
            hr = new HolderReference(prop, ref);

            map.put(prop, hr);

            super.addTail(hr, direct);
         }
      } else {
         super.addTail(ref, direct);
      }
   }

   @Override
   public synchronized void addHead(final MessageReference ref, boolean scheduling) {
      // we first need to check redelivery-delay, as we can't put anything on headers if redelivery-delay
      if (!scheduling && scheduledDeliveryHandler.checkAndSchedule(ref, false)) {
         return;
      }

      SimpleString lastValueProp = ref.getLastValueProperty();

      if (lastValueProp != null) {
         HolderReference hr = map.get(lastValueProp);

         if (hr != null) {
            if (scheduling) {
               // We need to overwrite the old ref with the new one and ack the old one

               replaceLVQMessage(ref, hr);
            } else {
               // We keep the current ref and ack the one we are returning

               super.referenceHandled(ref);

               try {
                  super.acknowledge(ref);
               } catch (Exception e) {
                  ActiveMQServerLogger.LOGGER.errorAckingOldReference(e);
               }
            }
         } else {
            hr = new HolderReference(lastValueProp, ref);

            map.put(lastValueProp, hr);

            super.addHead(hr, scheduling);
         }
      } else {
         super.addHead(ref, scheduling);
      }
   }

   private void replaceLVQMessage(MessageReference ref, HolderReference hr) {
      MessageReference oldRef = hr.getReference();

      referenceHandled(oldRef);
      super.refRemoved(oldRef);

      try {
         oldRef.acknowledge(null, AckReason.REPLACED, null);
      } catch (Exception e) {
         ActiveMQServerLogger.LOGGER.errorAckingOldReference(e);
      }

      hr.setReference(ref);
      addRefSize(ref);
      refAdded(ref);
   }

   //......   
}
  • LastValueQueue繼承了QueueImpl,其addTail或addHead方法先經過ref.getLastValueProperty()找到lastValueProp,在從map獲取HolderReference,對於HolderReference不爲null的,執行replaceLVQMessage方法;該方法會替換到oldRef並對oldRef進行ack

小結

CoreMessage提供了getLastValueProperty、setLastValueProperty方法,設置的是message的Message.HDR_LAST_VALUE_NAME(_AMQ_LVQ_NAME)屬性;MessageReferenceImpl提供了getLastValueProperty方法,它先從message獲取queue.getLastValueKey()的屬性值,若是爲null再讀取message.getLastValueProperty();LastValueQueue繼承了QueueImpl,其addTail或addHead方法先經過ref.getLastValueProperty()找到lastValueProp,在從map獲取HolderReference,對於HolderReference不爲null的,執行replaceLVQMessage方法;該方法會替換到oldRef並對oldRef進行ackide

doc

相關文章
相關標籤/搜索