RocketMQ支持兩種形式的消息消費者:java
實現類是DefaultMQPushConsumerImpl,看看它啓動時作了哪些事。算法
一、檢查配置&拷貝訂閱關係給負載均衡處理器多線程
二、利用MQClientManager建立本身的MQClientInstance實例對象,該MQClientInstance的做用在生產者章節描述過負載均衡
三、爲負載均衡處理器設置相關參數異步
四、初始化OffsetStore,目的是定時持久化消費進度到本地文件或者Broker,若是是廣播模式——>本地,若是是集羣——>Brokeride
五、設置consumeMessageService,分爲2種,若是使用在註冊的監聽器是MessageListenerOrderly,則consumeMessageService初始化爲ConsumeMessageOrderlyService,若是監聽器是MessageListenerConcurrently,則對應ConsumeMessageConcurrentlyService函數
六、利用MQClientInstance註冊消費者(並無向Broker註冊),MQClientInstance啓動相關服務。fetch
在集羣消費模式下,一條消息只會被同一個group裏一個消費端消費。不一樣group之間相互不影響。廣播消費模式下,一條消息會被同一個group裏每個消費端消費。可是廣播消息的代價較高,若是消費者集羣規模較大或訂閱的消費量較大,會影響集羣穩定性。在集羣模式下,當topic中的MessageQueue變動時,動態調整消費MessageQueue的數量,採用的策略以下:this
| AllocateMessageQueueConsistentHash | 一致性哈希算法分配 | | AllocateMessageQueueAveragely | 平均分配(默認) | | AllocateMessageQueueByMachineRoom | 機房分配 | | AllocateMessageQueueAveragelyByCircle | 環形分配 | | AllocateMessageQueueByConfig | 手動配置 |spa
經過增長consumer實例去分攤queue的消費,能夠起到水平擴展的消費能力的做用。而有實例下線的時候,會從新觸發負載均衡,這時候原來分配到的queue將分配到其餘實例上繼續消費。
可是若是consumer實例的數量比message queue的總數量還多的話,多出來的consumer實例將沒法分到queue,也就沒法消費到消息,也就沒法起到分攤負載的做用了。因此須要控制讓queue的總數量大於等於consumer的數量。
在啓動消費者時,會經過RebalanceService每隔20s執行一次負載均衡,能夠經過參數rocketmq.client.rebalance.waitInterval指定間隔時間。
【RebalanceService】 @Override public void run() { log.info(this.getServiceName() + " service started"); while (!this.isStopped()) { this.waitForRunning(waitInterval); this.mqClientFactory.doRebalance(); } log.info(this.getServiceName() + " service end"); } //直接利用CountDownLatch實現等待超時,不使用sleep的緣由是可能其餘操做會喚醒等待,要求當即rebalance protected void waitForRunning(long interval) { if (hasNotified.compareAndSet(true, false)) { this.onWaitEnd(); return; } //entry to wait,重寫了CountDownLatch的Sync,reset實現:setState(startCount),至關於從新初始化 waitPoint.reset(); try { waitPoint.await(interval, TimeUnit.MILLISECONDS); } catch (InterruptedException e) { log.error("Interrupted", e); } finally { hasNotified.set(false); this.onWaitEnd(); } }
並行處理:須要用戶註冊監聽器時實現 new MessageListenerConcurrently();實現以後對應的ConsumeMessageConcurrentlyService會啓動,將每次從Broker拉取的消息切割成多個consumeBatchSize(默認是1)的消息集合,每一個集合封裝成一個ConsumeRequest,提交到線程池處理。若是線程池拒絕了此次提交(當前線程沒法處理或線程池緩衝隊列滿了),則把這批消息放入到定時調度中,延遲5s再執行。使用並行處理的好處是多線程消費快,很差在於若是強調消息消費的順序性,例如必須(付款,發貨)這2條消息,並行處理可能致使(發貨——付款)的處理順序,不對!因此考慮順序消費的場景應該使用順序處理。
@Override public void submitConsumeRequest(……) { final int consumeBatchSize = this.defaultMQPushConsumer.getConsumeMessageBatchMaxSize(); if (msgs.size() <= consumeBatchSize) { ConsumeRequest consumeRequest = new ConsumeRequest(msgs, processQueue, messageQueue); this.consumeExecutor.submit(consumeRequest); } else { for (int total = 0; total < msgs.size(); ) { List<MessageExt> msgThis = new ArrayList<MessageExt>(consumeBatchSize); for (int i = 0; i < consumeBatchSize; i++, total++) { if (total < msgs.size()) { msgThis.add(msgs.get(total)); } else { break; } } try { consumeExecutor.submit(new ConsumeRequest(msgThis, processQueue, messageQueue)); } catch (RejectedExecutionException e) { for (; total < msgs.size(); total++) { msgThis.add(msgs.get(total)); } this.submitConsumeRequestLater(consumeRequest); } } } } private void submitConsumeRequestLater(final ConsumeRequest consumeRequest ) { this.scheduledExecutorService.schedule(new Runnable() { @Override public void run() { ConsumeMessageConcurrentlyService.this.consumeExecutor.submit(consumeRequest); } }, 5000, TimeUnit.MILLISECONDS); }
順序處理:須要用戶註冊監聽器時實現 new MessageListenerOrderly();實現以後對應的ConsumeMessageOrderlyService會啓動,該服務的特色是某一個隊列,在同一時間只能有一個線程訪問,只有等拿到鎖的線程消費成功後釋放鎖,其餘線程才能繼續消費該隊列。劣勢是下降了消息處理的吞吐量,當前一條消息消費出現問題時,會阻塞後續的流程。
ConsumeMessageOrderlyService在啓動的時候,若是是集羣模式下會啓動一個單線程的定時調度任務,延遲一秒,時間間隔爲20秒,執行rebalanceImpl的lockAll()方法。向主Master節點的Broker發送LOCK_BATCH_MQ請求,內容是鎖住該ConsumerId下的全部MessageQueue,當Broker返回鎖住成功的MQs後,Client端會把MQ對應的ProcessQueue也鎖住。這裏說明下兩個Queue的區別:
//消息隊列的基本屬性 public class MessageQueue { //消息主題 private String topic; //所在Broker private String brokerName; //惟一標識 private int queueId; } //消息隊列中的消息數據 public class ProcessQueue { //存放消息,key=MsgId,value=Message private final TreeMap<Long, MessageExt> msgTreeMap = new TreeMap<Long, MessageExt>(); private final Lock lockConsume = new ReentrantLock(); // ……還有其餘屬性 }
當Consumer拉取到消息後,調用ConsumeMessageOrderlyService#submitConsumeRequest處理這批消息。
@Override public void submitConsumeRequest( final List<MessageExt> msgs, final ProcessQueue processQueue, final MessageQueue messageQueue, final boolean dispathToConsume) { if (dispathToConsume) { //ConsumeRequest繼承了Runnable,封裝了一個MessageQueue && ProcessQueue ConsumeRequest consumeRequest = new ConsumeRequest(processQueue, messageQueue); //提交到線程池 this.consumeExecutor.submit(consumeRequest); } }
在執行ConsumerRequest時,它會獲取該MessageQueue的鎖,使用synchronized (objLock) 保證多線程串行處理該隊列。關鍵處理以下:
@Override public void run() { final Object objLock = messageQueueLock.fetchLockObject(this.messageQueue); synchronized (objLock) { //廣播模式 || (PQ 成功鎖住&& 鎖沒過時) if (MessageModel.BROADCASTING.equals(defaultMQPushConsumerImpl.messageModel()) || (this.processQueue.isLocked() && !this.processQueue.isLockExpired())) { //依次消費,直到continueConsume=false,若是使用者處理消息沒有錯誤,通常會返回SUCCESS,這時continueConsume始終是true,若是沒有返回SUCCESS,RocketMQ會根據返回狀態把continueConsume=false for (boolean continueConsume = true; continueConsume; ) { if (MessageModel.CLUSTERING.equals(defaultMQPushConsumerImpl.messageModel()) && !this.processQueue.isLocked()) { tryLockLaterAndReconsume(this.messageQueue, this.processQueue, 10); break; } if(CLUSTERING.equals(defaultMQPushConsumerImpl.messageModel())&& this.processQueue.isLockExpired()) { tryLockLaterAndReconsume(this.messageQueue, this.processQueue, 10); break; } // 在線程數小於隊列數狀況下,防止個別隊列被餓死 long interval = System.currentTimeMillis() - beginTime; if (interval > MAX_TIME_CONSUME_CONTINUOUSLY) { submitConsumeRequestLater(processQueue, messageQueue, 10); break; } try { //處理隊列加鎖 this.processQueue.getLockConsume().lock(); status = messageListener.consumeMessage(Collections.unmodifiableList(msgs), context); } catch (Throwable e) { } finally { this.processQueue.getLockConsume().unlock(); } continueConsume = processConsumeResult(msgs, status, context, this); } else { continueConsume = false; } } } else { // 沒有拿到當前隊列的鎖,稍後再消費 tryLockLaterAndReconsume(this.messageQueue, this.processQueue, 100); } } }
順序消費總的來講,首先,須要保證順序的消息要發送到同一個messagequeue中;其次,一個messagequeue只能被一個消費者消費,這點是由消息隊列的分配機制來保證的;最後,一個消費者內部對一個mq的消費要保證是有序的。
Broker的實時推送依靠2個服務:PullRequestHoldService和ReputMessageService。二者都繼承了ServiceThread,能夠丟給專門的線程去執行。前者是掛起每次請求,否則客戶端不停輪詢扛不住。後者用來向ConsumeQueue寫消息,並觸發前者處理最新的消息。
PullRequestHoldService的做用是掛起來自客戶端的拉取消息請求,直到有新消息知足拉取條件纔會釋放請求。
@Override public void run() { while (!this.isStopped()) { try { if (this.brokerController.getBrokerConfig().isLongPollingEnable()) { //間隔5s檢查是否有新消息 this.waitForRunning(5 * 1000); } else { this.waitForRunning(this.brokerController.getBrokerConfig().getShortPollingTimeMills()); } //實際是調用notifyMessageArriving this.checkHoldRequest(); } catch (Throwable e) { } } }
從官方給出的示例,能夠看出該模式須要使用者本身維護隊列的offset,比PushConsumer由服務端維護略顯麻煩。
public class PullConsumer { private static final Map<MessageQueue, Long> offseTable = new HashMap<MessageQueue, Long>(); public static void main(String[] args) throws MQClientException { MetaPullConsumer consumer = new MetaPullConsumer("please_rename_unique_group_name_5"); consumer.start(); Set<MessageQueue> mqs = consumer.fetchSubscribeMessageQueues("TopicTest"); for (MessageQueue mq : mqs) { PullResult pullResult = consumer.pullBlockIfNotFound(mq, null, getMessageQueueOffset(mq), 32); putMessageQueueOffset(mq, pullResult.getNextBeginOffset()); } consumer.shutdown(); } private static long getMessageQueueOffset(MessageQueue mq) { Long offset = offseTable.get(mq); if (offset != null) return offset; return 0; } private static void putMessageQueueOffset(MessageQueue mq, long offset) { offseTable.put(mq, offset); } }
從示例代碼中看出,使用時須要先獲取該消費者擁有的全部隊列,告訴客戶端須要從該隊列的哪一個offset位置取消息,獲得消息後本身更新offset。
拉取消息時分爲同步和異步,異步無非是客戶端自身(不是消費者)添加個回調函數new PullCallback() ,在MQClientAPIImpl接受消息以後調用pullCallback.onSuccess(pullResult);在Pull模式下,沒看到負載均衡,應該是給消費者自行控制,不像Push模式時由客戶端統一處理和控制,根據消費者的消費能力作流控。