本文主要研究一下artemis的groupRebalancejava
activemq-artemis-2.11.0/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/QueueImpl.javagit
public class QueueImpl extends CriticalComponentImpl implements Queue { //...... private volatile boolean groupRebalance; private volatile int groupBuckets; private MessageGroups<Consumer> groups; //...... public void addConsumer(final Consumer consumer) throws Exception { if (logger.isDebugEnabled()) { logger.debug(this + " adding consumer " + consumer); } enterCritical(CRITICAL_CONSUMER); try { synchronized (this) { if (maxConsumers != MAX_CONSUMERS_UNLIMITED && consumers.size() >= maxConsumers) { throw ActiveMQMessageBundle.BUNDLE.maxConsumerLimitReachedForQueue(address, name); } if (consumers.isEmpty()) { this.supportsDirectDeliver = consumer.supportsDirectDelivery(); } else { if (!consumer.supportsDirectDelivery()) { this.supportsDirectDeliver = false; } } cancelRedistributor(); ConsumerHolder<Consumer> newConsumerHolder = new ConsumerHolder<>(consumer); if (consumers.add(newConsumerHolder)) { int currentConsumerCount = consumers.size(); if (delayBeforeDispatch >= 0) { dispatchStartTimeUpdater.compareAndSet(this,-1, delayBeforeDispatch + System.currentTimeMillis()); } if (currentConsumerCount >= consumersBeforeDispatch) { if (dispatchingUpdater.compareAndSet(this, BooleanUtil.toInt(false), BooleanUtil.toInt(true))) { dispatchStartTimeUpdater.set(this, System.currentTimeMillis()); } } } if (groupRebalance) { groups.removeAll(); } if (refCountForConsumers != null) { refCountForConsumers.increment(); } } } finally { leaveCritical(CRITICAL_CONSUMER); } } public static MessageGroups<Consumer> groupMap(int groupBuckets) { if (groupBuckets == -1) { return new SimpleMessageGroups<>(); } else if (groupBuckets == 0) { return DisabledMessageGroups.instance(); } else { return new BucketMessageGroups<>(groupBuckets); } } //...... }
activemq-artemis-2.11.0/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/MessageGroups.javagithub
public interface MessageGroups<C> { void put(SimpleString key, C consumer); C get(SimpleString key); C remove(SimpleString key); boolean removeIf(Predicate<? super C> filter); void removeAll(); int size(); Map<SimpleString, C> toMap(); }
activemq-artemis-2.11.0/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/SimpleMessageGroups.javaredis
public class SimpleMessageGroups<C> extends MapMessageGroups<C> { public SimpleMessageGroups() { super(new HashMap<>()); } }
activemq-artemis-2.11.0/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/MapMessageGroups.javaapache
abstract class MapMessageGroups<C> implements MessageGroups<C> { private final Map<SimpleString, C> groups; protected MapMessageGroups(Map<SimpleString, C> groups) { this.groups = groups; } @Override public void put(SimpleString key, C consumer) { groups.put(key, consumer); } @Override public C get(SimpleString key) { return groups.get(key); } @Override public C remove(SimpleString key) { return groups.remove(key); } @Override public boolean removeIf(Predicate<? super C> filter) { return groups.values().removeIf(filter); } @Override public void removeAll() { groups.clear(); } @Override public int size() { return groups.size(); } @Override public Map<SimpleString, C> toMap() { return new HashMap<>(groups); } }
activemq-artemis-2.11.0/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/QueueImpl.javaide
public class QueueImpl extends CriticalComponentImpl implements Queue { //...... private volatile boolean exclusive; private final QueueConsumers<ConsumerHolder<? extends Consumer>> consumers = new QueueConsumersImpl<>(); private ConsumerHolder<Redistributor> redistributor; //...... private boolean deliver(final MessageReference ref) { synchronized (this) { if (!supportsDirectDeliver) { return false; } if (isPaused() || !canDispatch() && redistributor == null) { return false; } if (checkExpired(ref)) { return true; } consumers.reset(); while (consumers.hasNext() || redistributor != null) { ConsumerHolder<? extends Consumer> holder = redistributor == null ? consumers.next() : redistributor; Consumer consumer = holder.consumer; final SimpleString groupID = extractGroupID(ref); Consumer groupConsumer = getGroupConsumer(groupID); if (groupConsumer != null) { consumer = groupConsumer; } HandleStatus status = handle(ref, consumer); if (status == HandleStatus.HANDLED) { final MessageReference reference; if (redistributor == null) { reference = handleMessageGroup(ref, consumer, groupConsumer, groupID); } else { reference = ref; } incrementMesssagesAdded(); deliveriesInTransit.countUp(); reference.setInDelivery(true); proceedDeliver(consumer, reference); consumers.reset(); return true; } if (redistributor != null || groupConsumer != null) { break; } } if (logger.isTraceEnabled()) { logger.tracef("Queue " + getName() + " is out of direct delivery as no consumers handled a delivery"); } return false; } } private MessageReference handleMessageGroup(MessageReference ref, Consumer consumer, Consumer groupConsumer, SimpleString groupID) { if (exclusive) { if (groupConsumer == null) { exclusiveConsumer = consumer; if (groupFirstKey != null) { return new GroupFirstMessageReference(groupFirstKey, ref); } } consumers.repeat(); } else if (groupID != null) { if (extractGroupSequence(ref) == -1) { groups.remove(groupID); consumers.repeat(); } else if (groupConsumer == null) { groups.put(groupID, consumer); if (groupFirstKey != null) { return new GroupFirstMessageReference(groupFirstKey, ref); } } else { consumers.repeat(); } } return ref; } private SimpleString extractGroupID(MessageReference ref) { if (internalQueue || exclusive || groupBuckets == 0) { return null; } else { try { return ref.getMessage().getGroupID(); } catch (Throwable e) { ActiveMQServerLogger.LOGGER.unableToExtractGroupID(e); return null; } } } private Consumer getGroupConsumer(SimpleString groupID) { Consumer groupConsumer = null; if (exclusive) { // If exclusive is set, then this overrides the consumer chosen round-robin groupConsumer = exclusiveConsumer; } else { // If a group id is set, then this overrides the consumer chosen round-robin if (groupID != null) { groupConsumer = groups.get(groupID); } } return groupConsumer; } private int extractGroupSequence(MessageReference ref) { if (internalQueue) { return 0; } else { try { // But we don't use the groupID on internal queues (clustered queues) otherwise the group map would leak forever return ref.getMessage().getGroupSequence(); } catch (Throwable e) { ActiveMQServerLogger.LOGGER.unableToExtractGroupSequence(e); return 0; } } } //...... }
activemq-artemis-2.11.0/artemis-commons/src/main/java/org/apache/activemq/artemis/utils/collections/ArrayResettableIterator.javathis
public class ArrayResettableIterator<T> implements ResettableIterator<T> { private final Object[] array; private int cursor = 0; private int endPos = -1; private boolean hasNext; public ArrayResettableIterator(Object[] array) { this.array = array; reset(); } public static <T> ResettableIterator<T> iterator(Collection<T> collection) { return new ArrayResettableIterator<>(collection.toArray()); } @Override public void reset() { endPos = cursor; hasNext = array.length > 0; } @Override public boolean hasNext() { return hasNext; } @Override public T next() { if (!hasNext) { throw new IllegalStateException(); } @SuppressWarnings("unchecked") T result = (T) array[cursor]; cursor++; if (cursor == array.length) { cursor = 0; } if (cursor == endPos) { hasNext = false; } return result; } }
array.length > 0
QueueImpl定義了groupRebalance屬性,默認爲false;addConsumer方法在groupRebalance爲true是會執行groups.removeAll();它還定義了groupBuckets屬性,默認爲-1,建立的是SimpleMessageGroupsspa