說在前面apache
今天開始解析netty client handler的處理邏輯,UPDATE_AND_CREATE_TOPIC 更新並建立topicjson
源碼解析緩存
找到這個類org.apache.rocketmq.remoting.netty.NettyRemotingClient.NettyClientHandler微信
class NettyClientHandler extends SimpleChannelInboundHandler<RemotingCommand> { @Override protected void channelRead0(ChannelHandlerContext ctx, RemotingCommand msg) throws Exception { // 消息處理=》 processMessageReceived(ctx, msg); } }
進入這個實現方法org.apache.rocketmq.remoting.netty.NettyRemotingAbstract#processMessageReceived併發
public void processMessageReceived(ChannelHandlerContext ctx, RemotingCommand msg) throws Exception { final RemotingCommand cmd = msg; if (cmd != null) { switch (cmd.getType()) { case REQUEST_COMMAND: // 請求消息處理 =》 processRequestCommand(ctx, cmd); break; case RESPONSE_COMMAND: // 響應消息處理=》 processResponseCommand(ctx, cmd); break; default: break; } } }
進入這個方法org.apache.rocketmq.remoting.netty.NettyRemotingAbstract#processRequestCommandapp
public void processRequestCommand(final ChannelHandlerContext ctx, final RemotingCommand cmd) { // 公平的處理請求 final Pair<NettyRequestProcessor, ExecutorService> matched = this.processorTable.get(cmd.getCode()); final Pair<NettyRequestProcessor, ExecutorService> pair = null == matched ? this.defaultRequestProcessor : matched; final int opaque = cmd.getOpaque(); if (pair != null) { Runnable run = new Runnable() { @Override public void run() { try { // 客戶自定義的鉤子實現類 RPCHook rpcHook = NettyRemotingAbstract.this.getRPCHook(); if (rpcHook != null) { // 這裏mq提供了一些鉤子方法能夠擴展的地方,請求前處理邏輯能夠在這裏擴展 rpcHook.doBeforeRequest(RemotingHelper.parseChannelRemoteAddr(ctx.channel()), cmd); } // 處理請求,有各個實現,主要都是netty通訊 =》TODO 文章 final RemotingCommand response = pair.getObject1().processRequest(ctx, cmd); if (rpcHook != null) { // 執行rocketmq請求的後置處理鉤子方法 rpcHook.doAfterResponse(RemotingHelper.parseChannelRemoteAddr(ctx.channel()), cmd, response); } // 若是不是單線請求 if (!cmd.isOnewayRPC()) { if (response != null) { response.setOpaque(opaque); response.markResponseType(); try { ctx.writeAndFlush(response); } catch (Throwable e) { log.error("process request over, but response failed", e); log.error(cmd.toString()); log.error(response.toString()); } } else { } } } catch (Throwable e) { log.error("process request exception", e); log.error(cmd.toString()); if (!cmd.isOnewayRPC()) { final RemotingCommand response = RemotingCommand.createResponseCommand(RemotingSysResponseCode.SYSTEM_ERROR, RemotingHelper.exceptionSimpleDesc(e)); response.setOpaque(opaque); ctx.writeAndFlush(response); } } } }; // 系統繁忙,暫時啓動流量控制 if (pair.getObject1().rejectRequest()) { final RemotingCommand response = RemotingCommand.createResponseCommand(RemotingSysResponseCode.SYSTEM_BUSY, "[REJECTREQUEST]system busy, start flow control for a while"); response.setOpaque(opaque); ctx.writeAndFlush(response); return; } try { final RequestTask requestTask = new RequestTask(run, ctx.channel(), cmd); // 異步提交請求 pair.getObject2().submit(requestTask); } catch (RejectedExecutionException e) { if ((System.currentTimeMillis() % 10000) == 0) { log.warn(RemotingHelper.parseChannelRemoteAddr(ctx.channel()) + ", too many requests and system thread pool busy, RejectedExecutionException " + pair.getObject2().toString() + " request code: " + cmd.getCode()); } // 系統繁忙,暫時啓動流量控制 if (!cmd.isOnewayRPC()) { final RemotingCommand response = RemotingCommand.createResponseCommand(RemotingSysResponseCode.SYSTEM_BUSY, "[OVERLOAD]system busy, start flow control for a while"); response.setOpaque(opaque); ctx.writeAndFlush(response); } } } else { String error = " request type " + cmd.getCode() + " not supported"; // 請求編碼不支持 final RemotingCommand response = RemotingCommand.createResponseCommand(RemotingSysResponseCode.REQUEST_CODE_NOT_SUPPORTED, error); response.setOpaque(opaque); ctx.writeAndFlush(response); log.error(RemotingHelper.parseChannelRemoteAddr(ctx.channel()) + error); } }
進入到這個方法org.apache.rocketmq.broker.processor.AdminBrokerProcessor#processRequest異步
@Override public RemotingCommand processRequest(ChannelHandlerContext ctx, RemotingCommand request) throws RemotingCommandException { switch (request.getCode()) { // 三、源碼解析之更新並建立topic =》 case RequestCode.UPDATE_AND_CREATE_TOPIC: return this.updateAndCreateTopic(ctx, request); // 四、源碼解析之刪除topic =》 case RequestCode.DELETE_TOPIC_IN_BROKER: return this.deleteTopic(ctx, request); // 五、源碼解析之獲取全部的topic配置信息 =》 case RequestCode.GET_ALL_TOPIC_CONFIG: return this.getAllTopicConfig(ctx, request); case RequestCode.UPDATE_BROKER_CONFIG: // 六、源碼解析之更新broker配置信息 =》 return this.updateBrokerConfig(ctx, request); // 七、源碼解析之獲取broker的配置信息 =》 case RequestCode.GET_BROKER_CONFIG: return this.getBrokerConfig(ctx, request); // 八、源碼解析之查找offset按時間 =》 case RequestCode.SEARCH_OFFSET_BY_TIMESTAMP: return this.searchOffsetByTimestamp(ctx, request); // 九、源碼解析之獲取最大的offset =》 case RequestCode.GET_MAX_OFFSET: return this.getMaxOffset(ctx, request); // 十、源碼解析之獲取最小的offset =》 case RequestCode.GET_MIN_OFFSET: return this.getMinOffset(ctx, request); // 十一、源碼解析之獲取最先的消息存儲時間 =》 case RequestCode.GET_EARLIEST_MSG_STORETIME: return this.getEarliestMsgStoretime(ctx, request); // 十二、源碼解析獲取broker的運行時信息 =》 case RequestCode.GET_BROKER_RUNTIME_INFO: return this.getBrokerRuntimeInfo(ctx, request); // 1三、源碼解析批量鎖定消息隊列=》 case RequestCode.LOCK_BATCH_MQ: return this.lockBatchMQ(ctx, request); // 1四、源碼解析之批量解鎖消息隊列=》 case RequestCode.UNLOCK_BATCH_MQ: return this.unlockBatchMQ(ctx, request); // 1五、源碼解析之更新和建立訂閱組=》 case RequestCode.UPDATE_AND_CREATE_SUBSCRIPTIONGROUP: return this.updateAndCreateSubscriptionGroup(ctx, request); // 1六、源碼解析之獲取全部的訂閱組配置信息=》 case RequestCode.GET_ALL_SUBSCRIPTIONGROUP_CONFIG: return this.getAllSubscriptionGroup(ctx, request); // 1七、源碼解析之刪除訂閱組=》 case RequestCode.DELETE_SUBSCRIPTIONGROUP: return this.deleteSubscriptionGroup(ctx, request); // 1八、源碼解析之獲取topic的狀態信息=》 case RequestCode.GET_TOPIC_STATS_INFO: return this.getTopicStatsInfo(ctx, request); // 1九、TODO case RequestCode.GET_CONSUMER_CONNECTION_LIST: return this.getConsumerConnectionList(ctx, request); // 20、源碼解析之獲取生產者鏈接列表=》 case RequestCode.GET_PRODUCER_CONNECTION_LIST: return this.getProducerConnectionList(ctx, request); // 2一、源碼解析之獲取消費者的狀態=》 case RequestCode.GET_CONSUME_STATS: return this.getConsumeStats(ctx, request); // 2二、源碼解析之獲取全部消費者的offset=》 case RequestCode.GET_ALL_CONSUMER_OFFSET: return this.getAllConsumerOffset(ctx, request); // 2三、獲取全部delay的offset=》 case RequestCode.GET_ALL_DELAY_OFFSET: return this.getAllDelayOffset(ctx, request); // 2四、源碼解析之重置offset=》 case RequestCode.INVOKE_BROKER_TO_RESET_OFFSET: return this.resetOffset(ctx, request); // 2五、源碼解析之獲取消費者狀態=》 case RequestCode.INVOKE_BROKER_TO_GET_CONSUMER_STATUS: return this.getConsumerStatus(ctx, request); case RequestCode.QUERY_TOPIC_CONSUME_BY_WHO: // 2六、源碼解析之查詢topic被哪些消費者消費=》 return this.queryTopicConsumeByWho(ctx, request); // 2七、源碼解析之註冊過濾的server=》 case RequestCode.REGISTER_FILTER_SERVER: return this.registerFilterServer(ctx, request); // 2八、源碼解析之查詢消費者時間=》 case RequestCode.QUERY_CONSUME_TIME_SPAN: return this.queryConsumeTimeSpan(ctx, request); // 2九、源碼解析之從broker中獲取系統topic列表=》 case RequestCode.GET_SYSTEM_TOPIC_LIST_FROM_BROKER: return this.getSystemTopicListFromBroker(ctx, request); // 30、源碼解析之清除過時的消費隊列=》 case RequestCode.CLEAN_EXPIRED_CONSUMEQUEUE: return this.cleanExpiredConsumeQueue(); // 3一、源碼解析之清楚不用的topic=》 case RequestCode.CLEAN_UNUSED_TOPIC: return this.cleanUnusedTopic(); // 3二、源碼解析之獲取消費者運行時信息=》 case RequestCode.GET_CONSUMER_RUNNING_INFO: return this.getConsumerRunningInfo(ctx, request); // 3三、源碼解析之查詢修改後的offset=》 case RequestCode.QUERY_CORRECTION_OFFSET: return this.queryCorrectionOffset(ctx, request); // 3四、源碼解析之直接消費消息=》 case RequestCode.CONSUME_MESSAGE_DIRECTLY: return this.consumeMessageDirectly(ctx, request); // 3五、源碼解析之clone組的offset=》 case RequestCode.CLONE_GROUP_OFFSET: return this.cloneGroupOffset(ctx, request); // 3六、查詢broker狀態數據=》 case RequestCode.VIEW_BROKER_STATS_DATA: return ViewBrokerStatsData(ctx, request); // 3七、獲取broker消費組的狀態=》 case RequestCode.GET_BROKER_CONSUME_STATS: return fetchAllConsumeStatsInBroker(ctx, request); // 3八、源碼解析之查詢消費隊列=》 case RequestCode.QUERY_CONSUME_QUEUE: return queryConsumeQueue(ctx, request); default: break; } return null; }
進入到這個方法org.apache.rocketmq.broker.processor.AdminBrokerProcessor#updateAndCreateTopicide
private synchronized RemotingCommand updateAndCreateTopic(ChannelHandlerContext ctx, RemotingCommand request) throws RemotingCommandException { final RemotingCommand response = RemotingCommand.createResponseCommand(null); final CreateTopicRequestHeader requestHeader = (CreateTopicRequestHeader) request.decodeCommandCustomHeader(CreateTopicRequestHeader.class); log.info("updateAndCreateTopic called by {}", RemotingHelper.parseChannelRemoteAddr(ctx.channel())); // 若是topic名和broker集羣名字同樣了報錯 if (requestHeader.getTopic().equals(this.brokerController.getBrokerConfig().getBrokerClusterName())) { String errorMsg = "the topic[" + requestHeader.getTopic() + "] is conflict with system reserved words."; log.warn(errorMsg); response.setCode(ResponseCode.SYSTEM_ERROR); response.setRemark(errorMsg); return response; } try { response.setCode(ResponseCode.SUCCESS); response.setOpaque(request.getOpaque()); response.markResponseType(); response.setRemark(null); ctx.writeAndFlush(response); } catch (Exception e) { log.error("Failed to produce a proper response", e); } // 建立topic配置信息 TopicConfig topicConfig = new TopicConfig(requestHeader.getTopic()); topicConfig.setReadQueueNums(requestHeader.getReadQueueNums()); topicConfig.setWriteQueueNums(requestHeader.getWriteQueueNums()); topicConfig.setTopicFilterType(requestHeader.getTopicFilterTypeEnum()); topicConfig.setPerm(requestHeader.getPerm()); topicConfig.setTopicSysFlag(requestHeader.getTopicSysFlag() == null ? 0 : requestHeader.getTopicSysFlag()); // 更新topic配置=》 this.brokerController.getTopicConfigManager().updateTopicConfig(topicConfig); // 按版本號註冊broker數據 =》 this.brokerController.registerIncrementBrokerData(topicConfig,this.brokerController.getTopicConfigManager().getDataVersion()); return null; }
進入這個方法,更新topic配置org.apache.rocketmq.broker.topic.TopicConfigManager#updateTopicConfigfetch
public void updateTopicConfig(final TopicConfig topicConfig) { // 從緩存中取出以前的topic配置信息 TopicConfig old = this.topicConfigTable.put(topicConfig.getTopicName(), topicConfig); if (old != null) { log.info("update topic config, old:[{}] new:[{}]", old, topicConfig); } else { log.info("create new topic [{}]", topicConfig); } // 這裏用版本號來標記數據改變過了 this.dataVersion.nextVersion(); // =》持久化 this.persist(); }
進入這個方法,持久化topic配置信息org.apache.rocketmq.common.ConfigManager#persistui
public synchronized void persist() { // 持久化的是json存儲,序列化的時候按版本號維護的數據 =》 String jsonString = this.encode(true); if (jsonString != null) { // config/topics.json 文件存儲 =》 String fileName = this.configFilePath(); try { // 保存文件 =》 MixAll.string2File(jsonString, fileName); } catch (IOException e) { log.error("persist file " + fileName + " exception", e); } } }
進入這個方法保存文件org.apache.rocketmq.common.MixAll#string2File
public static void string2File(final String str, final String fileName) throws IOException { // 要保存的內容存儲在臨時文件中 String tmpFile = fileName + ".tmp"; string2FileNotSafe(str, tmpFile); // 把原來的數據進行備份 String bakFile = fileName + ".bak"; String prevContent = file2String(fileName); if (prevContent != null) { string2FileNotSafe(prevContent, bakFile); } // 刪掉源文件 File file = new File(fileName); file.delete(); // 臨時文件重命名 file = new File(tmpFile); file.renameTo(new File(fileName)); }
往上返回到這個方法,同步註冊broker數據org.apache.rocketmq.broker.BrokerController#registerIncrementBrokerData
public synchronized void registerIncrementBrokerData(TopicConfig topicConfig, DataVersion dataVersion) { TopicConfig registerTopicConfig = topicConfig; if (!PermName.isWriteable(this.getBrokerConfig().getBrokerPermission()) || !PermName.isReadable(this.getBrokerConfig().getBrokerPermission())) { registerTopicConfig = new TopicConfig(topicConfig.getTopicName(), topicConfig.getReadQueueNums(), topicConfig.getWriteQueueNums(), this.brokerConfig.getBrokerPermission()); } // 組裝topic序列化信息 ConcurrentMap<String, TopicConfig> topicConfigTable = new ConcurrentHashMap<String, TopicConfig>(); topicConfigTable.put(topicConfig.getTopicName(), registerTopicConfig); TopicConfigSerializeWrapper topicConfigSerializeWrapper = new TopicConfigSerializeWrapper(); topicConfigSerializeWrapper.setDataVersion(dataVersion); topicConfigSerializeWrapper.setTopicConfigTable(topicConfigTable); // 註冊全部的broker =》 doRegisterBrokerAll(true, false, topicConfigSerializeWrapper); }
進入這個方法org.apache.rocketmq.broker.BrokerController#doRegisterBrokerAll向全部的broker註冊topic配置信息
private void doRegisterBrokerAll(boolean checkOrderConfig, boolean oneway, TopicConfigSerializeWrapper topicConfigWrapper) { // 向全部的broker進行註冊=》 List<RegisterBrokerResult> registerBrokerResultList = this.brokerOuterAPI.registerBrokerAll( this.brokerConfig.getBrokerClusterName(), this.getBrokerAddr(), this.brokerConfig.getBrokerName(), this.brokerConfig.getBrokerId(), this.getHAServerAddr(), topicConfigWrapper, // 過濾的服務 this.filterServerManager.buildNewFilterServerList(), // 單途 oneway, this.brokerConfig.getRegisterBrokerTimeoutMills(), this.brokerConfig.isCompressedRegister()); if (registerBrokerResultList.size() > 0) { RegisterBrokerResult registerBrokerResult = registerBrokerResultList.get(0); if (registerBrokerResult != null) { if (this.updateMasterHAServerAddrPeriodically && registerBrokerResult.getHaServerAddr() != null) { // 更新master地址本地緩存 this.messageStore.updateHaMasterAddress(registerBrokerResult.getHaServerAddr()); } // 同步設置slave的master地址 this.slaveSynchronize.setMasterAddr(registerBrokerResult.getMasterAddr()); if (checkOrderConfig) { // 更新訂閱的topic配置 =》 this.getTopicConfigManager().updateOrderTopicConfig(registerBrokerResult.getKvTable()); } } } }
進入這個方法org.apache.rocketmq.broker.out.BrokerOuterAPI#registerBrokerAll
public List<RegisterBrokerResult> registerBrokerAll( final String clusterName, final String brokerAddr, final String brokerName, final long brokerId, final String haServerAddr, final TopicConfigSerializeWrapper topicConfigWrapper, final List<String> filterServerList, final boolean oneway, final int timeoutMills, final boolean compressed) { final List<RegisterBrokerResult> registerBrokerResultList = Lists.newArrayList(); // 獲取namesrv地址集合 List<String> nameServerAddressList = this.remotingClient.getNameServerAddressList(); if (nameServerAddressList != null && nameServerAddressList.size() > 0) { final RegisterBrokerRequestHeader requestHeader = new RegisterBrokerRequestHeader(); requestHeader.setBrokerAddr(brokerAddr); requestHeader.setBrokerId(brokerId); requestHeader.setBrokerName(brokerName); requestHeader.setClusterName(clusterName); requestHeader.setHaServerAddr(haServerAddr); requestHeader.setCompressed(compressed); RegisterBrokerBody requestBody = new RegisterBrokerBody(); requestBody.setTopicConfigSerializeWrapper(topicConfigWrapper); requestBody.setFilterServerList(filterServerList); // 對請求體進行編碼=》 final byte[] body = requestBody.encode(compressed); // 壓縮處理 final int bodyCrc32 = UtilAll.crc32(body); requestHeader.setBodyCrc32(bodyCrc32); final CountDownLatch countDownLatch = new CountDownLatch(nameServerAddressList.size()); // 併發向namesrv集羣註冊broker for (final String namesrvAddr : nameServerAddressList) { brokerOuterExecutor.execute(new Runnable() { @Override public void run() { try { // 註冊broker服務任務分發=》 RegisterBrokerResult result = registerBroker(namesrvAddr,oneway, timeoutMills,requestHeader,body); if (result != null) { registerBrokerResultList.add(result); } log.info("register broker to name server {} OK", namesrvAddr); } catch (Exception e) { log.warn("registerBroker Exception, {}", namesrvAddr, e); } finally { countDownLatch.countDown(); } } }); } try { countDownLatch.await(timeoutMills, TimeUnit.MILLISECONDS); } catch (InterruptedException e) { } } return registerBrokerResultList; }
進入這個方法對請求提進行編碼org.apache.rocketmq.common.protocol.body.RegisterBrokerBody#encode
public byte[] encode(boolean compress) { if (!compress) { // json編碼 return super.encode(); } long start = System.currentTimeMillis(); ByteArrayOutputStream byteArrayOutputStream = new ByteArrayOutputStream(); DeflaterOutputStream outputStream = new DeflaterOutputStream(byteArrayOutputStream, new Deflater(Deflater.BEST_COMPRESSION)); DataVersion dataVersion = topicConfigSerializeWrapper.getDataVersion(); ConcurrentMap<String, TopicConfig> topicConfigTable = cloneTopicConfigTable(topicConfigSerializeWrapper.getTopicConfigTable()); assert topicConfigTable != null; try { // 版本json編碼 byte[] buffer = dataVersion.encode(); // write data version outputStream.write(convertIntToByteArray(buffer.length)); outputStream.write(buffer); int topicNumber = topicConfigTable.size(); // write number of topic configs outputStream.write(convertIntToByteArray(topicNumber)); // write topic config entry one by one. for (ConcurrentMap.Entry<String, TopicConfig> next : topicConfigTable.entrySet()) { buffer = next.getValue().encode().getBytes(MixAll.DEFAULT_CHARSET); outputStream.write(convertIntToByteArray(buffer.length)); outputStream.write(buffer); } buffer = JSON.toJSONString(filterServerList).getBytes(MixAll.DEFAULT_CHARSET); // write filter server list json length outputStream.write(convertIntToByteArray(buffer.length)); // write filter server list json outputStream.write(buffer); outputStream.finish(); long interval = System.currentTimeMillis() - start; if (interval > 50) { LOGGER.info("Compressing takes {}ms", interval); } return byteArrayOutputStream.toByteArray(); } catch (IOException e) { LOGGER.error("Failed to compress RegisterBrokerBody object", e); } return null; }
進入這個方法org.apache.rocketmq.broker.out.BrokerOuterAPI#registerBroker註冊broker
private RegisterBrokerResult registerBroker( final String namesrvAddr, final boolean oneway, final int timeoutMills, final RegisterBrokerRequestHeader requestHeader, final byte[] body ) throws RemotingCommandException, MQBrokerException, RemotingConnectException, RemotingSendRequestException, RemotingTimeoutException, InterruptedException { RemotingCommand request = RemotingCommand.createRequestCommand(RequestCode.REGISTER_BROKER, requestHeader); request.setBody(body); if (oneway) { try { // 單線請求,不關心結果 =》 this.remotingClient.invokeOneway(namesrvAddr, request, timeoutMills); } catch (RemotingTooMuchRequestException e) { // Ignore } return null; } // broker同步向namesrv註冊broker=》 RemotingCommand response = this.remotingClient.invokeSync(namesrvAddr, request, timeoutMills); assert response != null; switch (response.getCode()) { case ResponseCode.SUCCESS: { RegisterBrokerResponseHeader responseHeader = (RegisterBrokerResponseHeader) response.decodeCommandCustomHeader(RegisterBrokerResponseHeader.class); RegisterBrokerResult result = new RegisterBrokerResult(); result.setMasterAddr(responseHeader.getMasterAddr()); result.setHaServerAddr(responseHeader.getHaServerAddr()); if (response.getBody() != null) { result.setKvTable(KVTable.decode(response.getBody(), KVTable.class)); } return result; } default: break; } throw new MQBrokerException(response.getCode(), response.getRemark()); }
進入這個方法org.apache.rocketmq.remoting.netty.NettyRemotingClient#invokeOneway單途註冊
@Override public void invokeOneway(String addr, RemotingCommand request, long timeoutMillis) throws InterruptedException, RemotingConnectException, RemotingTooMuchRequestException, RemotingTimeoutException, RemotingSendRequestException { // 獲取channel=》 final Channel channel = this.getAndCreateChannel(addr); if (channel != null && channel.isActive()) { try { if (this.rpcHook != null) { // 執行請求執行前的鉤子方法 this.rpcHook.doBeforeRequest(addr, request); } // 執行單線請求 =》 this.invokeOnewayImpl(channel, request, timeoutMillis); } catch (RemotingSendRequestException e) { log.warn("invokeOneway: send request exception, so close the channel[{}]", addr); // 異常關閉channel=》 this.closeChannel(addr, channel); throw e; } } else { this.closeChannel(addr, channel); throw new RemotingConnectException(addr); } }
未完待續。
說在最後
本次解析僅表明我的觀點,僅供參考。
加入技術微信羣
釘釘技術羣