RocketMQ源碼解析:高可用

🙂🙂🙂關注微信公衆號:【芋艿的後端小屋】有福利: java

  1. RocketMQ / MyCAT / Sharding-JDBC 全部源碼分析文章列表
  2. RocketMQ / MyCAT / Sharding-JDBC 中文註釋源碼 GitHub 地址
  3. 您對於源碼的疑問每條留言將獲得認真回覆。甚至不知道如何讀源碼也能夠請教噢
  4. 新的源碼解析文章實時收到通知。每週更新一篇左右

1. 概述

本文主要解析 NamesrvBroker 如何實現高可用,ProducerConsumer 怎麼與它們通訊保證高可用。node

2. Namesrv 高可用

啓動多個 Namesrv 實現高可用。
相較於 ZookeeperConsulEtcd 等,Namesrv 是一個超輕量級的註冊中心,提供命名服務後端

2.1 Broker 註冊到 Namesrv

  • 📌 多個 Namesrv 之間,沒有任何關係(不存在相似 ZookeeperLeader/Follower 等角色),不進行通訊與數據同步。經過 Broker 循環註冊多個 Namesrv
1: // ⬇️⬇️⬇️【BrokerOuterAPI.java】
  2: public RegisterBrokerResult registerBrokerAll( 3: final String clusterName, 4: final String brokerAddr, 5: final String brokerName, 6: final long brokerId, 7: final String haServerAddr, 8: final TopicConfigSerializeWrapper topicConfigWrapper, 9: final List<String> filterServerList, 10: final boolean oneway, 11: final int timeoutMills) {
 12:     RegisterBrokerResult registerBrokerResult = null;
 13: 
 14:     List<String> nameServerAddressList = this.remotingClient.getNameServerAddressList();
 15:     if (nameServerAddressList != null) {
 16:         for (String namesrvAddr : nameServerAddressList) { // 循環多個 Namesrv
 17:             try {
 18:                 RegisterBrokerResult result = this.registerBroker(namesrvAddr, clusterName, brokerAddr, brokerName, brokerId,
 19:                     haServerAddr, topicConfigWrapper, filterServerList, oneway, timeoutMills);
 20:                 if (result != null) {
 21:                     registerBrokerResult = result;
 22:                 }
 23: 
 24:                 log.info("register broker to name server {} OK", namesrvAddr);
 25:             } catch (Exception e) {
 26:                 log.warn("registerBroker Exception, {}", namesrvAddr, e);
 27:             }
 28:         }
 29:     }
 30: 
 31:     return registerBrokerResult;
 32: }複製代碼

2.2 Producer、Consumer 訪問 Namesrv

  • 📌 ProducerConsumerNamesrv列表選擇一個可鏈接的進行通訊。
1: // ⬇️⬇️⬇️【NettyRemotingClient.java】
  2: private Channel getAndCreateNameserverChannel() throws InterruptedException {
  3:     // 返回已選擇、可鏈接Namesrv
  4:     String addr = this.namesrvAddrChoosed.get();
  5:     if (addr != null) {
  6:         ChannelWrapper cw = this.channelTables.get(addr);
  7:         if (cw != null && cw.isOK()) {
  8:             return cw.getChannel();
  9:         }
 10:     }
 11:     //
 12:     final List<String> addrList = this.namesrvAddrList.get();
 13:     if (this.lockNamesrvChannel.tryLock(LOCK_TIMEOUT_MILLIS, TimeUnit.MILLISECONDS)) {
 14:         try {
 15:             // 返回已選擇、可鏈接的Namesrv
 16:             addr = this.namesrvAddrChoosed.get();
 17:             if (addr != null) {
 18:                 ChannelWrapper cw = this.channelTables.get(addr);
 19:                 if (cw != null && cw.isOK()) {
 20:                     return cw.getChannel();
 21:                 }
 22:             }
 23:             // 從【Namesrv列表】中選擇一個鏈接的返回
 24:             if (addrList != null && !addrList.isEmpty()) {
 25:                 for (int i = 0; i < addrList.size(); i++) {
 26:                     int index = this.namesrvIndex.incrementAndGet();
 27:                     index = Math.abs(index);
 28:                     index = index % addrList.size();
 29:                     String newAddr = addrList.get(index);
 30: 
 31:                     this.namesrvAddrChoosed.set(newAddr);
 32:                     Channel channelNew = this.createChannel(newAddr);
 33:                     if (channelNew != null)
 34:                         return channelNew;
 35:                 }
 36:             }
 37:         } catch (Exception e) {
 38:             log.error("getAndCreateNameserverChannel: create name server channel exception", e);
 39:         } finally {
 40:             this.lockNamesrvChannel.unlock();
 41:         }
 42:     } else {
 43:         log.warn("getAndCreateNameserverChannel: try to lock name server, but timeout, {}ms", LOCK_TIMEOUT_MILLIS);
 44:     }
 45: 
 46:     return null;
 47: }複製代碼

