本文主要研究一下artemis的confirmationWindowEnabledjava
activemq-artemis-2.11.0/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/client/impl/ClientProducerImpl.javagit
public class ClientProducerImpl implements ClientProducerInternal { //...... public void send(SimpleString address1, Message message, SendAcknowledgementHandler handler) throws ActiveMQException { checkClosed(); boolean confirmationWindowEnabled = session.isConfirmationWindowEnabled(); if (confirmationWindowEnabled) { doSend(address1, message, handler); } else { doSend(address1, message, null); if (handler != null) { if (logger.isDebugEnabled()) { logger.debug("Handler was used on producing messages towards address " + (address1 == null ? null : address1.toString()) + " however there is no confirmationWindowEnabled"); } if (!confirmationNotSetLogged) { // will log thisonly once ActiveMQClientLogger.LOGGER.confirmationNotSet(); } // if there is no confirmation enabled, we will at least call the handler after the sent is done session.scheduleConfirmation(handler, message); } } } //...... }
activemq-artemis-2.11.0/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/client/impl/ClientProducerImpl.javagithub
public class ClientProducerImpl implements ClientProducerInternal { //...... private void doSend(SimpleString sendingAddress, final Message msgToSend, final SendAcknowledgementHandler handler) throws ActiveMQException { if (sendingAddress == null) { sendingAddress = this.address; } session.startCall(); try { // In case we received message from another protocol, we first need to convert it to core as the ClientProducer only understands core ICoreMessage msg = msgToSend.toCore(); ClientProducerCredits theCredits; boolean isLarge; // a note about the second check on the writerIndexSize, // If it's a server's message, it means this is being done through the bridge or some special consumer on the // server's on which case we can't' convert the message into large at the servers if (sessionContext.supportsLargeMessage() && (getBodyInputStream(msg) != null || msg.isLargeMessage() || msg.getBodyBuffer().writerIndex() > minLargeMessageSize)) { isLarge = true; } else { isLarge = false; } if (!isLarge) { session.setAddress(msg, sendingAddress); } else { msg.setAddress(sendingAddress); } // Anonymous theCredits = session.getCredits(sendingAddress, true); if (rateLimiter != null) { // Rate flow control rateLimiter.limit(); } if (groupID != null) { msg.putStringProperty(Message.HDR_GROUP_ID, groupID); } final boolean sendBlockingConfig = msg.isDurable() ? blockOnDurableSend : blockOnNonDurableSend; // if Handler != null, we will send non blocking final boolean sendBlocking = sendBlockingConfig && handler == null; session.workDone(); if (isLarge) { largeMessageSend(sendBlocking, msg, theCredits, handler); } else { sendRegularMessage(sendingAddress, msg, sendBlocking, theCredits, handler); } } finally { session.endCall(); } } private void sendRegularMessage(final SimpleString sendingAddress, final ICoreMessage msgI, final boolean sendBlocking, final ClientProducerCredits theCredits, final SendAcknowledgementHandler handler) throws ActiveMQException { // This will block if credits are not available // Note, that for a large message, the encode size only includes the properties + headers // Not the continuations, but this is ok since we are only interested in limiting the amount of // data in *memory* and continuations go straight to the disk logger.tracef("sendRegularMessage::%s, Blocking=%s", msgI, sendBlocking); int creditSize = sessionContext.getCreditsOnSendingFull(msgI); theCredits.acquireCredits(creditSize); sessionContext.sendFullMessage(msgI, sendBlocking, handler, address); } //...... }
activemq-artemis-2.11.0/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/ActiveMQSessionContext.javaapache
public class ActiveMQSessionContext extends SessionContext { //...... public void sendFullMessage(ICoreMessage msgI, boolean sendBlocking, SendAcknowledgementHandler handler, SimpleString defaultAddress) throws ActiveMQException { final SessionSendMessage packet; if (sessionChannel.getConnection().isVersionBeforeAddressChange()) { packet = new SessionSendMessage_1X(msgI, sendBlocking, handler); } else if (sessionChannel.getConnection().isVersionBeforeAsyncResponseChange()) { packet = new SessionSendMessage(msgI, sendBlocking, handler); } else { boolean responseRequired = confirmationWindow != -1 || sendBlocking; packet = new SessionSendMessage_V2(msgI, responseRequired, handler); } if (sendBlocking) { sessionChannel.sendBlocking(packet, PacketImpl.NULL_RESPONSE); } else { sessionChannel.sendBatched(packet); } } //...... }
activemq-artemis-2.11.0/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/wireformat/SessionSendMessage.javaapi
public class SessionSendMessage extends MessagePacket { protected boolean requiresResponse; private final transient SendAcknowledgementHandler handler; /** This will be using the CoreMessage because it is meant for the core-protocol */ protected SessionSendMessage(final byte id, final ICoreMessage message, final boolean requiresResponse, final SendAcknowledgementHandler handler) { super(id, message); this.handler = handler; this.requiresResponse = requiresResponse; } protected SessionSendMessage(final byte id, final CoreMessage message) { super(id, message); this.handler = null; } /** This will be using the CoreMessage because it is meant for the core-protocol */ public SessionSendMessage(final ICoreMessage message, final boolean requiresResponse, final SendAcknowledgementHandler handler) { super(SESS_SEND, message); this.handler = handler; this.requiresResponse = requiresResponse; } public SessionSendMessage(final CoreMessage message) { super(SESS_SEND, message); this.handler = null; } // Public -------------------------------------------------------- @Override public boolean isRequiresResponse() { return requiresResponse; } public SendAcknowledgementHandler getHandler() { return handler; } //...... }
activemq-artemis-2.11.0/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/client/impl/ClientSessionImpl.javasession
public final class ClientSessionImpl implements ClientSessionInternal, FailureListener { //...... public void scheduleConfirmation(final SendAcknowledgementHandler handler, final Message message) { executor.execute(new Runnable() { @Override public void run() { handler.sendAcknowledged(message); } }); } //...... }
activemq-artemis-2.11.0/artemis-core-client/src/main/java/org/apache/activemq/artemis/api/core/client/SendAcknowledgementHandler.javaasync
public interface SendAcknowledgementHandler { /** * Notifies the client that a message sent asynchronously has been received by the server. * * @param message message sent asynchronously */ void sendAcknowledged(Message message); default void sendFailed(Message message, Exception e) { /** * By default ignore failures to preserve compatibility with existing implementations. * If the message makes it to the broker and a failure occurs sendAcknowledge() will * still be invoked just like it always was. */ } }
activemq-artemis-2.11.0/artemis-server/src/main/java/org/apache/activemq/artemis/core/protocol/core/ServerSessionPacketHandler.javaide
public class ServerSessionPacketHandler implements ChannelHandler { //...... private void sendResponse(final Packet confirmPacket, final Packet response, final boolean flush, final boolean closeChannel) { if (logger.isTraceEnabled()) { logger.trace("ServerSessionPacketHandler::scheduling response::" + response); } storageManager.afterCompleteOperations(new IOCallback() { @Override public void onError(final int errorCode, final String errorMessage) { ActiveMQServerLogger.LOGGER.errorProcessingIOCallback(errorCode, errorMessage); Packet exceptionPacket = convertToExceptionPacket(confirmPacket, ActiveMQExceptionType.createException(errorCode, errorMessage)); doConfirmAndResponse(confirmPacket, exceptionPacket, flush, closeChannel); if (logger.isTraceEnabled()) { logger.trace("ServerSessionPacketHandler::exception response sent::" + exceptionPacket); } } @Override public void done() { if (logger.isTraceEnabled()) { logger.trace("ServerSessionPacketHandler::regular response sent::" + response); } doConfirmAndResponse(confirmPacket, response, flush, closeChannel); } }); } private void doConfirmAndResponse(final Packet confirmPacket, final Packet response, final boolean flush, final boolean closeChannel) { // don't confirm if the response is an exception if (confirmPacket != null && (response == null || (response != null && response.getType() != PacketImpl.EXCEPTION))) { channel.confirm(confirmPacket); if (flush) { channel.flushConfirmations(); } } if (response != null) { channel.send(response); } if (closeChannel) { channel.close(); } } //...... }
activemq-artemis-2.11.0/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/ChannelImpl.javaui
public final class ChannelImpl implements Channel { //...... public void confirm(final Packet packet) { if (resendCache != null && packet.isRequiresConfirmations()) { lastConfirmedCommandID.incrementAndGet(); if (logger.isTraceEnabled()) { logger.trace("RemotingConnectionID=" + (connection == null ? "NULL" : connection.getID()) + " ChannelImpl::confirming packet " + packet + " last commandID=" + lastConfirmedCommandID); } receivedBytes += packet.getPacketSize(); if (receivedBytes >= confWindowSize) { receivedBytes = 0; final Packet confirmed = new PacketsConfirmedMessage(lastConfirmedCommandID.get()); confirmed.setChannelID(id); doWrite(confirmed); } } } //...... }
activemq-artemis-2.11.0/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/wireformat/PacketsConfirmedMessage.javathis
public class PacketsConfirmedMessage extends PacketImpl { private int commandID; public PacketsConfirmedMessage(final int commandID) { super(PACKETS_CONFIRMED); this.commandID = commandID; } public PacketsConfirmedMessage() { super(PACKETS_CONFIRMED); } //...... }
activemq-artemis-2.11.0/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/ChannelImpl.java
public final class ChannelImpl implements Channel { //...... public void handlePacket(final Packet packet) { if (packet.getType() == PacketImpl.PACKETS_CONFIRMED) { if (resendCache != null) { final PacketsConfirmedMessage msg = (PacketsConfirmedMessage) packet; clearUpTo(msg.getCommandID()); } if (!connection.isClient() && handler != null) { handler.handlePacket(packet); } return; } else { if (packet.isResponse()) { confirm(packet); handleAsyncResponse(packet); lock.lock(); try { response = packet; sendCondition.signal(); } finally { lock.unlock(); } } else if (handler != null) { handler.handlePacket(packet); } } } private void clearUpTo(final int lastReceivedCommandID) { final int numberToClear = 1 + lastReceivedCommandID - firstStoredCommandID; if (logger.isTraceEnabled()) { logger.trace("RemotingConnectionID=" + (connection == null ? "NULL" : connection.getID()) + " ChannelImpl::clearUpTo lastReceived commandID=" + lastReceivedCommandID + " first commandID=" + firstStoredCommandID + " number to clear " + numberToClear); } for (int i = 0; i < numberToClear; i++) { final Packet packet = resendCache.poll(); if (packet == null) { ActiveMQClientLogger.LOGGER.cannotFindPacketToClear(lastReceivedCommandID, firstStoredCommandID); firstStoredCommandID = lastReceivedCommandID + 1; return; } if (logger.isTraceEnabled()) { logger.trace("RemotingConnectionID=" + connection.getID() + " ChannelImpl::clearUpTo confirming " + packet + " towards " + commandConfirmationHandler); } if (commandConfirmationHandler != null) { commandConfirmationHandler.commandConfirmed(packet); } if (responseAsyncCache != null) { responseAsyncCache.handleResponse(packet); } } firstStoredCommandID += numberToClear; } //...... }
activemq-artemis-2.11.0/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/ActiveMQSessionContext.java
public class ActiveMQSessionContext extends SessionContext { //...... private final CommandConfirmationHandler commandConfirmationHandler = new CommandConfirmationHandler() { @Override public void commandConfirmed(Packet packet) { responseHandler.handleResponse(packet, null); } }; private final ResponseHandler responseHandler = new ResponseHandler() { @Override public void handleResponse(Packet packet, Packet response) { final ActiveMQException activeMQException; if (response != null && response.getType() == PacketImpl.EXCEPTION) { ActiveMQExceptionMessage exceptionResponseMessage = (ActiveMQExceptionMessage) response; activeMQException = exceptionResponseMessage.getException(); } else { activeMQException = null; } if (packet.getType() == PacketImpl.SESS_SEND) { SessionSendMessage ssm = (SessionSendMessage) packet; callSendAck(ssm.getHandler(), ssm.getMessage(), activeMQException); } else if (packet.getType() == PacketImpl.SESS_SEND_CONTINUATION) { SessionSendContinuationMessage scm = (SessionSendContinuationMessage) packet; if (!scm.isContinues()) { callSendAck(scm.getHandler(), scm.getMessage(), activeMQException); } } } private void callSendAck(SendAcknowledgementHandler handler, final Message message, final Exception exception) { if (handler != null) { if (exception == null) { handler.sendAcknowledged(message); } else { handler.sendFailed(message, exception); } } else if (sendAckHandler != null) { if (exception == null) { sendAckHandler.sendAcknowledged(message); } else { sendAckHandler.sendFailed(message, exception); } } } }; //...... }
ssm.getHandler()
)的sendAcknowledged或者sendFailed方法
ChannelImpl的handlePacket方法在packet的type爲PacketImpl.PACKETS_CONFIRMED,且resendCache不爲null時會執行clearUpTo方法;最後都會執行responseHandler.handleResponse(packet, response)方法,handleResponse方法會執行callSendAck,而callSendAck執行的是SendAcknowledgementHandler(
ssm.getHandler()
)的sendAcknowledged或者sendFailed方法