本文咱們接着分析一下RocektMQ實現消息消費的源碼細節,這部分的內容較多,所以拆分爲幾個章節分別進行講解。java
本章節重點講解DefaultMQPushConsumer的代碼邏輯。算法
按照慣例仍是先看一下DefaultMQPushConsumer的使用樣例。spring
@PostConstruct
public void init() {
defaultMQPushConsumer = new DefaultMQPushConsumer("ORDER_RESULT_NOTIFY_GROUP");
defaultMQPushConsumer.setNamesrvAddr(nameSrvAddr);
// 從頭開始消費
defaultMQPushConsumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET);
// 消費模式:集羣模式
defaultMQPushConsumer.setMessageModel(MessageModel.CLUSTERING);
// 註冊監聽器
defaultMQPushConsumer.registerMessageListener(messageListener);
// 訂閱全部消息
try {
defaultMQPushConsumer.subscribe("ORDER_RESULT_NOTIFY_TOPIC", "*");
defaultMQPushConsumer.start();
} catch (MQClientException e) {
throw new RuntimeException("[訂單結果通知消息消費者]--NotifySendConsumer加載異常!", e);
}
LOGGER.info("[訂單結果通知消息消費者]--NotifySendConsumer加載完成!");
}複製代碼
初始化過程當中須要調用registerMessageListener將具體的消費實現Listener注入。數組
@Component(value = "notifySendListenerImpl")
public class NotifySendListenerImpl implements MessageListenerConcurrently {複製代碼
...省略部分代碼...複製代碼
@Override
public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context) {複製代碼
try {
for (MessageExt msg : msgs) {
// 消息解碼
String message = new String(msg.getBody());
// 消費次數
int reconsumeTimes = msg.getReconsumeTimes();
String msgId = msg.getMsgId();
String logSuffix = ",msgId=" + msgId + ",reconsumeTimes=" + reconsumeTimes;複製代碼
LOGGER.info("[通知發送消息消費者]-OrderNotifySendProducer-接收到消息,message={},{}", message, logSuffix);
// 請求組裝
OrderResultNofityProtocol protocol = new OrderResultNofityProtocol();
protocol.decode(message);
// 參數加簽,獲取用戶privatekey
String privateKey = protocol.getPrivateKey();
String notifyUrl = protocol.getMerchantNotifyUrl();
String purseId = protocol.getPurseId();
ChargeNotifyRequest chargeNotifyRequest = new ChargeNotifyRequest();
chargeNotifyRequest.setChannel_orderid(protocol.getChannelOrderId())
.setFinish_time(DateUtil.formatDate(new Date(System.currentTimeMillis())))
.setOrder_status(NotifyConstant.NOTIFY_SUCCESS)
.setPlat_orderid(protocol.getOrderId())
.setSign(chargeNotifyRequest.sign(privateKey));
LOGGER.info("[通知發送消息消費者]-OrderNotifySendProducer-訂單結果通知入參:{},{}", chargeNotifyRequest.toString(), logSuffix);
// 通知發送
return sendNotifyByPost(reconsumeTimes, logSuffix, protocol, notifyUrl, purseId, chargeNotifyRequest);
}
} catch (Exception e) {
LOGGER.error("[通知發送消息消費者]消費異常,e={}", LogExceptionWapper.getStackTrace(e));
}
return ConsumeConcurrentlyStatus.RECONSUME_LATER;
}複製代碼
上面就是一個較爲標準的在spring框架中使用RocektMQ的DefaultMQPushConsumer進行消費的主流程。app
接下來咱們重點分析一下源碼實現。負載均衡
首先看一下DefaultMQPushConsumer的初始化過程。框架
進入DefaultMQPushConsumer.java類,查看構造方法:ide
public DefaultMQPushConsumer(final String consumerGroup) {
this(null, consumerGroup, null, new AllocateMessageQueueAveragely());
}複製代碼
調用了它的同名構造,採用AllocateMessageQueueAveragely策略(平均散列隊列算法)源碼分析
public DefaultMQPushConsumer(final String namespace, final String consumerGroup, RPCHook rpcHook,
AllocateMessageQueueStrategy allocateMessageQueueStrategy) {
this.consumerGroup = consumerGroup;
this.namespace = namespace;
this.allocateMessageQueueStrategy = allocateMessageQueueStrategy;
defaultMQPushConsumerImpl = new DefaultMQPushConsumerImpl(this, rpcHook);
}複製代碼
能夠看到實際初始化是經過DefaultMQPushConsumerImpl實現的,DefaultMQPushConsumer持有一個defaultMQPushConsumerImpl的引用。ui
[DefaultMQPushConsumerImpl.java]
public DefaultMQPushConsumerImpl(DefaultMQPushConsumer defaultMQPushConsumer, RPCHook rpcHook) {
// 初始化DefaultMQPushConsumerImpl,將defaultMQPushConsumer的實際引用傳入
this.defaultMQPushConsumer = defaultMQPushConsumer;
// 傳入rpcHook並指向本類的引用
this.rpcHook = rpcHook;
}
複製代碼
咱們接着看一下注冊消費監聽器的流程。
消費監聽接口MessageListener有兩個具體的實現,分別爲
MessageListenerConcurrently -- 並行消費監聽
MessageListenerOrderly -- 順序消費監聽複製代碼
本文以MessageListenerConcurrently爲主要講解的對象。
查看MessageListenerConcurrently的註冊過程。
@Override
public void registerMessageListener(
MessageListenerConcurrently messageListener) {
// 將實現指向本類引用
this.messageListener = messageListener;
// 進行真實註冊
this.defaultMQPushConsumerImpl.registerMessageListener(messageListener);
}複製代碼
接着看defaultMQPushConsumerImpl.registerMessageListener
DefaultMQPushConsumerImpl.java
public void registerMessageListener(MessageListener messageListener) {
this.messageListenerInner = messageListener;
}複製代碼
能夠看到DefaultMQPushConsumerImpl將真實的messageListener實現指向它本類的messageListener引用。
接着看一下訂閱topic的主流程。
topic訂閱主要經過方法subscribe實現,首先看一下DefaultMQPushConsumer的subscribe實現
@Override
public void subscribe(String topic, String subExpression)
throws MQClientException {
this.defaultMQPushConsumerImpl
.subscribe(withNamespace(topic), subExpression);
}複製代碼
能夠看到是調用了DefaultMQPushConsumerImpl的subscribe方法。
public void subscribe(String topic, String subExpression) throws MQClientException {
try {
// 構建主題的訂閱數據,默認爲集羣消費
SubscriptionData subscriptionData = FilterAPI.buildSubscriptionData(this.defaultMQPushConsumer.getConsumerGroup(),
topic, subExpression);
// 將topic的訂閱數據進行保存
this.rebalanceImpl.getSubscriptionInner().put(topic, subscriptionData);
if (this.mQClientFactory != null) {
// 若是MQClientInstance不爲空,則向全部的broker發送心跳包,加鎖
this.mQClientFactory.sendHeartbeatToAllBrokerWithLock();
}
} catch (Exception e) {
throw new MQClientException("subscription exception", e);
}
}複製代碼
看一下buildSubscriptionData代碼邏輯
[FilterAPI.java]
public static SubscriptionData buildSubscriptionData(final String consumerGroup, String topic,
String subString) throws Exception {
// 構造一個SubscriptionData實體,設置topic、表達式(tag)
SubscriptionData subscriptionData = new SubscriptionData();
subscriptionData.setTopic(topic);
subscriptionData.setSubString(subString);複製代碼
// 若是tag爲空或者爲"*",統一設置爲"*",即訂閱全部消息
if (null == subString || subString.equals(SubscriptionData.SUB_ALL) || subString.length() == 0) {
subscriptionData.setSubString(SubscriptionData.SUB_ALL);
} else {
// tag不爲空,則先按照‘|’進行分割
String[] tags = subString.split("\\|\\|");
if (tags.length > 0) {
// 遍歷tag表達式數組
for (String tag : tags) {
if (tag.length() > 0) {
String trimString = tag.trim();
if (trimString.length() > 0) {
// 將每一個tag的值設置到tagSet中
subscriptionData.getTagsSet().add(trimString);
subscriptionData.getCodeSet().add(trimString.hashCode());
}
}
}
} else {
// tag解析異常
throw new Exception("subString split error");
}
}
return subscriptionData;
}複製代碼
看一下sendHeartbeatToAllBrokerWithLock代碼邏輯
[MQClientInstance.java]
public void sendHeartbeatToAllBrokerWithLock() {
if (this.lockHeartbeat.tryLock()) {
try {
// 發送心跳包
this.sendHeartbeatToAllBroker();
this.uploadFilterClassSource();
} catch (final Exception e) {
log.error("sendHeartbeatToAllBroker exception", e);
} finally {
this.lockHeartbeat.unlock();
}
} else {
log.warn("lock heartBeat, but failed.");
}
}複製代碼
能夠看到,同步發送心跳包給全部的broker,而該過程是經過RemotingClient統一實現的,經過調用RemotingClient.invokeSync實現心跳包的發送,底層是經過Netty實現的。具體細節本文不進行展開。
上述初始化流程執行完畢以後,經過start()方法啓動消費客戶端。
@Override
public void start() throws MQClientException {
// 設置消費者組
setConsumerGroup(NamespaceUtil.wrapNamespace(this.getNamespace(), this.consumerGroup));
// 啓動消費客戶端
this.defaultMQPushConsumerImpl.start();
// trace處理邏輯
if (null != traceDispatcher) {
try {
traceDispatcher.start(this.getNamesrvAddr(), this.getAccessChannel());
} catch (MQClientException e) {
log.warn("trace dispatcher start failed ", e);
}
}
}複製代碼
關於trace的處理邏輯,本文再也不展開,感興趣的同窗能夠移步 跟我學RocketMQ之消息軌跡實戰與源碼分析
接着看defaultMQPushConsumerImpl.start()方法邏輯
[DefaultMQPushConsumerImpl.java]
public synchronized void start() throws MQClientException {
switch (this.serviceState) {
case CREATE_JUST:
log.info("the consumer [{}] start beginning. messageModel={},
isUnitMode={}", this.defaultMQPushConsumer.getConsumerGroup(),
this.defaultMQPushConsumer.getMessageModel(), this.defaultMQPushConsumer.isUnitMode());
this.serviceState = ServiceState.START_FAILED;複製代碼
首次啓動後,執行配置檢查,該方法爲前置校驗方法,主要進行消費屬性校驗。
this.checkConfig();複製代碼
將訂閱關係配置信息進行復制
this.copySubscription();複製代碼
若是當前爲集羣消費模式,修改實例名爲pid
if (this.defaultMQPushConsumer.getMessageModel() == MessageModel.CLUSTERING) {
this.defaultMQPushConsumer.changeInstanceNameToPID();
}複製代碼
建立一個新的MQClientInstance實例,若是已經存在直接使用該存在的MQClientInstance
this.mQClientFactory = MQClientManager.getInstance().getAndCreateMQClientInstance(this.defaultMQPushConsumer, this.rpcHook);複製代碼
爲消費者負載均衡實現rebalanceImpl設置屬性
// 設置消費者組
this.rebalanceImpl.setConsumerGroup(this.defaultMQPushConsumer.getConsumerGroup());
// 設置消費模式
this.rebalanceImpl.setMessageModel(this.defaultMQPushConsumer.getMessageModel());
// 設置隊列分配策略
this.rebalanceImpl.setAllocateMessageQueueStrategy(this.defaultMQPushConsumer.getAllocateMessageQueueStrategy());
// 設置當前的MQClientInstance實例
this.rebalanceImpl.setmQClientFactory(this.mQClientFactory);
複製代碼
this.pullAPIWrapper = new PullAPIWrapper(
mQClientFactory,
this.defaultMQPushConsumer.getConsumerGroup(), isUnitMode());
// 註冊消息過濾鉤子
this.pullAPIWrapper.registerFilterMessageHook(filterMessageHookList);複製代碼
處理offset存儲方式
// offsetStore不爲空則使用當前的offsetStore方式
if (this.defaultMQPushConsumer.getOffsetStore() != null) {
this.offsetStore = this.defaultMQPushConsumer.getOffsetStore();
} else {
// 不然根據消費方式選擇具體的offsetStore方式存儲offset
switch (this.defaultMQPushConsumer.getMessageModel()) {
// 若是是廣播方式,則使用本地存儲方式
case BROADCASTING:
this.offsetStore = new LocalFileOffsetStore(this.mQClientFactory, this.defaultMQPushConsumer.getConsumerGroup());
break;
// 若是是集羣方式,則使用遠端broker存儲方式存儲offset
case CLUSTERING:
this.offsetStore = new RemoteBrokerOffsetStore(this.mQClientFactory, this.defaultMQPushConsumer.getConsumerGroup());
break;
default:
break;
}
this.defaultMQPushConsumer.setOffsetStore(this.offsetStore);
}
// 加載當前的offset
this.offsetStore.load();複製代碼
根據MessageListener的具體實現方式選取具體的消息拉取線程實現。
// 若是是MessageListenerOrderly順序消費接口實現
// 消息消費服務選擇:ConsumeMessageOrderlyService(順序消息消費服務)
if (this.getMessageListenerInner() instanceof MessageListenerOrderly) {
this.consumeOrderly = true;
this.consumeMessageService =
new ConsumeMessageOrderlyService(this, (MessageListenerOrderly) this.getMessageListenerInner());
}
// 若是是MessageListenerConcurrently並行消息消費接口實現
// 消息消費服務選擇:ConsumeMessageConcurrentlyService(並行消息消費服務)
else if (this.getMessageListenerInner() instanceof MessageListenerConcurrently) {
this.consumeOrderly = false;
this.consumeMessageService =
new ConsumeMessageConcurrentlyService(this, (MessageListenerConcurrently) this.getMessageListenerInner());
}複製代碼
選擇並初始化完成具體的消息消費服務以後,啓動消息消費服務。consumeMessageService主要負責對消息進行消費,它的內部維護了一個線程池。
// 啓動消息消費服務
this.consumeMessageService.start();複製代碼
接着向MQClientInstance註冊消費者,並啓動MQClientInstance。這裏再次強調
一個JVM中全部消費者、生產者持有同一個MQClientInstance,且MQClientInstance只會啓動一次
boolean registerOK = mQClientFactory.registerConsumer(this.defaultMQPushConsumer.getConsumerGroup(), this);
if (!registerOK) {
this.serviceState = ServiceState.CREATE_JUST;
this.consumeMessageService.shutdown();
throw new MQClientException("The consumer group[" + this.defaultMQPushConsumer.getConsumerGroup()
+ "] has been created before, specify another name please." + FAQUrl.suggestTodo(FAQUrl.GROUP_NAME_DUPLICATE_URL),
null);
}複製代碼
mQClientFactory.start();
log.info("the consumer [{}] start OK.", this.defaultMQPushConsumer.getConsumerGroup());
this.serviceState = ServiceState.RUNNING;
break;複製代碼
若是MQClientInstance已經啓動,或者已經關閉,或者啓動失敗,重複調用start會報錯。這裏也能直觀的反映出:MQClientInstance的啓動只有一次
case RUNNING:
case START_FAILED:
case SHUTDOWN_ALREADY:
throw new MQClientException("The PushConsumer service state not OK, maybe started once, "
+ this.serviceState
+ FAQUrl.suggestTodo(FAQUrl.CLIENT_SERVICE_NOT_OK),
null);
default:
break;
}複製代碼
啓動完成執行後續收尾工做
// 訂閱關係改變,更新Nameserver的訂閱關係表
this.updateTopicSubscribeInfoWhenSubscriptionChanged();
// 檢查客戶端狀態
this.mQClientFactory.checkClientInBroker();
// 發送心跳包
this.mQClientFactory.sendHeartbeatToAllBrokerWithLock();
// 喚醒執行消費者負載均衡
this.mQClientFactory.rebalanceImmediately();
}
複製代碼
消費者啓動流程較爲重要,咱們接着對其中的重點方法展開講解。這部份內容能夠暫時跳過,不影響對主流程的把控。
咱們研究一下copySubscription方法的實現細節。
[DefaultMQPushConsumerImpl.java]
private void copySubscription() throws MQClientException {
try {複製代碼
// 首先獲取訂閱信息
Map<String, String> sub = this.defaultMQPushConsumer.getSubscription();
if (sub != null) {
for (final Map.Entry<String, String> entry : sub.entrySet()) {
final String topic = entry.getKey();
final String subString = entry.getValue();
SubscriptionData subscriptionData = FilterAPI.buildSubscriptionData(this.defaultMQPushConsumer.getConsumerGroup(),
topic, subString);
this.rebalanceImpl.getSubscriptionInner().put(topic, subscriptionData);
}
}複製代碼
// 爲defaultMQPushConsumer設置具體的MessageListener實現
if (null == this.messageListenerInner) {
this.messageListenerInner = this.defaultMQPushConsumer.getMessageListener();
}複製代碼
根據消費類型選擇是否進行重試topic訂閱
switch (this.defaultMQPushConsumer.getMessageModel()) {複製代碼
// 若是是廣播消費模式,則不進行任何處理,即無重試
case BROADCASTING:
break;複製代碼
// 若是是集羣消費模式,訂閱重試主題消息
case CLUSTERING:
final String retryTopic = MixAll.getRetryTopic(this.defaultMQPushConsumer.getConsumerGroup());
SubscriptionData subscriptionData = FilterAPI.buildSubscriptionData(this.defaultMQPushConsumer.getConsumerGroup(),
retryTopic, SubscriptionData.SUB_ALL);
this.rebalanceImpl.getSubscriptionInner().put(retryTopic, subscriptionData);
break;
default:
break;
}
} catch (Exception e) {
throw new MQClientException("subscription exception", e);
}
}複製代碼
若是是集羣消費模式,會訂閱重試主題消息
獲取重試topic,規則爲 RETRYGROUPTOPIC_PREFIX + consumerGroup,即:"%RETRY%"+消費組名 ;
爲重試topic設置訂閱關係,訂閱全部的消息;
消費者啓動的時候會自動訂閱該重試主題,並參與該topic的消息隊列負載過程。
到此,咱們就DefaultMQPushConsumer的初始化、啓動、校驗以及topic訂閱、重試等代碼實現細節進行了較爲詳細的講解。
下一章節,我將帶領讀者對消息消費線程 consumeMessageService 的實現進行分析,咱們下篇文章見。
版權聲明: 原創不易,洗文可恥。除非註明,本博文章均爲原創,轉載請以連接形式標明本文地址。