本文主要研究一下artemis的lastValuePropertyjava
activemq-artemis-2.11.0/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/message/impl/CoreMessage.javagit
public class CoreMessage extends RefCountMessage implements ICoreMessage { //...... public SimpleString getLastValueProperty() { return getSimpleStringProperty(Message.HDR_LAST_VALUE_NAME); } @Override public Message setLastValueProperty(SimpleString lastValueName) { return putStringProperty(Message.HDR_LAST_VALUE_NAME, lastValueName); } //...... }
_AMQ_LVQ_NAME
)屬性activemq-artemis-2.11.0/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/MessageReferenceImpl.javagithub
public class MessageReferenceImpl extends LinkedListImpl.Node<MessageReferenceImpl> implements MessageReference, Runnable { //...... public SimpleString getLastValueProperty() { SimpleString lastValue = message.getSimpleStringProperty(queue.getLastValueKey()); if (lastValue == null) { lastValue = message.getLastValueProperty(); } return lastValue; } //...... }
activemq-artemis-2.11.0/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/LastValueQueue.javaapache
public class LastValueQueue extends QueueImpl { private final Map<SimpleString, HolderReference> map = new ConcurrentHashMap<>(); private final SimpleString lastValueKey; //...... @Override public synchronized void addTail(final MessageReference ref, final boolean direct) { if (scheduleIfPossible(ref)) { return; } final SimpleString prop = ref.getLastValueProperty(); if (prop != null) { HolderReference hr = map.get(prop); if (hr != null) { // We need to overwrite the old ref with the new one and ack the old one replaceLVQMessage(ref, hr); } else { hr = new HolderReference(prop, ref); map.put(prop, hr); super.addTail(hr, direct); } } else { super.addTail(ref, direct); } } @Override public synchronized void addHead(final MessageReference ref, boolean scheduling) { // we first need to check redelivery-delay, as we can't put anything on headers if redelivery-delay if (!scheduling && scheduledDeliveryHandler.checkAndSchedule(ref, false)) { return; } SimpleString lastValueProp = ref.getLastValueProperty(); if (lastValueProp != null) { HolderReference hr = map.get(lastValueProp); if (hr != null) { if (scheduling) { // We need to overwrite the old ref with the new one and ack the old one replaceLVQMessage(ref, hr); } else { // We keep the current ref and ack the one we are returning super.referenceHandled(ref); try { super.acknowledge(ref); } catch (Exception e) { ActiveMQServerLogger.LOGGER.errorAckingOldReference(e); } } } else { hr = new HolderReference(lastValueProp, ref); map.put(lastValueProp, hr); super.addHead(hr, scheduling); } } else { super.addHead(ref, scheduling); } } private void replaceLVQMessage(MessageReference ref, HolderReference hr) { MessageReference oldRef = hr.getReference(); referenceHandled(oldRef); super.refRemoved(oldRef); try { oldRef.acknowledge(null, AckReason.REPLACED, null); } catch (Exception e) { ActiveMQServerLogger.LOGGER.errorAckingOldReference(e); } hr.setReference(ref); addRefSize(ref); refAdded(ref); } //...... }
CoreMessage提供了getLastValueProperty、setLastValueProperty方法,設置的是message的Message.HDR_LAST_VALUE_NAME(_AMQ_LVQ_NAME
)屬性;MessageReferenceImpl提供了getLastValueProperty方法,它先從message獲取queue.getLastValueKey()的屬性值,若是爲null再讀取message.getLastValueProperty();LastValueQueue繼承了QueueImpl,其addTail或addHead方法先經過ref.getLastValueProperty()找到lastValueProp,在從map獲取HolderReference,對於HolderReference不爲null的,執行replaceLVQMessage方法;該方法會替換到oldRef並對oldRef進行ackide