分佈式消息通訊ActiveMQ原理-消費消息策略-筆記

消息消費流程圖java

消費端消費消息的原理緩存

  • 咱們經過上一節課的講解,知道有兩種方法能夠接收消息,
    • 一種是使用同步阻塞的MessageConsumer#receive方法。
    • 另外一種是使用消息監聽器MessageListener。
  • 這裏須要注意的是,在同一個session下,這二者不能同時工做,
    • 也就是說不能針對不一樣消息採用不一樣的接收方式。
    • 不然會拋出異常。
  • 至於爲何這麼作,最大的緣由仍是在事務性會話中,兩種消費模式的事務很差管控

ActiveMQMessageConsumer.receivesession

  • 消費端同步接收消息的源碼入口
public Message receive() throws JMSException {
        checkClosed();
        checkMessageListener(); //檢查receive和MessageListener是否同時配置在當前的會話中
        sendPullCommand(0); //若是PrefetchSizeSize爲0而且unconsumerMessage爲空,則發起pull命令
        MessageDispatch md = dequeue(-1); //從unconsumerMessage出隊列獲取消息
        if (md == null) {
            return null;
        }
        beforeMessageIsConsumed(md);
        afterMessageIsConsumed(md, false); //發送ack給到broker
        return createActiveMQMessage(md);//獲取消息並返回
    }

sendPullCommand異步

  • 發送pull命令從broker上獲取消息,前提是prefetchSize=0而且unconsumedMessages爲空。
  • unconsumedMessage表示未消費的消息,這裏面預讀取的消息大小爲prefetchSize的值
protected void sendPullCommand(long timeout) throws JMSException {
        clearDeliveredList();
        if (info.getCurrentPrefetchSize() == 0 && unconsumedMessages.isEmpty()) {
            MessagePull messagePull = new MessagePull();
            messagePull.configure(info);
            messagePull.setTimeout(timeout);
            session.asyncSendPacket(messagePull); //向服務端異步發送messagePull指令
        }
    }

clearDeliveredListasync

  • 在上面的sendPullCommand方法中,會先調用clearDeliveredList方法,
    • 主要用來清理已經分發的消息鏈表deliveredMessages
      • deliveredMessages,存儲分發給消費者但還爲應答的消息鏈表
      • Ø 若是session是事務的,則會遍歷deliveredMessage中的消息放入到previouslyDeliveredMessage中來作重發
      • Ø 若是session是非事務的,根據ACK的模式來選擇不一樣的應答操做
private void clearDeliveredList() {
        if (clearDeliveredList) {
            synchronized (deliveredMessages) {
                if (clearDeliveredList) {
                    if (!deliveredMessages.isEmpty()) {
                        if (session.isTransacted()) {
                            if (previouslyDeliveredMessages == null) {
                                previouslyDeliveredMessages = new PreviouslyDeliveredMap<MessageId,Boolean>(session.getTransactionContext().getTransactionId());
                            }
                            for (MessageDispatch delivered : deliveredMessages) {
                                previouslyDeliveredMessages.put(delivered.getMessage().getMessageId(), false);
                            }
                            LOG.debug("{} tracking existing transacted {} delivered list ({}) on transport interrupt",
                                    getConsumerId(), previouslyDeliveredMessages.transactionId,
                                    deliveredMessages.size());
                        } else {
                            if (session.isClientAcknowledge()) {
                                LOG.debug("{} rolling back delivered list ({}) on transport interrupt", getConsumerId(), deliveredMessages.size());
                                // allow redelivery
                                if (!this.info.isBrowser()) {
                                    for (MessageDispatch md : deliveredMessages) {
                                        this.session.connection.rollbackDuplicate(this,
                                                md.getMessage());
                                    }
                                }
                            }
                            LOG.debug("{} clearing delivered list ({}) on transport interrupt",getConsumerId(), deliveredMessages.size());
                            deliveredMessages.clear();
                            pendingAck = null;
                        }
                    }
                    clearDeliveredList = false;
                }
            }
        }
    }

dequeue性能

  • 從unconsumedMessage中取出一個消息,
  • 在建立一個消費者時,就會未這個消費者建立一個爲消費的消息通道,這個通道分爲兩種,
    • 一種是簡單優先級隊列分發通道SimplePriorityMessageDispatchChannel ;
    • 另外一種是先進先出的分發通道FifoMessageDispatchChannel.
  • 至於爲何要存在這樣一個消息分發通道,你們能夠想象一下,
    • 若是消費者每次去消費完一個消息之後再去broker拿一個消息,效率是比較低的。
    • 因此經過這樣的設計能夠容許session可以一次性將多條消息分發給一個消費者。
    • 默認狀況下對於queue來講,prefetchSize的值是1000

