消息消費流程圖java
消費端消費消息的原理緩存
ActiveMQMessageConsumer.receivesession
public Message receive() throws JMSException { checkClosed(); checkMessageListener(); //檢查receive和MessageListener是否同時配置在當前的會話中 sendPullCommand(0); //若是PrefetchSizeSize爲0而且unconsumerMessage爲空,則發起pull命令 MessageDispatch md = dequeue(-1); //從unconsumerMessage出隊列獲取消息 if (md == null) { return null; } beforeMessageIsConsumed(md); afterMessageIsConsumed(md, false); //發送ack給到broker return createActiveMQMessage(md);//獲取消息並返回 }
sendPullCommand異步
protected void sendPullCommand(long timeout) throws JMSException { clearDeliveredList(); if (info.getCurrentPrefetchSize() == 0 && unconsumedMessages.isEmpty()) { MessagePull messagePull = new MessagePull(); messagePull.configure(info); messagePull.setTimeout(timeout); session.asyncSendPacket(messagePull); //向服務端異步發送messagePull指令 } }
clearDeliveredListasync
private void clearDeliveredList() { if (clearDeliveredList) { synchronized (deliveredMessages) { if (clearDeliveredList) { if (!deliveredMessages.isEmpty()) { if (session.isTransacted()) { if (previouslyDeliveredMessages == null) { previouslyDeliveredMessages = new PreviouslyDeliveredMap<MessageId,Boolean>(session.getTransactionContext().getTransactionId()); } for (MessageDispatch delivered : deliveredMessages) { previouslyDeliveredMessages.put(delivered.getMessage().getMessageId(), false); } LOG.debug("{} tracking existing transacted {} delivered list ({}) on transport interrupt", getConsumerId(), previouslyDeliveredMessages.transactionId, deliveredMessages.size()); } else { if (session.isClientAcknowledge()) { LOG.debug("{} rolling back delivered list ({}) on transport interrupt", getConsumerId(), deliveredMessages.size()); // allow redelivery if (!this.info.isBrowser()) { for (MessageDispatch md : deliveredMessages) { this.session.connection.rollbackDuplicate(this, md.getMessage()); } } } LOG.debug("{} clearing delivered list ({}) on transport interrupt",getConsumerId(), deliveredMessages.size()); deliveredMessages.clear(); pendingAck = null; } } clearDeliveredList = false; } } } }
dequeue性能
beforeMessageIsConsumedfetch
private void beforeMessageIsConsumed(MessageDispatch md) throws JMSException { md.setDeliverySequenceId(session.getNextDeliveryId()); lastDeliveredSequenceId = md.getMessage().getMessageId().getBrokerSequenceId(); if (!isAutoAcknowledgeBatch()) { synchronized(deliveredMessages) { deliveredMessages.addFirst(md); } if (session.getTransacted()) { if (transactedIndividualAck) { immediateIndividualTransactedAck(md); } else { ackLater(md, MessageAck.DELIVERED_ACK_TYPE); } } } }
afterMessageIsConsumed優化
private void afterMessageIsConsumed(MessageDispatch md, boolean messageExpired) throws JMSException { if (unconsumedMessages.isClosed()) { return; } if (messageExpired) { acknowledge(md, MessageAck.EXPIRED_ACK_TYPE); stats.getExpiredMessageCount().increment(); } else { stats.onMessage(); if (session.getTransacted()) { // Do nothing. } else if (isAutoAcknowledgeEach()) { if (deliveryingAcknowledgements.compareAndSet(false, true)) { synchronized (deliveredMessages) { if (!deliveredMessages.isEmpty()) { if (optimizeAcknowledge) { ackCounter++; // AMQ-3956 evaluate both expired and normal msgs as // otherwise consumer may get stalled if (ackCounter + deliveredCounter >= (info.getPrefetchSize() * .65) || (optimizeAcknowledgeTimeOut > 0 && System.currentTimeMillis() >= (optimizeAckTimestamp + optimizeAcknowledgeTimeOut))) { MessageAck ack = makeAckForAllDeliveredMessages(MessageAck.STANDARD_ACK_TYPE); if (ack != null) { deliveredMessages.clear(); ackCounter = 0; session.sendAck(ack); optimizeAckTimestamp = System.currentTimeMillis(); } // AMQ-3956 - as further optimization send // ack for expired msgs when there are any. // This resets the deliveredCounter to 0 so that // we won't sent standard acks with every msg just // because the deliveredCounter just below // 0.5 * prefetch as used in ackLater() if (pendingAck != null && deliveredCounter > 0) { session.sendAck(pendingAck); pendingAck = null; deliveredCounter = 0; } } } else { MessageAck ack = makeAckForAllDeliveredMessages(MessageAck.STANDARD_ACK_TYPE); if (ack != null) { deliveredMessages.clear(); session.sendAck(ack); } } } } deliveryingAcknowledgements.set(false); } } else if (isAutoAcknowledgeBatch()) { ackLater(md, MessageAck.STANDARD_ACK_TYPE); } else if (session.isClientAcknowledge() || session.isIndividualAcknowledge()) { boolean messageUnackedByConsumer = false; synchronized (deliveredMessages) { messageUnackedByConsumer = deliveredMessages.contains(md); } if (messageUnackedByConsumer) { ackLater(md, MessageAck.DELIVERED_ACK_TYPE); } } else { throw new IllegalStateException("Invalid session state."); } } }