本文主要研究一下artemis的duplicatePropertyjava
activemq-artemis-2.11.0/artemis-core-client/src/main/java/org/apache/activemq/artemis/api/core/Message.javagit
public interface Message { //...... default Object getDuplicateProperty() { return null; } default byte[] getDuplicateIDBytes() { Object duplicateID = getDuplicateProperty(); if (duplicateID == null) { return null; } else { if (duplicateID instanceof SimpleString) { return ((SimpleString) duplicateID).getData(); } else if (duplicateID instanceof String) { return new SimpleString(duplicateID.toString()).getData(); } else { return (byte[]) duplicateID; } } } //...... }
activemq-artemis-2.11.0/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/message/impl/CoreMessage.javagithub
public class CoreMessage extends RefCountMessage implements ICoreMessage { //...... public Object getDuplicateProperty() { return getObjectProperty(Message.HDR_DUPLICATE_DETECTION_ID); } //...... }
activemq-artemis-2.11.0/artemis-server/src/main/java/org/apache/activemq/artemis/core/postoffice/impl/PostOfficeImpl.javaredis
public class PostOfficeImpl implements PostOffice, NotificationListener, BindingsFactory { //...... private final ConcurrentMap<SimpleString, DuplicateIDCache> duplicateIDCaches = new ConcurrentHashMap<>(); //...... public RoutingStatus route(final Message message, final RoutingContext context, final boolean direct, boolean rejectDuplicates, final Binding bindingMove) throws Exception { RoutingStatus result; // Sanity check if (message.getRefCount() > 0) { throw new IllegalStateException("Message cannot be routed more than once"); } final SimpleString address = context.getAddress(message); setPagingStore(address, message); AtomicBoolean startedTX = new AtomicBoolean(false); applyExpiryDelay(message, address); if (context.isDuplicateDetection() && !checkDuplicateID(message, context, rejectDuplicates, startedTX)) { return RoutingStatus.DUPLICATED_ID; } message.clearInternalProperties(); Bindings bindings = addressManager.getBindingsForRoutingAddress(address); AddressInfo addressInfo = addressManager.getAddressInfo(address); //...... if (server.hasBrokerMessagePlugins()) { server.callBrokerMessagePlugins(plugin -> plugin.afterMessageRoute(message, context, direct, rejectDuplicates, result)); } return result; } private boolean checkDuplicateID(final Message message, final RoutingContext context, boolean rejectDuplicates, AtomicBoolean startedTX) throws Exception { // Check the DuplicateCache for the Bridge first Object bridgeDup = message.removeExtraBytesProperty(Message.HDR_BRIDGE_DUPLICATE_ID); if (bridgeDup != null) { // if the message is being sent from the bridge, we just ignore the duplicate id, and use the internal one byte[] bridgeDupBytes = (byte[]) bridgeDup; DuplicateIDCache cacheBridge = getDuplicateIDCache(BRIDGE_CACHE_STR.concat(context.getAddress(message).toString())); if (context.getTransaction() == null) { context.setTransaction(new TransactionImpl(storageManager)); startedTX.set(true); } if (!cacheBridge.atomicVerify(bridgeDupBytes, context.getTransaction())) { context.getTransaction().rollback(); startedTX.set(false); message.decrementRefCount(); return false; } } else { // if used BridgeDuplicate, it's not going to use the regular duplicate // since this will would break redistribution (re-setting the duplicateId) byte[] duplicateIDBytes = message.getDuplicateIDBytes(); DuplicateIDCache cache = null; boolean isDuplicate = false; if (duplicateIDBytes != null) { cache = getDuplicateIDCache(context.getAddress(message)); isDuplicate = cache.contains(duplicateIDBytes); if (rejectDuplicates && isDuplicate) { ActiveMQServerLogger.LOGGER.duplicateMessageDetected(message); String warnMessage = "Duplicate message detected - message will not be routed. Message information:" + message.toString(); if (context.getTransaction() != null) { context.getTransaction().markAsRollbackOnly(new ActiveMQDuplicateIdException(warnMessage)); } message.decrementRefCount(); return false; } } if (cache != null && !isDuplicate) { if (context.getTransaction() == null) { // We need to store the duplicate id atomically with the message storage, so we need to create a tx for this context.setTransaction(new TransactionImpl(storageManager)); startedTX.set(true); } cache.addToCache(duplicateIDBytes, context.getTransaction(), startedTX.get()); } } return true; } public DuplicateIDCache getDuplicateIDCache(final SimpleString address) { DuplicateIDCache cache = duplicateIDCaches.get(address); if (cache == null) { cache = new DuplicateIDCacheImpl(address, idCacheSize, storageManager, persistIDCache); DuplicateIDCache oldCache = duplicateIDCaches.putIfAbsent(address, cache); if (oldCache != null) { cache = oldCache; } } return cache; } //...... }
activemq-artemis-2.11.0/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/PostOfficeJournalLoader.javaapache
public class PostOfficeJournalLoader implements JournalLoader { //...... public void handleDuplicateIds(Map<SimpleString, List<Pair<byte[], Long>>> duplicateIDMap) throws Exception { for (Map.Entry<SimpleString, List<Pair<byte[], Long>>> entry : duplicateIDMap.entrySet()) { SimpleString address = entry.getKey(); DuplicateIDCache cache = postOffice.getDuplicateIDCache(address); if (configuration.isPersistIDCache()) { cache.load(entry.getValue()); } } } //...... }
activemq-artemis-2.11.0/artemis-server/src/main/java/org/apache/activemq/artemis/core/postoffice/DuplicateIDCache.javaapi
public interface DuplicateIDCache { boolean contains(byte[] duplicateID); boolean atomicVerify(byte[] duplID, Transaction tx) throws Exception; void addToCache(byte[] duplicateID) throws Exception; void addToCache(byte[] duplicateID, Transaction tx) throws Exception; /** * it will add the data to the cache. * If TX == null it won't use a transaction. * if instantAdd=true, it won't wait a transaction to add on the cache which is needed on the case of the Bridges */ void addToCache(byte[] duplicateID, Transaction tx, boolean instantAdd) throws Exception; void deleteFromCache(byte[] duplicateID) throws Exception; void load(List<Pair<byte[], Long>> theIds) throws Exception; void load(Transaction tx, byte[] duplID); void clear() throws Exception; List<Pair<byte[], Long>> getMap(); }
activemq-artemis-2.11.0/artemis-server/src/main/java/org/apache/activemq/artemis/core/postoffice/impl/DuplicateIDCacheImpl.java數組
public class DuplicateIDCacheImpl implements DuplicateIDCache { private static final Logger logger = Logger.getLogger(DuplicateIDCacheImpl.class); // ByteHolder, position private final Map<ByteArrayHolder, Integer> cache = new ConcurrentHashMap<>(); private final SimpleString address; // Note - deliberately typed as ArrayList since we want to ensure fast indexed // based array access private final ArrayList<Pair<ByteArrayHolder, Long>> ids; private int pos; private final int cacheSize; private final StorageManager storageManager; private final boolean persist; public DuplicateIDCacheImpl(final SimpleString address, final int size, final StorageManager storageManager, final boolean persist) { this.address = address; cacheSize = size; ids = new ArrayList<>(size); this.storageManager = storageManager; this.persist = persist; } @Override public void load(final List<Pair<byte[], Long>> theIds) throws Exception { long txID = -1; // If we have more IDs than cache size, we shrink the first ones int deleteCount = theIds.size() - cacheSize; if (deleteCount < 0) { deleteCount = 0; } for (Pair<byte[], Long> id : theIds) { if (deleteCount > 0) { if (txID == -1) { txID = storageManager.generateID(); } if (logger.isTraceEnabled()) { logger.trace("DuplicateIDCacheImpl::load deleting id=" + describeID(id.getA(), id.getB())); } storageManager.deleteDuplicateIDTransactional(txID, id.getB()); deleteCount--; } else { ByteArrayHolder bah = new ByteArrayHolder(id.getA()); Pair<ByteArrayHolder, Long> pair = new Pair<>(bah, id.getB()); cache.put(bah, ids.size()); ids.add(pair); if (logger.isTraceEnabled()) { logger.trace("DuplicateIDCacheImpl::load loading id=" + describeID(id.getA(), id.getB())); } } } if (txID != -1) { storageManager.commit(txID); } pos = ids.size(); if (pos == cacheSize) { pos = 0; } } public boolean contains(final byte[] duplID) { boolean contains = cache.get(new ByteArrayHolder(duplID)) != null; if (contains) { logger.trace("DuplicateIDCacheImpl(" + this.address + ")::constains found a duplicate " + describeID(duplID, 0)); } return contains; } public synchronized void addToCache(final byte[] duplID, final Transaction tx, boolean instantAdd) throws Exception { long recordID = -1; if (tx == null) { if (persist) { recordID = storageManager.generateID(); storageManager.storeDuplicateID(address, duplID, recordID); } addToCacheInMemory(duplID, recordID); } else { if (persist) { recordID = storageManager.generateID(); storageManager.storeDuplicateIDTransactional(tx.getID(), address, duplID, recordID); tx.setContainsPersistent(); } if (logger.isTraceEnabled()) { logger.trace("DuplicateIDCacheImpl(" + this.address + ")::addToCache Adding duplicateID TX operation for " + describeID(duplID, recordID) + ", tx=" + tx); } if (instantAdd) { tx.addOperation(new AddDuplicateIDOperation(duplID, recordID, false)); } else { // For a tx, it's important that the entry is not added to the cache until commit // since if the client fails then resends them tx we don't want it to get rejected tx.afterStore(new AddDuplicateIDOperation(duplID, recordID, true)); } } } private synchronized void addToCacheInMemory(final byte[] duplID, final long recordID) { if (logger.isTraceEnabled()) { logger.trace("DuplicateIDCacheImpl(" + this.address + ")::addToCacheInMemory Adding " + describeID(duplID, recordID)); } ByteArrayHolder holder = new ByteArrayHolder(duplID); cache.put(holder, pos); Pair<ByteArrayHolder, Long> id; if (pos < ids.size()) { // Need fast array style access here -hence ArrayList typing id = ids.get(pos); // The id here might be null if it was explicit deleted if (id.getA() != null) { if (logger.isTraceEnabled()) { logger.trace("DuplicateIDCacheImpl(" + this.address + ")::addToCacheInMemory removing excess duplicateDetection " + describeID(id.getA().bytes, id.getB())); } cache.remove(id.getA()); // Record already exists - we delete the old one and add the new one // Note we can't use update since journal update doesn't let older records get // reclaimed if (id.getB() != null) { try { storageManager.deleteDuplicateID(id.getB()); } catch (Exception e) { ActiveMQServerLogger.LOGGER.errorDeletingDuplicateCache(e); } } } id.setA(holder); // The recordID could be negative if the duplicateCache is configured to not persist, // -1 would mean null on this case id.setB(recordID >= 0 ? recordID : null); if (logger.isTraceEnabled()) { logger.trace("DuplicateIDCacheImpl(" + this.address + ")::addToCacheInMemory replacing old duplicateID by " + describeID(id.getA().bytes, id.getB())); } holder.pos = pos; } else { id = new Pair<>(holder, recordID >= 0 ? recordID : null); if (logger.isTraceEnabled()) { logger.trace("DuplicateIDCacheImpl(" + this.address + ")::addToCacheInMemory Adding new duplicateID " + describeID(id.getA().bytes, id.getB())); } ids.add(id); holder.pos = pos; } if (pos++ == cacheSize - 1) { pos = 0; } } //...... }
activemq-artemis-2.11.0/artemis-server/src/main/java/org/apache/activemq/artemis/core/postoffice/impl/DuplicateIDCacheImpl.javaapp
private final class AddDuplicateIDOperation extends TransactionOperationAbstract { final byte[] duplID; final long recordID; volatile boolean done; private final boolean afterCommit; AddDuplicateIDOperation(final byte[] duplID, final long recordID, boolean afterCommit) { this.duplID = duplID; this.recordID = recordID; this.afterCommit = afterCommit; } private void process() { if (!done) { addToCacheInMemory(duplID, recordID); done = true; } } @Override public void afterCommit(final Transaction tx) { if (afterCommit) { process(); } } @Override public void beforeCommit(Transaction tx) throws Exception { if (!afterCommit) { process(); } } @Override public List<MessageReference> getRelatedMessageReferences() { return null; } }