3. Broker 高可用

啓動多個 Broker分組 造成 集羣 實現高可用。
Broker分組 = Master節點x1 + Slave節點xN。
相似 MySQLMaster節點 提供讀寫服務,Slave節點 只提供服務。 微信

3.2 Broker 主從

  • 每一個分組,Master節點 不斷髮送新的 CommitLogSlave節點。 Slave節點 不斷上報本地的 CommitLog 已經同步到的位置給 Master節點。
  • Broker分組Broker分組 之間沒有任何關係,不進行通訊與數據同步。
  • 消費進度 目前不支持 Master/Slave 同步。

集羣內,Master節點 有兩種類型:Master_SYNCMaster_ASYNC:前者在 Producer 發送消息時,等待 Slave節點 存儲完畢後再返回發送結果,然後者不須要等待。app

3.1.1 配置

目前官方提供三套配置:socket

  • 2m-2s-async
brokerClusterName brokerName brokerRole brokerId
DefaultCluster broker-a ASYNC_MASTER 0
DefaultCluster broker-a SLAVE 1
DefaultCluster broker-b ASYNC_MASTER 0
DefaultCluster broker-b SLAVE 1
  • 2m-2s-sync
brokerClusterName brokerName brokerRole brokerId
DefaultCluster broker-a SYNC_MASTER 0
DefaultCluster broker-a SLAVE 1
DefaultCluster broker-b SYNC_MASTER 0
DefaultCluster broker-b SLAVE 1
  • 2m-noslave
brokerClusterName brokerName brokerRole brokerId
DefaultCluster broker-a ASYNC_MASTER 0
DefaultCluster broker-b ASYNC_MASTER 0

3.1.2 組件

再看具體實現代碼以前,咱們來看看 Master/Slave節點 包含的組件:
async

HA組件圖.png
HA組件圖.png

  • Master節點
    • AcceptSocketService :接收 Slave節點 鏈接。
    • HAConnection
      • ReadSocketService來自 Slave節點 的數據。
      • WriteSocketService到往 Slave節點 的數據。
  • Slave節點
    • HAClient :對 Master節點 鏈接、讀寫數據。

3.1.3 通訊協議

Master節點 與 Slave節點 通訊協議很簡單,只有以下兩條。ide

對象 用途 第幾位 字段 數據類型 字節數 說明
Slave=>Master 上報CommitLog已經同步到的物理位置
0 maxPhyOffset Long 8 CommitLog最大物理位置
Master=>Slave 傳輸新的 CommitLog 數據
0 fromPhyOffset Long 8 CommitLog開始物理位置
1 size Int 4 傳輸CommitLog數據長度
2 body Bytes size 傳輸CommitLog數據

3.1.4 Slave

HAClient順序圖
HAClient順序圖


  • Slave 主循環,實現了不斷不斷不斷Master 傳輸 CommitLog 數據,上傳 Master 本身本地的 CommitLog 已經同步物理位置。
