說在前面apache
client管理 心跳檢測json
源碼解析緩存
進入這個方法org.apache.rocketmq.broker.processor.ClientManageProcessor#processRequest client管理請求微信
@Overridepublic RemotingCommand processRequest(ChannelHandlerContext ctx, RemotingCommand request)throws RemotingCommandException {switch (request.getCode()) {// 心跳監測=》case RequestCode.HEART_BEAT:return this.heartBeat(ctx, request);// 取消註冊client=》case RequestCode.UNREGISTER_CLIENT:return this.unregisterClient(ctx, request);// 檢查client的配置=》case RequestCode.CHECK_CLIENT_CONFIG:return this.checkClientConfig(ctx, request);default:break;}return null; }
進入這個方法,org.apache.rocketmq.broker.processor.ClientManageProcessor#heartBeat 心跳檢測多線程
public RemotingCommand heartBeat(ChannelHandlerContext ctx, RemotingCommand request) {RemotingCommand response = RemotingCommand.createResponseCommand(null);HeartbeatData heartbeatData = HeartbeatData.decode(request.getBody(), HeartbeatData.class);ClientChannelInfo clientChannelInfo = new ClientChannelInfo(ctx.channel(),heartbeatData.getClientID(),request.getLanguage(),request.getVersion());for (ConsumerData data : heartbeatData.getConsumerDataSet()) {// 獲取訂閱組的配置信息=》SubscriptionGroupConfig subscriptionGroupConfig =this.brokerController.getSubscriptionGroupManager().findSubscriptionGroupConfig(data.getGroupName());boolean isNotifyConsumerIdsChangedEnable = true;if (null != subscriptionGroupConfig) {isNotifyConsumerIdsChangedEnable = subscriptionGroupConfig.isNotifyConsumerIdsChangedEnable();int topicSysFlag = 0;if (data.isUnitMode()) {topicSysFlag = TopicSysFlag.buildSysFlag(false, true);}// 獲取重試的topic名稱 = %RETRY% +消費組名稱String newTopic = MixAll.getRetryTopic(data.getGroupName());// 發送消息返回後建立topic=》this.brokerController.getTopicConfigManager().createTopicInSendMessageBackMethod(newTopic,subscriptionGroupConfig.getRetryQueueNums(),PermName.PERM_WRITE | PermName.PERM_READ, topicSysFlag);}// 註冊消費者=》boolean changed = this.brokerController.getConsumerManager().registerConsumer(data.getGroupName(),clientChannelInfo,data.getConsumeType(),data.getMessageModel(),data.getConsumeFromWhere(),data.getSubscriptionDataSet(),isNotifyConsumerIdsChangedEnable);// 若是消費者已經存在是更新if (changed) {log.info("registerConsumer info changed {} {}",data.toString(),RemotingHelper.parseChannelRemoteAddr(ctx.channel()));} }
進入這個方法,獲取訂閱組的配置信息,org.apache.rocketmq.broker.subscription.SubscriptionGroupManager#findSubscriptionGroupConfig併發
public SubscriptionGroupConfig findSubscriptionGroupConfig(final String group) {// 從緩存中獲取組的訂閱信息SubscriptionGroupConfig subscriptionGroupConfig = this.subscriptionGroupTable.get(group);if (null == subscriptionGroupConfig) {// 自動建立消費組或者是系統自用的消費組if (brokerController.getBrokerConfig().isAutoCreateSubscriptionGroup() || MixAll.isSysConsumerGroup(group)) {subscriptionGroupConfig = new SubscriptionGroupConfig();subscriptionGroupConfig.setGroupName(group);SubscriptionGroupConfig preConfig = this.subscriptionGroupTable.putIfAbsent(group, subscriptionGroupConfig);if (null == preConfig) {log.info("auto create a subscription group, {}", subscriptionGroupConfig.toString());}// 更新數據的版本號this.dataVersion.nextVersion();// 持久化=》this.persist();}}return subscriptionGroupConfig; }
進入這個方法,org.apache.rocketmq.common.ConfigManager#persist,持久化app
public synchronized void persist() {// 持久化的是json存儲,序列化的時候按版本號維護的數據 =》String jsonString = this.encode(true);if (jsonString != null) {// user.home/store/config/topics.json 文件存儲 =》String fileName = this.configFilePath();try {// 保存文件 =》MixAll.string2File(jsonString, fileName);} catch (IOException e) {log.error("persist file " + fileName + " exception", e);}} }
進入這個方法,保存文件,org.apache.rocketmq.common.MixAll#string2Filesocket
public static void string2File(final String str, final String fileName) throws IOException {// 要保存的內容存儲在臨時文件中String tmpFile = fileName + ".tmp";string2FileNotSafe(str, tmpFile);// 把原來的數據進行備份String bakFile = fileName + ".bak";String prevContent = file2String(fileName);if (prevContent != null) {string2FileNotSafe(prevContent, bakFile);}// 刪掉源文件File file = new File(fileName);file.delete();// 臨時文件重命名file = new File(tmpFile);file.renameTo(new File(fileName)); }
進入這個方法,發送消息返回後建立重試topic,org.apache.rocketmq.broker.topic.TopicConfigManager#createTopicInSendMessageBackMethodide
public TopicConfig createTopicInSendMessageBackMethod(final String topic,final int clientDefaultTopicQueueNums,final int perm,final int topicSysFlag) {// 獲取topic配置信息TopicConfig topicConfig = this.topicConfigTable.get(topic);if (topicConfig != null)return topicConfig;boolean createNew = false;try {if (this.lockTopicConfigTable.tryLock(LOCK_TIMEOUT_MILLIS, TimeUnit.MILLISECONDS)) {try {topicConfig = this.topicConfigTable.get(topic);if (topicConfig != null)return topicConfig;topicConfig = new TopicConfig(topic);topicConfig.setReadQueueNums(clientDefaultTopicQueueNums);topicConfig.setWriteQueueNums(clientDefaultTopicQueueNums);topicConfig.setPerm(perm);topicConfig.setTopicSysFlag(topicSysFlag);log.info("create new topic {}", topicConfig);// 存儲重試的tpic配置信息this.topicConfigTable.put(topic, topicConfig);createNew = true;// 修飾數據的版本號this.dataVersion.nextVersion();// 持久化=》this.persist();} finally {this.lockTopicConfigTable.unlock();}}} catch (InterruptedException e) {log.error("createTopicInSendMessageBackMethod exception", e);}// 若是topic配置信息是從新建立的,註冊到broker集羣中=》if (createNew) {this.brokerController.registerBrokerAll(false, true,true);}return topicConfig; }
進入這個方法,若是topic配置信息是從新建立的,註冊到broker集羣中,org.apache.rocketmq.broker.BrokerController#registerBrokerAllui
public synchronized void registerBrokerAll(final boolean checkOrderConfig, boolean oneway, boolean forceRegister) {// 包裝topic配置信息TopicConfigSerializeWrapper topicConfigWrapper = this.getTopicConfigManager().buildTopicConfigSerializeWrapper();// 若是沒有讀寫權限if (!PermName.isWriteable(this.getBrokerConfig().getBrokerPermission())|| !PermName.isReadable(this.getBrokerConfig().getBrokerPermission())) {ConcurrentHashMap<String, TopicConfig> topicConfigTable = new ConcurrentHashMap<String, TopicConfig>();for (TopicConfig topicConfig : topicConfigWrapper.getTopicConfigTable().values()) {TopicConfig tmp =new TopicConfig(topicConfig.getTopicName(), topicConfig.getReadQueueNums(), topicConfig.getWriteQueueNums(),this.brokerConfig.getBrokerPermission());topicConfigTable.put(topicConfig.getTopicName(), tmp);}topicConfigWrapper.setTopicConfigTable(topicConfigTable);}// 判斷broker集羣是否須要註冊=》if (forceRegister || needRegister(this.brokerConfig.getBrokerClusterName(),this.getBrokerAddr(),this.brokerConfig.getBrokerName(),this.brokerConfig.getBrokerId(),this.brokerConfig.getRegisterBrokerTimeoutMills())) {// 向全部的broker註冊topic配置信息 =》doRegisterBrokerAll(checkOrderConfig, oneway, topicConfigWrapper);} }
進入這個方法,判斷broker集羣是否須要註冊,org.apache.rocketmq.broker.BrokerController#needRegister
private boolean needRegister(final String clusterName,final String brokerAddr,final String brokerName,final long brokerId,final int timeoutMills) {TopicConfigSerializeWrapper topicConfigWrapper = this.getTopicConfigManager().buildTopicConfigSerializeWrapper();// 判斷是否須要註冊 =》List<Boolean> changeList = brokerOuterAPI.needRegister(clusterName, brokerAddr, brokerName, brokerId, topicConfigWrapper, timeoutMills);boolean needRegister = false;for (Boolean changed : changeList) {// 只要一個namesrv須要更新就所有須要更新if (changed) {needRegister = true;break;}}return needRegister; }
進入這個方法,判斷是否須要註冊,org.apache.rocketmq.broker.out.BrokerOuterAPI#needRegister
public List<Boolean> needRegister(final String clusterName,final String brokerAddr,final String brokerName,final long brokerId,final TopicConfigSerializeWrapper topicConfigWrapper,final int timeoutMills) {// 多線程更新是否須要變化狀態的集合final List<Boolean> changedList = new CopyOnWriteArrayList<>();// 獲取namesrv地址List<String> nameServerAddressList = this.remotingClient.getNameServerAddressList();if (nameServerAddressList != null && nameServerAddressList.size() > 0) {final CountDownLatch countDownLatch = new CountDownLatch(nameServerAddressList.size());// 多線程分發執行,一個namesrv一個線程for (final String namesrvAddr : nameServerAddressList) {brokerOuterExecutor.execute(new Runnable() {@Overridepublic void run() {try {QueryDataVersionRequestHeader requestHeader = new QueryDataVersionRequestHeader();requestHeader.setBrokerAddr(brokerAddr);requestHeader.setBrokerId(brokerId);requestHeader.setBrokerName(brokerName);requestHeader.setClusterName(clusterName);RemotingCommand request = RemotingCommand.createRequestCommand(RequestCode.QUERY_DATA_VERSION, requestHeader);request.setBody(topicConfigWrapper.getDataVersion().encode());// broker向namesrv同步查詢數據版本 =》RemotingCommand response = remotingClient.invokeSync(namesrvAddr, request, timeoutMills);DataVersion nameServerDataVersion = null;Boolean changed = false;switch (response.getCode()) {case ResponseCode.SUCCESS: {QueryDataVersionResponseHeader queryDataVersionResponseHeader =(QueryDataVersionResponseHeader) response.decodeCommandCustomHeader(QueryDataVersionResponseHeader.class);// 根據版本號判斷數據有沒有發生變化changed = queryDataVersionResponseHeader.getChanged();byte[] body = response.getBody();if (body != null) {nameServerDataVersion = DataVersion.decode(body, DataVersion.class);if (!topicConfigWrapper.getDataVersion().equals(nameServerDataVersion)) {// 數據版本不一致須要更新changed = true;}}if (changed == null || changed) {changedList.add(Boolean.TRUE);}}default:break;}log.warn("Query data version from name server {} OK,changed {}, broker {},name server {}", namesrvAddr, changed, topicConfigWrapper.getDataVersion(), nameServerDataVersion == null ? "" : nameServerDataVersion);} catch (Exception e) {changedList.add(Boolean.TRUE);log.error("Query data version from name server {} Exception, {}", namesrvAddr, e);} finally {// 異常狀況下解除線程阻塞countDownLatch.countDown();}}});}try {// 等到超時解除線程阻塞countDownLatch.await(timeoutMills, TimeUnit.MILLISECONDS);} catch (InterruptedException e) {log.error("query dataversion from nameserver countDownLatch await Exception", e);}}return changedList; }
網上返回到這個方法,向全部的broker註冊topic配置信息,org.apache.rocketmq.broker.BrokerController#doRegisterBrokerAll
private void doRegisterBrokerAll(boolean checkOrderConfig, boolean oneway,TopicConfigSerializeWrapper topicConfigWrapper) {// 向全部的broker進行註冊=》List<RegisterBrokerResult> registerBrokerResultList = this.brokerOuterAPI.registerBrokerAll(this.brokerConfig.getBrokerClusterName(),this.getBrokerAddr(),this.brokerConfig.getBrokerName(),this.brokerConfig.getBrokerId(),this.getHAServerAddr(),topicConfigWrapper,// 過濾的服務this.filterServerManager.buildNewFilterServerList(),// 單途oneway,this.brokerConfig.getRegisterBrokerTimeoutMills(),this.brokerConfig.isCompressedRegister());if (registerBrokerResultList.size() > 0) {RegisterBrokerResult registerBrokerResult = registerBrokerResultList.get(0);if (registerBrokerResult != null) {if (this.updateMasterHAServerAddrPeriodically && registerBrokerResult.getHaServerAddr() != null) {// 更新master地址本地緩存this.messageStore.updateHaMasterAddress(registerBrokerResult.getHaServerAddr());}// 同步設置slave的master地址this.slaveSynchronize.setMasterAddr(registerBrokerResult.getMasterAddr());if (checkOrderConfig) {// 更新訂閱的topic配置 =》this.getTopicConfigManager().updateOrderTopicConfig(registerBrokerResult.getKvTable());}}} }
進入這個方法,向全部的broker進行註冊,org.apache.rocketmq.broker.out.BrokerOuterAPI#registerBrokerAll
final List<RegisterBrokerResult> registerBrokerResultList = Lists.newArrayList();// 獲取namesrv地址集合List<String> nameServerAddressList = this.remotingClient.getNameServerAddressList();if (nameServerAddressList != null && nameServerAddressList.size() > 0) {final RegisterBrokerRequestHeader requestHeader = new RegisterBrokerRequestHeader();requestHeader.setBrokerAddr(brokerAddr);requestHeader.setBrokerId(brokerId);requestHeader.setBrokerName(brokerName);requestHeader.setClusterName(clusterName);requestHeader.setHaServerAddr(haServerAddr);requestHeader.setCompressed(compressed);RegisterBrokerBody requestBody = new RegisterBrokerBody();requestBody.setTopicConfigSerializeWrapper(topicConfigWrapper);requestBody.setFilterServerList(filterServerList);// 對請求體進行編碼=》final byte[] body = requestBody.encode(compressed);// 壓縮處理final int bodyCrc32 = UtilAll.crc32(body);requestHeader.setBodyCrc32(bodyCrc32);final CountDownLatch countDownLatch = new CountDownLatch(nameServerAddressList.size());// 併發向namesrv集羣註冊brokerfor (final String namesrvAddr : nameServerAddressList) {brokerOuterExecutor.execute(new Runnable() {@Overridepublic void run() {try {// 註冊broker服務任務分發=》RegisterBrokerResult result = registerBroker(namesrvAddr,oneway, timeoutMills,requestHeader,body);if (result != null) {registerBrokerResultList.add(result);}log.info("register broker to name server {} OK", namesrvAddr);} catch (Exception e) {log.warn("registerBroker Exception, {}", namesrvAddr, e);} finally {countDownLatch.countDown();}}});}try {countDownLatch.await(timeoutMills, TimeUnit.MILLISECONDS);} catch (InterruptedException e) {}}return registerBrokerResultList; }
進入這個方法, 註冊broker服務任務分發,org.apache.rocketmq.broker.out.BrokerOuterAPI#registerBroker
private RegisterBrokerResult registerBroker(final String namesrvAddr,final boolean oneway,final int timeoutMills,final RegisterBrokerRequestHeader requestHeader,final byte[] body) throws RemotingCommandException, MQBrokerException, RemotingConnectException, RemotingSendRequestException, RemotingTimeoutException,InterruptedException {RemotingCommand request = RemotingCommand.createRequestCommand(RequestCode.REGISTER_BROKER, requestHeader);request.setBody(body);if (oneway) {try {// 單線請求,不關心結果 =》this.remotingClient.invokeOneway(namesrvAddr, request, timeoutMills);} catch (RemotingTooMuchRequestException e) {// Ignore}return null;}// broker同步向namesrv註冊broker=》RemotingCommand response = this.remotingClient.invokeSync(namesrvAddr, request, timeoutMills);assert response != null;switch (response.getCode()) {case ResponseCode.SUCCESS: {RegisterBrokerResponseHeader responseHeader =(RegisterBrokerResponseHeader) response.decodeCommandCustomHeader(RegisterBrokerResponseHeader.class);RegisterBrokerResult result = new RegisterBrokerResult();result.setMasterAddr(responseHeader.getMasterAddr());result.setHaServerAddr(responseHeader.getHaServerAddr());if (response.getBody() != null) {result.setKvTable(KVTable.decode(response.getBody(), KVTable.class));}return result;}default:break;}throw new MQBrokerException(response.getCode(), response.getRemark()); }
進入這個方法,單線請求,不關心結果,org.apache.rocketmq.remoting.netty.NettyRemotingClient#invokeOneway
@Overridepublic void invokeOneway(String addr, RemotingCommand request, long timeoutMillis) throws InterruptedException,RemotingConnectException, RemotingTooMuchRequestException, RemotingTimeoutException, RemotingSendRequestException {// 獲取channel=》final Channel channel = this.getAndCreateChannel(addr);if (channel != null && channel.isActive()) {try {if (this.rpcHook != null) {// 執行請求執行前的鉤子方法this.rpcHook.doBeforeRequest(addr, request);}// 執行單線請求 =》this.invokeOnewayImpl(channel, request, timeoutMillis);} catch (RemotingSendRequestException e) {log.warn("invokeOneway: send request exception, so close the channel[{}]", addr);// 異常關閉channel=》this.closeChannel(addr, channel);throw e;}} else {this.closeChannel(addr, channel);throw new RemotingConnectException(addr);} }
進入這個方法, 執行單線請求,org.apache.rocketmq.remoting.netty.NettyRemotingAbstract#invokeOnewayImpl
public void invokeOnewayImpl(final Channel channel, final RemotingCommand request, final long timeoutMillis)throws InterruptedException, RemotingTooMuchRequestException, RemotingTimeoutException, RemotingSendRequestException {request.markOnewayRPC();// 獲取信號量的信號,這裏用semaphore作了限流boolean acquired = this.semaphoreOneway.tryAcquire(timeoutMillis, TimeUnit.MILLISECONDS);if (acquired) {final SemaphoreReleaseOnlyOnce once = new SemaphoreReleaseOnlyOnce(this.semaphoreOneway);try {channel.writeAndFlush(request).addListener(new ChannelFutureListener() {@Overridepublic void operationComplete(ChannelFuture f) throws Exception {once.release();if (!f.isSuccess()) {log.warn("send a request command to channel <" + channel.remoteAddress() + "> failed.");}}});} catch (Exception e) {once.release();log.warn("write send a request command to channel <" + channel.remoteAddress() + "> failed.");throw new RemotingSendRequestException(RemotingHelper.parseChannelRemoteAddr(channel), e);}} else {if (timeoutMillis <= 0) {throw new RemotingTooMuchRequestException("invokeOnewayImpl invoke too fast");} else {String info = String.format("invokeOnewayImpl tryAcquire semaphore timeout, %dms, waiting thread nums: %d semaphoreAsyncValue: %d",timeoutMillis,this.semaphoreOneway.getQueueLength(),this.semaphoreOneway.availablePermits());log.warn(info);throw new RemotingTimeoutException(info);}} }
網上返回到這個方法,broker同步向namesrv註冊broker,org.apache.rocketmq.remoting.netty.NettyRemotingClient#invokeSync
@Overridepublic RemotingCommand invokeSync(String addr, final RemotingCommand request, long timeoutMillis)throws InterruptedException, RemotingConnectException, RemotingSendRequestException, RemotingTimeoutException {long beginStartTime = System.currentTimeMillis();// 獲取並建立channel =》final Channel channel = this.getAndCreateChannel(addr);if (channel != null && channel.isActive()) {try {if (this.rpcHook != null) {// 執行請求前鉤子方法this.rpcHook.doBeforeRequest(addr, request);}long costTime = System.currentTimeMillis() - beginStartTime;if (timeoutMillis < costTime) {throw new RemotingTimeoutException("invokeSync call timeout");}// 執行同步請求 =》RemotingCommand response = this.invokeSyncImpl(channel, request, timeoutMillis - costTime);if (this.rpcHook != null) {// 執行響應後鉤子方法 =》this.rpcHook.doAfterResponse(RemotingHelper.parseChannelRemoteAddr(channel), request, response);}return response;} catch (RemotingSendRequestException e) {log.warn("invokeSync: send request exception, so close the channel[{}]", addr);// 出現異常,channel關閉 =》this.closeChannel(addr, channel);throw e;} catch (RemotingTimeoutException e) {if (nettyClientConfig.isClientCloseSocketIfTimeout()) {this.closeChannel(addr, channel);log.warn("invokeSync: close socket because of timeout, {}ms, {}", timeoutMillis, addr);}log.warn("invokeSync: wait response timeout exception, the channel[{}]", addr);throw e;}} else {this.closeChannel(addr, channel);throw new RemotingConnectException(addr);} }
接下篇。
說在最後
本次解析僅表明我的觀點,僅供參考。
加入技術微信羣
釘釘技術羣