【RocketMQ】消息的消費

RocketMQ支持兩種形式的消息消費者:java

  • PushConsumer:使用者向Consumer對象註冊一個Listener,用戶實現MessageListenerConcurrently或者MessageListenerOrderly,Consumer一旦收到消息,當即回調Listener接口方法。底層採用的是Pull長輪詢+Broker掛起方式拉取消息。該模式強調實時性。
  • PullConsumer:使用者主動調用Consumer的拉消息方法從Broker拉消息。

PushConsumer

實現類是DefaultMQPushConsumerImpl,看看它啓動時作了哪些事。算法

一、檢查配置&拷貝訂閱關係給負載均衡處理器多線程

二、利用MQClientManager建立本身的MQClientInstance實例對象,該MQClientInstance的做用在生產者章節描述過負載均衡

三、爲負載均衡處理器設置相關參數異步

四、初始化OffsetStore,目的是定時持久化消費進度到本地文件或者Broker,若是是廣播模式——>本地,若是是集羣——>Brokeride

五、設置consumeMessageService,分爲2種,若是使用在註冊的監聽器是MessageListenerOrderly,則consumeMessageService初始化爲ConsumeMessageOrderlyService,若是監聽器是MessageListenerConcurrently,則對應ConsumeMessageConcurrentlyService函數

六、利用MQClientInstance註冊消費者(並無向Broker註冊),MQClientInstance啓動相關服務。fetch

消費者的負載均衡

在集羣消費模式下,一條消息只會被同一個group裏一個消費端消費。不一樣group之間相互不影響。廣播消費模式下,一條消息會被同一個group裏每個消費端消費。可是廣播消息的代價較高,若是消費者集羣規模較大或訂閱的消費量較大,會影響集羣穩定性。在集羣模式下,當topic中的MessageQueue變動時,動態調整消費MessageQueue的數量,採用的策略以下:this

| AllocateMessageQueueConsistentHash | 一致性哈希算法分配 | | AllocateMessageQueueAveragely | 平均分配(默認) | | AllocateMessageQueueByMachineRoom | 機房分配 | | AllocateMessageQueueAveragelyByCircle | 環形分配 | | AllocateMessageQueueByConfig | 手動配置 |spa

經過增長consumer實例去分攤queue的消費,能夠起到水平擴展的消費能力的做用。而有實例下線的時候,會從新觸發負載均衡,這時候原來分配到的queue將分配到其餘實例上繼續消費。

可是若是consumer實例的數量比message queue的總數量還多的話,多出來的consumer實例將沒法分到queue,也就沒法消費到消息,也就沒法起到分攤負載的做用了。因此須要控制讓queue的總數量大於等於consumer的數量。

在啓動消費者時,會經過RebalanceService每隔20s執行一次負載均衡,能夠經過參數rocketmq.client.rebalance.waitInterval指定間隔時間。

【RebalanceService】
 @Override                                                                                                          
 public void run() {                                                                                                
     log.info(this.getServiceName() + " service started");                                                          

     while (!this.isStopped()) {                                                                                    
         this.waitForRunning(waitInterval);                                                                         
         this.mqClientFactory.doRebalance();                                                                        
     }                                                                                                              

     log.info(this.getServiceName() + " service end");                                                              
 }                                                                                                                  
 //直接利用CountDownLatch實現等待超時,不使用sleep的緣由是可能其餘操做會喚醒等待,要求當即rebalance
 protected void waitForRunning(long interval) {
        if (hasNotified.compareAndSet(true, false)) {
            this.onWaitEnd();
            return;
        }

        //entry to wait,重寫了CountDownLatch的Sync,reset實現:setState(startCount),至關於從新初始化
        waitPoint.reset();

        try {
            waitPoint.await(interval, TimeUnit.MILLISECONDS);
        } catch (InterruptedException e) {
            log.error("Interrupted", e);
        } finally {
            hasNotified.set(false);
            this.onWaitEnd();
        }
    }

消費者處理消息的方式

並行處理:須要用戶註冊監聽器時實現 new MessageListenerConcurrently();實現以後對應的ConsumeMessageConcurrentlyService會啓動,將每次從Broker拉取的消息切割成多個consumeBatchSize(默認是1)的消息集合,每一個集合封裝成一個ConsumeRequest,提交到線程池處理。若是線程池拒絕了此次提交(當前線程沒法處理或線程池緩衝隊列滿了),則把這批消息放入到定時調度中,延遲5s再執行。使用並行處理的好處是多線程消費快,很差在於若是強調消息消費的順序性,例如必須(付款,發貨)這2條消息,並行處理可能致使(發貨——付款)的處理順序,不對!因此考慮順序消費的場景應該使用順序處理。

