說在前面apache
管理請求 GET_ALL_DELAY_OFFSET 延遲的offset緩存
源碼解析微信
進入這個方法org.apache.rocketmq.broker.processor.AdminBrokerProcessor#getAllDelayOffset獲取延遲的offsetapp
private RemotingCommand getAllDelayOffset(ChannelHandlerContext ctx, RemotingCommand request) { final RemotingCommand response = RemotingCommand.createResponseCommand(null); // 消息編碼=》 String content = ((DefaultMessageStore) this.brokerController.getMessageStore()).getScheduleMessageService().encode(); if (content != null && content.length() > 0) { try { response.setBody(content.getBytes(MixAll.DEFAULT_CHARSET)); } catch (UnsupportedEncodingException e) { log.error("get all delay offset from master error.", e); response.setCode(ResponseCode.SYSTEM_ERROR); response.setRemark("UnsupportedEncodingException " + e); return response; } } else { log.error("No delay offset in this broker, client: {} ", ctx.channel().remoteAddress()); response.setCode(ResponseCode.SYSTEM_ERROR); response.setRemark("No delay offset in this broker"); return response; } response.setCode(ResponseCode.SUCCESS); response.setRemark(null); return response; }
進入這個方法org.apache.rocketmq.store.schedule.ScheduleMessageService#encode(boolean) 消息編碼this
public String encode(final boolean prettyFormat) { DelayOffsetSerializeWrapper delayOffsetSerializeWrapper = new DelayOffsetSerializeWrapper(); delayOffsetSerializeWrapper.setOffsetTable(this.offsetTable); return delayOffsetSerializeWrapper.toJson(prettyFormat); }
延遲消息offset內存存儲位置編碼
// delay的offset緩存 private final ConcurrentMap<Integer /* level */, Long/* offset */> offsetTable = new ConcurrentHashMap<Integer, Long>(32);
往上返回到這個方法org.apache.rocketmq.broker.processor.AdminBrokerProcessor#getAllDelayOffset結束code
說在最後orm
本次解析僅表明我的觀點,僅供參考。blog
加入技術微信羣內存
釘釘技術羣