beforeMessageIsConsumedfetch

  • 這裏面主要是作消息消費以前的一些準備工做,
  • 若是ACK類型不是DUPS_OK_ACKNOWLEDGE或者隊列模式(簡單來講就是除了Topic和DupAck這兩種狀況),
    • 全部的消息先放到deliveredMessages鏈表的開頭。
  • 而且若是當前是事務類型的會話,
    • 則判斷transactedIndividualAck,若是爲true,表示單條消息直接返回ack。
    • 不然,調用ackLater,批量應答,
      • client端在消費消息後暫且不發送ACK,而是把它緩存下來(pendingACK),
        • 等到這些消息的條數達到必定閥值時,只須要經過一個ACK指令把它們所有確認;
        • 這比對每條消息都逐個確認,在性能上要提升不少
private void beforeMessageIsConsumed(MessageDispatch md) throws JMSException {
        md.setDeliverySequenceId(session.getNextDeliveryId());
        lastDeliveredSequenceId = md.getMessage().getMessageId().getBrokerSequenceId();
        if (!isAutoAcknowledgeBatch()) {
            synchronized(deliveredMessages) {
                deliveredMessages.addFirst(md);
            }
            if (session.getTransacted()) {
                if (transactedIndividualAck) {
                    immediateIndividualTransactedAck(md);
                } else {
                    ackLater(md, MessageAck.DELIVERED_ACK_TYPE);
                }
            }
        }
    }

afterMessageIsConsumed優化

  • 這個方法的主要做用是執行應答操做,這裏面作如下幾個操做
    • Ø 若是消息過時,則返回消息過時的ack
    • Ø 若是是事務類型的會話,則不作任何處理
    • Ø 若是是AUTOACK或者(DUPS_OK_ACK且是隊列),而且是優化ack操做,則走批量確認ack
    • Ø 若是是DUPS_OK_ACK,則走ackLater邏輯
    • Ø 若是是CLIENT_ACK,則執行ackLater
private void afterMessageIsConsumed(MessageDispatch md, boolean messageExpired) throws
            JMSException {
        if (unconsumedMessages.isClosed()) {
            return;
        }
        if (messageExpired) {
            acknowledge(md, MessageAck.EXPIRED_ACK_TYPE);
            stats.getExpiredMessageCount().increment();
        } else {
            stats.onMessage();
            if (session.getTransacted()) {
                // Do nothing.
            } else if (isAutoAcknowledgeEach()) {
                if (deliveryingAcknowledgements.compareAndSet(false, true)) {
                    synchronized (deliveredMessages) {
                        if (!deliveredMessages.isEmpty()) {
                            if (optimizeAcknowledge) {
                                ackCounter++;
                                // AMQ-3956 evaluate both expired and normal msgs as
                                // otherwise consumer may get stalled
                                if (ackCounter + deliveredCounter >= (info.getPrefetchSize() * .65)
                                        || (optimizeAcknowledgeTimeOut > 0 && System.currentTimeMillis() >= (optimizeAckTimestamp +
                                        optimizeAcknowledgeTimeOut))) {
                                    MessageAck ack =
                                            makeAckForAllDeliveredMessages(MessageAck.STANDARD_ACK_TYPE);
                                    if (ack != null) {
                                        deliveredMessages.clear();
                                        ackCounter = 0;
                                        session.sendAck(ack);
                                        optimizeAckTimestamp = System.currentTimeMillis();
                                    }
                                    // AMQ-3956 - as further optimization send
                                    // ack for expired msgs when there are any.
                                    // This resets the deliveredCounter to 0 so that
                                    // we won't sent standard acks with every msg just
                                    // because the deliveredCounter just below
                                    // 0.5 * prefetch as used in ackLater()
                                    if (pendingAck != null && deliveredCounter > 0) {
                                        session.sendAck(pendingAck);
                                        pendingAck = null;
                                        deliveredCounter = 0;
                                    }
                                }
                            } else {
                                MessageAck ack =
                                        makeAckForAllDeliveredMessages(MessageAck.STANDARD_ACK_TYPE);
                                if (ack != null) {
                                    deliveredMessages.clear();
                                    session.sendAck(ack);
                                }
                            }
                        }
                    }
                    deliveryingAcknowledgements.set(false);
                }
            } else if (isAutoAcknowledgeBatch()) {
                ackLater(md, MessageAck.STANDARD_ACK_TYPE);
            } else if (session.isClientAcknowledge() || session.isIndividualAcknowledge()) {
                boolean messageUnackedByConsumer = false;
                synchronized (deliveredMessages) {
                    messageUnackedByConsumer = deliveredMessages.contains(md);
                }
                if (messageUnackedByConsumer) {
                    ackLater(md, MessageAck.DELIVERED_ACK_TYPE);
                }
            } else {
                throw new IllegalStateException("Invalid session state.");
            }
        }
    }
相關文章
相關標籤/搜索