1: // ⬇️⬇️⬇️【HAClient.java】
  2: public void run() {
  3:     log.info(this.getServiceName() + " service started");
  4: 
  5:     while (!this.isStopped()) {
  6:         try {
  7:             if (this.connectMaster()) {
  8:                 // 若到知足上報間隔,上報到Master進度
  9:                 if (this.isTimeToReportOffset()) {
 10:                     boolean result = this.reportSlaveMaxOffset(this.currentReportedOffset);
 11:                     if (!result) {
 12:                         this.closeMaster();
 13:                     }
 14:                 }
 15: 
 16:                 this.selector.select(1000);
 17: 
 18:                 // 處理讀取事件
 19:                 boolean ok = this.processReadEvent();
 20:                 if (!ok) {
 21:                     this.closeMaster();
 22:                 }
 23: 
 24:                 // 若進度有變化,上報到Master進度
 25:                 if (!reportSlaveMaxOffsetPlus()) {
 26:                     continue;
 27:                 }
 28: 
 29:                 // Master太久未返回數據,關閉鏈接
 30:                 long interval = HAService.this.getDefaultMessageStore().getSystemClock().now() - this.lastWriteTimestamp;
 31:                 if (interval > HAService.this.getDefaultMessageStore().getMessageStoreConfig()
 32:                     .getHaHousekeepingInterval()) {
 33:                     log.warn("HAClient, housekeeping, found this connection[" + this.masterAddress
 34:                         + "] expired, " + interval);
 35:                     this.closeMaster();
 36:                     log.warn("HAClient, master not response some time, so close connection");
 37:                 }
 38:             } else {
 39:                 this.waitForRunning(1000 * 5);
 40:             }
 41:         } catch (Exception e) {
 42:             log.warn(this.getServiceName() + " service has exception. ", e);
 43:             this.waitForRunning(1000 * 5);
 44:         }
 45:     }
 46: 
 47:     log.info(this.getServiceName() + " service end");
 48: }複製代碼
  • 第 8 至 14 行 :固定間隔(默認5s)Master 上報 Slave 本地 CommitLog 已經同步到的物理位置。該操做還有心跳的做用。
  • 第 16 至 22 行 :處理 Master 傳輸 SlaveCommitLog 數據。

  • 咱們來看看 #dispatchReadRequest(...)#reportSlaveMaxOffset(...) 是怎麼實現的。
1: // 【HAClient.java】
  2: /** 3: * 讀取Master傳輸的CommitLog數據,並返回是異常 4: * 若是讀取到數據,寫入CommitLog 5: * 異常緣由: 6: * 1. Master傳輸來的數據offset 不等於 Slave的CommitLog數據最大offset 7: * 2. 上報到Master進度失敗 8: * 9: * @return 是否異常 10: */
 11: private boolean dispatchReadRequest() {
 12:     final int msgHeaderSize = 8 + 4; // phyoffset + size
 13:     int readSocketPos = this.byteBufferRead.position();
 14: 
 15:     while (true) {
 16:         // 讀取到請求
 17:         int diff = this.byteBufferRead.position() - this.dispatchPostion;
 18:         if (diff >= msgHeaderSize) {
 19:             // 讀取masterPhyOffset、bodySize。使用dispatchPostion的緣由是:處理數據「粘包」致使數據讀取不完整。
 20:             long masterPhyOffset = this.byteBufferRead.getLong(this.dispatchPostion);
 21:             int bodySize = this.byteBufferRead.getInt(this.dispatchPostion + 8);
 22:             // 校驗 Master傳輸來的數據offset 是否和 Slave的CommitLog數據最大offset 是否相同。
 23:             long slavePhyOffset = HAService.this.defaultMessageStore.getMaxPhyOffset();
 24:             if (slavePhyOffset != 0) {
 25:                 if (slavePhyOffset != masterPhyOffset) {
 26:                     log.error("master pushed offset not equal the max phy offset in slave, SLAVE: "
 27:                         + slavePhyOffset + " MASTER: " + masterPhyOffset);
 28:                     return false;
 29:                 }
 30:             }
 31:             // 讀取到消息
 32:             if (diff >= (msgHeaderSize + bodySize)) {
 33:                 // 寫入CommitLog
 34:                 byte[] bodyData = new byte[bodySize];
 35:                 this.byteBufferRead.position(this.dispatchPostion + msgHeaderSize);
 36:                 this.byteBufferRead.get(bodyData);
 37:                 HAService.this.defaultMessageStore.appendToCommitLog(masterPhyOffset, bodyData);
 38:                 // 設置處理到的位置
 39:                 this.byteBufferRead.position(readSocketPos);
 40:                 this.dispatchPostion += msgHeaderSize + bodySize;
 41:                 // 上報到Master進度
 42:                 if (!reportSlaveMaxOffsetPlus()) {
 43:                     return false;
 44:                 }
 45:                 // 繼續循環
 46:                 continue;
 47:             }
 48:         }
 49: 
 50:         // 空間寫滿,從新分配空間
 51:         if (!this.byteBufferRead.hasRemaining()) {
 52:             this.reallocateByteBuffer();
 53:         }
 54: 
 55:         break;
 56:     }
 57: 
 58:     return true;
 59: }
 60: 
 61: /** 62: * 上報進度 63: * 64: * @param maxOffset 進度 65: * @return 是否上報成功 66: */
 67: private boolean reportSlaveMaxOffset(final long maxOffset) {
 68:     this.reportOffset.position(0);
 69:     this.reportOffset.limit(8);
 70:     this.reportOffset.putLong(maxOffset);
 71:     this.reportOffset.position(0);
 72:     this.reportOffset.limit(8);
 73: 
 74:     for (int i = 0; i < 3 && this.reportOffset.hasRemaining(); i++) {
 75:         try {
 76:             this.socketChannel.write(this.reportOffset);
 77:         } catch (IOException e) {
 78:             log.error(this.getServiceName()
 79:                 + "reportSlaveMaxOffset this.socketChannel.write exception", e);
 80:             return false;
 81:         }
 82:     }
 83: 
 84:     return !this.reportOffset.hasRemaining();
 85: }複製代碼

