rocketmq源碼解析consumer、producer處理過程①

說在前面java

DefaultMQProducer、DefaultMQPullConsumer、DefaultMQPushConsumer 處理過程apache

 

源碼解析緩存

producer微信





public static void main(String[] args) throws MQClientException, InterruptedException {DefaultMQProducer producer = new DefaultMQProducer("ProducerGroupName");producer.start();for (int i = 0; i < 128; i++)try {{Message msg = new Message("TopicTest","TagA","OrderID188","Hello world".getBytes(RemotingHelper.DEFAULT_CHARSET));SendResult sendResult = producer.send(msg);System.out.printf("%s%n", sendResult);}} catch (Exception e) {e.printStackTrace();}producer.shutdown();    }

生產者啓動,producer.start();進入方法,org.apache.rocketmq.client.producer.DefaultMQProducer#startdom

@Overridepublic void start() throws MQClientException {//        生產者啓動=》this.defaultMQProducerImpl.start();    }

進入方法,生產者啓動,org.apache.rocketmq.client.impl.producer.DefaultMQProducerImpl#start()異步

public void start() throws MQClientException {//        =》this.start(true);    }

進入方法,org.apache.rocketmq.client.impl.producer.DefaultMQProducerImpl#start(boolean)ide








public void start(final boolean startFactory) throws MQClientException {switch (this.serviceState) {case CREATE_JUST:this.serviceState = ServiceState.START_FAILED;//                檢查配置=》this.checkConfig();if (!this.defaultMQProducer.getProducerGroup().equals(MixAll.CLIENT_INNER_PRODUCER_GROUP)) {this.defaultMQProducer.changeInstanceNameToPID();}//                建立mqclient對象=》this.mQClientFactory = MQClientManager.getInstance().getAndCreateMQClientInstance(this.defaultMQProducer, rpcHook);//                註冊生產者=》boolean registerOK = mQClientFactory.registerProducer(this.defaultMQProducer.getProducerGroup(), this);if (!registerOK) { //this.serviceState = ServiceState.CREATE_JUST;throw new MQClientException("The producer group[" + this.defaultMQProducer.getProducerGroup()+ "] has been created before, specify another name please." + FAQUrl.suggestTodo(FAQUrl.GROUP_NAME_DUPLICATE_URL),null);}//                存儲topic發佈信息this.topicPublishInfoTable.put(this.defaultMQProducer.getCreateTopicKey(), new TopicPublishInfo());if (startFactory) {//                    啓動mqclient=》mQClientFactory.start();}log.info("the producer [{}] start OK. sendMessageWithVIPChannel={}", this.defaultMQProducer.getProducerGroup(),this.defaultMQProducer.isSendMessageWithVIPChannel());this.serviceState = ServiceState.RUNNING;break;case RUNNING:case START_FAILED:case SHUTDOWN_ALREADY:throw new MQClientException("The producer service state not OK, maybe started once, "+ this.serviceState+ FAQUrl.suggestTodo(FAQUrl.CLIENT_SERVICE_NOT_OK),null);default:break;}//        同步發送心跳檢測請求向全部的broker=》this.mQClientFactory.sendHeartbeatToAllBrokerWithLock();    }

進入方法,檢查配置,org.apache.rocketmq.client.impl.producer.DefaultMQProducerImpl#checkConfig性能


private void checkConfig() throws MQClientException {Validators.checkGroup(this.defaultMQProducer.getProducerGroup());if (null == this.defaultMQProducer.getProducerGroup()) {throw new MQClientException("producerGroup is null", null);}if (this.defaultMQProducer.getProducerGroup().equals(MixAll.DEFAULT_PRODUCER_GROUP)) {throw new MQClientException("producerGroup can not equal " + MixAll.DEFAULT_PRODUCER_GROUP + ", please specify another one.",null);}    }

進入方法,建立mqclient對象,org.apache.rocketmq.client.impl.MQClientManager#getAndCreateMQClientInstance(org.apache.rocketmq.client.ClientConfig, org.apache.rocketmq.remoting.RPCHook)ui


public MQClientInstance getAndCreateMQClientInstance(final ClientConfig clientConfig, RPCHook rpcHook) {String clientId = clientConfig.buildMQClientId();//       從本地緩存中獲取client對象,簡單的通常會concurrentHashMap當本地緩存,性能很高MQClientInstance instance = this.factoryTable.get(clientId);if (null == instance) {instance =new MQClientInstance(clientConfig.cloneClientConfig(),this.factoryIndexGenerator.getAndIncrement(), clientId, rpcHook);MQClientInstance prev = this.factoryTable.putIfAbsent(clientId, instance);if (prev != null) {instance = prev;log.warn("Returned Previous MQClientInstance for clientId:[{}]", clientId);} else {log.info("Created new MQClientInstance for clientId:[{}]", clientId);}}return instance;    }

返回方法,註冊生產者,org.apache.rocketmq.client.impl.factory.MQClientInstance#registerProducerthis


public boolean registerProducer(final String group, final DefaultMQProducerImpl producer) {if (null == group || null == producer) {return false;}//        這裏是用concurrentHashMap在本地內存中維護註冊信息MQProducerInner prev = this.producerTable.putIfAbsent(group, producer);if (prev != null) {log.warn("the producer group[{}] exist already.", group);return false;}return true;    }

返回方法,啓動mqclient,org.apache.rocketmq.client.impl.factory.MQClientInstance#start 前面介紹過了,能夠翻閱前面的章節。

返回方法,同步發送心跳檢測請求向全部的broker,org.apache.rocketmq.client.impl.factory.MQClientInstance#sendHeartbeatToAllBrokerWithLock

public void sendHeartbeatToAllBrokerWithLock() {if (this.lockHeartbeat.tryLock()) {try {//                發送心跳監測向全部broker=》this.sendHeartbeatToAllBroker();//                更新資源=》this.uploadFilterClassSource();} catch (final Exception e) {log.error("sendHeartbeatToAllBroker exception", e);} finally {this.lockHeartbeat.unlock();}} else {log.warn("lock heartBeat, but failed.");}    }

進入方法,發送心跳監測向全部broker,org.apache.rocketmq.client.impl.factory.MQClientInstance#sendHeartbeatToAllBroker


private void sendHeartbeatToAllBroker() {//        準備心跳檢測數據final HeartbeatData heartbeatData = this.prepareHeartbeatData();final boolean producerEmpty = heartbeatData.getProducerDataSet().isEmpty();final boolean consumerEmpty = heartbeatData.getConsumerDataSet().isEmpty();//        failfastif (producerEmpty && consumerEmpty) {log.warn("sending heartbeat, but no consumer and no producer");return;}if (!this.brokerAddrTable.isEmpty()) {long times = this.sendHeartbeatTimesTotal.getAndIncrement();Iterator<Entry<String, HashMap<Long, String>>> it = this.brokerAddrTable.entrySet().iterator();while (it.hasNext()) {Entry<String, HashMap<Long, String>> entry = it.next();String brokerName = entry.getKey();HashMap<Long, String> oneTable = entry.getValue();if (oneTable != null) {for (Map.Entry<Long, String> entry1 : oneTable.entrySet()) {Long id = entry1.getKey();String addr = entry1.getValue();if (addr != null) {//                            非master broke人節點沒有消費者if (consumerEmpty) {if (id != MixAll.MASTER_ID)continue;}try {//                                心跳檢測=》int version = this.mQClientAPIImpl.sendHearbeat(addr, heartbeatData, 3000);if (!this.brokerVersionTable.containsKey(brokerName)) {this.brokerVersionTable.put(brokerName, new HashMap<String, Integer>(4));}this.brokerVersionTable.get(brokerName).put(addr, version);if (times % 20 == 0) {log.info("send heart beat to broker[{} {} {}] success", brokerName, id, addr);log.info(heartbeatData.toString());}} catch (Exception e) {//                                broker是否存在=》if (this.isBrokerInNameServer(addr)) {log.info("send heart beat to broker[{} {} {}] failed", brokerName, id, addr);} else {log.info("send heart beat to broker[{} {} {}] exception, because the broker not up, forget it", brokerName,id, addr);}}}}}}}    }

進入方法,心跳檢測,org.apache.rocketmq.client.impl.MQClientAPIImpl#sendHearbeat

public int sendHearbeat(final String addr,final HeartbeatData heartbeatData,final long timeoutMillis) throws RemotingException, MQBrokerException, InterruptedException {RemotingCommand request = RemotingCommand.createRequestCommand(RequestCode.HEART_BEAT, null);request.setLanguage(clientConfig.getLanguage());request.setBody(heartbeatData.encode());//        同步執行RemotingCommand response = this.remotingClient.invokeSync(addr, request, timeoutMillis);assert response != null;switch (response.getCode()) {case ResponseCode.SUCCESS: {return response.getVersion();}default:break;}throw new MQBrokerException(response.getCode(), response.getRemark());    }

進入方法,org.apache.rocketmq.remoting.netty.NettyRemotingClient#invokeSync介紹過了。

返回方法,broker是否存在,org.apache.rocketmq.client.impl.factory.MQClientInstance#isBrokerInNameServer

private boolean isBrokerInNameServer(final String brokerAddr) {//        存儲topic路由信息Iterator<Entry<String, TopicRouteData>> it = this.topicRouteTable.entrySet().iterator();while (it.hasNext()) {Entry<String, TopicRouteData> itNext = it.next();List<BrokerData> brokerDatas = itNext.getValue().getBrokerDatas();for (BrokerData bd : brokerDatas) {boolean contain = bd.getBrokerAddrs().containsValue(brokerAddr);if (contain)return true;}}return false;    }

返回方法,更新資源,org.apache.rocketmq.client.impl.factory.MQClientInstance#uploadFilterClassSource

private void uploadFilterClassSource() {//        遍歷消費者Iterator<Entry<String, MQConsumerInner>> it = this.consumerTable.entrySet().iterator();while (it.hasNext()) {Entry<String, MQConsumerInner> next = it.next();MQConsumerInner consumer = next.getValue();//            若是消費者消息模式是pushif (ConsumeType.CONSUME_PASSIVELY == consumer.consumeType()) {//                獲取消費者訂閱信息Set<SubscriptionData> subscriptions = consumer.subscriptions();for (SubscriptionData sub : subscriptions) {if (sub.isClassFilterMode() && sub.getFilterClassSource() != null) {final String consumerGroup = consumer.groupName();final String className = sub.getSubString();final String topic = sub.getTopic();final String filterClassSource = sub.getFilterClassSource();try {//                            更新過濾類去全部過濾的server=》this.uploadFilterClassToAllFilterServer(consumerGroup, className, topic, filterClassSource);} catch (Exception e) {log.error("uploadFilterClassToAllFilterServer Exception", e);}}}}}    }

進入方法,更新過濾類去全部過濾的server,org.apache.rocketmq.client.impl.factory.MQClientInstance#uploadFilterClassToAllFilterServer



private void uploadFilterClassToAllFilterServer(final String consumerGroup, final String fullClassName,final String topic,final String filterClassSource) throws UnsupportedEncodingException {byte[] classBody = null;int classCRC = 0;try {classBody = filterClassSource.getBytes(MixAll.DEFAULT_CHARSET);//            壓縮classCRC = UtilAll.crc32(classBody);} catch (Exception e1) {log.warn("uploadFilterClassToAllFilterServer Exception, ClassName: {} {}",fullClassName,RemotingHelper.exceptionSimpleDesc(e1));}//        獲取topic的路由信息TopicRouteData topicRouteData = this.topicRouteTable.get(topic);if (topicRouteData != null&& topicRouteData.getFilterServerTable() != null && !topicRouteData.getFilterServerTable().isEmpty()) {//            遍歷的過濾server地址Iterator<Entry<String, List<String>>> it = topicRouteData.getFilterServerTable().entrySet().iterator();while (it.hasNext()) {Entry<String, List<String>> next = it.next();List<String> value = next.getValue();for (final String fsAddr : value) {try {//                        註冊過濾類的消息=》this.mQClientAPIImpl.registerMessageFilterClass(fsAddr, consumerGroup, topic, fullClassName, classCRC, classBody,5000);log.info("register message class filter to {} OK, ConsumerGroup: {} Topic: {} ClassName: {}", fsAddr, consumerGroup,topic, fullClassName);} catch (Exception e) {log.error("uploadFilterClassToAllFilterServer Exception", e);}}}} else {log.warn("register message class filter failed, because no filter server, ConsumerGroup: {} Topic: {} ClassName: {}",consumerGroup, topic, fullClassName);}    }

進入方法,註冊過濾類的消息,org.apache.rocketmq.client.impl.MQClientAPIImpl#registerMessageFilterClass


public void registerMessageFilterClass(final String addr,final String consumerGroup,final String topic,final String className,final int classCRC,final byte[] classBody,final long timeoutMillis) throws RemotingConnectException, RemotingSendRequestException, RemotingTimeoutException,InterruptedException, MQBrokerException {RegisterMessageFilterClassRequestHeader requestHeader = new RegisterMessageFilterClassRequestHeader();requestHeader.setConsumerGroup(consumerGroup);requestHeader.setClassName(className);requestHeader.setTopic(topic);requestHeader.setClassCRC(classCRC);RemotingCommand request = RemotingCommand.createRequestCommand(RequestCode.REGISTER_MESSAGE_FILTER_CLASS, requestHeader);request.setBody(classBody);RemotingCommand response = this.remotingClient.invokeSync(addr, request, timeoutMillis);switch (response.getCode()) {case ResponseCode.SUCCESS: {return;}default:break;}throw new MQBrokerException(response.getCode(), response.getRemark());    }

進入方法,org.apache.rocketmq.remoting.netty.NettyRemotingClient#invokeSync介紹過了。

進入方法,org.apache.rocketmq.client.producer.DefaultMQProducer#send(org.apache.rocketmq.common.message.Message)發送消息

@Overridepublic SendResult send(Message msg) throws MQClientException, RemotingException, MQBrokerException, InterruptedException {//        =》return this.defaultMQProducerImpl.send(msg);    }

進入方法,org.apache.rocketmq.client.impl.producer.DefaultMQProducerImpl#send(org.apache.rocketmq.common.message.Message)

public SendResult send(Message msg) throws MQClientException, RemotingException, MQBrokerException, InterruptedException {//        發送消息3s超時=》return send(msg, this.defaultMQProducer.getSendMsgTimeout());    }

進入方法,發送消息3s超時,org.apache.rocketmq.client.impl.producer.DefaultMQProducerImpl#send(org.apache.rocketmq.common.message.Message, long)

public SendResult send(Message msg,long timeout) throws MQClientException, RemotingException, MQBrokerException, InterruptedException {//        同步發送消息=》return this.sendDefaultImpl(msg, CommunicationMode.SYNC, null, timeout);    }

進入方法,同步發送消息,org.apache.rocketmq.client.impl.producer.DefaultMQProducerImpl#sendDefaultImpl













private SendResult sendDefaultImpl(Message msg,final CommunicationMode communicationMode,final SendCallback sendCallback,final long timeout) throws MQClientException, RemotingException, MQBrokerException, InterruptedException {//        確認客戶端服務是否正常this.makeSureStateOK();//        檢查消息是否合法,failfast原則Validators.checkMessage(msg, this.defaultMQProducer);final long invokeID = random.nextLong();long beginTimestampFirst = System.currentTimeMillis();long beginTimestampPrev = beginTimestampFirst;long endTimestamp = beginTimestampFirst;//        找到topic的發佈信息=》TopicPublishInfo topicPublishInfo = this.tryToFindTopicPublishInfo(msg.getTopic());if (topicPublishInfo != null && topicPublishInfo.ok()) {boolean callTimeout = false;MessageQueue mq = null;Exception exception = null;SendResult sendResult = null;//            同步重試3次,異步1次int timesTotal = communicationMode == CommunicationMode.SYNC ? 1 + this.defaultMQProducer.getRetryTimesWhenSendFailed() : 1;int times = 0;String[] brokersSent = new String[timesTotal];for (; times < timesTotal; times++) {String lastBrokerName = null == mq ? null : mq.getBrokerName();//                按brokerName選擇一個消息隊列=》MessageQueue mqSelected = this.selectOneMessageQueue(topicPublishInfo, lastBrokerName);if (mqSelected != null) {mq = mqSelected;brokersSent[times] = mq.getBrokerName();try {beginTimestampPrev = System.currentTimeMillis();long costTime = beginTimestampPrev - beginTimestampFirst;if (timeout < costTime) {callTimeout = true;break;}//                        發送=》sendResult = this.sendKernelImpl(msg, mq, communicationMode, sendCallback, topicPublishInfo, timeout - costTime);endTimestamp = System.currentTimeMillis();this.updateFaultItem(mq.getBrokerName(), endTimestamp - beginTimestampPrev, false);switch (communicationMode) {case ASYNC:return null;case ONEWAY:return null;case SYNC:if (sendResult.getSendStatus() != SendStatus.SEND_OK) {//                                    發送失敗重試另外一個brokerif (this.defaultMQProducer.isRetryAnotherBrokerWhenNotStoreOK()) {continue;}}return sendResult;default:break;}} catch (RemotingException e) {endTimestamp = System.currentTimeMillis();this.updateFaultItem(mq.getBrokerName(), endTimestamp - beginTimestampPrev, true);log.warn(String.format("sendKernelImpl exception, resend at once, InvokeID: %s, RT: %sms, Broker: %s", invokeID, endTimestamp - beginTimestampPrev, mq), e);log.warn(msg.toString());exception = e;continue;} catch (MQClientException e) {endTimestamp = System.currentTimeMillis();this.updateFaultItem(mq.getBrokerName(), endTimestamp - beginTimestampPrev, true);log.warn(String.format("sendKernelImpl exception, resend at once, InvokeID: %s, RT: %sms, Broker: %s", invokeID, endTimestamp - beginTimestampPrev, mq), e);log.warn(msg.toString());exception = e;continue;} catch (MQBrokerException e) {endTimestamp = System.currentTimeMillis();this.updateFaultItem(mq.getBrokerName(), endTimestamp - beginTimestampPrev, true);log.warn(String.format("sendKernelImpl exception, resend at once, InvokeID: %s, RT: %sms, Broker: %s", invokeID, endTimestamp - beginTimestampPrev, mq), e);log.warn(msg.toString());exception = e;switch (e.getResponseCode()) {case ResponseCode.TOPIC_NOT_EXIST:case ResponseCode.SERVICE_NOT_AVAILABLE:case ResponseCode.SYSTEM_ERROR:case ResponseCode.NO_PERMISSION:case ResponseCode.NO_BUYER_ID:case ResponseCode.NOT_IN_CURRENT_UNIT:continue;default:if (sendResult != null) {return sendResult;}throw e;}} catch (InterruptedException e) {endTimestamp = System.currentTimeMillis();this.updateFaultItem(mq.getBrokerName(), endTimestamp - beginTimestampPrev, false);log.warn(String.format("sendKernelImpl exception, throw exception, InvokeID: %s, RT: %sms, Broker: %s", invokeID, endTimestamp - beginTimestampPrev, mq), e);log.warn(msg.toString());log.warn("sendKernelImpl exception", e);log.warn(msg.toString());throw e;}} else {break;}}if (sendResult != null) {return sendResult;}String info = String.format("Send [%d] times, still failed, cost [%d]ms, Topic: %s, BrokersSent: %s",times,System.currentTimeMillis() - beginTimestampFirst,msg.getTopic(),Arrays.toString(brokersSent));info += FAQUrl.suggestTodo(FAQUrl.SEND_MSG_FAILED);MQClientException mqClientException = new MQClientException(info, exception);if (callTimeout) {throw new RemotingTooMuchRequestException("sendDefaultImpl call timeout");}if (exception instanceof MQBrokerException) {mqClientException.setResponseCode(((MQBrokerException) exception).getResponseCode());} else if (exception instanceof RemotingConnectException) {mqClientException.setResponseCode(ClientErrorCode.CONNECT_BROKER_EXCEPTION);} else if (exception instanceof RemotingTimeoutException) {mqClientException.setResponseCode(ClientErrorCode.ACCESS_BROKER_TIMEOUT);} else if (exception instanceof MQClientException) {mqClientException.setResponseCode(ClientErrorCode.BROKER_NOT_EXIST_EXCEPTION);}throw mqClientException;}//        查詢namesrv地址列表List<String> nsList = this.getmQClientFactory().getMQClientAPIImpl().getNameServerAddressList();if (null == nsList || nsList.isEmpty()) {throw new MQClientException("No name server address, please set it." + FAQUrl.suggestTodo(FAQUrl.NAME_SERVER_ADDR_NOT_EXIST_URL), null).setResponseCode(ClientErrorCode.NO_NAME_SERVER_EXCEPTION);}throw new MQClientException("No route info of this topic, " + msg.getTopic() + FAQUrl.suggestTodo(FAQUrl.NO_TOPIC_ROUTE_INFO),null).setResponseCode(ClientErrorCode.NOT_FOUND_TOPIC_EXCEPTION);    }

進入方法,找到topic的發佈信息,org.apache.rocketmq.client.impl.producer.DefaultMQProducerImpl#tryToFindTopicPublishInfo

private TopicPublishInfo tryToFindTopicPublishInfo(final String topic) {TopicPublishInfo topicPublishInfo = this.topicPublishInfoTable.get(topic);if (null == topicPublishInfo || !topicPublishInfo.ok()) {this.topicPublishInfoTable.putIfAbsent(topic, new TopicPublishInfo());//           從namesrv更新topic路由信息=》this.mQClientFactory.updateTopicRouteInfoFromNameServer(topic);topicPublishInfo = this.topicPublishInfoTable.get(topic);}if (topicPublishInfo.isHaveTopicRouterInfo() || topicPublishInfo.ok()) {return topicPublishInfo;} else {//            從namesrv更新topic路由信息this.mQClientFactory.updateTopicRouteInfoFromNameServer(topic, true, this.defaultMQProducer);topicPublishInfo = this.topicPublishInfoTable.get(topic);return topicPublishInfo;}    }

進入方法,從namesrv更新topic路由信息,org.apache.rocketmq.client.impl.factory.MQClientInstance#updateTopicRouteInfoFromNameServer(java.lang.String, boolean, org.apache.rocketmq.client.producer.DefaultMQProducer)





public boolean updateTopicRouteInfoFromNameServer(final String topic, boolean isDefault,DefaultMQProducer defaultMQProducer) {try {if (this.lockNamesrv.tryLock(LOCK_TIMEOUT_MILLIS, TimeUnit.MILLISECONDS)) {try {TopicRouteData topicRouteData;if (isDefault && defaultMQProducer != null) {//                        獲取默認的topic路由信息 =》topicRouteData = this.mQClientAPIImpl.getDefaultTopicRouteInfoFromNameServer(defaultMQProducer.getCreateTopicKey(),1000 * 3);if (topicRouteData != null) {//                            獲取隊列信息for (QueueData data : topicRouteData.getQueueDatas()) {//                                讀寫隊列最大數量4int queueNums = Math.min(defaultMQProducer.getDefaultTopicQueueNums(), data.getReadQueueNums());data.setReadQueueNums(queueNums);data.setWriteQueueNums(queueNums);}}} else {//                        獲取topic路由信息=》topicRouteData = this.mQClientAPIImpl.getTopicRouteInfoFromNameServer(topic, 1000 * 3);}if (topicRouteData != null) {TopicRouteData old = this.topicRouteTable.get(topic);//                        判斷topic路由是否改變=》boolean changed = topicRouteDataIsChange(old, topicRouteData);if (!changed) {//                            須要更新路由信息changed = this.isNeedUpdateTopicRouteInfo(topic);} else {log.info("the topic[{}] route info changed, old[{}] ,new[{}]", topic, old, topicRouteData);}if (changed) {TopicRouteData cloneTopicRouteData = topicRouteData.cloneTopicRouteData();for (BrokerData bd : topicRouteData.getBrokerDatas()) {//                                更新broker的地址列表this.brokerAddrTable.put(bd.getBrokerName(), bd.getBrokerAddrs());}// Update Pub info{//                                topic路由信息轉換成topic發佈信息=》TopicPublishInfo publishInfo = topicRouteData2TopicPublishInfo(topic, topicRouteData);publishInfo.setHaveTopicRouterInfo(true);//                                遍歷生產者信息Iterator<Entry<String, MQProducerInner>> it = this.producerTable.entrySet().iterator();while (it.hasNext()) {Entry<String, MQProducerInner> entry = it.next();MQProducerInner impl = entry.getValue();if (impl != null) {//                                        更新topic發佈信息=》impl.updateTopicPublishInfo(topic, publishInfo);}}}// Update sub info{//                                獲取消息隊列訂閱信息Set<MessageQueue> subscribeInfo = topicRouteData2TopicSubscribeInfo(topic, topicRouteData);//                                遍歷消費者Iterator<Entry<String, MQConsumerInner>> it = this.consumerTable.entrySet().iterator();while (it.hasNext()) {Entry<String, MQConsumerInner> entry = it.next();MQConsumerInner impl = entry.getValue();if (impl != null) {//                                        更新topic的訂閱信息=》impl.updateTopicSubscribeInfo(topic, subscribeInfo);}}}log.info("topicRouteTable.put. Topic = {}, TopicRouteData[{}]", topic, cloneTopicRouteData);this.topicRouteTable.put(topic, cloneTopicRouteData);return true;}} else {log.warn("updateTopicRouteInfoFromNameServer, getTopicRouteInfoFromNameServer return null, Topic: {}", topic);}} catch (Exception e) {if (!topic.startsWith(MixAll.RETRY_GROUP_TOPIC_PREFIX) && !topic.equals(MixAll.AUTO_CREATE_TOPIC_KEY_TOPIC)) {log.warn("updateTopicRouteInfoFromNameServer Exception", e);}} finally {this.lockNamesrv.unlock();}} else {log.warn("updateTopicRouteInfoFromNameServer tryLock timeout {}ms", LOCK_TIMEOUT_MILLIS);}} catch (InterruptedException e) {log.warn("updateTopicRouteInfoFromNameServer Exception", e);}return false;    }

接下篇。

 

說在最後

本次解析僅表明我的觀點,僅供參考。

 

加入技術微信羣

釘釘技術羣

相關文章
相關標籤/搜索