本文咱們接着分析RocketMQ消息消費的邏輯。併發
接上文,DefaultMQPushConsumerImpl啓動過程當中,啓動了consumeMessageService消息消費線程。異步
if (this.getMessageListenerInner() instanceof MessageListenerOrderly) {
this.consumeOrderly = true;
this.consumeMessageService =
new ConsumeMessageOrderlyService(this, (MessageListenerOrderly) this.getMessageListenerInner());
} else if (this.getMessageListenerInner() instanceof MessageListenerConcurrently) {
this.consumeOrderly = false;
this.consumeMessageService =
new ConsumeMessageConcurrentlyService(this, (MessageListenerConcurrently) this.getMessageListenerInner());
}
this.consumeMessageService.start();複製代碼
能夠看到,是根據MessageListener的具體實現選擇具體的consumeMessageService實現,咱們重點講解並行消費服務ConsumeMessageConcurrentlyService。ide
首先看一下ConsumeMessageConcurrentlyService的成員變量,具體的解釋寫在註釋上函數
public class ConsumeMessageConcurrentlyService implements ConsumeMessageService {
private static final InternalLogger log = ClientLogger.getLog();複製代碼
// 消費推模式實現類
private final DefaultMQPushConsumerImpl defaultMQPushConsumerImpl;複製代碼
// 消費者引用
private final DefaultMQPushConsumer defaultMQPushConsumer;複製代碼
// 併發消息事件監聽回調
private final MessageListenerConcurrently messageListener;複製代碼
// 消息消費任務隊列
private final BlockingQueue<Runnable> consumeRequestQueue;複製代碼
// 消息消費線程池
private final ThreadPoolExecutor consumeExecutor;複製代碼
// 消息消費組
private final String consumerGroup;複製代碼
// 添加消費任務到consumeExecutor定時調度器
private final ScheduledExecutorService scheduledExecutorService;複製代碼
// 定時刪除過時任務線程池
private final ScheduledExecutorService cleanExpireMsgExecutors;複製代碼
接着看它的構造方法:性能
public ConsumeMessageConcurrentlyService(
DefaultMQPushConsumerImpl defaultMQPushConsumerImpl,
MessageListenerConcurrently messageListener) {複製代碼
// 初始化defaultMQPushConsumerImpl,messageListener
// 本地引用指向外部具體實現
this.defaultMQPushConsumerImpl = defaultMQPushConsumerImpl;
this.messageListener = messageListener;
this.defaultMQPushConsumer = this.defaultMQPushConsumerImpl.getDefaultMQPushConsumer();複製代碼
// 消費者組
this.consumerGroup = this.defaultMQPushConsumer.getConsumerGroup();
// 初始化消費請求隊列爲LinkedBlockingQueue無界隊列
this.consumeRequestQueue = new LinkedBlockingQueue<Runnable>();複製代碼
// 初始化線程池,指向消費調度線程池
this.consumeExecutor = new ThreadPoolExecutor(
this.defaultMQPushConsumer.getConsumeThreadMin(),
this.defaultMQPushConsumer.getConsumeThreadMax(),
1000 * 60,
TimeUnit.MILLISECONDS,
this.consumeRequestQueue,
new ThreadFactoryImpl("ConsumeMessageThread_"));複製代碼
// 初始化消費定時任務線程池,線程數=1
this.scheduledExecutorService =
Executors.newSingleThreadScheduledExecutor(new ThreadFactoryImpl("ConsumeMessageScheduledThread_"));
// 初始化清除過時消息線程池,線程數=1
this.cleanExpireMsgExecutors =
Executors.newSingleThreadScheduledExecutor(new ThreadFactoryImpl("CleanExpireMsgScheduledThread_"));
}複製代碼
ConsumeMessageConcurrentlyService啓動邏輯爲start方法this
public void start() {
this.cleanExpireMsgExecutors.scheduleAtFixedRate(new Runnable() {複製代碼
@Override
public void run() {
cleanExpireMsg();
}複製代碼
},
// 15min 消費超時
this.defaultMQPushConsumer.getConsumeTimeout(), this.defaultMQPushConsumer.getConsumeTimeout(), TimeUnit.MINUTES);
}複製代碼
能夠看到start方法是對cleanExpireMsgExecutors進行處理,開啓清除過時消息的調度過程。spa
咱們重點看一下cleanExpireMsg方法。.net
private void cleanExpireMsg() {
Iterator<Map.Entry<MessageQueue, ProcessQueue>> it =
this.defaultMQPushConsumerImpl.getRebalanceImpl().getProcessQueueTable().entrySet().iterator();
while (it.hasNext()) {
Map.Entry<MessageQueue, ProcessQueue> next = it.next();
ProcessQueue pq = next.getValue();
pq.cleanExpiredMsg(this.defaultMQPushConsumer);
}
}複製代碼
能夠看到,cleanExpireMsg方法定時對ProcessQueue進行處理,將其中的消息進行清理。這部份內容不是講解的重點,暫時打住。線程
咱們重點研究一下ConsumeMessageConcurrentlyService的消息消費過程。日誌
ConsumeMessageConcurrentlyService的消息消費過程主要方法爲submitConsumeRequest。
經過submitConsumeRequest提交消費請求進行消費過程。
@Override
public void submitConsumeRequest(
// 消息列表 默認一次從服務端拉取最多32條消息
final List<MessageExt> msgs,
// 消息處理隊列
final ProcessQueue processQueue,
// 消息所屬的消息隊列
final MessageQueue messageQueue,
// 是否轉發到消費線程池 併發消費時忽略該參數
final boolean dispatchToConsume) { 複製代碼
獲取批量消費數量,這個值爲ConsumeMessageBatchMaxSize,默認爲1
final int consumeBatchSize = this.defaultMQPushConsumer.getConsumeMessageBatchMaxSize();複製代碼
若是消息的大小小於等於consumeBatchSize,組裝消費請求,提交到消費線程池中進行消費操做。
若是一場則稍後再次提交消費請求,經過方法submitConsumeRequestLater實現。
if (msgs.size() <= consumeBatchSize) {
// 拉取的消息小於等於consumeBatchSize(默認爲1)
// 提交消費請求到線程池中進行消費
ConsumeRequest consumeRequest = new ConsumeRequest(msgs, processQueue, messageQueue);
try {
this.consumeExecutor.submit(consumeRequest);
} catch (RejectedExecutionException e) {
this.submitConsumeRequestLater(consumeRequest);
}複製代碼
若是拉取的消息條數大於consumeBatchSize,則對拉取到消息進行分頁處理;
每頁大小爲:consumeBatchSize。
經過循環迭代的方式,建立多個ConsumeRequest消費請求任務,提交到消費線程池中。
} else {
// 拉取的消息大於consumeBatchSize 進行分頁提交任務到線程池
for (int total = 0; total < msgs.size(); ) {
List<MessageExt> msgThis = new ArrayList<MessageExt>(consumeBatchSize);
for (int i = 0; i < consumeBatchSize; i++, total++) {
if (total < msgs.size()) {
msgThis.add(msgs.get(total));
} else {
break;
}
}
ConsumeRequest consumeRequest = new ConsumeRequest(msgThis, processQueue, messageQueue);
try {
// 提交消費任務到消費線程池
this.consumeExecutor.submit(consumeRequest);複製代碼
若是觸發拒絕提交異常,則稍後繼續提交。實際上,因爲任務隊列是LinkedBlockingQueue無界隊列,所以理論上不會出現拒絕提交。
} catch (RejectedExecutionException e) {
for (; total < msgs.size(); total++) {
msgThis.add(msgs.get(total));
}
this.submitConsumeRequestLater(consumeRequest);
}
}
}
}複製代碼
這裏插入對submitConsumeRequestLater的解釋,這一部分能夠直接選擇跳過,對主流程沒有影響。
private void submitConsumeRequestLater(final ConsumeRequest consumeRequest) {複製代碼
this.scheduledExecutorService.schedule(new Runnable() {複製代碼
@Override
public void run() {
ConsumeMessageConcurrentlyService.this.consumeExecutor.submit(consumeRequest);
}
}, 5000, TimeUnit.MILLISECONDS);
}複製代碼
能夠看到經過scheduledExecutorService進行調度,每5秒再次提交一次消息消費請求。
咱們能夠看到消費消息服務的核心代碼爲
this.consumeExecutor.submit(consumeRequest);複製代碼
根據咱們對線程池調度的瞭解,能夠知道submit接受一個Runnable接口實現,也就是這裏的ConsumeRequest;經過調用該Runnable的run方法實現具體的調度邏輯。
咱們接着看一下ConsumeRequest的run方法。
大段代碼預警.....
@Override
public void run() {複製代碼
step1. 首先檢查processQueue的dropped是否爲true,若是是true,則中止消費,直接return。
當發生消息rebalance時,會設置dropped==true,這麼作的目的是防止消費者消費不屬於本身的消息隊列。
if (this.processQueue.isDropped()) {
log.info("the message queue not be able to consume,
because it's dropped. group={} {}",
ConsumeMessageConcurrentlyService.this.consumerGroup,
this.messageQueue);
return;
}
複製代碼
MessageListenerConcurrently listener = ConsumeMessageConcurrentlyService.this.messageListener;
ConsumeConcurrentlyContext context = new ConsumeConcurrentlyContext(messageQueue);
ConsumeConcurrentlyStatus status = null;
defaultMQPushConsumerImpl.resetRetryAndNamespace(msgs, defaultMQPushConsumer.getConsumerGroup());
ConsumeMessageContext consumeMessageContext = null;複製代碼
step2. 若是消費者存在鉤子函數,則經過 executeHookBefore 調用該鉤子函數
if (ConsumeMessageConcurrentlyService.this.defaultMQPushConsumerImpl.hasHook()) {
consumeMessageContext = new ConsumeMessageContext();
consumeMessageContext.setNamespace(defaultMQPushConsumer.getNamespace());
consumeMessageContext.setConsumerGroup(defaultMQPushConsumer.getConsumerGroup());
consumeMessageContext.setProps(new HashMap<String, String>());
consumeMessageContext.setMq(messageQueue);
consumeMessageContext.setMsgList(msgs);
consumeMessageContext.setSuccess(false);
ConsumeMessageConcurrentlyService.this.defaultMQPushConsumerImpl.executeHookBefore(consumeMessageContext);
}複製代碼
step3. 【重點!】此處的代碼是消費的核心部分。
首先判斷msgs是否爲空,若是不爲空,則迭代msgs,設置消費開始時間戳,回調客戶端實現的MessageListenerConcurrently.consumeMessage方法執行具體消費邏輯,得到其消費結果status。
long beginTimestamp = System.currentTimeMillis();
boolean hasException = false;
ConsumeReturnType returnType = ConsumeReturnType.SUCCESS;
try {
if (msgs != null && !msgs.isEmpty()) {
for (MessageExt msg : msgs) {
MessageAccessor.setConsumeStartTimeStamp(msg, String.valueOf(System.currentTimeMillis()));
}
}複製代碼
// 經過Collections.unmodifiableList將msgs包裝爲不可修改的視圖
status = listener.consumeMessage(Collections.unmodifiableList(msgs), context);複製代碼
// 若是消費執行異常則hasException = true;
} catch (Throwable e) {
log.warn("consumeMessage exception: {} Group: {} Msgs: {} MQ: {}",
RemotingHelper.exceptionSimpleDesc(e),
ConsumeMessageConcurrentlyService.this.consumerGroup,
msgs,
messageQueue);
hasException = true;
}
// 計算消費耗時
long consumeRT = System.currentTimeMillis() - beginTimestamp;複製代碼
step4. 根據具體的status返回值進行後續處理:
// 若是status爲空,且hasException==true,則返回ConsumeReturnType.EXCEPTION,
// 不然返回 ConsumeReturnType.RETURNNULL
if (null == status) {
if (hasException) {
returnType = ConsumeReturnType.EXCEPTION;
} else {
returnType = ConsumeReturnType.RETURNNULL;
}
// 消費超時
} else if (consumeRT >= defaultMQPushConsumer.getConsumeTimeout() * 60 * 1000) {
returnType = ConsumeReturnType.TIME_OUT;
// 業務側返回RECONSUME_LATER,須要從新消費,returnType爲消費失敗
} else if (ConsumeConcurrentlyStatus.RECONSUME_LATER == status) {
returnType = ConsumeReturnType.FAILED;
// 業務側返回CONSUME_SUCCESS,消費成功,returnType爲消費成功
} else if (ConsumeConcurrentlyStatus.CONSUME_SUCCESS == status) {
returnType = ConsumeReturnType.SUCCESS;
}
......
// 若是客戶端返回的status爲null,則賦值爲RECONSUME_LATER,以便重複消費。
if (null == status) {
log.warn("consumeMessage return null, Group: {} Msgs: {} MQ: {}",
ConsumeMessageConcurrentlyService.this.consumerGroup,
msgs,
messageQueue);
status = ConsumeConcurrentlyStatus.RECONSUME_LATER;
}複製代碼
step5. 若是存在鉤子函數,則執行鉤子函數executeHookAfter
if (ConsumeMessageConcurrentlyService.this.defaultMQPushConsumerImpl.hasHook()) {
consumeMessageContext.setStatus(status.toString());
consumeMessageContext.setSuccess(ConsumeConcurrentlyStatus.CONSUME_SUCCESS == status);
ConsumeMessageConcurrentlyService.this.defaultMQPushConsumerImpl.executeHookAfter(consumeMessageContext);
}複製代碼
ConsumeMessageConcurrentlyService.this.getConsumerStatsManager()
.incConsumeRT(ConsumeMessageConcurrentlyService.this.consumerGroup, messageQueue.getTopic(), consumeRT);複製代碼
step6. 執行消費邏輯以後,再次判斷processQueue的dropped狀態;若是爲true,則不進行任何處理;當非true時,調用processConsumeResult對消費結果進行處理。
if (!processQueue.isDropped()) {
ConsumeMessageConcurrentlyService.this.processConsumeResult(status, context, this);
} else {
log.warn("processQueue is dropped without process consume result. messageQueue={}, msgs={}", messageQueue, msgs);
}
}複製代碼
之因此當processQueue的dropped狀態爲true時不作任何處理,是由於當processQueue.dropped==true時,說明此時可能出現了新消費者的加入/原消費者down機等狀況,致使原先消費者的隊列在rebalance以後分配給了新的消費者。那麼,這部分消息會被從新消費,所以此處就不須要作多餘的處理,等待從新消費就能夠了。
到processConsumeResult方法,就進入本文的結束部分,即:解析消費結果。
這部分的邏輯主要是消費進度offset進行處理。
public void processConsumeResult(
// 並行消費結果
final ConsumeConcurrentlyStatus status,
// 並行消費上下文
final ConsumeConcurrentlyContext context,
// 消費請求
final ConsumeRequest consumeRequest
) {
int ackIndex = context.getAckIndex();複製代碼
if (consumeRequest.getMsgs().isEmpty())
return;複製代碼
判斷消費結果,若是是CONSUMESUCCESS則設置ackIndex=msgs.size()-1;若是是RECONSUMELATER則設置ackIndex=-1。爲發送消息確認ACK作準備。
switch (status) {
case CONSUME_SUCCESS:
if (ackIndex >= consumeRequest.getMsgs().size()) {
ackIndex = consumeRequest.getMsgs().size() - 1;
}
int ok = ackIndex + 1;
int failed = consumeRequest.getMsgs().size() - ok;
this.getConsumerStatsManager().incConsumeOKTPS(consumerGroup, consumeRequest.getMessageQueue().getTopic(), ok);
this.getConsumerStatsManager().incConsumeFailedTPS(consumerGroup, consumeRequest.getMessageQueue().getTopic(), failed);
break;
case RECONSUME_LATER:
ackIndex = -1;
this.getConsumerStatsManager().incConsumeFailedTPS(consumerGroup, consumeRequest.getMessageQueue().getTopic(),
consumeRequest.getMsgs().size());
break;
default:
break;
}複製代碼
根據消費類型,進行處理,若是是廣播模式:業務側返回RECONSUME_LATER不會從新消費,只會打印告警日誌;
若是是集羣模式,消息消費成功不執行sendMessageBack;當業務側返回RECONSUME_LATER時,這批消息須要將ACK發送給broker。
須要將它們從新封裝爲consumeRequest,延遲五秒後從新消費。
switch (this.defaultMQPushConsumer.getMessageModel()) {
case BROADCASTING:
for (int i = ackIndex + 1; i < consumeRequest.getMsgs().size(); i++) {
MessageExt msg = consumeRequest.getMsgs().get(i);
log.warn("BROADCASTING, the message consume failed, drop it, {}", msg.toString());
}
break;
case CLUSTERING:
List<MessageExt> msgBackFailed = new ArrayList<MessageExt>(consumeRequest.getMsgs().size());
for (int i = ackIndex + 1; i < consumeRequest.getMsgs().size(); i++) {
MessageExt msg = consumeRequest.getMsgs().get(i);
boolean result = this.sendMessageBack(msg, context);
if (!result) {
msg.setReconsumeTimes(msg.getReconsumeTimes() + 1);
msgBackFailed.add(msg);
}
}
if (!msgBackFailed.isEmpty()) {
consumeRequest.getMsgs().removeAll(msgBackFailed);
this.submitConsumeRequestLater(msgBackFailed, consumeRequest.getProcessQueue(), consumeRequest.getMessageQueue());
}
break;
default:
break;
}複製代碼
最後,從ProcessQueue中將這批成功消費的消息移除,經過offset更新消費進度;以便後續可以從上次的消費位點繼續消費,避免重複消費。
long offset = consumeRequest.getProcessQueue().removeMessage(consumeRequest.getMsgs());
if (offset >= 0 && !consumeRequest.getProcessQueue().isDropped()) {
this.defaultMQPushConsumerImpl.getOffsetStore().updateOffset(consumeRequest.getMessageQueue(), offset, true);
}
}複製代碼
咱們看一下submitConsumeRequestLater這個方法又作了哪些處理。
private void submitConsumeRequestLater(
final List<MessageExt> msgs,
final ProcessQueue processQueue,
final MessageQueue messageQueue
) {複製代碼
this.scheduledExecutorService.schedule(new Runnable() {複製代碼
@Override
public void run() {
ConsumeMessageConcurrentlyService.this.submitConsumeRequest(msgs, processQueue, messageQueue, true);
}
}, 5000, TimeUnit.MILLISECONDS);
}複製代碼
能夠看到就是在這個方法中調用了submitConsumeRequest進行了消息消費處理。這樣咱們的消費流程就完美的閉環了。
本文咱們主要講解了ConsumeMessageConcurrentlyService消息消費服務是如何異步地對消息進行消費,着重分析了它的生命週期以及消費狀態的流轉過程。
到此,咱們還有一個問題沒有解決,那就是ConsumeMessageConcurrentlyService消費的消息是從何處得到的?
這裏就涉及到RocketMQ消息消費時的消息拉取流程,這個流程也是異步的,RocketMQ中大量使用了異步線程模型。這種方式便於理解,也有利於性能的提高,該異步流程咱們會在接下來的文章中繼續分析。