3.1.5 Master

  • ReadSocketService 邏輯同 HAClient#processReadEvent(...) 基本相同,咱們直接看代碼。
1: // ⬇️⬇️⬇️【ReadSocketService.java】
  2: private boolean processReadEvent() {
  3:     int readSizeZeroTimes = 0;
  4: 
  5:     // 清空byteBufferRead
  6:     if (!this.byteBufferRead.hasRemaining()) {
  7:         this.byteBufferRead.flip();
  8:         this.processPostion = 0;
  9:     }
 10: 
 11:     while (this.byteBufferRead.hasRemaining()) {
 12:         try {
 13:             int readSize = this.socketChannel.read(this.byteBufferRead);
 14:             if (readSize > 0) {
 15:                 readSizeZeroTimes = 0;
 16: 
 17:                 // 設置最後讀取時間
 18:                 this.lastReadTimestamp = HAConnection.this.haService.getDefaultMessageStore().getSystemClock().now();
 19: 
 20:                 if ((this.byteBufferRead.position() - this.processPostion) >= 8) {
 21:                     // 讀取Slave 請求來的CommitLog的最大位置
 22:                     int pos = this.byteBufferRead.position() - (this.byteBufferRead.position() % 8);
 23:                     long readOffset = this.byteBufferRead.getLong(pos - 8);
 24:                     this.processPostion = pos;
 25: 
 26:                     // 設置Slave CommitLog的最大位置
 27:                     HAConnection.this.slaveAckOffset = readOffset;
 28: 
 29:                     // 設置Slave 第一次請求的位置
 30:                     if (HAConnection.this.slaveRequestOffset < 0) {
 31:                         HAConnection.this.slaveRequestOffset = readOffset;
 32:                         log.info("slave[" + HAConnection.this.clientAddr + "] request offset " + readOffset);
 33:                     }
 34: 
 35:                     // 通知目前Slave進度。主要用於Master節點爲同步類型的。
 36:                     HAConnection.this.haService.notifyTransferSome(HAConnection.this.slaveAckOffset);
 37:                 }
 38:             } else if (readSize == 0) {
 39:                 if (++readSizeZeroTimes >= 3) {
 40:                     break;
 41:                 }
 42:             } else {
 43:                 log.error("read socket[" + HAConnection.this.clientAddr + "] < 0");
 44:                 return false;
 45:             }
 46:         } catch (IOException e) {
 47:             log.error("processReadEvent exception", e);
 48:             return false;
 49:         }
 50:     }
 51: 
 52:     return true;
 53: }複製代碼

  • WriteSocketService 計算 Slave開始同步的位置後,不斷向 Slave 傳輸新的 CommitLog數據。

HA.WriteSocketService流程圖
HA.WriteSocketService流程圖