@Override
    public void submitConsumeRequest(……) {
        final int consumeBatchSize = this.defaultMQPushConsumer.getConsumeMessageBatchMaxSize();
        if (msgs.size() <= consumeBatchSize) {
            ConsumeRequest consumeRequest = new ConsumeRequest(msgs, processQueue, messageQueue);
            this.consumeExecutor.submit(consumeRequest);     
        } else {
            for (int total = 0; total < msgs.size(); ) {
                List<MessageExt> msgThis = new ArrayList<MessageExt>(consumeBatchSize);
                for (int i = 0; i < consumeBatchSize; i++, total++) {
                    if (total < msgs.size()) {
                        msgThis.add(msgs.get(total));
                    } else {
                        break;
                    }
                }
                try {
                    consumeExecutor.submit(new ConsumeRequest(msgThis, processQueue, messageQueue));
                } catch (RejectedExecutionException e) {
                    for (; total < msgs.size(); total++) {
                        msgThis.add(msgs.get(total));
                    }
                  this.submitConsumeRequestLater(consumeRequest);
                }
            }
        }
    }
    private void submitConsumeRequestLater(final ConsumeRequest consumeRequest
    ) {

        this.scheduledExecutorService.schedule(new Runnable() {

            @Override
            public void run() {
                ConsumeMessageConcurrentlyService.this.consumeExecutor.submit(consumeRequest);
            }
        }, 5000, TimeUnit.MILLISECONDS);
    }

順序處理:須要用戶註冊監聽器時實現 new MessageListenerOrderly();實現以後對應的ConsumeMessageOrderlyService會啓動,該服務的特色是某一個隊列,在同一時間只能有一個線程訪問,只有等拿到鎖的線程消費成功後釋放鎖,其餘線程才能繼續消費該隊列。劣勢是下降了消息處理的吞吐量,當前一條消息消費出現問題時,會阻塞後續的流程。

ConsumeMessageOrderlyService在啓動的時候,若是是集羣模式下會啓動一個單線程的定時調度任務,延遲一秒,時間間隔爲20秒,執行rebalanceImpl的lockAll()方法。向主Master節點的Broker發送LOCK_BATCH_MQ請求,內容是鎖住該ConsumerId下的全部MessageQueue,當Broker返回鎖住成功的MQs後,Client端會把MQ對應的ProcessQueue也鎖住。這裏說明下兩個Queue的區別:

//消息隊列的基本屬性
public class MessageQueue {
    //消息主題
    private String topic;
    //所在Broker
    private String brokerName;
    //惟一標識
    private int queueId;
}

//消息隊列中的消息數據
public class ProcessQueue {
   //存放消息,key=MsgId,value=Message
   private final TreeMap<Long, MessageExt> msgTreeMap = new TreeMap<Long, MessageExt>();

   private final Lock lockConsume = new ReentrantLock();
   // ……還有其餘屬性
}

當Consumer拉取到消息後,調用ConsumeMessageOrderlyService#submitConsumeRequest處理這批消息。

@Override
    public void submitConsumeRequest(
        final List<MessageExt> msgs,
        final ProcessQueue processQueue,
        final MessageQueue messageQueue,
        final boolean dispathToConsume) {
        if (dispathToConsume) {
            //ConsumeRequest繼承了Runnable,封裝了一個MessageQueue && ProcessQueue
            ConsumeRequest consumeRequest = new ConsumeRequest(processQueue, messageQueue);
            //提交到線程池
            this.consumeExecutor.submit(consumeRequest);
        }
}

在執行ConsumerRequest時,它會獲取該MessageQueue的鎖,使用synchronized (objLock) 保證多線程串行處理該隊列。關鍵處理以下:

