本文主要研究一下artemis ClientConsumer的handleRegularMessagejava
activemq-artemis-2.11.0/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/client/impl/ClientConsumerImpl.javagit
public final class ClientConsumerImpl implements ClientConsumerInternal { //...... private final PriorityLinkedList<ClientMessageInternal> buffer = new PriorityLinkedListImpl<>(ClientConsumerImpl.NUM_PRIORITIES); private final Runner runner = new Runner(); private volatile MessageHandler handler; //...... private void handleRegularMessage(ClientMessageInternal message) { if (message.getAddress() == null) { message.setAddress(queueInfo.getAddress()); } message.onReceipt(this); if (!ackIndividually && message.getPriority() != 4 && !message.containsProperty(ClientConsumerImpl.FORCED_DELIVERY_MESSAGE)) { // We have messages of different priorities so we need to ack them individually since the order // of them in the ServerConsumerImpl delivery list might not be the same as the order they are // consumed in, which means that acking all up to won't work ackIndividually = true; } // Add it to the buffer buffer.addTail(message, message.getPriority()); if (handler != null) { // Execute using executor if (!stopped) { queueExecutor(); } } else { notify(); } } private void queueExecutor() { if (logger.isTraceEnabled()) { logger.trace(this + "::Adding Runner on Executor for delivery"); } sessionExecutor.execute(runner); } //...... }
activemq-artemis-2.11.0/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/client/impl/ClientConsumerImpl.javagithub
public final class ClientConsumerImpl implements ClientConsumerInternal { //...... private class Runner implements Runnable { @Override public void run() { try { callOnMessage(); } catch (Exception e) { ActiveMQClientLogger.LOGGER.onMessageError(e); lastException = e; } } } private void callOnMessage() throws Exception { if (closing || stopped) { return; } session.workDone(); // We pull the message from the buffer from inside the Runnable so we can ensure priority // ordering. If we just added a Runnable with the message to the executor immediately as we get it // we could not do that ClientMessageInternal message; // Must store handler in local variable since might get set to null // otherwise while this is executing and give NPE when calling onMessage MessageHandler theHandler = handler; if (theHandler != null) { if (rateLimiter != null) { rateLimiter.limit(); } failedOver = false; synchronized (this) { message = buffer.poll(); } if (message != null) { if (message.containsProperty(ClientConsumerImpl.FORCED_DELIVERY_MESSAGE)) { //Ignore, this could be a relic from a previous receiveImmediate(); return; } boolean expired = message.isExpired(); flowControlBeforeConsumption(message); if (!expired) { if (logger.isTraceEnabled()) { logger.trace(this + "::Calling handler.onMessage"); } final ClassLoader originalLoader = AccessController.doPrivileged(new PrivilegedAction<ClassLoader>() { @Override public ClassLoader run() { ClassLoader originalLoader = Thread.currentThread().getContextClassLoader(); Thread.currentThread().setContextClassLoader(contextClassLoader); return originalLoader; } }); onMessageThread = Thread.currentThread(); try { theHandler.onMessage(message); } finally { try { AccessController.doPrivileged(new PrivilegedAction<Object>() { @Override public Object run() { Thread.currentThread().setContextClassLoader(originalLoader); return null; } }); } catch (Exception e) { ActiveMQClientLogger.LOGGER.failedPerformPostActionsOnMessage(e); } onMessageThread = null; } if (logger.isTraceEnabled()) { logger.trace(this + "::Handler.onMessage done"); } if (message.isLargeMessage()) { message.discardBody(); } } else { session.expire(this, message); } // If slow consumer, we need to send 1 credit to make sure we get another message if (clientWindowSize == 0) { startSlowConsumer(); } } } } private void flowControlBeforeConsumption(final ClientMessageInternal message) throws ActiveMQException { // Chunk messages will execute the flow control while receiving the chunks if (message.getFlowControlSize() != 0) { // on large messages we should discount 1 on the first packets as we need continuity until the last packet flowControl(message.getFlowControlSize(), !message.isLargeMessage()); } } public void flowControl(final int messageBytes, final boolean discountSlowConsumer) throws ActiveMQException { if (clientWindowSize >= 0) { creditsToSend += messageBytes; if (creditsToSend >= clientWindowSize) { if (clientWindowSize == 0 && discountSlowConsumer) { if (logger.isTraceEnabled()) { logger.trace(this + "::FlowControl::Sending " + creditsToSend + " -1, for slow consumer"); } // sending the credits - 1 initially send to fire the slow consumer, or the slow consumer would be // always buffering one after received the first message final int credits = creditsToSend - 1; creditsToSend = 0; if (credits > 0) { sendCredits(credits); } } else { if (logger.isDebugEnabled()) { logger.debug("Sending " + messageBytes + " from flow-control"); } final int credits = creditsToSend; creditsToSend = 0; if (credits > 0) { sendCredits(credits); } } } } } //...... }
ClientConsumerImpl的handleRegularMessage方法先執行buffer.addTail(message, message.getPriority()),以後對於handler不爲null的會執行queueExecutor(),不然執行notify();queueExecutor方法是經過sessionExecutor執行runner;Runner實現了Runnable接口,其run方法執行callOnMessage();該方法對於rateLimiter不爲null會執行rateLimiter.limit();以後執行buffer.poll()獲取ClientMessageInternal進行處理apache