說在前面java
源碼解析apache
進入方法,構建索引,org.apache.rocketmq.store.index.IndexService#buildIndex微信
public void buildIndex(DispatchRequest req) {// 試着獲取並建立索引文件IndexFile indexFile = retryGetAndCreateIndexFile();if (indexFile != null) {long endPhyOffset = indexFile.getEndPhyOffset();DispatchRequest msg = req;String topic = msg.getTopic();String keys = msg.getKeys();if (msg.getCommitLogOffset() < endPhyOffset) {return;}// 解析事務消息類型=》final int tranType = MessageSysFlag.getTransactionValue(msg.getSysFlag());switch (tranType) {case MessageSysFlag.TRANSACTION_NOT_TYPE:case MessageSysFlag.TRANSACTION_PREPARED_TYPE:case MessageSysFlag.TRANSACTION_COMMIT_TYPE:break;case MessageSysFlag.TRANSACTION_ROLLBACK_TYPE:return;}if (req.getUniqKey() != null) {indexFile = putKey(indexFile, msg, buildKey(topic, req.getUniqKey()));if (indexFile == null) {log.error("putKey error commitlog {} uniqkey {}", req.getCommitLogOffset(), req.getUniqKey());return;}}if (keys != null && keys.length() > 0) {String[] keyset = keys.split(MessageConst.KEY_SEPARATOR);for (int i = 0; i < keyset.length; i++) {String key = keyset[i];if (key.length() > 0) {indexFile = putKey(indexFile, msg, buildKey(topic, key));if (indexFile == null) {log.error("putKey error commitlog {} uniqkey {}", req.getCommitLogOffset(), req.getUniqKey());return;}}}}} else {log.error("build index error, stop building index");} }
返回方法,消息到達通知監聽器,org.apache.rocketmq.broker.longpolling.NotifyMessageArrivingListener#arrivingapp
@Overridepublic void arriving(String topic, int queueId, long logicOffset, long tagsCode,long msgStoreTime, byte[] filterBitMap, Map<String, String> properties) {// =》this.pullRequestHoldService.notifyMessageArriving(topic, queueId, logicOffset, tagsCode,msgStoreTime, filterBitMap, properties); }
進入方法,org.apache.rocketmq.broker.longpolling.PullRequestHoldService#notifyMessageArriving(java.lang.String, int, long, java.lang.Long, long, byte[], java.util.Map<java.lang.String,java.lang.String>)異步
public void notifyMessageArriving(final String topic, final int queueId, final long maxOffset, final Long tagsCode,long msgStoreTime, byte[] filterBitMap, Map<String, String> properties) {String key = this.buildKey(topic, queueId);// 獲取pullRequestManyPullRequest mpr = this.pullRequestTable.get(key);if (mpr != null) {List<PullRequest> requestList = mpr.cloneListAndClear();if (requestList != null) {List<PullRequest> replayList = new ArrayList<PullRequest>();for (PullRequest request : requestList) {long newestOffset = maxOffset;if (newestOffset <= request.getPullFromThisOffset()) {// 按topic, queueId從消息隊列中獲取最大offset=》newestOffset = this.brokerController.getMessageStore().getMaxOffsetInQueue(topic, queueId);}if (newestOffset > request.getPullFromThisOffset()) {boolean match = request.getMessageFilter().isMatchedByConsumeQueue(tagsCode,new ConsumeQueueExt.CqExtUnit(tagsCode, msgStoreTime, filterBitMap));// match by bit map, need eval again when properties is not null.if (match && properties != null) {match = request.getMessageFilter().isMatchedByCommitLog(null, properties);}if (match) {try {// 執行pull request請求=》this.brokerController.getPullMessageProcessor().executeRequestWhenWakeup(request.getClientChannel(),request.getRequestCommand());} catch (Throwable e) {log.error("execute request when wakeup failed.", e);}continue;}}if (System.currentTimeMillis() >= (request.getSuspendTimestamp() + request.getTimeoutMillis())) {try {// =》this.brokerController.getPullMessageProcessor().executeRequestWhenWakeup(request.getClientChannel(),request.getRequestCommand());} catch (Throwable e) {log.error("execute request when wakeup failed.", e);}continue;}replayList.add(request);}if (!replayList.isEmpty()) {mpr.addPullRequest(replayList);}}} }
進入方法,按topic, queueId從消息隊列中獲取最大offset,org.apache.rocketmq.store.DefaultMessageStore#getMaxOffsetInQueue介紹過了。socket
返回方法,執行pull request請求,org.apache.rocketmq.broker.processor.PullMessageProcessor#executeRequestWhenWakeup分佈式
public void executeRequestWhenWakeup(final Channel channel,final RemotingCommand request) throws RemotingCommandException {Runnable run = new Runnable() {@Overridepublic void run() {try {// =》final RemotingCommand response = PullMessageProcessor.this.processRequest(channel, request, false);if (response != null) {response.setOpaque(request.getOpaque());response.markResponseType();try {channel.writeAndFlush(response).addListener(new ChannelFutureListener() {@Overridepublic void operationComplete(ChannelFuture future) throws Exception {if (!future.isSuccess()) {log.error("processRequestWrapper response to {} failed",future.channel().remoteAddress(), future.cause());log.error(request.toString());log.error(response.toString());}}});} catch (Throwable e) {log.error("processRequestWrapper process request over, but response failed", e);log.error(request.toString());log.error(response.toString());}}} catch (RemotingCommandException e1) {log.error("excuteRequestWhenWakeup run", e1);}}};// 異步執行this.brokerController.getPullMessageExecutor().submit(new RequestTask(run, channel, request)); }
進入方法,org.apache.rocketmq.broker.processor.PullMessageProcessor#processRequest(io.netty.channel.Channel, org.apache.rocketmq.remoting.protocol.RemotingCommand, boolean)介紹過了。ide
返回方法,ha服務啓動,org.apache.rocketmq.store.ha.HAService#startui
public void start() throws Exception {// 創建nio鏈接,爲何不用netty呢this.acceptSocketService.beginAccept();// 服務啓動=》this.acceptSocketService.start();this.groupTransferService.start();// ha client啓動=》this.haClient.start(); }
進入方法,創建nio鏈接,爲何不用netty呢,this
org.apache.rocketmq.store.ha.HAService.AcceptSocketService#beginAccept
public void beginAccept() throws Exception {this.serverSocketChannel = ServerSocketChannel.open();this.selector = RemotingUtil.openSelector();this.serverSocketChannel.socket().setReuseAddress(true);this.serverSocketChannel.socket().bind(this.socketAddressListen);this.serverSocketChannel.configureBlocking(false);this.serverSocketChannel.register(this.selector, SelectionKey.OP_ACCEPT); }
返回方法,服務啓動,org.apache.rocketmq.store.ha.HAService.AcceptSocketService#run
@Overridepublic void run() {log.info(this.getServiceName() + " service started");while (!this.isStopped()) {try {this.selector.select(1000);Set<SelectionKey> selected = this.selector.selectedKeys();if (selected != null) {for (SelectionKey k : selected) {if ((k.readyOps() & SelectionKey.OP_ACCEPT) != 0) {SocketChannel sc = ((ServerSocketChannel) k.channel()).accept();if (sc != null) {HAService.log.info("HAService receive new connection, "+ sc.socket().getRemoteSocketAddress());try {HAConnection conn = new HAConnection(HAService.this, sc);// ha connection啓動conn.start();HAService.this.addConnection(conn);} catch (Exception e) {log.error("new HAConnection exception", e);sc.close();}}} else {log.warn("Unexpected ops in select " + k.readyOps());}}selected.clear();}} catch (Exception e) {log.error(this.getServiceName() + " service has exception.", e);}}log.info(this.getServiceName() + " service end"); }
進入方法,ha connection啓動,org.apache.rocketmq.store.ha.HAConnection#start
public void start() {this.readSocketService.start();this.writeSocketService.start(); }
進入方法,org.apache.rocketmq.store.ha.HAConnection.ReadSocketService#run
@Overridepublic void run() {HAConnection.log.info(this.getServiceName() + " service started");while (!this.isStopped()) {try {this.selector.select(1000);// 處理讀事件=》boolean ok = this.processReadEvent();if (!ok) {HAConnection.log.error("processReadEvent error");break;}long interval = HAConnection.this.haService.getDefaultMessageStore().getSystemClock().now() - this.lastReadTimestamp;if (interval > HAConnection.this.haService.getDefaultMessageStore().getMessageStoreConfig().getHaHousekeepingInterval()) {log.warn("ha housekeeping, found this connection[" + HAConnection.this.clientAddr + "] expired, " + interval);break;}} catch (Exception e) {HAConnection.log.error(this.getServiceName() + " service has exception.", e);break;}}this.makeStop();writeSocketService.makeStop();haService.removeConnection(HAConnection.this);HAConnection.this.haService.getConnectionCount().decrementAndGet();SelectionKey sk = this.socketChannel.keyFor(this.selector);if (sk != null) {sk.cancel();}try {this.selector.close();this.socketChannel.close();} catch (IOException e) {HAConnection.log.error("", e);}HAConnection.log.info(this.getServiceName() + " service end"); }
進入方法,處理讀事件,org.apache.rocketmq.store.ha.HAConnection.ReadSocketService#processReadEvent
private boolean processReadEvent() {int readSizeZeroTimes = 0;if (!this.byteBufferRead.hasRemaining()) {this.byteBufferRead.flip();this.processPostion = 0;}while (this.byteBufferRead.hasRemaining()) {try {int readSize = this.socketChannel.read(this.byteBufferRead);if (readSize > 0) {readSizeZeroTimes = 0;this.lastReadTimestamp = HAConnection.this.haService.getDefaultMessageStore().getSystemClock().now();if ((this.byteBufferRead.position() - this.processPostion) >= 8) {int pos = this.byteBufferRead.position() - (this.byteBufferRead.position() % 8);long readOffset = this.byteBufferRead.getLong(pos - 8);this.processPostion = pos;HAConnection.this.slaveAckOffset = readOffset;if (HAConnection.this.slaveRequestOffset < 0) {HAConnection.this.slaveRequestOffset = readOffset;log.info("slave[" + HAConnection.this.clientAddr + "] request offset " + readOffset);}HAConnection.this.haService.notifyTransferSome(HAConnection.this.slaveAckOffset);}} else if (readSize == 0) {if (++readSizeZeroTimes >= 3) {break;}} else {log.error("read socket[" + HAConnection.this.clientAddr + "] < 0");return false;}} catch (IOException e) {log.error("processReadEvent exception", e);return false;}}return true; }
進入方法,org.apache.rocketmq.store.ha.HAConnection.ReadSocketService#run
@Overridepublic void run() {HAConnection.log.info(this.getServiceName() + " service started");while (!this.isStopped()) {try {this.selector.select(1000);// 處理讀事件=》boolean ok = this.processReadEvent();if (!ok) {HAConnection.log.error("processReadEvent error");break;}long interval = HAConnection.this.haService.getDefaultMessageStore().getSystemClock().now() - this.lastReadTimestamp;if (interval > HAConnection.this.haService.getDefaultMessageStore().getMessageStoreConfig().getHaHousekeepingInterval()) {log.warn("ha housekeeping, found this connection[" + HAConnection.this.clientAddr + "] expired, " + interval);break;}} catch (Exception e) {HAConnection.log.error(this.getServiceName() + " service has exception.", e);break;}}this.makeStop();writeSocketService.makeStop();haService.removeConnection(HAConnection.this);HAConnection.this.haService.getConnectionCount().decrementAndGet();SelectionKey sk = this.socketChannel.keyFor(this.selector);if (sk != null) {sk.cancel();}try {this.selector.close();this.socketChannel.close();} catch (IOException e) {HAConnection.log.error("", e);}HAConnection.log.info(this.getServiceName() + " service end"); }
進入方法,處理讀事件,org.apache.rocketmq.store.ha.HAConnection.ReadSocketService#processReadEvent
private boolean processReadEvent() {int readSizeZeroTimes = 0;if (!this.byteBufferRead.hasRemaining()) {this.byteBufferRead.flip();this.processPostion = 0;}while (this.byteBufferRead.hasRemaining()) {try {int readSize = this.socketChannel.read(this.byteBufferRead);if (readSize > 0) {readSizeZeroTimes = 0;this.lastReadTimestamp = HAConnection.this.haService.getDefaultMessageStore().getSystemClock().now();if ((this.byteBufferRead.position() - this.processPostion) >= 8) {int pos = this.byteBufferRead.position() - (this.byteBufferRead.position() % 8);long readOffset = this.byteBufferRead.getLong(pos - 8);this.processPostion = pos;HAConnection.this.slaveAckOffset = readOffset;if (HAConnection.this.slaveRequestOffset < 0) {HAConnection.this.slaveRequestOffset = readOffset;log.info("slave[" + HAConnection.this.clientAddr + "] request offset " + readOffset);}HAConnection.this.haService.notifyTransferSome(HAConnection.this.slaveAckOffset);}} else if (readSize == 0) {if (++readSizeZeroTimes >= 3) {break;}} else {log.error("read socket[" + HAConnection.this.clientAddr + "] < 0");return false;}} catch (IOException e) {log.error("processReadEvent exception", e);return false;}}return true; }
進入方法,org.apache.rocketmq.store.ha.HAService.GroupTransferService#run
public void run() {log.info(this.getServiceName() + " service started");while (!this.isStopped()) {try {this.waitForRunning(10);this.doWaitTransfer();} catch (Exception e) {log.warn(this.getServiceName() + " service has exception. ", e);}}log.info(this.getServiceName() + " service end"); }
進入方法,org.apache.rocketmq.store.ha.HAService.GroupTransferService#doWaitTransfer
private void doWaitTransfer() {synchronized (this.requestsRead) {if (!this.requestsRead.isEmpty()) {for (CommitLog.GroupCommitRequest req : this.requestsRead) {boolean transferOK = HAService.this.push2SlaveMaxOffset.get() >= req.getNextOffset();for (int i = 0; !transferOK && i < 5; i++) {this.notifyTransferObject.waitForRunning(1000);transferOK = HAService.this.push2SlaveMaxOffset.get() >= req.getNextOffset();}if (!transferOK) {log.warn("transfer messsage to slave timeout, " + req.getNextOffset());}req.wakeupCustomer(transferOK);}this.requestsRead.clear();}} }
返回方法,ha client啓動,org.apache.rocketmq.store.ha.HAService.HAClient#run
@Overridepublic void run() {log.info(this.getServiceName() + " service started");while (!this.isStopped()) {try {// 鏈接master=》if (this.connectMaster()) {if (this.isTimeToReportOffset()) {boolean result = this.reportSlaveMaxOffset(this.currentReportedOffset);if (!result) {// 關閉master=》this.closeMaster();}}this.selector.select(1000);// 處理讀事件=》boolean ok = this.processReadEvent();if (!ok) {this.closeMaster();}if (!reportSlaveMaxOffsetPlus()) {continue;}long interval =HAService.this.getDefaultMessageStore().getSystemClock().now()- this.lastWriteTimestamp;if (interval > HAService.this.getDefaultMessageStore().getMessageStoreConfig().getHaHousekeepingInterval()) {log.warn("HAClient, housekeeping, found this connection[" + this.masterAddress+ "] expired, " + interval);this.closeMaster();log.warn("HAClient, master not response some time, so close connection");}} else {this.waitForRunning(1000 * 5);}} catch (Exception e) {log.warn(this.getServiceName() + " service has exception. ", e);this.waitForRunning(1000 * 5);}}log.info(this.getServiceName() + " service end"); }
進入方法,鏈接master,org.apache.rocketmq.store.ha.HAService.HAClient#connectMaster
private boolean connectMaster() throws ClosedChannelException {if (null == socketChannel) {String addr = this.masterAddress.get();if (addr != null) {SocketAddress socketAddress = RemotingUtil.string2SocketAddress(addr);if (socketAddress != null) {this.socketChannel = RemotingUtil.connect(socketAddress);if (this.socketChannel != null) {this.socketChannel.register(this.selector, SelectionKey.OP_READ);}}}this.currentReportedOffset = HAService.this.defaultMessageStore.getMaxPhyOffset();this.lastWriteTimestamp = System.currentTimeMillis();}return this.socketChannel != null; }
返回方法,關閉master,org.apache.rocketmq.store.ha.HAService.HAClient#closeMaster
private void closeMaster() {if (null != this.socketChannel) {try {SelectionKey sk = this.socketChannel.keyFor(this.selector);if (sk != null) {sk.cancel();}this.socketChannel.close();this.socketChannel = null;} catch (IOException e) {log.warn("closeMaster exception. ", e);}this.lastWriteTimestamp = 0;this.dispatchPostion = 0;this.byteBufferBackup.position(0);this.byteBufferBackup.limit(READ_MAX_BUFFER_SIZE);this.byteBufferRead.position(0);this.byteBufferRead.limit(READ_MAX_BUFFER_SIZE);} }
進入方法,處理讀事件,org.apache.rocketmq.store.ha.HAService.HAClient#processReadEvent
private boolean processReadEvent() {int readSizeZeroTimes = 0;while (this.byteBufferRead.hasRemaining()) {try {int readSize = this.socketChannel.read(this.byteBufferRead);if (readSize > 0) {lastWriteTimestamp = HAService.this.defaultMessageStore.getSystemClock().now();readSizeZeroTimes = 0;boolean result = this.dispatchReadRequest();if (!result) {log.error("HAClient, dispatchReadRequest error");return false;}} else if (readSize == 0) {if (++readSizeZeroTimes >= 3) {break;}} else {log.info("HAClient, processReadEvent read socket < 0");return false;}} catch (IOException e) {log.info("HAClient, processReadEvent read socket exception", e);return false;}}return true; }
返回方法,添加調度服務,org.apache.rocketmq.store.DefaultMessageStore#addScheduleTask
private void addScheduleTask() {this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() {@Overridepublic void run() {// 按期清除文件=》DefaultMessageStore.this.cleanFilesPeriodically();}}, 1000 * 60, this.messageStoreConfig.getCleanResourceInterval(), TimeUnit.MILLISECONDS);this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() {@Overridepublic void run() {DefaultMessageStore.this.checkSelf();}}, 1, 10, TimeUnit.MINUTES);this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() {@Overridepublic void run() {if (DefaultMessageStore.this.getMessageStoreConfig().isDebugLockEnable()) {try {if (DefaultMessageStore.this.commitLog.getBeginTimeInLock() != 0) {long lockTime = System.currentTimeMillis() - DefaultMessageStore.this.commitLog.getBeginTimeInLock();if (lockTime > 1000 && lockTime < 10000000) {String stack = UtilAll.jstack();final String fileName = System.getProperty("user.home") + File.separator + "debug/lock/stack-"+ DefaultMessageStore.this.commitLog.getBeginTimeInLock() + "-" + lockTime;MixAll.string2FileNotSafe(stack, fileName);}}} catch (Exception e) {}}}}, 1, 1, TimeUnit.SECONDS);// this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() {// @Override// public void run() {// DefaultMessageStore.this.cleanExpiredConsumerQueue();// }// }, 1, 1, TimeUnit.HOURS); }
進入方法,按期清除文件,org.apache.rocketmq.store.DefaultMessageStore#cleanFilesPeriodically
private void cleanFilesPeriodically() {// =》this.cleanCommitLogService.run();// =》this.cleanConsumeQueueService.run(); }
進入方法,org.apache.rocketmq.store.DefaultMessageStore.CleanCommitLogService#run
public void run() {try {// 刪除過時文件=》this.deleteExpiredFiles();// 從新刪除該刪除的文件=》this.redeleteHangedFile();} catch (Throwable e) {DefaultMessageStore.log.warn(this.getServiceName() + " service has exception. ", e);} }
進入方法,刪除過時文件,org.apache.rocketmq.store.DefaultMessageStore.CleanCommitLogService#deleteExpiredFiles
private void deleteExpiredFiles() {int deleteCount = 0;long fileReservedTime = DefaultMessageStore.this.getMessageStoreConfig().getFileReservedTime();int deletePhysicFilesInterval = DefaultMessageStore.this.getMessageStoreConfig().getDeleteCommitLogFilesInterval();int destroyMapedFileIntervalForcibly = DefaultMessageStore.this.getMessageStoreConfig().getDestroyMapedFileIntervalForcibly();// 時間到了該刪除boolean timeup = this.isTimeToDelete();// 磁盤達到百分比改刪除boolean spacefull = this.isSpaceToDelete();boolean manualDelete = this.manualDeleteFileSeveralTimes > 0;if (timeup || spacefull || manualDelete) {if (manualDelete)this.manualDeleteFileSeveralTimes--;boolean cleanAtOnce = DefaultMessageStore.this.getMessageStoreConfig().isCleanFileForciblyEnable() && this.cleanImmediately;log.info("begin to delete before {} hours file. timeup: {} spacefull: {} manualDeleteFileSeveralTimes: {} cleanAtOnce: {}",fileReservedTime,timeup,spacefull,manualDeleteFileSeveralTimes,cleanAtOnce);fileReservedTime *= 60 * 60 * 1000;// =》deleteCount = DefaultMessageStore.this.commitLog.deleteExpiredFile(fileReservedTime, deletePhysicFilesInterval,destroyMapedFileIntervalForcibly, cleanAtOnce);if (deleteCount > 0) {} else if (spacefull) {log.warn("disk space will be full soon, but delete file failed.");}} }
進入方法,org.apache.rocketmq.store.CommitLog#deleteExpiredFile
public int deleteExpiredFile(final long expiredTime,final int deleteFilesInterval,final long intervalForcibly,final boolean cleanImmediately) {// 按時間刪除過時文件=》return this.mappedFileQueue.deleteExpiredFileByTime(expiredTime, deleteFilesInterval, intervalForcibly, cleanImmediately); }
進入方法,按時間刪除過時文件,org.apache.rocketmq.store.MappedFileQueue#deleteExpiredFileByTime
public int deleteExpiredFileByTime(final long expiredTime,final int deleteFilesInterval,final long intervalForcibly,final boolean cleanImmediately) {Object[] mfs = this.copyMappedFiles(0);if (null == mfs)return 0;int mfsLength = mfs.length - 1;int deleteCount = 0;List<MappedFile> files = new ArrayList<MappedFile>();if (null != mfs) {for (int i = 0; i < mfsLength; i++) {MappedFile mappedFile = (MappedFile) mfs[i];// 文件最大存儲時間long liveMaxTimestamp = mappedFile.getLastModifiedTimestamp() + expiredTime;if (System.currentTimeMillis() >= liveMaxTimestamp || cleanImmediately) {// 文件銷燬=》if (mappedFile.destroy(intervalForcibly)) {files.add(mappedFile);deleteCount++;if (files.size() >= DELETE_FILES_BATCH_MAX) {break;}if (deleteFilesInterval > 0 && (i + 1) < mfsLength) {try {Thread.sleep(deleteFilesInterval);} catch (InterruptedException e) {}}} else {break;}} else {//avoid deleting files in the middlebreak;}}}// 刪除映射文件隊列中映射文件=》deleteExpiredFile(files);return deleteCount; }
進入方法,刪除映射文件隊列中映射文件,org.apache.rocketmq.store.MappedFileQueue#deleteExpiredFile
void deleteExpiredFile(List<MappedFile> files) {if (!files.isEmpty()) {Iterator<MappedFile> iterator = files.iterator();while (iterator.hasNext()) {MappedFile cur = iterator.next();if (!this.mappedFiles.contains(cur)) {iterator.remove();log.info("This mappedFile {} is not contained by mappedFiles, so skip it.", cur.getFileName());}}try {if (!this.mappedFiles.removeAll(files)) {log.error("deleteExpiredFile remove failed.");}} catch (Exception e) {log.error("deleteExpiredFile has exception.", e);}} }
返回方法,從新刪除該刪除的文件,org.apache.rocketmq.store.DefaultMessageStore.CleanCommitLogService#redeleteHangedFile
private void redeleteHangedFile() {int interval = DefaultMessageStore.this.getMessageStoreConfig().getRedeleteHangedFileInterval();long currentTimestamp = System.currentTimeMillis();if ((currentTimestamp - this.lastRedeleteTimestamp) > interval) {this.lastRedeleteTimestamp = currentTimestamp;int destroyMapedFileIntervalForcibly =DefaultMessageStore.this.getMessageStoreConfig().getDestroyMapedFileIntervalForcibly();// =》if (DefaultMessageStore.this.commitLog.retryDeleteFirstFile(destroyMapedFileIntervalForcibly)) {}} }
進入方法,org.apache.rocketmq.store.MappedFileQueue#retryDeleteFirstFile
public boolean retryDeleteFirstFile(final long intervalForcibly) {// 獲取映射文件隊列中第一個文件=》MappedFile mappedFile = this.getFirstMappedFile();if (mappedFile != null) {if (!mappedFile.isAvailable()) {log.warn("the mappedFile was destroyed once, but still alive, " + mappedFile.getFileName());// 文件銷燬=》boolean result = mappedFile.destroy(intervalForcibly);if (result) {log.info("the mappedFile re delete OK, " + mappedFile.getFileName());List<MappedFile> tmpFiles = new ArrayList<MappedFile>();tmpFiles.add(mappedFile);// 刪除映射文件隊列中文件=》this.deleteExpiredFile(tmpFiles);} else {log.warn("the mappedFile re delete failed, " + mappedFile.getFileName());}return result;}}return false; }
返回方法,org.apache.rocketmq.store.DefaultMessageStore.CleanConsumeQueueService#run
public void run() {try {// 刪除過時文件=》this.deleteExpiredFiles();} catch (Throwable e) {DefaultMessageStore.log.warn(this.getServiceName() + " service has exception. ", e);} }
進入方法,刪除過時文件,org.apache.rocketmq.store.DefaultMessageStore.CleanConsumeQueueService#deleteExpiredFiles
private void deleteExpiredFiles() {int deleteLogicsFilesInterval = DefaultMessageStore.this.getMessageStoreConfig().getDeleteConsumeQueueFilesInterval();long minOffset = DefaultMessageStore.this.commitLog.getMinOffset();if (minOffset > this.lastPhysicalMinOffset) {this.lastPhysicalMinOffset = minOffset;ConcurrentMap<String, ConcurrentMap<Integer, ConsumeQueue>> tables = DefaultMessageStore.this.consumeQueueTable;for (ConcurrentMap<Integer, ConsumeQueue> maps : tables.values()) {for (ConsumeQueue logic : maps.values()) {// =》int deleteCount = logic.deleteExpiredFile(minOffset);if (deleteCount > 0 && deleteLogicsFilesInterval > 0) {try {Thread.sleep(deleteLogicsFilesInterval);} catch (InterruptedException ignored) {}}}}// 刪除索引文件=》DefaultMessageStore.this.indexService.deleteExpiredFile(minOffset);} }
進入方法,org.apache.rocketmq.store.ConsumeQueue#deleteExpiredFile
public int deleteExpiredFile(long offset) {// 按offset刪除過時文件=》int cnt = this.mappedFileQueue.deleteExpiredFileByOffset(offset, CQ_STORE_UNIT_SIZE);// 修改最小的offset=》this.correctMinOffset(offset);return cnt; }
進入方法,按offset刪除過時文件,org.apache.rocketmq.store.MappedFileQueue#deleteExpiredFileByOffset介紹過了。
返回方法,修改最小的offset,org.apache.rocketmq.store.ConsumeQueue#correctMinOffset介紹過了。
進入方法,刪除索引文件,org.apache.rocketmq.store.index.IndexService#deleteExpiredFile(long)
public void deleteExpiredFile(long offset) {Object[] files = null;try {this.readWriteLock.readLock().lock();if (this.indexFileList.isEmpty()) {return;}long endPhyOffset = this.indexFileList.get(0).getEndPhyOffset();if (endPhyOffset < offset) {files = this.indexFileList.toArray();}} catch (Exception e) {log.error("destroy exception", e);} finally {this.readWriteLock.readLock().unlock();}if (files != null) {List<IndexFile> fileList = new ArrayList<IndexFile>();for (int i = 0; i < (files.length - 1); i++) {IndexFile f = (IndexFile) files[i];if (f.getEndPhyOffset() < offset) {fileList.add(f);} else {break;}}// 刪除過時的索引文件=》this.deleteExpiredFile(fileList);} }
進入方法,刪除過時的索引文件,org.apache.rocketmq.store.index.IndexService#deleteExpiredFile(java.util.List<org.apache.rocketmq.store.index.IndexFile>)
private void deleteExpiredFile(List<IndexFile> files) {if (!files.isEmpty()) {try {this.readWriteLock.writeLock().lock();for (IndexFile file : files) {boolean destroyed = file.destroy(3000);destroyed = destroyed && this.indexFileList.remove(file);if (!destroyed) {log.error("deleteExpiredFile remove failed.");break;}}} catch (Exception e) {log.error("deleteExpiredFile has exception.", e);} finally {this.readWriteLock.writeLock().unlock();}} }
返回方法,註冊broker,org.apache.rocketmq.broker.BrokerController#registerBrokerAll介紹過了。
進入方法,broker快速失敗服務,org.apache.rocketmq.broker.latency.BrokerFastFailure#start
public void start() {this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() {@Overridepublic void run() {if (brokerController.getBrokerConfig().isBrokerFastFailureEnable()) {// 清除過時的請求=》cleanExpiredRequest();}}}, 1000, 10, TimeUnit.MILLISECONDS); }
進入方法,清除過時的請求,org.apache.rocketmq.broker.latency.BrokerFastFailure#cleanExpiredRequest
private void cleanExpiredRequest() {// 系統繁忙while (this.brokerController.getMessageStore().isOSPageCacheBusy()) {try {if (!this.brokerController.getSendThreadPoolQueue().isEmpty()) {final Runnable runnable = this.brokerController.getSendThreadPoolQueue().poll(0, TimeUnit.SECONDS);if (null == runnable) {break;}final RequestTask rt = castRunnable(runnable);rt.returnResponse(RemotingSysResponseCode.SYSTEM_BUSY, String.format("[PCBUSY_CLEAN_QUEUE]broker busy, start flow control for a while, period in queue: %sms, size of queue: %d", System.currentTimeMillis() - rt.getCreateTimestamp(), this.brokerController.getSendThreadPoolQueue().size()));} else {break;}} catch (Throwable ignored) {}}cleanExpiredRequestInQueue(this.brokerController.getSendThreadPoolQueue(),this.brokerController.getBrokerConfig().getWaitTimeMillsInSendQueue());cleanExpiredRequestInQueue(this.brokerController.getPullThreadPoolQueue(),this.brokerController.getBrokerConfig().getWaitTimeMillsInPullQueue());cleanExpiredRequestInQueue(this.brokerController.getHeartbeatThreadPoolQueue(),this.brokerController.getBrokerConfig().getWaitTimeMillsInHeartbeatQueue()); }
返回方法,分佈式消息事務服務啓動,org.apache.rocketmq.broker.transaction.TransactionalMessageCheckService#start
@Overridepublic void start() {// 這裏也是自旋鎖的應用if (started.compareAndSet(false, true)) {super.start();this.brokerController.getTransactionalMessageService().open();} }
返回方法,org.apache.rocketmq.broker.BrokerStartup#start結束。
說在最後
本次解析僅表明我的觀點,僅供參考
加入技術微信羣
釘釘技術羣