1: // ⬇️⬇️⬇️【WriteSocketService.java】
  2: @Override
  3: public void run() {
  4:     HAConnection.log.info(this.getServiceName() + " service started");
  5: 
  6:     while (!this.isStopped()) {
  7:         try {
  8:             this.selector.select(1000);
  9: 
 10:             // 未得到Slave讀取進度請求,sleep等待。
 11:             if (-1 == HAConnection.this.slaveRequestOffset) {
 12:                 Thread.sleep(10);
 13:                 continue;
 14:             }
 15: 
 16:             // 計算初始化nextTransferFromWhere
 17:             if (-1 == this.nextTransferFromWhere) {
 18:                 if (0 == HAConnection.this.slaveRequestOffset) {
 19:                     long masterOffset = HAConnection.this.haService.getDefaultMessageStore().getCommitLog().getMaxOffset();
 20:                     masterOffset = masterOffset - (masterOffset % HAConnection.this.haService.getDefaultMessageStore().getMessageStoreConfig().getMapedFileSizeCommitLog());
 21:                     if (masterOffset < 0) {
 22:                         masterOffset = 0;
 23:                     }
 24: 
 25:                     this.nextTransferFromWhere = masterOffset;
 26:                 } else {
 27:                     this.nextTransferFromWhere = HAConnection.this.slaveRequestOffset;
 28:                 }
 29: 
 30:                 log.info("master transfer data from " + this.nextTransferFromWhere + " to slave[" + HAConnection.this.clientAddr
 31:                     + "], and slave request " + HAConnection.this.slaveRequestOffset);
 32:             }
 33: 
 34:             if (this.lastWriteOver) {
 35:                 long interval = HAConnection.this.haService.getDefaultMessageStore().getSystemClock().now() - this.lastWriteTimestamp;
 36:                 if (interval > HAConnection.this.haService.getDefaultMessageStore().getMessageStoreConfig().getHaSendHeartbeatInterval()) { // 心跳
 37: 
 38:                     // Build Header
 39:                     this.byteBufferHeader.position(0);
 40:                     this.byteBufferHeader.limit(headerSize);
 41:                     this.byteBufferHeader.putLong(this.nextTransferFromWhere);
 42:                     this.byteBufferHeader.putInt(0);
 43:                     this.byteBufferHeader.flip();
 44: 
 45:                     this.lastWriteOver = this.transferData();
 46:                     if (!this.lastWriteOver)
 47:                         continue;
 48:                 }
 49:             } else { // 未傳輸完成,繼續傳輸
 50:                 this.lastWriteOver = this.transferData();
 51:                 if (!this.lastWriteOver)
 52:                     continue;
 53:             }
 54: 
 55:             // 選擇新的CommitLog數據進行傳輸
 56:             SelectMappedBufferResult selectResult =
 57:                 HAConnection.this.haService.getDefaultMessageStore().getCommitLogData(this.nextTransferFromWhere);
 58:             if (selectResult != null) {
 59:                 int size = selectResult.getSize();
 60:                 if (size > HAConnection.this.haService.getDefaultMessageStore().getMessageStoreConfig().getHaTransferBatchSize()) {
 61:                     size = HAConnection.this.haService.getDefaultMessageStore().getMessageStoreConfig().getHaTransferBatchSize();
 62:                 }
 63: 
 64:                 long thisOffset = this.nextTransferFromWhere;
 65:                 this.nextTransferFromWhere += size;
 66: 
 67:                 selectResult.getByteBuffer().limit(size);
 68:                 this.selectMappedBufferResult = selectResult;
 69: 
 70:                 // Build Header
 71:                 this.byteBufferHeader.position(0);
 72:                 this.byteBufferHeader.limit(headerSize);
 73:                 this.byteBufferHeader.putLong(thisOffset);
 74:                 this.byteBufferHeader.putInt(size);
 75:                 this.byteBufferHeader.flip();
 76: 
 77:                 this.lastWriteOver = this.transferData();
 78:             } else { // 沒新的消息,掛起等待
 79:                 HAConnection.this.haService.getWaitNotifyObject().allWaitForRunning(100);
 80:             }
 81:         } catch (Exception e) {
 82: 
 83:             HAConnection.log.error(this.getServiceName() + " service has exception.", e);
 84:             break;
 85:         }
 86:     }
 87: 
 88:     // 斷開鏈接 & 暫停寫線程 & 暫停讀線程 & 釋放CommitLog
 89:     if (this.selectMappedBufferResult != null) {
 90:         this.selectMappedBufferResult.release();
 91:     }
 92: 
 93:     this.makeStop();
 94: 
 95:     readSocketService.makeStop();
 96: 
 97:     haService.removeConnection(HAConnection.this);
 98: 
 99:     SelectionKey sk = this.socketChannel.keyFor(this.selector);
100:     if (sk != null) {
101:         sk.cancel();
102:     }
103: 
104:     try {
105:         this.selector.close();
106:         this.socketChannel.close();
107:     } catch (IOException e) {
108:         HAConnection.log.error("", e);
109:     }
110: 
111:     HAConnection.log.info(this.getServiceName() + " service end");
112: }
113: 
114: /** 115: * 傳輸數據 116: */
117: private boolean transferData() throws Exception {
118:     int writeSizeZeroTimes = 0;
119:     // Write Header
120:     while (this.byteBufferHeader.hasRemaining()) {
121:         int writeSize = this.socketChannel.write(this.byteBufferHeader);
122:         if (writeSize > 0) {
123:             writeSizeZeroTimes = 0;
124:             this.lastWriteTimestamp = HAConnection.this.haService.getDefaultMessageStore().getSystemClock().now();
125:         } else if (writeSize == 0) {
126:             if (++writeSizeZeroTimes >= 3) {
127:                 break;
128:             }
129:         } else {
130:             throw new Exception("ha master write header error < 0");
131:         }
132:     }
133: 
134:     if (null == this.selectMappedBufferResult) {
135:         return !this.byteBufferHeader.hasRemaining();
136:     }
137: 
138:     writeSizeZeroTimes = 0;
139: 
140:     // Write Body
141:     if (!this.byteBufferHeader.hasRemaining()) {
142:         while (this.selectMappedBufferResult.getByteBuffer().hasRemaining()) {
143:             int writeSize = this.socketChannel.write(this.selectMappedBufferResult.getByteBuffer());
144:             if (writeSize > 0) {
145:                 writeSizeZeroTimes = 0;
146:                 this.lastWriteTimestamp = HAConnection.this.haService.getDefaultMessageStore().getSystemClock().now();
147:             } else if (writeSize == 0) {
148:                 if (++writeSizeZeroTimes >= 3) {
149:                     break;
150:                 }
151:             } else {
152:                 throw new Exception("ha master write body error < 0");
153:             }
154:         }
155:     }
156: 
157:     boolean result = !this.byteBufferHeader.hasRemaining() && !this.selectMappedBufferResult.getByteBuffer().hasRemaining();
158: 
159:     if (!this.selectMappedBufferResult.getByteBuffer().hasRemaining()) {
160:         this.selectMappedBufferResult.release();
161:         this.selectMappedBufferResult = null;
162:     }
163: 
164:     return result;
165: }複製代碼

