@(rocketmq源碼解讀)java
先解釋一下題目,咱們假設有一個Producer
和兩個Consumer
,Producer
向TOPICA
和TOPICB
發送消息,兩個Consumer
分別訂閱兩個topic
。咱們看下這時候會出現的問題,以及根據源碼分析一下爲何出現問題。ide
現象其實仍是比較隱蔽的,broker
上會打印:the consumer's subscription not exist,group ...
的日誌(Consumer
端也會打印相似的日誌)。源碼分析
還會有一些subscription changed, group: ...
相似的日誌,而且若是仔細的話還會發現,其中一個消費者消費消息時,另一個就不會消費。ui
咱們看一下爲何會致使這樣的問題,一開始生看或者debug都是很難下手,這時候可能就須要使用必殺技(通常不外傳那種)——問。this
問天問地,谷歌百度必應。我直接問了一個大神——芋艿。大神說這種狀況會出問題,具體緣由他也記不清了,致使這種現象的問題應該是消費關係不停地相互覆蓋。spa
好了,聽到這句話咱們就有入口了,至少知道應該從Broker
上找起。debug
順藤摸瓜找到了緣由,下面一塊兒看一下源碼。日誌
首先咱們知道,消費者的兩種實現(推和拉)中都維護一個MQClientInstance
,這個類很是重要,在啓動消費者的時候,都會去啓動這個類,咱們看下啓動的代碼,其中有這麼一部分:code
// Start various schedule tasks
this.startScheduledTask();
複製代碼
這裏啓動了好多定時任務,咱們追進去看一下:ip
this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() {
@Override
public void run() {
try {
MQClientInstance.this.cleanOfflineBroker();
//定時發送心跳
MQClientInstance.this.sendHeartbeatToAllBrokerWithLock();
} catch (Exception e) {
log.error("ScheduledTask sendHeartbeatToAllBroker exception", e);
}
}
}, 1000, this.clientConfig.getHeartbeatBrokerInterval(), TimeUnit.MILLISECONDS);
複製代碼
這裏咱們看到,消費者會定時發送心跳給Broker
,咱們繼續追進去,最後找到sendHeartbeatToAllBroker
方法:
//給全部的broker發送心跳
if (!this.brokerAddrTable.isEmpty()) {
long times = this.sendHeartbeatTimesTotal.getAndIncrement();
Iterator<Entry<String, HashMap<Long, String>>> it = this.brokerAddrTable.entrySet().iterator();
while (it.hasNext()) {
Entry<String, HashMap<Long, String>> entry = it.next();
String brokerName = entry.getKey();
HashMap<Long, String> oneTable = entry.getValue();
if (oneTable != null) {
for (Map.Entry<Long, String> entry1 : oneTable.entrySet()) {
Long id = entry1.getKey();
String addr = entry1.getValue();
if (addr != null) {
if (consumerEmpty) {
if (id != MixAll.MASTER_ID)
continue;
}
try {
//真正發送心跳的部分
int version = this.mQClientAPIImpl.sendHearbeat(addr, heartbeatData, 3000);
if (!this.brokerVersionTable.containsKey(brokerName)) {
this.brokerVersionTable.put(brokerName, new HashMap<String, Integer>(4));
}
this.brokerVersionTable.get(brokerName).put(addr, version);
if (times % 20 == 0) {
log.info("send heart beat to broker[{} {} {}] success", brokerName, id, addr);
log.info(heartbeatData.toString());
}
} catch (Exception e) {
if (this.isBrokerInNameServer(addr)) {
log.info("send heart beat to broker[{} {} {}] failed", brokerName, id, addr);
} else {
log.info("send heart beat to broker[{} {} {}] exception, because the broker not up, forget it", brokerName,
id, addr);
}
}
}
}
}
}
}
複製代碼
這裏會向全部的Broker
發送心跳,咱們根據咱們的例子,這時候Broker
是一臺,咱們再去Broker
上看一下Broker
如何處理心跳消息,咱們根據發送的是HEART_BEAT
類型的消息,能夠在Broker
上看到,這類消息使用ClientManageProcessor
處理,咱們看下處理心跳的部分(heartBeat
方法):
//循環全部發送過來的數據
for (ConsumerData data : heartbeatData.getConsumerDataSet()) {
//根據消費組的名字獲取broker上記錄的消費消息
SubscriptionGroupConfig subscriptionGroupConfig = this.brokerController.getSubscriptionGroupManager().findSubscriptionGroupConfig(data.getGroupName());
boolean isNotifyConsumerIdsChangedEnable = true;
if (null != subscriptionGroupConfig) {
isNotifyConsumerIdsChangedEnable = subscriptionGroupConfig.isNotifyConsumerIdsChangedEnable();
int topicSysFlag = 0;
if (data.isUnitMode()) {
topicSysFlag = TopicSysFlag.buildSysFlag(false, true);
}
String newTopic = MixAll.getRetryTopic(data.getGroupName());
this.brokerController.getTopicConfigManager().createTopicInSendMessageBackMethod(
newTopic,
subscriptionGroupConfig.getRetryQueueNums(),
PermName.PERM_WRITE | PermName.PERM_READ, topicSysFlag);
}
//註冊消費者
boolean changed = this.brokerController.getConsumerManager().registerConsumer(
data.getGroupName(),
clientChannelInfo,
data.getConsumeType(),
data.getMessageModel(),
data.getConsumeFromWhere(),
data.getSubscriptionDataSet(),
isNotifyConsumerIdsChangedEnable
);
if (changed) {
log.info("registerConsumer info changed {} {}",
data.toString(),
RemotingHelper.parseChannelRemoteAddr(ctx.channel())
);
}
}
複製代碼
咱們能夠看到,broker
會根據consumer
放過來的消息,獲取本身這邊記錄的消費者訂閱的信息,注意,獲取時是按照消費組獲取的,咱們看下registerConsumer
:
//根據消費組獲取消費者信息
ConsumerGroupInfo consumerGroupInfo = this.consumerTable.get(group);
if (null == consumerGroupInfo) {
ConsumerGroupInfo tmp = new ConsumerGroupInfo(group, consumeType, messageModel, consumeFromWhere);
//注意這裏,這裏consumerTable的鍵就是group
ConsumerGroupInfo prev = this.consumerTable.putIfAbsent(group, tmp);
consumerGroupInfo = prev != null ? prev : tmp;
}
boolean r1 =
consumerGroupInfo.updateChannel(clientChannelInfo, consumeType, messageModel,
consumeFromWhere);
boolean r2 = consumerGroupInfo.updateSubscription(subList);
if (r1 || r2) {
if (isNotifyConsumerIdsChangedEnable) {
this.consumerIdsChangeListener.handle(ConsumerGroupEvent.CHANGE, group, consumerGroupInfo.getAllChannel());
}
}
this.consumerIdsChangeListener.handle(ConsumerGroupEvent.REGISTER, group, subList);
return r1 || r2;
複製代碼
咱們注意ConsumerGroupInfo prev = this.consumerTable.putIfAbsent(group, tmp);
這裏,這句話告訴咱們consumerTable
中存放的消費者信息是按照消費組來的,那麼一個組的消費信息若是不同,按照咱們的例子中,則訂閱了TOPICA
的消費者心跳信息告訴Broker
:咱們組訂閱的是TOPICA
!而後Broker
就記錄下來了。過了一會訂閱了TOPICB
的消費者心跳信息高速Broker
:咱們訂閱的是TOPICB
!
這裏就致使了訂閱消息相互覆蓋,那麼拉取消息時,確定有一個消費者無法拉到消息,由於Broker
上查詢不到訂閱信息。
至此咱們就知道了致使上述現象的緣由。