本文主要研究一下artemis的individualAcknowledgejava
activemq-artemis-2.11.0/artemis-jms-client/src/main/java/org/apache/activemq/artemis/jms/client/ActiveMQMessage.javagit
public class ActiveMQMessage implements javax.jms.Message { //...... public void acknowledge() throws JMSException { if (session != null) { try { if (session.isClosed()) { throw ActiveMQClientMessageBundle.BUNDLE.sessionClosed(); } if (individualAck) { message.individualAcknowledge(); } if (clientAck || individualAck) { session.commit(session.isBlockOnAcknowledge()); } } catch (ActiveMQException e) { throw JMSExceptionHelper.convertFromActiveMQException(e); } } } //...... }
activemq-artemis-2.11.0/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/client/impl/ClientMessageImpl.javagithub
public class ClientMessageImpl extends CoreMessage implements ClientMessageInternal { //...... public ClientMessageImpl individualAcknowledge() throws ActiveMQException { if (consumer != null) { consumer.individualAcknowledge(this); } return this; } //...... }
activemq-artemis-2.11.0/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/client/impl/ClientConsumerImpl.javaapache
public final class ClientConsumerImpl implements ClientConsumerInternal { //...... public void individualAcknowledge(ClientMessage message) throws ActiveMQException { if (lastAckedMessage != null) { flushAcks(); } session.individualAcknowledge(this, message); } public void flushAcks() throws ActiveMQException { if (lastAckedMessage != null) { if (logger.isTraceEnabled()) { logger.trace(this + "::FlushACK acking lastMessage::" + lastAckedMessage); } doAck(lastAckedMessage); } } private void doAck(final ClientMessageInternal message) throws ActiveMQException { ackBytes = 0; lastAckedMessage = null; if (logger.isTraceEnabled()) { logger.trace(this + "::Acking message " + message); } session.acknowledge(this, message); } //...... }
activemq-artemis-2.11.0/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/client/impl/ClientSessionImpl.javasession
public final class ClientSessionImpl implements ClientSessionInternal, FailureListener { //...... public void individualAcknowledge(final ClientConsumer consumer, final Message message) throws ActiveMQException { // if we're pre-acknowledging then we don't need to do anything if (preAcknowledge) { return; } checkClosed(); startCall(); try { sessionContext.sendACK(true, blockOnAcknowledge, consumer, message); } finally { endCall(); } } //...... }
activemq-artemis-2.11.0/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/ActiveMQSessionContext.javaapp
public class ActiveMQSessionContext extends SessionContext { //...... public void sendACK(boolean individual, boolean block, final ClientConsumer consumer, final Message message) throws ActiveMQException { PacketImpl messagePacket; if (individual) { messagePacket = new SessionIndividualAcknowledgeMessage(getConsumerID(consumer), message.getMessageID(), block); } else { messagePacket = new SessionAcknowledgeMessage(getConsumerID(consumer), message.getMessageID(), block); } if (block) { sessionChannel.sendBlocking(messagePacket, PacketImpl.NULL_RESPONSE); } else { sessionChannel.sendBatched(messagePacket); } } //...... }
activemq-artemis-2.11.0/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ServerConsumerImpl.javathis
public class ServerConsumerImpl implements ServerConsumer, ReadyListener { //...... public synchronized void acknowledge(Transaction tx, final long messageID) throws Exception { if (browseOnly) { return; } // Acknowledge acknowledges all refs delivered by the consumer up to and including the one explicitly // acknowledged // We use a transaction here as if the message is not found, we should rollback anything done // This could eventually happen on retries during transactions, and we need to make sure we don't ACK things we are not supposed to acknowledge boolean startedTransaction = false; if (tx == null) { startedTransaction = true; tx = new TransactionImpl(storageManager); } try { MessageReference ref; do { synchronized (lock) { ref = deliveringRefs.poll(); } if (logger.isTraceEnabled()) { logger.trace("ACKing ref " + ref + " on tx= " + tx + ", consumer=" + this); } if (ref == null) { ActiveMQIllegalStateException ils = ActiveMQMessageBundle.BUNDLE.consumerNoReference(id, messageID, messageQueue.getName()); tx.markAsRollbackOnly(ils); throw ils; } ref.acknowledge(tx, this); acks++; } while (ref.getMessageID() != messageID); if (startedTransaction) { tx.commit(); } } catch (ActiveMQException e) { if (startedTransaction) { tx.rollback(); } else { tx.markAsRollbackOnly(e); } throw e; } catch (Throwable e) { ActiveMQServerLogger.LOGGER.errorAckingMessage((Exception) e); ActiveMQException activeMQIllegalStateException = new ActiveMQIllegalStateException(e.getMessage()); if (startedTransaction) { tx.rollback(); } else { tx.markAsRollbackOnly(activeMQIllegalStateException); } throw activeMQIllegalStateException; } } public synchronized void individualAcknowledge(Transaction tx, final long messageID) throws Exception { if (browseOnly) { return; } boolean startedTransaction = false; if (logger.isTraceEnabled()) { logger.trace("individualACK messageID=" + messageID); } if (tx == null) { if (logger.isTraceEnabled()) { logger.trace("individualACK starting new TX"); } startedTransaction = true; tx = new TransactionImpl(storageManager); } try { MessageReference ref; ref = removeReferenceByID(messageID); if (logger.isTraceEnabled()) { logger.trace("ACKing ref " + ref + " on tx= " + tx + ", consumer=" + this); } if (ref == null) { ActiveMQIllegalStateException ils = new ActiveMQIllegalStateException("Cannot find ref to ack " + messageID); tx.markAsRollbackOnly(ils); throw ils; } ref.acknowledge(tx, this); acks++; if (startedTransaction) { tx.commit(); } } catch (ActiveMQException e) { if (startedTransaction) { tx.rollback(); } else if (tx != null) { tx.markAsRollbackOnly(e); } throw e; } catch (Throwable e) { ActiveMQServerLogger.LOGGER.errorAckingMessage((Exception) e); ActiveMQIllegalStateException hqex = new ActiveMQIllegalStateException(e.getMessage()); if (startedTransaction) { tx.rollback(); } else if (tx != null) { tx.markAsRollbackOnly(hqex); } throw hqex; } } //...... }
ActiveMQMessage的acknowledge方法對於individualAck爲true的會單獨執行message.individualAcknowledge();ClientMessageImpl的individualAcknowledge方法會執行consumer.individualAcknowledge(this);ClientConsumerImpl的individualAcknowledge,對於lastAckedMessage不爲null的先執行flushAcks,最後執行session.individualAcknowledge;ClientSessionImpl的individualAcknowledge方法經過sessionContext.sendACK來發送ack;ActiveMQSessionContext的sendACK方法對於individual爲true的建立的是SessionIndividualAcknowledgeMessage,最後經過sessionChannel.sendBlocking或者sessionChannel.sendBatched方法發送消息code