本文主要研究一下rocketmq的HAClientjava
rocketmq-all-4.6.0-source-release/store/src/main/java/org/apache/rocketmq/store/ha/HAService.javagit
class HAClient extends ServiceThread { private static final int READ_MAX_BUFFER_SIZE = 1024 * 1024 * 4; private final AtomicReference<String> masterAddress = new AtomicReference<>(); private final ByteBuffer reportOffset = ByteBuffer.allocate(8); private SocketChannel socketChannel; private Selector selector; private long lastWriteTimestamp = System.currentTimeMillis(); private long currentReportedOffset = 0; private int dispatchPosition = 0; private ByteBuffer byteBufferRead = ByteBuffer.allocate(READ_MAX_BUFFER_SIZE); private ByteBuffer byteBufferBackup = ByteBuffer.allocate(READ_MAX_BUFFER_SIZE); public HAClient() throws IOException { this.selector = RemotingUtil.openSelector(); } //...... @Override public void run() { log.info(this.getServiceName() + " service started"); while (!this.isStopped()) { try { if (this.connectMaster()) { if (this.isTimeToReportOffset()) { boolean result = this.reportSlaveMaxOffset(this.currentReportedOffset); if (!result) { this.closeMaster(); } } this.selector.select(1000); boolean ok = this.processReadEvent(); if (!ok) { this.closeMaster(); } if (!reportSlaveMaxOffsetPlus()) { continue; } long interval = HAService.this.getDefaultMessageStore().getSystemClock().now() - this.lastWriteTimestamp; if (interval > HAService.this.getDefaultMessageStore().getMessageStoreConfig() .getHaHousekeepingInterval()) { log.warn("HAClient, housekeeping, found this connection[" + this.masterAddress + "] expired, " + interval); this.closeMaster(); log.warn("HAClient, master not response some time, so close connection"); } } else { this.waitForRunning(1000 * 5); } } catch (Exception e) { log.warn(this.getServiceName() + " service has exception. ", e); this.waitForRunning(1000 * 5); } } log.info(this.getServiceName() + " service end"); } @Override public String getServiceName() { return HAClient.class.getSimpleName(); } //...... private boolean processReadEvent() { int readSizeZeroTimes = 0; while (this.byteBufferRead.hasRemaining()) { try { int readSize = this.socketChannel.read(this.byteBufferRead); if (readSize > 0) { readSizeZeroTimes = 0; boolean result = this.dispatchReadRequest(); if (!result) { log.error("HAClient, dispatchReadRequest error"); return false; } } else if (readSize == 0) { if (++readSizeZeroTimes >= 3) { break; } } else { log.info("HAClient, processReadEvent read socket < 0"); return false; } } catch (IOException e) { log.info("HAClient, processReadEvent read socket exception", e); return false; } } return true; } //...... }
waitForRunning(1000 * 5)
;連上了master以後再判斷isTimeToReportOffset,即判斷當前時間與lastWriteTimestamp的差值,若該值大於defaultMessageStore.getMessageStoreConfig().getHaSendHeartbeatInterval(),則返回true;最後執行processReadEvent;processReadEvent在byteBufferRead.hasRemaining()前提下會執行dispatchReadRequestrocketmq-all-4.6.0-source-release/store/src/main/java/org/apache/rocketmq/store/ha/HAService.javagithub
class HAClient extends ServiceThread { //...... private boolean dispatchReadRequest() { final int msgHeaderSize = 8 + 4; // phyoffset + size int readSocketPos = this.byteBufferRead.position(); while (true) { int diff = this.byteBufferRead.position() - this.dispatchPosition; if (diff >= msgHeaderSize) { long masterPhyOffset = this.byteBufferRead.getLong(this.dispatchPosition); int bodySize = this.byteBufferRead.getInt(this.dispatchPosition + 8); long slavePhyOffset = HAService.this.defaultMessageStore.getMaxPhyOffset(); if (slavePhyOffset != 0) { if (slavePhyOffset != masterPhyOffset) { log.error("master pushed offset not equal the max phy offset in slave, SLAVE: " + slavePhyOffset + " MASTER: " + masterPhyOffset); return false; } } if (diff >= (msgHeaderSize + bodySize)) { byte[] bodyData = new byte[bodySize]; this.byteBufferRead.position(this.dispatchPosition + msgHeaderSize); this.byteBufferRead.get(bodyData); HAService.this.defaultMessageStore.appendToCommitLog(masterPhyOffset, bodyData); this.byteBufferRead.position(readSocketPos); this.dispatchPosition += msgHeaderSize + bodySize; if (!reportSlaveMaxOffsetPlus()) { return false; } continue; } } if (!this.byteBufferRead.hasRemaining()) { this.reallocateByteBuffer(); } break; } return true; } //...... }
diff >= (msgHeaderSize + bodySize)
,若成立則執行defaultMessageStore.appendToCommitLog(masterPhyOffset, bodyData)waitForRunning(1000 * 5)
;連上了master以後再判斷isTimeToReportOffset,即判斷當前時間與lastWriteTimestamp的差值,若該值大於defaultMessageStore.getMessageStoreConfig().getHaSendHeartbeatInterval(),則返回true;最後執行processReadEvent;processReadEvent在byteBufferRead.hasRemaining()前提下會執行dispatchReadRequest