3.1.6 Master_SYNC

  • Producer 發送消息時,Master_SYNC節點 會等待 Slave節點 存儲完畢後再返回發送結果。

核心代碼以下:源碼分析

1: // ⬇️⬇️⬇️【CommitLog.java】
  2: public PutMessageResult putMessage(final MessageExtBrokerInner msg) {
  3:     // ....省略處理髮送代碼 
  4:     // Synchronous write double 若是是同步Master,同步到從節點
  5:     if (BrokerRole.SYNC_MASTER == this.defaultMessageStore.getMessageStoreConfig().getBrokerRole()) {
  6:         HAService service = this.defaultMessageStore.getHaService();
  7:         if (msg.isWaitStoreMsgOK()) {
  8:             // Determine whether to wait
  9:             if (service.isSlaveOK(result.getWroteOffset() + result.getWroteBytes())) {
 10:                 if (null == request) {
 11:                     request = new GroupCommitRequest(result.getWroteOffset() + result.getWroteBytes());
 12:                 }
 13:                 service.putRequest(request);
 14: 
 15:                 // 喚醒WriteSocketService
 16:                 service.getWaitNotifyObject().wakeupAll();
 17: 
 18:                 boolean flushOK = request.waitForFlush(this.defaultMessageStore.getMessageStoreConfig().getSyncFlushTimeout());
 19:                 if (!flushOK) {
 20:                     log.error("do sync transfer other node, wait return, but failed, topic: " + msg.getTopic() + " tags: "
 21:                         + msg.getTags() + " client address: " + msg.getBornHostString());
 22:                     putMessageResult.setPutMessageStatus(PutMessageStatus.FLUSH_SLAVE_TIMEOUT);
 23:                 }
 24:             }
 25:             // Slave problem
 26:             else {
 27:                 // Tell the producer, slave not available
 28:                 putMessageResult.setPutMessageStatus(PutMessageStatus.SLAVE_NOT_AVAILABLE);
 29:             }
 30:         }
 31:     }
 32: 
 33:     return putMessageResult;
 34: }複製代碼
  • 第 16 行 :喚醒 WriteSocketService
    • 喚醒後,WriteSocketService 掛起等待新消息結束,Master 傳輸 Slave 新的 CommitLog 數據。
    • Slave 收到數據後,當即上報最新的 CommitLog 同步進度到 MasterReadSocketService 喚醒第 18 行request#waitForFlush(...)