@Override
 public void run() {
     final Object objLock = messageQueueLock.fetchLockObject(this.messageQueue);
     synchronized (objLock) {
         //廣播模式 || (PQ 成功鎖住&& 鎖沒過時)
         if (MessageModel.BROADCASTING.equals(defaultMQPushConsumerImpl.messageModel())
             || (this.processQueue.isLocked() && !this.processQueue.isLockExpired())) {
                //依次消費,直到continueConsume=false,若是使用者處理消息沒有錯誤,通常會返回SUCCESS,這時continueConsume始終是true,若是沒有返回SUCCESS,RocketMQ會根據返回狀態把continueConsume=false
                for (boolean continueConsume = true; continueConsume; ) {  
                 if (MessageModel.CLUSTERING.equals(defaultMQPushConsumerImpl.messageModel())
                            && !this.processQueue.isLocked()) {
                       tryLockLaterAndReconsume(this.messageQueue, this.processQueue, 10);
                            break;
                  }

                 if(CLUSTERING.equals(defaultMQPushConsumerImpl.messageModel())&& this.processQueue.isLockExpired()) {
                       tryLockLaterAndReconsume(this.messageQueue, this.processQueue, 10);
                            break;
                  }
                 // 在線程數小於隊列數狀況下,防止個別隊列被餓死
                 long interval = System.currentTimeMillis() - beginTime;
                 if (interval > MAX_TIME_CONSUME_CONTINUOUSLY) {
                      submitConsumeRequestLater(processQueue, messageQueue, 10);
                      break;
                 }
                 try {
                   //處理隊列加鎖
                   this.processQueue.getLockConsume().lock();
                   status = messageListener.consumeMessage(Collections.unmodifiableList(msgs), context);
                  } catch (Throwable e) {
                                
                  } finally {
                       this.processQueue.getLockConsume().unlock();
                  }
                  continueConsume = processConsumeResult(msgs, status, context, this);
                  } else {
                            continueConsume = false;
                        }
                    }
                } else { 
                  // 沒有拿到當前隊列的鎖,稍後再消費
                    tryLockLaterAndReconsume(this.messageQueue, this.processQueue, 100);
                }
            }
        }

順序消費總的來講,首先,須要保證順序的消息要發送到同一個messagequeue中;其次,一個messagequeue只能被一個消費者消費,這點是由消息隊列的分配機制來保證的;最後,一個消費者內部對一個mq的消費要保證是有序的。

Broker如何實時推送

Broker的實時推送依靠2個服務:PullRequestHoldService和ReputMessageService。二者都繼承了ServiceThread,能夠丟給專門的線程去執行。前者是掛起每次請求,否則客戶端不停輪詢扛不住。後者用來向ConsumeQueue寫消息,並觸發前者處理最新的消息。

PullRequestHoldService的做用是掛起來自客戶端的拉取消息請求,直到有新消息知足拉取條件纔會釋放請求。

@Override
    public void run() {
       
        while (!this.isStopped()) {
            try {
                if (this.brokerController.getBrokerConfig().isLongPollingEnable()) {
                    //間隔5s檢查是否有新消息
                    this.waitForRunning(5 * 1000);
                } else {             this.waitForRunning(this.brokerController.getBrokerConfig().getShortPollingTimeMills());
                }
                //實際是調用notifyMessageArriving
                this.checkHoldRequest();
               
            } catch (Throwable e) {
             
            }
        }
    }

PullConsumer

從官方給出的示例,能夠看出該模式須要使用者本身維護隊列的offset,比PushConsumer由服務端維護略顯麻煩。

public class PullConsumer {
    private static final Map<MessageQueue, Long> offseTable = new HashMap<MessageQueue, Long>();


    public static void main(String[] args) throws MQClientException {
        MetaPullConsumer consumer = new MetaPullConsumer("please_rename_unique_group_name_5");

        consumer.start();

        Set<MessageQueue> mqs = consumer.fetchSubscribeMessageQueues("TopicTest");
        for (MessageQueue mq : mqs) {
            PullResult pullResult = consumer.pullBlockIfNotFound(mq, null, getMessageQueueOffset(mq), 32);
            putMessageQueueOffset(mq, pullResult.getNextBeginOffset());
        }

        consumer.shutdown();
    }

    private static long getMessageQueueOffset(MessageQueue mq) {
        Long offset = offseTable.get(mq);
        if (offset != null)
            return offset;

        return 0;
    }

    private static void putMessageQueueOffset(MessageQueue mq, long offset) {
        offseTable.put(mq, offset);
    }

}

從示例代碼中看出,使用時須要先獲取該消費者擁有的全部隊列,告訴客戶端須要從該隊列的哪一個offset位置取消息,獲得消息後本身更新offset。

拉取消息時分爲同步和異步,異步無非是客戶端自身(不是消費者)添加個回調函數new PullCallback() ,在MQClientAPIImpl接受消息以後調用pullCallback.onSuccess(pullResult);在Pull模式下,沒看到負載均衡,應該是給消費者自行控制,不像Push模式時由客戶端統一處理和控制,根據消費者的消費能力作流控。

相關文章
相關標籤/搜索