rocketmq源碼解析消息拉取處理器②

說在前面java

消息拉取處理器apache

 

源碼解析微信

進入到這個方法,根據offset找到映射文件,org.apache.rocketmq.store.MappedFileQueue#findMappedFileByOffset(long, boolean)上面介紹過了。app

往上返回到這個方法,根據offset找到下個映射文件,org.apache.rocketmq.store.CommitLog#rollNextFileless

public long rollNextFile(final long offset) {int mappedFileSize = this.defaultMessageStore.getMessageStoreConfig().getMapedFileSizeCommitLog();return offset + mappedFileSize - offset % mappedFileSize;    }

往上返回到這個方法,執行消息消費的鉤子方法在消費消息以前,org.apache.rocketmq.broker.processor.PullMessageProcessor#executeConsumeMessageHookBeforeui

public void executeConsumeMessageHookBefore(final ConsumeMessageContext context) {if (hasConsumeMessageHook()) {for (ConsumeMessageHook hook : this.consumeMessageHookList) {try {//                    用戶能夠實現本身在消費以前的鉤子方法hook.consumeMessageBefore(context);} catch (Throwable e) {}}}    }

往上返回到這個方法,暫停消息拉取服務,org.apache.rocketmq.broker.longpolling.PullRequestHoldService#suspendPullRequestthis

public void suspendPullRequest(final String topic, final int queueId, final PullRequest pullRequest) {String key = this.buildKey(topic, queueId);ManyPullRequest mpr = this.pullRequestTable.get(key);if (null == mpr) {mpr = new ManyPullRequest();ManyPullRequest prev = this.pullRequestTable.putIfAbsent(key, mpr);if (prev != null) {mpr = prev;}}mpr.addPullRequest(pullRequest);    }

往上返回到這個方法,提交offset,org.apache.rocketmq.broker.offset.ConsumerOffsetManager#commitOffset(java.lang.String, java.lang.String, java.lang.String, int, long).net

public void commitOffset(final String clientHost, final String group, final String topic, final int queueId,final long offset) {// topic@groupString key = topic + TOPIC_GROUP_SEPARATOR + group;//        =》this.commitOffset(clientHost, key, queueId, offset);    }

進入這個方法,org.apache.rocketmq.broker.offset.ConsumerOffsetManager#commitOffset(java.lang.String, java.lang.String, int, long)netty

private void commitOffset(final String clientHost, final String key, final int queueId, final long offset) {ConcurrentMap<Integer, Long> map = this.offsetTable.get(key);if (null == map) {map = new ConcurrentHashMap<Integer, Long>(32);map.put(queueId, offset);this.offsetTable.put(key, map);} else {Long storeOffset = map.put(queueId, offset);if (storeOffset != null && offset < storeOffset) {log.warn("[NOTIFYME]update consumer offset less than store. clientHost={}, key={}, queueId={}, requestOffset={}, storeOffset={}", clientHost, key, queueId, offset, storeOffset);}}    }

往上返回到這個方法,org.apache.rocketmq.broker.processor.PullMessageProcessor#processRequest(io.netty.channel.Channel, org.apache.rocketmq.remoting.protocol.RemotingCommand, boolean)結束。code

 

說在最後

本次解析僅表明我的觀點,僅供參考。

 

加入技術微信羣

釘釘技術羣

相關文章
相關標籤/搜索