咱們來看下 GroupTransferService 的核心邏輯代碼:ui

1: // ⬇️⬇️⬇️【GroupTransferService.java】
  2: private void doWaitTransfer() {
  3:     synchronized (this.requestsRead) {
  4:         if (!this.requestsRead.isEmpty()) {
  5:             for (CommitLog.GroupCommitRequest req : this.requestsRead) {
  6:                 // 等待Slave上傳進度
  7:                 boolean transferOK = HAService.this.push2SlaveMaxOffset.get() >= req.getNextOffset();
  8:                 for (int i = 0; !transferOK && i < 5; i++) {
  9:                     this.notifyTransferObject.waitForRunning(1000); // 喚醒
 10:                     transferOK = HAService.this.push2SlaveMaxOffset.get() >= req.getNextOffset();
 11:                 }
 12: 
 13:                 if (!transferOK) {
 14:                     log.warn("transfer messsage to slave timeout, " + req.getNextOffset());
 15:                 }
 16: 
 17:                 // 喚醒請求,並設置是否Slave同步成功
 18:                 req.wakeupCustomer(transferOK);
 19:             }
 20: 
 21:             this.requestsRead.clear();
 22:         }
 23:     }
 24: }複製代碼

3.2 Producer 發送消息

  • Producer 發送消息時,會對 Broker集羣 的全部隊列進行選擇。

核心代碼以下:

1: // ⬇️⬇️⬇️【DefaultMQProducerImpl.java】
  2: private SendResult sendDefaultImpl(// 3: Message msg, // 4: final CommunicationMode communicationMode, // 5: final SendCallback sendCallback, // 6: final long timeout// 7: ) throws MQClientException, RemotingException, MQBrokerException, InterruptedException {
  8:     // .... 省略:處理【校驗邏輯】
  9:     // 獲取 Topic路由信息
 10:     TopicPublishInfo topicPublishInfo = this.tryToFindTopicPublishInfo(msg.getTopic());
 11:     if (topicPublishInfo != null && topicPublishInfo.ok()) {
 12:         MessageQueue mq = null; // 最後選擇消息要發送到的隊列
 13:         Exception exception = null;
 14:         SendResult sendResult = null; // 最後一次發送結果
 15:         int timesTotal = communicationMode == CommunicationMode.SYNC ? 1 + this.defaultMQProducer.getRetryTimesWhenSendFailed() : 1; // 同步屢次調用
 16:         int times = 0; // 第幾回發送
 17:         String[] brokersSent = new String[timesTotal]; // 存儲每次發送消息選擇的broker名
 18:         // 循環調用發送消息,直到成功
 19:         for (; times < timesTotal; times++) {
 20:             String lastBrokerName = null == mq ? null : mq.getBrokerName();
 21:             MessageQueue tmpmq = this.selectOneMessageQueue(topicPublishInfo, lastBrokerName); // 選擇消息要發送到的隊列
 22:             if (tmpmq != null) {
 23:                 mq = tmpmq;
 24:                 brokersSent[times] = mq.getBrokerName();
 25:                 try {
 26:                     beginTimestampPrev = System.currentTimeMillis();
 27:                     // 調用發送消息核心方法
 28:                     sendResult = this.sendKernelImpl(msg, mq, communicationMode, sendCallback, topicPublishInfo, timeout);
 29:                     endTimestamp = System.currentTimeMillis();
 30:                     // 更新Broker可用性信息
 31:                     this.updateFaultItem(mq.getBrokerName(), endTimestamp - beginTimestampPrev, false);
 32:                     // .... 省略:處理【發送返回結果】
 33:                     }
 34:                 } catch (e) { // .... 省略:處理【異常】
 35:                     
 36:                 }
 37:             } else {
 38:                 break;
 39:             }
 40:         }
 41:         // .... 省略:處理【發送返回結果】
 42:     }
 43:     // .... 省略:處理【找不到消息路由】
 44: }複製代碼

以下是調試 #sendDefaultImpl(...)TopicPublishInfo 的結果,Producer 得到到了 broker-a,broker-b 兩個 Broker分組 的消息隊列:

Producer.TopicPublishInfo.調試.png
Producer.TopicPublishInfo.調試.png

3.3 Consumer 消費消息

  • Consumer 消費消息時,會對 Broker集羣 的全部隊列進行選擇。

4. 總結

HA總結.jpeg
HA總結.jpeg
相關文章
相關標籤/搜索