rocketmq源碼解析之管理請求清除過時的消費隊列

說在前面apache

管理請求 CLEAN_EXPIRED_CONSUMEQUEUE 清除過時的消費隊列微信

 

源碼解析併發

進入這個方法org.apache.rocketmq.broker.processor.AdminBrokerProcessor#cleanExpiredConsumeQueueapp

public RemotingCommand cleanExpiredConsumeQueue() {
        log.warn("invoke cleanExpiredConsumeQueue start.");
        final RemotingCommand response = RemotingCommand.createResponseCommand(null);
//        =》
        brokerController.getMessageStore().cleanExpiredConsumerQueue();
        log.warn("invoke cleanExpiredConsumeQueue end.");
        response.setCode(ResponseCode.SUCCESS);
        response.setRemark(null);
        return response;
    }

進入這個方法org.apache.rocketmq.store.DefaultMessageStore#cleanExpiredConsumerQueue清除過時的消費隊列性能

public void cleanExpiredConsumerQueue() {
//        從commitLog中獲取最小的offset=》
        long minCommitLogOffset = this.commitLog.getMinOffset();
        Iterator<Entry<String, ConcurrentMap<Integer, ConsumeQueue>>> it = this.consumeQueueTable.entrySet().iterator();
        while (it.hasNext()) {
            Entry<String, ConcurrentMap<Integer, ConsumeQueue>> next = it.next();
            String topic = next.getKey();
            if (!topic.equals(ScheduleMessageService.SCHEDULE_TOPIC)) {
                ConcurrentMap<Integer, ConsumeQueue> queueTable = next.getValue();
                Iterator<Entry<Integer, ConsumeQueue>> itQT = queueTable.entrySet().iterator();
                while (itQT.hasNext()) {
                    Entry<Integer, ConsumeQueue> nextQT = itQT.next();
//                    獲取消息隊列的lastOffset=》
                    long maxCLOffsetInConsumeQueue = nextQT.getValue().getLastOffset();
                    if (maxCLOffsetInConsumeQueue == -1) {
                        log.warn("maybe ConsumeQueue was created just now. topic={} queueId={} maxPhysicOffset={} minLogicOffset={}.",
                            nextQT.getValue().getTopic(),
                            nextQT.getValue().getQueueId(),
                            nextQT.getValue().getMaxPhysicOffset(),
                            nextQT.getValue().getMinLogicOffset());
//                        正常狀況下應該等於,若是是小於說明有的消息隊列已過時
                    } else if (maxCLOffsetInConsumeQueue < minCommitLogOffset) {
                        log.info(
                            "cleanExpiredConsumerQueue: {} {} consumer queue destroyed, minCommitLogOffset: {} maxCLOffsetInConsumeQueue: {}",
                            topic,
                            nextQT.getKey(),
                            minCommitLogOffset,
                            maxCLOffsetInConsumeQueue);
//                        按topic和queueId刪除topic和queue的offset=》
                        DefaultMessageStore.this.commitLog.removeQueueFromTopicQueueTable(nextQT.getValue().getTopic(),
                            nextQT.getValue().getQueueId());
                        nextQT.getValue().destroy();
//                      消息隊列銷燬了,消息隊列所在元素刪除
                        itQT.remove();
                    }
                }

                if (queueTable.isEmpty()) {
                    log.info("cleanExpiredConsumerQueue: {},topic destroyed", topic);
//                  topic銷燬了,消息隊列所在的元素刪除
                    it.remove();
                }
            }
        }
    }

進入這個方法org.apache.rocketmq.store.CommitLog#getMinOffset獲取最小的offsetthis

public long getMinOffset() {
//        獲取第一個映射文件=》
        MappedFile mappedFile = this.mappedFileQueue.getFirstMappedFile();
        if (mappedFile != null) {
            if (mappedFile.isAvailable()) {
//                獲取映射文件的起始偏移量
                return mappedFile.getFileFromOffset();
            } else {
//                獲取下個文件的起始偏移量=》
                return this.rollNextFile(mappedFile.getFileFromOffset());
            }
        }

        return -1;
    }

進入到這個方法org.apache.rocketmq.store.MappedFileQueue#getFirstMappedFile獲取映射隊列中第一個映射文件3d

public MappedFile getFirstMappedFile() {
    MappedFile mappedFileFirst = null;
    if (!this.mappedFiles.isEmpty()) {
        try {
            mappedFileFirst = this.mappedFiles.get(0);
        } catch (IndexOutOfBoundsException e) {
            //ignore
        } catch (Exception e) {
            log.error("getFirstMappedFile has exception.", e);
        }
    }

    return mappedFileFirst;
}

進入到這個方法org.apache.rocketmq.store.CommitLog#rollNextFile獲取到下個映射文件的起始偏移量blog

public long rollNextFile(final long offset) {
    int mappedFileSize = this.defaultMessageStore.getMessageStoreConfig().getMapedFileSizeCommitLog();
    return offset + mappedFileSize - offset % mappedFileSize;
}

往上返回到這個方法org.apache.rocketmq.store.ConsumeQueue#getLastOffset獲取最後的offset隊列

public long getLastOffset() {
        long lastOffset = -1;
        int logicFileSize = this.mappedFileSize;
//        獲取映射文件隊列中最後一個映射文件=》
        MappedFile mappedFile = this.mappedFileQueue.getLastMappedFile();
        if (mappedFile != null) {

//            有點疑問,這裏是獲取的commitOffset嗎
            int position = mappedFile.getWrotePosition() - CQ_STORE_UNIT_SIZE;
            if (position < 0)
                position = 0;
            ByteBuffer byteBuffer = mappedFile.sliceByteBuffer();
            byteBuffer.position(position);
            for (int i = 0; i < logicFileSize; i += CQ_STORE_UNIT_SIZE) {
                long offset = byteBuffer.getLong();
                int size = byteBuffer.getInt();
                byteBuffer.getLong();
                if (offset >= 0 && size > 0) {
                    lastOffset = offset + size;
                } else {
                    break;
                }
            }
        }

        return lastOffset;
    }

進入到這個方法org.apache.rocketmq.store.MappedFileQueue#getLastMappedFile()獲取映射隊列中最後的映射文件rem

public MappedFile getLastMappedFile() {
    MappedFile mappedFileLast = null;
    while (!this.mappedFiles.isEmpty()) {
        try {
            mappedFileLast = this.mappedFiles.get(this.mappedFiles.size() - 1);
            break;
        } catch (IndexOutOfBoundsException e) {
            //continue;
        } catch (Exception e) {
            log.error("getLastMappedFile has exception.", e);
            break;
        }
    }

    return mappedFileLast;
}

往上返回到這個方法org.apache.rocketmq.store.CommitLog#removeQueueFromTopicQueueTable按topic、queueId刪除消費隊列

public void removeQueueFromTopicQueueTable(final String topic, final int queueId) {
        String key = topic + "-" + queueId;
//        這個地方同步用synchronized有點疑問,若是併發量比較小,synchronized性能也能夠的,可是併發量達到必定量級lock或者其餘無鎖實現
//        應該會好一點,難道消息隊列過時這種狀況出現過時未消費的機率較低
        synchronized (this) {
            this.topicQueueTable.remove(key);
        }

        log.info("removeQueueFromTopicQueueTable OK Topic: {} QueueId: {}", topic, queueId);
    }

往上返回到這個方法org.apache.rocketmq.broker.processor.AdminBrokerProcessor#cleanExpiredConsumeQueue結束

 

說在最後

本次解析僅表明我的觀點,僅供參考。

 

加入技術微信羣

釘釘技術羣

相關文章
相關標籤/搜索