摘要: 原創出處 http://www.iocoder.cn/RocketMQ/high-availability/ 「芋道源碼」歡迎轉載,保留摘要,謝謝!java
本文主要基於 RocketMQ 4.0.x 正式版node
本文主要解析 Namesrv
、Broker
如何實現高可用,Producer
、Consumer
怎麼與它們通訊保證高可用。面試
啓動多個 Namesrv
實現高可用。
相較於 Zookeeper
、Consul
、Etcd
等,Namesrv
是一個超輕量級的註冊中心,提供命名服務。數據庫
Namesrv
之間,沒有任何關係(不存在相似 Zookeeper
的 Leader
/Follower
等角色),不進行通訊與數據同步。經過 Broker
循環註冊多個 Namesrv
。1: // ⬇️⬇️⬇️【BrokerOuterAPI.java】 2: public RegisterBrokerResult registerBrokerAll( 3: final String clusterName, 4: final String brokerAddr, 5: final String brokerName, 6: final long brokerId, 7: final String haServerAddr, 8: final TopicConfigSerializeWrapper topicConfigWrapper, 9: final List<String> filterServerList, 10: final boolean oneway, 11: final int timeoutMills) { 12: RegisterBrokerResult registerBrokerResult = null; 13: 14: List<String> nameServerAddressList = this.remotingClient.getNameServerAddressList(); 15: if (nameServerAddressList != null) { 16: for (String namesrvAddr : nameServerAddressList) { // 循環多個 Namesrv 17: try { 18: RegisterBrokerResult result = this.registerBroker(namesrvAddr, clusterName, brokerAddr, brokerName, brokerId, 19: haServerAddr, topicConfigWrapper, filterServerList, oneway, timeoutMills); 20: if (result != null) { 21: registerBrokerResult = result; 22: } 23: 24: log.info("register broker to name server {} OK", namesrvAddr); 25: } catch (Exception e) { 26: log.warn("registerBroker Exception, {}", namesrvAddr, e); 27: } 28: } 29: } 30: 31: return registerBrokerResult; 32: }
Producer
、Consumer
從 Namesrv
列表選擇一個可鏈接的進行通訊。1: // ⬇️⬇️⬇️【NettyRemotingClient.java】 2: private Channel getAndCreateNameserverChannel() throws InterruptedException { 3: // 返回已選擇、可鏈接Namesrv 4: String addr = this.namesrvAddrChoosed.get(); 5: if (addr != null) { 6: ChannelWrapper cw = this.channelTables.get(addr); 7: if (cw != null && cw.isOK()) { 8: return cw.getChannel(); 9: } 10: } 11: // 12: final List<String> addrList = this.namesrvAddrList.get(); 13: if (this.lockNamesrvChannel.tryLock(LOCK_TIMEOUT_MILLIS, TimeUnit.MILLISECONDS)) { 14: try { 15: // 返回已選擇、可鏈接的Namesrv 16: addr = this.namesrvAddrChoosed.get(); 17: if (addr != null) { 18: ChannelWrapper cw = this.channelTables.get(addr); 19: if (cw != null && cw.isOK()) { 20: return cw.getChannel(); 21: } 22: } 23: // 從【Namesrv列表】中選擇一個鏈接的返回 24: if (addrList != null && !addrList.isEmpty()) { 25: for (int i = 0; i < addrList.size(); i++) { 26: int index = this.namesrvIndex.incrementAndGet(); 27: index = Math.abs(index); 28: index = index % addrList.size(); 29: String newAddr = addrList.get(index); 30: 31: this.namesrvAddrChoosed.set(newAddr); 32: Channel channelNew = this.createChannel(newAddr); 33: if (channelNew != null) 34: return channelNew; 35: } 36: } 37: } catch (Exception e) { 38: log.error("getAndCreateNameserverChannel: create name server channel exception", e); 39: } finally { 40: this.lockNamesrvChannel.unlock(); 41: } 42: } else { 43: log.warn("getAndCreateNameserverChannel: try to lock name server, but timeout, {}ms", LOCK_TIMEOUT_MILLIS); 44: } 45: 46: return null; 47: }
啓動多個 Broker分組
造成 集羣
實現高可用。
Broker分組
= Master節點
x1 + Slave節點
xN。
相似 MySQL
,Master節點
提供讀寫服務,Slave節點
只提供讀服務。apache
Master
節點 不斷髮送新的 CommitLog
給 Slave
節點。 Slave
節點 不斷上報本地的 CommitLog
已經同步到的位置給 Master
節點。Broker分組
與 Broker分組
之間沒有任何關係,不進行通訊與數據同步。Master
/Slave
同步。org.apache.rocketmq.broker.slave.SlaveSynchronize
類,Slave
節點會從 Master
節點拉取消費進度、Topic 配置等等。集羣內,Master
節點 有兩種類型:Master_SYNC
、Master_ASYNC
:前者在 Producer
發送消息時,等待 Slave
節點 存儲完畢後再返回發送結果,然後者不須要等待。app
目前官方提供三套配置:socket
brokerClusterName | brokerName | brokerRole | brokerId |
---|---|---|---|
DefaultCluster | broker-a | ASYNC_MASTER | 0 |
DefaultCluster | broker-a | SLAVE | 1 |
DefaultCluster | broker-b | ASYNC_MASTER | 0 |
DefaultCluster | broker-b | SLAVE | 1 |
brokerClusterName | brokerName | brokerRole | brokerId |
---|---|---|---|
DefaultCluster | broker-a | SYNC_MASTER | 0 |
DefaultCluster | broker-a | SLAVE | 1 |
DefaultCluster | broker-b | SYNC_MASTER | 0 |
DefaultCluster | broker-b | SLAVE | 1 |
brokerClusterName | brokerName | brokerRole | brokerId |
---|---|---|---|
DefaultCluster | broker-a | ASYNC_MASTER | 0 |
DefaultCluster | broker-b | ASYNC_MASTER | 0 |
再看具體實現代碼以前,咱們來看看 Master
/Slave
節點 包含的組件:
async
Master
節點
AcceptSocketService
:接收 Slave
節點 鏈接。HAConnection
ReadSocketService
:讀來自 Slave
節點 的數據。WriteSocketService
:寫到往 Slave
節點 的數據。Slave
節點
HAClient
:對 Master
節點 鏈接、讀寫數據。Master
節點 與 Slave
節點 通訊協議很簡單,只有以下兩條。ide
對象 | 用途 | 第幾位 | 字段 | 數據類型 | 字節數 | 說明 |
---|---|---|---|---|---|---|
Slave=>Master | 上報CommitLog已經同步到的物理位置 | |||||
0 | maxPhyOffset | Long | 8 | CommitLog最大物理位置 | ||
Master=>Slave | 傳輸新的 CommitLog 數據 |
|||||
0 | fromPhyOffset | Long | 8 | CommitLog開始物理位置 | ||
1 | size | Int | 4 | 傳輸CommitLog數據長度 | ||
2 | body | Bytes | size | 傳輸CommitLog數據 |
Slave
主循環,實現了不斷不斷不斷從 Master
傳輸 CommitLog
數據,上傳 Master
本身本地的 CommitLog
已經同步物理位置。1: // ⬇️⬇️⬇️【HAClient.java】 2: public void run() { 3: log.info(this.getServiceName() + " service started"); 4: 5: while (!this.isStopped()) { 6: try { 7: if (this.connectMaster()) { 8: // 若到知足上報間隔,上報到Master進度 9: if (this.isTimeToReportOffset()) { 10: boolean result = this.reportSlaveMaxOffset(this.currentReportedOffset); 11: if (!result) { 12: this.closeMaster(); 13: } 14: } 15: 16: this.selector.select(1000); 17: 18: // 處理讀取事件 19: boolean ok = this.processReadEvent(); 20: if (!ok) { 21: this.closeMaster(); 22: } 23: 24: // 若進度有變化,上報到Master進度 25: if (!reportSlaveMaxOffsetPlus()) { 26: continue; 27: } 28: 29: // Master太久未返回數據,關閉鏈接 30: long interval = HAService.this.getDefaultMessageStore().getSystemClock().now() - this.lastWriteTimestamp; 31: if (interval > HAService.this.getDefaultMessageStore().getMessageStoreConfig() 32: .getHaHousekeepingInterval()) { 33: log.warn("HAClient, housekeeping, found this connection[" + this.masterAddress 34: + "] expired, " + interval); 35: this.closeMaster(); 36: log.warn("HAClient, master not response some time, so close connection"); 37: } 38: } else { 39: this.waitForRunning(1000 * 5); 40: } 41: } catch (Exception e) { 42: log.warn(this.getServiceName() + " service has exception. ", e); 43: this.waitForRunning(1000 * 5); 44: } 45: } 46: 47: log.info(this.getServiceName() + " service end"); 48: }
Master
上報 Slave
本地 CommitLog
已經同步到的物理位置。該操做還有心跳的做用。Master
傳輸 Slave
的 CommitLog
數據。#dispatchReadRequest(...)
與 #reportSlaveMaxOffset(...)
是怎麼實現的。1: // 【HAClient.java】 2: /** 3: * 讀取Master傳輸的CommitLog數據,並返回是異常 4: * 若是讀取到數據,寫入CommitLog 5: * 異常緣由: 6: * 1. Master傳輸來的數據offset 不等於 Slave的CommitLog數據最大offset 7: * 2. 上報到Master進度失敗 8: * 9: * @return 是否異常 10: */ 11: private boolean dispatchReadRequest() { 12: final int msgHeaderSize = 8 + 4; // phyoffset + size 13: int readSocketPos = this.byteBufferRead.position(); 14: 15: while (true) { 16: // 讀取到請求 17: int diff = this.byteBufferRead.position() - this.dispatchPostion; 18: if (diff >= msgHeaderSize) { 19: // 讀取masterPhyOffset、bodySize。使用dispatchPostion的緣由是:處理數據「粘包」致使數據讀取不完整。 20: long masterPhyOffset = this.byteBufferRead.getLong(this.dispatchPostion); 21: int bodySize = this.byteBufferRead.getInt(this.dispatchPostion + 8); 22: // 校驗 Master傳輸來的數據offset 是否和 Slave的CommitLog數據最大offset 是否相同。 23: long slavePhyOffset = HAService.this.defaultMessageStore.getMaxPhyOffset(); 24: if (slavePhyOffset != 0) { 25: if (slavePhyOffset != masterPhyOffset) { 26: log.error("master pushed offset not equal the max phy offset in slave, SLAVE: " 27: + slavePhyOffset + " MASTER: " + masterPhyOffset); 28: return false; 29: } 30: } 31: // 讀取到消息 32: if (diff >= (msgHeaderSize + bodySize)) { 33: // 寫入CommitLog 34: byte[] bodyData = new byte[bodySize]; 35: this.byteBufferRead.position(this.dispatchPostion + msgHeaderSize); 36: this.byteBufferRead.get(bodyData); 37: HAService.this.defaultMessageStore.appendToCommitLog(masterPhyOffset, bodyData); 38: // 設置處理到的位置 39: this.byteBufferRead.position(readSocketPos); 40: this.dispatchPostion += msgHeaderSize + bodySize; 41: // 上報到Master進度 42: if (!reportSlaveMaxOffsetPlus()) { 43: return false; 44: } 45: // 繼續循環 46: continue; 47: } 48: } 49: 50: // 空間寫滿,從新分配空間 51: if (!this.byteBufferRead.hasRemaining()) { 52: this.reallocateByteBuffer(); 53: } 54: 55: break; 56: } 57: 58: return true; 59: } 60: 61: /** 62: * 上報進度 63: * 64: * @param maxOffset 進度 65: * @return 是否上報成功 66: */ 67: private boolean reportSlaveMaxOffset(final long maxOffset) { 68: this.reportOffset.position(0); 69: this.reportOffset.limit(8); 70: this.reportOffset.putLong(maxOffset); 71: this.reportOffset.position(0); 72: this.reportOffset.limit(8); 73: 74: for (int i = 0; i < 3 && this.reportOffset.hasRemaining(); i++) { 75: try { 76: this.socketChannel.write(this.reportOffset); 77: } catch (IOException e) { 78: log.error(this.getServiceName() 79: + "reportSlaveMaxOffset this.socketChannel.write exception", e); 80: return false; 81: } 82: } 83: 84: return !this.reportOffset.hasRemaining(); 85: }
ReadSocketService
邏輯同 HAClient#processReadEvent(...)
基本相同,咱們直接看代碼。1: // ⬇️⬇️⬇️【ReadSocketService.java】 2: private boolean processReadEvent() { 3: int readSizeZeroTimes = 0; 4: 5: // 清空byteBufferRead 6: if (!this.byteBufferRead.hasRemaining()) { 7: this.byteBufferRead.flip(); 8: this.processPostion = 0; 9: } 10: 11: while (this.byteBufferRead.hasRemaining()) { 12: try { 13: int readSize = this.socketChannel.read(this.byteBufferRead); 14: if (readSize > 0) { 15: readSizeZeroTimes = 0; 16: 17: // 設置最後讀取時間 18: this.lastReadTimestamp = HAConnection.this.haService.getDefaultMessageStore().getSystemClock().now(); 19: 20: if ((this.byteBufferRead.position() - this.processPostion) >= 8) { 21: // 讀取Slave 請求來的CommitLog的最大位置 22: int pos = this.byteBufferRead.position() - (this.byteBufferRead.position() % 8); 23: long readOffset = this.byteBufferRead.getLong(pos - 8); 24: this.processPostion = pos; 25: 26: // 設置Slave CommitLog的最大位置 27: HAConnection.this.slaveAckOffset = readOffset; 28: 29: // 設置Slave 第一次請求的位置 30: if (HAConnection.this.slaveRequestOffset < 0) { 31: HAConnection.this.slaveRequestOffset = readOffset; 32: log.info("slave[" + HAConnection.this.clientAddr + "] request offset " + readOffset); 33: } 34: 35: // 通知目前Slave進度。主要用於Master節點爲同步類型的。 36: HAConnection.this.haService.notifyTransferSome(HAConnection.this.slaveAckOffset); 37: } 38: } else if (readSize == 0) { 39: if (++readSizeZeroTimes >= 3) { 40: break; 41: } 42: } else { 43: log.error("read socket[" + HAConnection.this.clientAddr + "] < 0"); 44: return false; 45: } 46: } catch (IOException e) { 47: log.error("processReadEvent exception", e); 48: return false; 49: } 50: } 51: 52: return true; 53: }
WriteSocketService
計算 Slave
開始同步的位置後,不斷向 Slave
傳輸新的 CommitLog
數據。1: // ⬇️⬇️⬇️【WriteSocketService.java】 2: @Override 3: public void run() { 4: HAConnection.log.info(this.getServiceName() + " service started"); 5: 6: while (!this.isStopped()) { 7: try { 8: this.selector.select(1000); 9: 10: // 未得到Slave讀取進度請求,sleep等待。 11: if (-1 == HAConnection.this.slaveRequestOffset) { 12: Thread.sleep(10); 13: continue; 14: } 15: 16: // 計算初始化nextTransferFromWhere 17: if (-1 == this.nextTransferFromWhere) { 18: if (0 == HAConnection.this.slaveRequestOffset) { 19: long masterOffset = HAConnection.this.haService.getDefaultMessageStore().getCommitLog().getMaxOffset(); 20: masterOffset = masterOffset - (masterOffset % HAConnection.this.haService.getDefaultMessageStore().getMessageStoreConfig().getMapedFileSizeCommitLog()); 21: if (masterOffset < 0) { 22: masterOffset = 0; 23: } 24: 25: this.nextTransferFromWhere = masterOffset; 26: } else { 27: this.nextTransferFromWhere = HAConnection.this.slaveRequestOffset; 28: } 29: 30: log.info("master transfer data from " + this.nextTransferFromWhere + " to slave[" + HAConnection.this.clientAddr 31: + "], and slave request " + HAConnection.this.slaveRequestOffset); 32: } 33: 34: if (this.lastWriteOver) { 35: long interval = HAConnection.this.haService.getDefaultMessageStore().getSystemClock().now() - this.lastWriteTimestamp; 36: if (interval > HAConnection.this.haService.getDefaultMessageStore().getMessageStoreConfig().getHaSendHeartbeatInterval()) { // 心跳 37: 38: // Build Header 39: this.byteBufferHeader.position(0); 40: this.byteBufferHeader.limit(headerSize); 41: this.byteBufferHeader.putLong(this.nextTransferFromWhere); 42: this.byteBufferHeader.putInt(0); 43: this.byteBufferHeader.flip(); 44: 45: this.lastWriteOver = this.transferData(); 46: if (!this.lastWriteOver) 47: continue; 48: } 49: } else { // 未傳輸完成,繼續傳輸 50: this.lastWriteOver = this.transferData(); 51: if (!this.lastWriteOver) 52: continue; 53: } 54: 55: // 選擇新的CommitLog數據進行傳輸 56: SelectMappedBufferResult selectResult = 57: HAConnection.this.haService.getDefaultMessageStore().getCommitLogData(this.nextTransferFromWhere); 58: if (selectResult != null) { 59: int size = selectResult.getSize(); 60: if (size > HAConnection.this.haService.getDefaultMessageStore().getMessageStoreConfig().getHaTransferBatchSize()) { 61: size = HAConnection.this.haService.getDefaultMessageStore().getMessageStoreConfig().getHaTransferBatchSize(); 62: } 63: 64: long thisOffset = this.nextTransferFromWhere; 65: this.nextTransferFromWhere += size; 66: 67: selectResult.getByteBuffer().limit(size); 68: this.selectMappedBufferResult = selectResult; 69: 70: // Build Header 71: this.byteBufferHeader.position(0); 72: this.byteBufferHeader.limit(headerSize); 73: this.byteBufferHeader.putLong(thisOffset); 74: this.byteBufferHeader.putInt(size); 75: this.byteBufferHeader.flip(); 76: 77: this.lastWriteOver = this.transferData(); 78: } else { // 沒新的消息,掛起等待 79: HAConnection.this.haService.getWaitNotifyObject().allWaitForRunning(100); 80: } 81: } catch (Exception e) { 82: 83: HAConnection.log.error(this.getServiceName() + " service has exception.", e); 84: break; 85: } 86: } 87: 88: // 斷開鏈接 & 暫停寫線程 & 暫停讀線程 & 釋放CommitLog 89: if (this.selectMappedBufferResult != null) { 90: this.selectMappedBufferResult.release(); 91: } 92: 93: this.makeStop(); 94: 95: readSocketService.makeStop(); 96: 97: haService.removeConnection(HAConnection.this); 98: 99: SelectionKey sk = this.socketChannel.keyFor(this.selector); 100: if (sk != null) { 101: sk.cancel(); 102: } 103: 104: try { 105: this.selector.close(); 106: this.socketChannel.close(); 107: } catch (IOException e) { 108: HAConnection.log.error("", e); 109: } 110: 111: HAConnection.log.info(this.getServiceName() + " service end"); 112: } 113: 114: /** 115: * 傳輸數據 116: */ 117: private boolean transferData() throws Exception { 118: int writeSizeZeroTimes = 0; 119: // Write Header 120: while (this.byteBufferHeader.hasRemaining()) { 121: int writeSize = this.socketChannel.write(this.byteBufferHeader); 122: if (writeSize > 0) { 123: writeSizeZeroTimes = 0; 124: this.lastWriteTimestamp = HAConnection.this.haService.getDefaultMessageStore().getSystemClock().now(); 125: } else if (writeSize == 0) { 126: if (++writeSizeZeroTimes >= 3) { 127: break; 128: } 129: } else { 130: throw new Exception("ha master write header error < 0"); 131: } 132: } 133: 134: if (null == this.selectMappedBufferResult) { 135: return !this.byteBufferHeader.hasRemaining(); 136: } 137: 138: writeSizeZeroTimes = 0; 139: 140: // Write Body 141: if (!this.byteBufferHeader.hasRemaining()) { 142: while (this.selectMappedBufferResult.getByteBuffer().hasRemaining()) { 143: int writeSize = this.socketChannel.write(this.selectMappedBufferResult.getByteBuffer()); 144: if (writeSize > 0) { 145: writeSizeZeroTimes = 0; 146: this.lastWriteTimestamp = HAConnection.this.haService.getDefaultMessageStore().getSystemClock().now(); 147: } else if (writeSize == 0) { 148: if (++writeSizeZeroTimes >= 3) { 149: break; 150: } 151: } else { 152: throw new Exception("ha master write body error < 0"); 153: } 154: } 155: } 156: 157: boolean result = !this.byteBufferHeader.hasRemaining() && !this.selectMappedBufferResult.getByteBuffer().hasRemaining(); 158: 159: if (!this.selectMappedBufferResult.getByteBuffer().hasRemaining()) { 160: this.selectMappedBufferResult.release(); 161: this.selectMappedBufferResult = null; 162: } 163: 164: return result; 165: }
Producer
發送消息時,Master_SYNC
節點 會等待 Slave
節點 存儲完畢後再返回發送結果。核心代碼以下:學習
1: // ⬇️⬇️⬇️【CommitLog.java】 2: public PutMessageResult putMessage(final MessageExtBrokerInner msg) { 3: // ....省略處理髮送代碼 4: // Synchronous write double 若是是同步Master,同步到從節點 5: if (BrokerRole.SYNC_MASTER == this.defaultMessageStore.getMessageStoreConfig().getBrokerRole()) { 6: HAService service = this.defaultMessageStore.getHaService(); 7: if (msg.isWaitStoreMsgOK()) { 8: // Determine whether to wait 9: if (service.isSlaveOK(result.getWroteOffset() + result.getWroteBytes())) { 10: if (null == request) { 11: request = new GroupCommitRequest(result.getWroteOffset() + result.getWroteBytes()); 12: } 13: service.putRequest(request); 14: 15: // 喚醒WriteSocketService 16: service.getWaitNotifyObject().wakeupAll(); 17: 18: boolean flushOK = request.waitForFlush(this.defaultMessageStore.getMessageStoreConfig().getSyncFlushTimeout()); 19: if (!flushOK) { 20: log.error("do sync transfer other node, wait return, but failed, topic: " + msg.getTopic() + " tags: " 21: + msg.getTags() + " client address: " + msg.getBornHostString()); 22: putMessageResult.setPutMessageStatus(PutMessageStatus.FLUSH_SLAVE_TIMEOUT); 23: } 24: } 25: // Slave problem 26: else { 27: // Tell the producer, slave not available 28: putMessageResult.setPutMessageStatus(PutMessageStatus.SLAVE_NOT_AVAILABLE); 29: } 30: } 31: } 32: 33: return putMessageResult; 34: }
WriteSocketService
。
WriteSocketService
掛起等待新消息結束,Master
傳輸 Slave
新的 CommitLog
數據。Slave
收到數據後,當即上報最新的 CommitLog
同步進度到 Master
。ReadSocketService
喚醒第 18 行:request#waitForFlush(...)
。咱們來看下 GroupTransferService
的核心邏輯代碼:
1: // ⬇️⬇️⬇️【GroupTransferService.java】 2: private void doWaitTransfer() { 3: synchronized (this.requestsRead) { 4: if (!this.requestsRead.isEmpty()) { 5: for (CommitLog.GroupCommitRequest req : this.requestsRead) { 6: // 等待Slave上傳進度 7: boolean transferOK = HAService.this.push2SlaveMaxOffset.get() >= req.getNextOffset(); 8: for (int i = 0; !transferOK && i < 5; i++) { 9: this.notifyTransferObject.waitForRunning(1000); // 喚醒 10: transferOK = HAService.this.push2SlaveMaxOffset.get() >= req.getNextOffset(); 11: } 12: 13: if (!transferOK) { 14: log.warn("transfer messsage to slave timeout, " + req.getNextOffset()); 15: } 16: 17: // 喚醒請求,並設置是否Slave同步成功 18: req.wakeupCustomer(transferOK); 19: } 20: 21: this.requestsRead.clear(); 22: } 23: } 24: }
Producer
發送消息時,會對 Broker
集羣 的全部隊列進行選擇。核心代碼以下:
1: // ⬇️⬇️⬇️【DefaultMQProducerImpl.java】 2: private SendResult sendDefaultImpl(// 3: Message msg, // 4: final CommunicationMode communicationMode, // 5: final SendCallback sendCallback, // 6: final long timeout// 7: ) throws MQClientException, RemotingException, MQBrokerException, InterruptedException { 8: // .... 省略:處理【校驗邏輯】 9: // 獲取 Topic路由信息 10: TopicPublishInfo topicPublishInfo = this.tryToFindTopicPublishInfo(msg.getTopic()); 11: if (topicPublishInfo != null && topicPublishInfo.ok()) { 12: MessageQueue mq = null; // 最後選擇消息要發送到的隊列 13: Exception exception = null; 14: SendResult sendResult = null; // 最後一次發送結果 15: int timesTotal = communicationMode == CommunicationMode.SYNC ? 1 + this.defaultMQProducer.getRetryTimesWhenSendFailed() : 1; // 同步屢次調用 16: int times = 0; // 第幾回發送 17: String[] brokersSent = new String[timesTotal]; // 存儲每次發送消息選擇的broker名 18: // 循環調用發送消息,直到成功 19: for (; times < timesTotal; times++) { 20: String lastBrokerName = null == mq ? null : mq.getBrokerName(); 21: MessageQueue tmpmq = this.selectOneMessageQueue(topicPublishInfo, lastBrokerName); // 選擇消息要發送到的隊列 22: if (tmpmq != null) { 23: mq = tmpmq; 24: brokersSent[times] = mq.getBrokerName(); 25: try { 26: beginTimestampPrev = System.currentTimeMillis(); 27: // 調用發送消息核心方法 28: sendResult = this.sendKernelImpl(msg, mq, communicationMode, sendCallback, topicPublishInfo, timeout); 29: endTimestamp = System.currentTimeMillis(); 30: // 更新Broker可用性信息 31: this.updateFaultItem(mq.getBrokerName(), endTimestamp - beginTimestampPrev, false); 32: // .... 省略:處理【發送返回結果】 33: } 34: } catch (e) { // .... 省略:處理【異常】 35: 36: } 37: } else { 38: break; 39: } 40: } 41: // .... 省略:處理【發送返回結果】 42: } 43: // .... 省略:處理【找不到消息路由】 44: }
以下是調試 #sendDefaultImpl(...)
時 TopicPublishInfo
的結果,Producer
得到到了 broker-a
,broker-b
兩個 Broker
分組 的消息隊列:
Consumer
消費消息時,會對 Broker
集羣 的全部隊列進行選擇。