rocketmq源碼解client管理心跳檢測①

說在前面apache

client管理 心跳檢測json

 

源碼解析緩存

進入這個方法org.apache.rocketmq.broker.processor.ClientManageProcessor#processRequest client管理請求微信

@Overridepublic RemotingCommand processRequest(ChannelHandlerContext ctx, RemotingCommand request)throws RemotingCommandException {switch (request.getCode()) {//            心跳監測=》case RequestCode.HEART_BEAT:return this.heartBeat(ctx, request);//                取消註冊client=》case RequestCode.UNREGISTER_CLIENT:return this.unregisterClient(ctx, request);//                檢查client的配置=》case RequestCode.CHECK_CLIENT_CONFIG:return this.checkClientConfig(ctx, request);default:break;}return null;    }

進入這個方法,org.apache.rocketmq.broker.processor.ClientManageProcessor#heartBeat 心跳檢測多線程



public RemotingCommand heartBeat(ChannelHandlerContext ctx, RemotingCommand request) {RemotingCommand response = RemotingCommand.createResponseCommand(null);HeartbeatData heartbeatData = HeartbeatData.decode(request.getBody(), HeartbeatData.class);ClientChannelInfo clientChannelInfo = new ClientChannelInfo(ctx.channel(),heartbeatData.getClientID(),request.getLanguage(),request.getVersion());for (ConsumerData data : heartbeatData.getConsumerDataSet()) {//            獲取訂閱組的配置信息=》SubscriptionGroupConfig subscriptionGroupConfig =this.brokerController.getSubscriptionGroupManager().findSubscriptionGroupConfig(data.getGroupName());boolean isNotifyConsumerIdsChangedEnable = true;if (null != subscriptionGroupConfig) {isNotifyConsumerIdsChangedEnable = subscriptionGroupConfig.isNotifyConsumerIdsChangedEnable();int topicSysFlag = 0;if (data.isUnitMode()) {topicSysFlag = TopicSysFlag.buildSysFlag(false, true);}//                獲取重試的topic名稱 = %RETRY% +消費組名稱String newTopic = MixAll.getRetryTopic(data.getGroupName());//                發送消息返回後建立topic=》this.brokerController.getTopicConfigManager().createTopicInSendMessageBackMethod(newTopic,subscriptionGroupConfig.getRetryQueueNums(),PermName.PERM_WRITE | PermName.PERM_READ, topicSysFlag);}//            註冊消費者=》boolean changed = this.brokerController.getConsumerManager().registerConsumer(data.getGroupName(),clientChannelInfo,data.getConsumeType(),data.getMessageModel(),data.getConsumeFromWhere(),data.getSubscriptionDataSet(),isNotifyConsumerIdsChangedEnable);//            若是消費者已經存在是更新if (changed) {log.info("registerConsumer info changed {} {}",data.toString(),RemotingHelper.parseChannelRemoteAddr(ctx.channel()));}        }

進入這個方法,獲取訂閱組的配置信息,org.apache.rocketmq.broker.subscription.SubscriptionGroupManager#findSubscriptionGroupConfig併發

public SubscriptionGroupConfig findSubscriptionGroupConfig(final String group) {//        從緩存中獲取組的訂閱信息SubscriptionGroupConfig subscriptionGroupConfig = this.subscriptionGroupTable.get(group);if (null == subscriptionGroupConfig) {//            自動建立消費組或者是系統自用的消費組if (brokerController.getBrokerConfig().isAutoCreateSubscriptionGroup() || MixAll.isSysConsumerGroup(group)) {subscriptionGroupConfig = new SubscriptionGroupConfig();subscriptionGroupConfig.setGroupName(group);SubscriptionGroupConfig preConfig = this.subscriptionGroupTable.putIfAbsent(group, subscriptionGroupConfig);if (null == preConfig) {log.info("auto create a subscription group, {}", subscriptionGroupConfig.toString());}//                更新數據的版本號this.dataVersion.nextVersion();//                持久化=》this.persist();}}return subscriptionGroupConfig;    }

進入這個方法,org.apache.rocketmq.common.ConfigManager#persist,持久化app

public synchronized void persist() {//        持久化的是json存儲,序列化的時候按版本號維護的數據 =》String jsonString = this.encode(true);if (jsonString != null) {//             user.home/store/config/topics.json 文件存儲 =》String fileName = this.configFilePath();try {//                保存文件 =》MixAll.string2File(jsonString, fileName);} catch (IOException e) {log.error("persist file " + fileName + " exception", e);}}    }

進入這個方法,保存文件,org.apache.rocketmq.common.MixAll#string2Filesocket




public static void string2File(final String str, final String fileName) throws IOException {//        要保存的內容存儲在臨時文件中String tmpFile = fileName + ".tmp";string2FileNotSafe(str, tmpFile);//        把原來的數據進行備份String bakFile = fileName + ".bak";String prevContent = file2String(fileName);if (prevContent != null) {string2FileNotSafe(prevContent, bakFile);}//      刪掉源文件File file = new File(fileName);file.delete();//       臨時文件重命名file = new File(tmpFile);file.renameTo(new File(fileName));    }

進入這個方法,發送消息返回後建立重試topic,org.apache.rocketmq.broker.topic.TopicConfigManager#createTopicInSendMessageBackMethodide






public TopicConfig createTopicInSendMessageBackMethod(final String topic,final int clientDefaultTopicQueueNums,final int perm,final int topicSysFlag) {//        獲取topic配置信息TopicConfig topicConfig = this.topicConfigTable.get(topic);if (topicConfig != null)return topicConfig;boolean createNew = false;try {if (this.lockTopicConfigTable.tryLock(LOCK_TIMEOUT_MILLIS, TimeUnit.MILLISECONDS)) {try {topicConfig = this.topicConfigTable.get(topic);if (topicConfig != null)return topicConfig;topicConfig = new TopicConfig(topic);topicConfig.setReadQueueNums(clientDefaultTopicQueueNums);topicConfig.setWriteQueueNums(clientDefaultTopicQueueNums);topicConfig.setPerm(perm);topicConfig.setTopicSysFlag(topicSysFlag);log.info("create new topic {}", topicConfig);//                    存儲重試的tpic配置信息this.topicConfigTable.put(topic, topicConfig);createNew = true;//                    修飾數據的版本號this.dataVersion.nextVersion();//                    持久化=》this.persist();} finally {this.lockTopicConfigTable.unlock();}}} catch (InterruptedException e) {log.error("createTopicInSendMessageBackMethod exception", e);}//        若是topic配置信息是從新建立的,註冊到broker集羣中=》if (createNew) {this.brokerController.registerBrokerAll(false, true,true);}return topicConfig;    }

進入這個方法,若是topic配置信息是從新建立的,註冊到broker集羣中,org.apache.rocketmq.broker.BrokerController#registerBrokerAllui


public synchronized void registerBrokerAll(final boolean checkOrderConfig, boolean oneway, boolean forceRegister) {//        包裝topic配置信息TopicConfigSerializeWrapper topicConfigWrapper = this.getTopicConfigManager().buildTopicConfigSerializeWrapper();//        若是沒有讀寫權限if (!PermName.isWriteable(this.getBrokerConfig().getBrokerPermission())|| !PermName.isReadable(this.getBrokerConfig().getBrokerPermission())) {ConcurrentHashMap<String, TopicConfig> topicConfigTable = new ConcurrentHashMap<String, TopicConfig>();for (TopicConfig topicConfig : topicConfigWrapper.getTopicConfigTable().values()) {TopicConfig tmp =new TopicConfig(topicConfig.getTopicName(), topicConfig.getReadQueueNums(), topicConfig.getWriteQueueNums(),this.brokerConfig.getBrokerPermission());topicConfigTable.put(topicConfig.getTopicName(), tmp);}topicConfigWrapper.setTopicConfigTable(topicConfigTable);}//        判斷broker集羣是否須要註冊=》if (forceRegister || needRegister(this.brokerConfig.getBrokerClusterName(),this.getBrokerAddr(),this.brokerConfig.getBrokerName(),this.brokerConfig.getBrokerId(),this.brokerConfig.getRegisterBrokerTimeoutMills())) {//            向全部的broker註冊topic配置信息 =》doRegisterBrokerAll(checkOrderConfig, oneway, topicConfigWrapper);}    }

進入這個方法,判斷broker集羣是否須要註冊,org.apache.rocketmq.broker.BrokerController#needRegister

private boolean needRegister(final String clusterName,final String brokerAddr,final String brokerName,final long brokerId,final int timeoutMills) {TopicConfigSerializeWrapper topicConfigWrapper = this.getTopicConfigManager().buildTopicConfigSerializeWrapper();//        判斷是否須要註冊 =》List<Boolean> changeList = brokerOuterAPI.needRegister(clusterName, brokerAddr, brokerName, brokerId, topicConfigWrapper, timeoutMills);boolean needRegister = false;for (Boolean changed : changeList) {//            只要一個namesrv須要更新就所有須要更新if (changed) {needRegister = true;break;}}return needRegister;    }

進入這個方法,判斷是否須要註冊,org.apache.rocketmq.broker.out.BrokerOuterAPI#needRegister

public List<Boolean> needRegister(final String clusterName,final String brokerAddr,final String brokerName,final long brokerId,final TopicConfigSerializeWrapper topicConfigWrapper,final int timeoutMills) {//        多線程更新是否須要變化狀態的集合final List<Boolean> changedList = new CopyOnWriteArrayList<>();//        獲取namesrv地址List<String> nameServerAddressList = this.remotingClient.getNameServerAddressList();if (nameServerAddressList != null && nameServerAddressList.size() > 0) {final CountDownLatch countDownLatch = new CountDownLatch(nameServerAddressList.size());//            多線程分發執行,一個namesrv一個線程for (final String namesrvAddr : nameServerAddressList) {brokerOuterExecutor.execute(new Runnable() {@Overridepublic void run() {try {QueryDataVersionRequestHeader requestHeader = new QueryDataVersionRequestHeader();requestHeader.setBrokerAddr(brokerAddr);requestHeader.setBrokerId(brokerId);requestHeader.setBrokerName(brokerName);requestHeader.setClusterName(clusterName);RemotingCommand request = RemotingCommand.createRequestCommand(RequestCode.QUERY_DATA_VERSION, requestHeader);request.setBody(topicConfigWrapper.getDataVersion().encode());//                            broker向namesrv同步查詢數據版本 =》RemotingCommand response = remotingClient.invokeSync(namesrvAddr, request, timeoutMills);DataVersion nameServerDataVersion = null;Boolean changed = false;switch (response.getCode()) {case ResponseCode.SUCCESS: {QueryDataVersionResponseHeader queryDataVersionResponseHeader =(QueryDataVersionResponseHeader) response.decodeCommandCustomHeader(QueryDataVersionResponseHeader.class);//                                   根據版本號判斷數據有沒有發生變化changed = queryDataVersionResponseHeader.getChanged();byte[] body = response.getBody();if (body != null) {nameServerDataVersion = DataVersion.decode(body, DataVersion.class);if (!topicConfigWrapper.getDataVersion().equals(nameServerDataVersion)) {//                                            數據版本不一致須要更新changed = true;}}if (changed == null || changed) {changedList.add(Boolean.TRUE);}}default:break;}log.warn("Query data version from name server {} OK,changed {}, broker {},name server {}", namesrvAddr, changed, topicConfigWrapper.getDataVersion(), nameServerDataVersion == null ? "" : nameServerDataVersion);} catch (Exception e) {changedList.add(Boolean.TRUE);log.error("Query data version from name server {}  Exception, {}", namesrvAddr, e);} finally {//                            異常狀況下解除線程阻塞countDownLatch.countDown();}}});}try {//                等到超時解除線程阻塞countDownLatch.await(timeoutMills, TimeUnit.MILLISECONDS);} catch (InterruptedException e) {log.error("query dataversion from nameserver countDownLatch await Exception", e);}}return changedList;    }

網上返回到這個方法,向全部的broker註冊topic配置信息,org.apache.rocketmq.broker.BrokerController#doRegisterBrokerAll



private void doRegisterBrokerAll(boolean checkOrderConfig, boolean oneway,TopicConfigSerializeWrapper topicConfigWrapper) {//        向全部的broker進行註冊=》List<RegisterBrokerResult> registerBrokerResultList = this.brokerOuterAPI.registerBrokerAll(this.brokerConfig.getBrokerClusterName(),this.getBrokerAddr(),this.brokerConfig.getBrokerName(),this.brokerConfig.getBrokerId(),this.getHAServerAddr(),topicConfigWrapper,//            過濾的服務this.filterServerManager.buildNewFilterServerList(),//            單途oneway,this.brokerConfig.getRegisterBrokerTimeoutMills(),this.brokerConfig.isCompressedRegister());if (registerBrokerResultList.size() > 0) {RegisterBrokerResult registerBrokerResult = registerBrokerResultList.get(0);if (registerBrokerResult != null) {if (this.updateMasterHAServerAddrPeriodically && registerBrokerResult.getHaServerAddr() != null) {//                    更新master地址本地緩存this.messageStore.updateHaMasterAddress(registerBrokerResult.getHaServerAddr());}//                同步設置slave的master地址this.slaveSynchronize.setMasterAddr(registerBrokerResult.getMasterAddr());if (checkOrderConfig) {//                   更新訂閱的topic配置 =》this.getTopicConfigManager().updateOrderTopicConfig(registerBrokerResult.getKvTable());}}}    }

進入這個方法,向全部的broker進行註冊,org.apache.rocketmq.broker.out.BrokerOuterAPI#registerBrokerAll





final List<RegisterBrokerResult> registerBrokerResultList = Lists.newArrayList();//        獲取namesrv地址集合List<String> nameServerAddressList = this.remotingClient.getNameServerAddressList();if (nameServerAddressList != null && nameServerAddressList.size() > 0) {final RegisterBrokerRequestHeader requestHeader = new RegisterBrokerRequestHeader();requestHeader.setBrokerAddr(brokerAddr);requestHeader.setBrokerId(brokerId);requestHeader.setBrokerName(brokerName);requestHeader.setClusterName(clusterName);requestHeader.setHaServerAddr(haServerAddr);requestHeader.setCompressed(compressed);RegisterBrokerBody requestBody = new RegisterBrokerBody();requestBody.setTopicConfigSerializeWrapper(topicConfigWrapper);requestBody.setFilterServerList(filterServerList);//            對請求體進行編碼=》final byte[] body = requestBody.encode(compressed);//            壓縮處理final int bodyCrc32 = UtilAll.crc32(body);requestHeader.setBodyCrc32(bodyCrc32);final CountDownLatch countDownLatch = new CountDownLatch(nameServerAddressList.size());//            併發向namesrv集羣註冊brokerfor (final String namesrvAddr : nameServerAddressList) {brokerOuterExecutor.execute(new Runnable() {@Overridepublic void run() {try {//                            註冊broker服務任務分發=》RegisterBrokerResult result = registerBroker(namesrvAddr,oneway, timeoutMills,requestHeader,body);if (result != null) {registerBrokerResultList.add(result);}log.info("register broker to name server {} OK", namesrvAddr);} catch (Exception e) {log.warn("registerBroker Exception, {}", namesrvAddr, e);} finally {countDownLatch.countDown();}}});}try {countDownLatch.await(timeoutMills, TimeUnit.MILLISECONDS);} catch (InterruptedException e) {}}return registerBrokerResultList;    }

進入這個方法, 註冊broker服務任務分發,org.apache.rocketmq.broker.out.BrokerOuterAPI#registerBroker



private RegisterBrokerResult registerBroker(final String namesrvAddr,final boolean oneway,final int timeoutMills,final RegisterBrokerRequestHeader requestHeader,final byte[] body) throws RemotingCommandException, MQBrokerException, RemotingConnectException, RemotingSendRequestException, RemotingTimeoutException,InterruptedException {RemotingCommand request = RemotingCommand.createRequestCommand(RequestCode.REGISTER_BROKER, requestHeader);request.setBody(body);if (oneway) {try {//                單線請求,不關心結果 =》this.remotingClient.invokeOneway(namesrvAddr, request, timeoutMills);} catch (RemotingTooMuchRequestException e) {// Ignore}return null;}//        broker同步向namesrv註冊broker=》RemotingCommand response = this.remotingClient.invokeSync(namesrvAddr, request, timeoutMills);assert response != null;switch (response.getCode()) {case ResponseCode.SUCCESS: {RegisterBrokerResponseHeader responseHeader =(RegisterBrokerResponseHeader) response.decodeCommandCustomHeader(RegisterBrokerResponseHeader.class);RegisterBrokerResult result = new RegisterBrokerResult();result.setMasterAddr(responseHeader.getMasterAddr());result.setHaServerAddr(responseHeader.getHaServerAddr());if (response.getBody() != null) {result.setKvTable(KVTable.decode(response.getBody(), KVTable.class));}return result;}default:break;}throw new MQBrokerException(response.getCode(), response.getRemark());    }

進入這個方法,單線請求,不關心結果,org.apache.rocketmq.remoting.netty.NettyRemotingClient#invokeOneway

@Overridepublic void invokeOneway(String addr, RemotingCommand request, long timeoutMillis) throws InterruptedException,RemotingConnectException, RemotingTooMuchRequestException, RemotingTimeoutException, RemotingSendRequestException {//        獲取channel=》final Channel channel = this.getAndCreateChannel(addr);if (channel != null && channel.isActive()) {try {if (this.rpcHook != null) {//                    執行請求執行前的鉤子方法this.rpcHook.doBeforeRequest(addr, request);}//               執行單線請求 =》this.invokeOnewayImpl(channel, request, timeoutMillis);} catch (RemotingSendRequestException e) {log.warn("invokeOneway: send request exception, so close the channel[{}]", addr);//                異常關閉channel=》this.closeChannel(addr, channel);throw e;}} else {this.closeChannel(addr, channel);throw new RemotingConnectException(addr);}    }

進入這個方法, 執行單線請求,org.apache.rocketmq.remoting.netty.NettyRemotingAbstract#invokeOnewayImpl

public void invokeOnewayImpl(final Channel channel, final RemotingCommand request, final long timeoutMillis)throws InterruptedException, RemotingTooMuchRequestException, RemotingTimeoutException, RemotingSendRequestException {request.markOnewayRPC();//        獲取信號量的信號,這裏用semaphore作了限流boolean acquired = this.semaphoreOneway.tryAcquire(timeoutMillis, TimeUnit.MILLISECONDS);if (acquired) {final SemaphoreReleaseOnlyOnce once = new SemaphoreReleaseOnlyOnce(this.semaphoreOneway);try {channel.writeAndFlush(request).addListener(new ChannelFutureListener() {@Overridepublic void operationComplete(ChannelFuture f) throws Exception {once.release();if (!f.isSuccess()) {log.warn("send a request command to channel <" + channel.remoteAddress() + "> failed.");}}});} catch (Exception e) {once.release();log.warn("write send a request command to channel <" + channel.remoteAddress() + "> failed.");throw new RemotingSendRequestException(RemotingHelper.parseChannelRemoteAddr(channel), e);}} else {if (timeoutMillis <= 0) {throw new RemotingTooMuchRequestException("invokeOnewayImpl invoke too fast");} else {String info = String.format("invokeOnewayImpl tryAcquire semaphore timeout, %dms, waiting thread nums: %d semaphoreAsyncValue: %d",timeoutMillis,this.semaphoreOneway.getQueueLength(),this.semaphoreOneway.availablePermits());log.warn(info);throw new RemotingTimeoutException(info);}}    }

網上返回到這個方法,broker同步向namesrv註冊broker,org.apache.rocketmq.remoting.netty.NettyRemotingClient#invokeSync

@Overridepublic RemotingCommand invokeSync(String addr, final RemotingCommand request, long timeoutMillis)throws InterruptedException, RemotingConnectException, RemotingSendRequestException, RemotingTimeoutException {long beginStartTime = System.currentTimeMillis();//        獲取並建立channel =》final Channel channel = this.getAndCreateChannel(addr);if (channel != null && channel.isActive()) {try {if (this.rpcHook != null) {//                    執行請求前鉤子方法this.rpcHook.doBeforeRequest(addr, request);}long costTime = System.currentTimeMillis() - beginStartTime;if (timeoutMillis < costTime) {throw new RemotingTimeoutException("invokeSync call timeout");}//                執行同步請求 =》RemotingCommand response = this.invokeSyncImpl(channel, request, timeoutMillis - costTime);if (this.rpcHook != null) {//                    執行響應後鉤子方法 =》this.rpcHook.doAfterResponse(RemotingHelper.parseChannelRemoteAddr(channel), request, response);}return response;} catch (RemotingSendRequestException e) {log.warn("invokeSync: send request exception, so close the channel[{}]", addr);//                出現異常,channel關閉 =》this.closeChannel(addr, channel);throw e;} catch (RemotingTimeoutException e) {if (nettyClientConfig.isClientCloseSocketIfTimeout()) {this.closeChannel(addr, channel);log.warn("invokeSync: close socket because of timeout, {}ms, {}", timeoutMillis, addr);}log.warn("invokeSync: wait response timeout exception, the channel[{}]", addr);throw e;}} else {this.closeChannel(addr, channel);throw new RemotingConnectException(addr);}    }

接下篇。

 

說在最後

本次解析僅表明我的觀點,僅供參考。

 

加入技術微信羣

釘釘技術羣

相關文章
相關標籤/搜索