RocketMQ架構上主要分爲四部分,如上圖所示:apache
基礎:Namesrv用來存儲路由的基礎信息都放在RouteInfoManager類中,RouteInfoManager類也能夠看作是Namesrv的資源類,不少操做都是對此類中的數據進行實時更改,
主要成員變量:緩存
private final HashMap<String/\* topic \*/, List<QueueData>> topicQueueTable; private final HashMap<String/\* brokerName \*/, BrokerData> brokerAddrTable; private final HashMap<String/\* clusterName \*/, Set<String/\* brokerName \*/\>> clusterAddrTable; private final HashMap<String/\* brokerAddr \*/, BrokerLiveInfo> brokerLiveTable; private final HashMap<String/\* brokerAddr \*/, List<String>/\* Filter Server \*/\> filterServerTable;
topicQueueTable:Topic消息隊列路由信息,消息發送時根據路由表進行負載均衡
brokerAddrTable:Broker基礎信息,包含brokerName、所屬集羣名稱、主備Broker地址
clusterAddrTable:Broke集羣信息,存儲集羣中全部Broker名稱
brokerLiveTable:Broker狀態信息,NameServer每次收到心跳包時會替換該信息
filterServerTable:Broker上的FilterServer列表,用於類模式消息過濾服務器
其主要方法是RouteInfoManager類的registerBroker()方法網絡
public RegisterBrokerResult registerBroker( final String clusterName, final String brokerAddr, final String brokerName, final long brokerId, final String haServerAddr, final TopicConfigSerializeWrapper topicConfigWrapper, final List<String> filterServerList, final Channel channel)
主要流程如上圖數據結構
RMQ路由發現是非實時的,當Topic發生變化後,Namesrv不知道推送給客戶端,而是由客戶端主動拉取最新的路由。
其主要方法是RouteInfoManager類的pickupTopicRouteData()方法架構
public TopicRouteData pickupTopicRouteData(final String topic)
主要流程以下圖:app
路由刪除會從topicQueueTable、brokerAddrTable、brokerLiveTable、filterServerTable中刪除與該Broker相關的信息。
其主要方法是RouteInfoManager類的scanNotActiveBroker()和onChannelDestroy(String remoteAddr, Channel channel)方法負載均衡
public void scanNotActiveBroker() { Iterator<Entry<String, BrokerLiveInfo>> it = this.brokerLiveTable.entrySet().iterator(); //遍歷每一個存活broker while (it.hasNext()) { Entry<String, BrokerLiveInfo> next = it.next(); long last = next.getValue().getLastUpdateTimestamp(); //默認超過兩分鐘,關閉和broker的channel,把本身從brokerLiveTable中剔除 //調用 onChannelDestroy if ((last + BROKER\_CHANNEL\_EXPIRED\_TIME) < System.currentTimeMillis()) { RemotingUtil.closeChannel(next.getValue().getChannel()); it.remove(); log.warn("The broker channel expired, {} {}ms", next.getKey(), BROKER\_CHANNEL\_EXPIRED\_TIME); this.onChannelDestroy(next.getKey(), next.getValue().getChannel()); } } }
onChannelDestroy()方法的主要流程以下:
1.申請寫鎖,根據brokerAddress從brokerLiveTable、filterServerTable移除
2.維護brokerAddrTable,遍歷brokerAddrTable,根據參數remoteAddr從brokerAddrTable中刪除brokerAddr,若是BrokerAddrs集合爲空,則從brokerAddrTable中刪除brokerName。
3.根據BrokerName,從ClusterAddrTable中找到Broker並從集羣中移除, 若是移除後,集羣中不包含任何Broker,則將該集羣從clusterAddrTable中移除。
4.根據brokerName,遍歷topicQueueTable,找到QueueData並從List<QueueData>移除,若是List<QueueData>爲空,則把相應的topic從topicQueueTable中移除。
5.釋放鎖異步
1.1:總體流程以下圖
1.2: 在應用裏初始化DefaultMQProducer時候,會以Producer名或者RPCHook的任一個或兩個做爲參數初始化DefaultMQProducer對象,而後對DefaultMQProducer對象設置NameServer地址等參數,而後調用start方法啓動Producer,其實內部調用了DefaultMQProducerImpl.start 方法,其大體流程以下:分佈式
public void sendHeartbeatToAllBrokerWithLock() { if (this.lockHeartbeat.tryLock()) { try { this.sendHeartbeatToAllBroker(); //向Filter過濾服務器發送REGISTER_MESSAGE_FILTER_CLASS請求碼,更新過濾服務器中的Filterclass文件 this.uploadFilterClassSource(); } catch (final Exception e) { log.error("sendHeartbeatToAllBroker exception", e); } finally { this.lockHeartbeat.unlock(); } } else { log.warn("lock heartBeat, but failed."); } }
sendHeartbeatToAllBroker()的主要流程以下:
一、初始化 HeartbeatData 對象,將該 Producer 或 Consumer 的 ClientID 賦值給 HeartbeatData 對象的 clientID 變量。
二、遍歷 MQClientInstance.consumerTable和producerTable,根據每一個 MQConsumerInner 對象的值初始化ConsumerData 對象和ProducerData對象。
三、若 ConsumerData 集合和 ProducerData 集合都爲空,說明沒有 consumer或 produer,則不發送心跳信息。
四、若不是都爲空,則遍歷 MQClientInstance.brokerAddrTable 列表,向每一個 Broker 地址發送請求碼爲 HEART_BEAT 的心跳消息,可是當存在 Consumer 時才向全部 Broker 發送心跳消息,不然若不存在 Consumer 則只向主用 Broker 地址發送心跳消息。
5.根據broker返回結果,更新MQClientInstance.brokerVersionTable。
3.1:消息發送的總體時序流程以下圖
其中幾個主要方法文字說明以下:
3.2 消息的數據結構
rocketmq的消息封裝在org.apache.rocketmq.common.message類中,屬性:
private String topic; //消息所屬topic private int flag; //消息flag (沒啥用) private Map<String, String> properties; //擴展屬性 (消息的TAGS 消息的延遲等級。) private byte[] body; //消息主體內容 private String transactionId; //交易id
3.3 普通消息的發送
對於普通消息的發送,能夠從DefaultProducerImpl的send方法入手,
public SendResult send(Message msg) throws MQClientException, RemotingException, MQBrokerException, InterruptedException { return send(msg, this.defaultMQProducer.getSendMsgTimeout()); }
發送流程中幾個主要方法以下:
驗證消息是否符合規範,包括topicName、body不能爲空、length不能爲0且最大爲4_1024_1024
public static void checkMessage(Message msg, DefaultMQProducer defaultMQProducer) throws MQClientException { if (null == msg) { throw new MQClientException(ResponseCode.MESSAGE_ILLEGAL, "the message is null"); } // topic Validators.checkTopic(msg.getTopic()); if (null == msg.getBody()) { throw new MQClientException(ResponseCode.MESSAGE_ILLEGAL, "the message body is null"); } if (0 == msg.getBody().length) { throw new MQClientException(ResponseCode.MESSAGE_ILLEGAL, "the message body length is zero"); } if (msg.getBody().length > defaultMQProducer.getMaxMessageSize()) { throw new MQClientException(ResponseCode.MESSAGE_ILLEGAL, "the message body size over max value, MAX: " + defaultMQProducer.getMaxMessageSize()); } }
獲取主題的路由信息,查找要發送的具體的Broker節點
private TopicPublishInfo tryToFindTopicPublishInfo(final String topic) { TopicPublishInfo topicPublishInfo = this.topicPublishInfoTable.get(topic); if (null == topicPublishInfo || !topicPublishInfo.ok()) { this.topicPublishInfoTable.putIfAbsent(topic, new TopicPublishInfo()); this.mQClientFactory.updateTopicRouteInfoFromNameServer(topic); topicPublishInfo = this.topicPublishInfoTable.get(topic); } if (topicPublishInfo.isHaveTopicRouterInfo() || topicPublishInfo.ok()) { return topicPublishInfo; } else { this.mQClientFactory.updateTopicRouteInfoFromNameServer(topic, true, this.defaultMQProducer); topicPublishInfo = this.topicPublishInfoTable.get(topic); return topicPublishInfo; } }
生產者中若是緩存了topic的路由信息則直接返回,不然向Namesrv查詢,再未查詢到則嘗試用默認主題createTopicKey=TBW102去查詢,都查詢步到則會拋出異常。Producer關於更新和維護路由信息MQClientInstance.topicRouteTable緩存操做都在updateTopicRoutInfoFromNameServer方法中。
根據路由信息選擇消息隊列,返回的消息體按照broker、序號排序。
首先採用重試機制,由retryTimesWhenSendFailed指定同步方式重試次數,異步由retryTimesWhenSendAsyncFailed指定,而後使用循環執行的方式,選擇消息隊列 、發送消息,發送成功則返回,收到異常則重試。
選擇消息隊列有2種方式:
1.sendLatencyFaultEnable=false ,默認不啓用 Broker 故障延遲機制,調用 TopicPublishlnfo的slectOneMessageQueue每次獲取index的時候都是從本地線程變量ThreadLocal中獲取,沒有的狀況下就是隨機生成一個,加1取絕對值後返回,再對隊列列表的長度取模,因此在同一線程中,會輪訓的從隊列列表獲取隊列。而若是是不一樣線程的話,index是隨機生成的,因此就是隨機從隊列列表中獲取。
能夠看到選擇隊列方法的入參有一個lastBrokerName的入參,此參數的目的是在發送消息失敗的狀況下,producer會重試再次發送,而再次發送選擇的隊列須要另選一個broker,lastBrokerName就是要過濾掉失敗的broker,選擇下一個broker的隊列進行發送消息。
public MessageQueue selectOneMessageQueue(final String lastBrokerName) { if (lastBrokerName == null) { return selectOneMessageQueue(); } else { int index = this.sendWhichQueue.getAndIncrement(); for (int i = 0; i < this.messageQueueList.size(); i++) { int pos = Math.abs(index++) % this.messageQueueList.size(); if (pos < 0) pos = 0; MessageQueue mq = this.messageQueueList.get(pos); if (!mq.getBrokerName().equals(lastBrokerName)) { return mq; } } return selectOneMessageQueue(); } } public MessageQueue selectOneMessageQueue() { int index = this.sendWhichQueue.getAndIncrement(); int pos = Math.abs(index) % this.messageQueueList.size(); if (pos < 0) pos = 0; return this.messageQueueList.get(pos); }
2.sendLatencyFaultEnable=true ,啓用 Broker 障延遲機制,保證低延遲,調用MQFaultStrategy.selectOneMessageQueue
開啓延遲故障,每當發送完一次消息,無論成功仍是失敗,都會把這次存儲消息的broker給保存下來,記錄故障狀況下此broker須要延長多長時間才能再次發送,目前看到在代碼裏面寫死了,故障下30s以內是不能再向此broker發送消息了
public MessageQueue selectOneMessageQueue(final TopicPublishInfo tpInfo, final String lastBrokerName) { //開啓此策略能夠保證低延遲 if (this.sendLatencyFaultEnable) { try { int index = tpInfo.getSendWhichQueue().getAndIncrement(); //輪詢看能不能有低延遲的MessageQueue for (int i = 0; i < tpInfo.getMessageQueueList().size(); i++) { int pos = Math.abs(index++) % tpInfo.getMessageQueueList().size(); if (pos < 0) pos = 0; MessageQueue mq = tpInfo.getMessageQueueList().get(pos); if (latencyFaultTolerance.isAvailable(mq.getBrokerName())) { if (null == lastBrokerName || mq.getBrokerName().equals(lastBrokerName)) return mq; } } //若是沒找到就隨機找一個 final String notBestBroker = latencyFaultTolerance.pickOneAtLeast(); int writeQueueNums = tpInfo.getQueueIdByBroker(notBestBroker); if (writeQueueNums > 0) { final MessageQueue mq = tpInfo.selectOneMessageQueue(); if (notBestBroker != null) { mq.setBrokerName(notBestBroker); mq.setQueueId(tpInfo.getSendWhichQueue().getAndIncrement() % writeQueueNums); } return mq; } else { latencyFaultTolerance.remove(notBestBroker); } } catch (Exception e) { log.error("Error occurred when selecting message queue", e); } return tpInfo.selectOneMessageQueue(); } return tpInfo.selectOneMessageQueue(lastBrokerName); }
3.3.4 真正消息的發送
1.消息發送的入口在DefaultMQProducerimpl的sendKernerlmpl方法中:根據 MessageQueue 獲取 Broker 的網絡地址 若是 MQClientlnstance.brokeraddrTable沒緩存該 Broke 的信息,則從 NameServer 主動更新一 topic 的路由信
若是路由更新後還 找不到Broker信息,則拋出異常,提示Broer不存在
String brokerAddr = this.mQClientFactory.findBrokerAddressinPublish(mq.getBrokerName()); if (null == brokerAddr) { tryToFindTopicPublishinfo(mq.getTopic()); brokerAddr = this.mQClientFactory.findBrokerAddressinPublish (mq.getBrokerName()); }
2.爲消息分配全局惟一id,若是消息默認超 4K(compressMsgBodyOverHowmuch),會對消息體採用 zip 壓縮,並設置消息的系統標記爲 MessageSysFlag.COMPRESSED_FLAG,若是是事務Prepared消息,則設消息的系統標記爲 MessageSysFlag.TRANSACTION_PREPARED TYPE
//for MessageBatch,ID has been set in the generating process if (!(msg instanceof MessageBatch)) { MessageClientIDSetter.setUniqID(msg); } boolean topicWithNamespace = false; if (null != this.mQClientFactory.getClientConfig().getNamespace()) { msg.setInstanceId(this.mQClientFactory.getClientConfig().getNamespace()); topicWithNamespace = true; } int sysFlag = 0; boolean msgBodyCompressed = false; if (this.tryToCompressMessage(msg)) { sysFlag |= MessageSysFlag.COMPRESSED_FLAG; msgBodyCompressed = true; } final String tranMsg = msg.getProperty(MessageConst.PROPERTY_TRANSACTION_PREPARED); if (tranMsg != null && Boolean.parseBoolean(tranMsg)) { sysFlag |= MessageSysFlag.TRANSACTION_PREPARED_TYPE; }
if (this.hasSendMessageHook()) { context = new SendMessageContext(); context.setProducer(this); context.setProducerGroup(this.defaultMQProducer.getProducerGroup()); context.setCommunicationMode(communicationMode); context.setBornHost(this.defaultMQProducer.getClientIP()); context.setBrokerAddr(brokerAddr); context.setMessage(msg); context.setMq(mq); context.setNamespace(this.defaultMQProducer.getNamespace()); String isTrans = msg.getProperty(MessageConst.PROPERTY_TRANSACTION_PREPARED); if (isTrans != null && isTrans.equals("true")) { context.setMsgType(MessageType.Trans\_Msg\_Half); } if (msg.getProperty("\_\_STARTDELIVERTIME") != null || msg.getProperty(MessageConst.PROPERTY\_DELAY\_TIME\_LEVEL) != null) { context.setMsgType(MessageType.Delay\_Msg); } this.executeSendMessageHookBefore(context); }
4.構建消息發送請求 主要包含以下重要信息:生產者組、主題名稱、默認建立主題 Key 、該主題在單個Broker默認隊列數、隊ID (隊列序號)、消息系統標( MessageSysFlag 消息發時間 、消息標記( RocketMQ對消息中的flag不作任何處理供應用程序使用) 消息擴展屬性 、消息重試次數、是不是批量消息等
SendMessageRequestHeader requestHeader = new SendMessageRequestHeader(); requestHeader.setProducerGroup(this.defaultMQProducer.getProducerGroup()); requestHeader.setTopic(msg.getTopic()); requestHeader.setDefaultTopic(this.defaultMQProducer.getCreateTopicKey()); requestHeader.setDefaultTopicQueueNums(this.defaultMQProducer.getDefaultTopicQueueNums()); requestHeader.setQueueId(mq.getQueueId()); requestHeader.setSysFlag(sysFlag); requestHeader.setBornTimestamp(System.currentTimeMillis()); requestHeader.setFlag(msg.getFlag()); requestHeader.setProperties(MessageDecoder.messageProperties2String(msg.getProperties())); requestHeader.setReconsumeTimes(0); requestHeader.setUnitMode(this.isUnitMode()); requestHeader.setBatch(msg instanceof MessageBatch); if (requestHeader.getTopic().startsWith(MixAll.RETRY_GROUP_TOPIC_PREFIX)) { String reconsumeTimes = MessageAccessor.getReconsumeTime(msg); if (reconsumeTimes != null) { requestHeader.setReconsumeTimes(Integer.valueOf(reconsumeTimes)); MessageAccessor.clearProperty(msg, MessageConst.PROPERTY_RECONSUME_TIME); } String maxReconsumeTimes = MessageAccessor.getMaxReconsumeTimes(msg); if (maxReconsumeTimes != null) { requestHeader.setMaxReconsumeTimes(Integer.valueOf(maxReconsumeTimes)); MessageAccessor.clearProperty(msg, MessageConst.PROPERTY_MAX_RECONSUME_TIMES); } }
5.根據 發送方式,同步、異步、單式進行網絡傳輸,消息發送在MQClientAPIImpl的sendMessage()方法中
public SendResult sendMessage( final String addr, final String brokerName, final Message msg, final SendMessageRequestHeader requestHeader, final long timeoutMillis, final CommunicationMode communicationMode, final SendCallback sendCallback, final TopicPublishInfo topicPublishInfo, final MQClientInstance instance, final int retryTimesWhenSendFailed, final SendMessageContext context, final DefaultMQProducerImpl producer ) throws RemotingException, MQBrokerException, InterruptedException { long beginStartTime = System.currentTimeMillis(); RemotingCommand request = null; if (sendSmartMsg || msg instanceof MessageBatch) { SendMessageRequestHeaderV2 requestHeaderV2 = SendMessageRequestHeaderV2.createSendMessageRequestHeaderV2(requestHeader); request = RemotingCommand.createRequestCommand(msg instanceof MessageBatch ? RequestCode.SEND\_BATCH\_MESSAGE : RequestCode.SEND\_MESSAGE\_V2, requestHeaderV2); } else { request = RemotingCommand.createRequestCommand(RequestCode.SEND\_MESSAGE, requestHeader); } request.setBody(msg.getBody()); switch (communicationMode) { case ONEWAY: this.remotingClient.invokeOneway(addr, request, timeoutMillis); return null; case ASYNC: final AtomicInteger times = new AtomicInteger(); long costTimeAsync = System.currentTimeMillis() - beginStartTime; if (timeoutMillis < costTimeAsync) { throw new RemotingTooMuchRequestException("sendMessage call timeout"); } this.sendMessageAsync(addr, brokerName, msg, timeoutMillis - costTimeAsync, request, sendCallback, topicPublishInfo, instance, retryTimesWhenSendFailed, times, context, producer); return null; case SYNC: long costTimeSync = System.currentTimeMillis() - beginStartTime; if (timeoutMillis < costTimeSync) { throw new RemotingTooMuchRequestException("sendMessage call timeout"); } return this.sendMessageSync(addr, brokerName, msg, timeoutMillis - costTimeSync, request); default: assert false; break; } return null; }
if (this.hasSendMessageHook()) { context.setException(e); this.executeSendMessageHookAfter(context); }