說在前面apache
今天開始解析管理請求處理之UPDATE_BROKER_CONFIG更新broker配置信息json
源碼解析緩存
進入這個方法org.apache.rocketmq.broker.processor.AdminBrokerProcessor#updateBrokerConfig 更新broker配置信息微信
private synchronized RemotingCommand updateBrokerConfig(ChannelHandlerContext ctx, RemotingCommand request) { final RemotingCommand response = RemotingCommand.createResponseCommand(null); log.info("updateBrokerConfig called by {}", RemotingHelper.parseChannelRemoteAddr(ctx.channel())); byte[] body = request.getBody(); if (body != null) { try { String bodyStr = new String(body, MixAll.DEFAULT_CHARSET); Properties properties = MixAll.string2Properties(bodyStr); if (properties != null) { log.info("updateBrokerConfig, new config: [{}] client: {} ", properties, ctx.channel().remoteAddress()); // 更新持久存儲 =》 this.brokerController.getConfiguration().update(properties); // 若是有權限設置強制更新 if (properties.containsKey("brokerPermission")) { // 更新數據的版本號 this.brokerController.getTopicConfigManager().getDataVersion().nextVersion(); // 註冊全部的broker =》 this.brokerController.registerBrokerAll(false, false, true); } } else { log.error("string2Properties error"); response.setCode(ResponseCode.SYSTEM_ERROR); response.setRemark("string2Properties error"); return response; } } catch (UnsupportedEncodingException e) { log.error("", e); response.setCode(ResponseCode.SYSTEM_ERROR); response.setRemark("UnsupportedEncodingException " + e); return response; } } response.setCode(ResponseCode.SUCCESS); response.setRemark(null); return response; }
進入這個方法org.apache.rocketmq.common.Configuration#update 跟更新配置多線程
public void update(Properties properties) { try { // 讀寫鎖同步實現 readWriteLock.writeLock().lockInterruptibly(); try { // the property must be exist when update mergeIfExist(properties, this.allConfigs); for (Object configObject : configObjectList) { // not allConfigs to update... MixAll.properties2Object(properties, configObject); } // 更新數據的版本號 this.dataVersion.nextVersion(); } finally { readWriteLock.writeLock().unlock(); } } catch (InterruptedException e) { log.error("update lock error, {}", properties); return; } // 持久化=》 persist(); }
進入這個方法org.apache.rocketmq.common.Configuration#persist broker配置持久化,從這個類org.apache.rocketmq.broker.BrokerPathConfigHelper的brokerConfigPath變量值是能夠知道broker的配置文件路徑是併發
private static String brokerConfigPath = System.getProperty("user.home") + File.separator + "store" + File.separator + "config" + File.separator + "broker.properties";
進入這個方法org.apache.rocketmq.common.MixAll#string2File文件存儲app
public static void string2File(final String str, final String fileName) throws IOException { // 要保存的內容存儲在臨時文件中 String tmpFile = fileName + ".tmp"; string2FileNotSafe(str, tmpFile); // 把原來的數據進行備份 String bakFile = fileName + ".bak"; String prevContent = file2String(fileName); if (prevContent != null) { string2FileNotSafe(prevContent, bakFile); } // 刪掉源文件 File file = new File(fileName); file.delete(); // 臨時文件重命名 file = new File(tmpFile); file.renameTo(new File(fileName)); }
往上返回到這個方法org.apache.rocketmq.broker.BrokerController#registerBrokerAll 若是broker配置信息中有權限設置更新全部的broker信息ide
public synchronized void registerBrokerAll(final boolean checkOrderConfig, boolean oneway, boolean forceRegister) { TopicConfigSerializeWrapper topicConfigWrapper = this.getTopicConfigManager().buildTopicConfigSerializeWrapper(); // 若是沒有讀寫權限 if (!PermName.isWriteable(this.getBrokerConfig().getBrokerPermission()) || !PermName.isReadable(this.getBrokerConfig().getBrokerPermission())) { ConcurrentHashMap<String, TopicConfig> topicConfigTable = new ConcurrentHashMap<String, TopicConfig>(); for (TopicConfig topicConfig : topicConfigWrapper.getTopicConfigTable().values()) { TopicConfig tmp = new TopicConfig(topicConfig.getTopicName(), topicConfig.getReadQueueNums(), topicConfig.getWriteQueueNums(), this.brokerConfig.getBrokerPermission()); topicConfigTable.put(topicConfig.getTopicName(), tmp); } topicConfigWrapper.setTopicConfigTable(topicConfigTable); } // 判斷broker集羣是否須要註冊=》 if (forceRegister || needRegister(this.brokerConfig.getBrokerClusterName(), this.getBrokerAddr(), this.brokerConfig.getBrokerName(), this.brokerConfig.getBrokerId(), this.brokerConfig.getRegisterBrokerTimeoutMills())) { // 向全部的broker註冊topic配置信息 =》 doRegisterBrokerAll(checkOrderConfig, oneway, topicConfigWrapper); } }
進入這個方法org.apache.rocketmq.broker.BrokerController#needRegister 判斷broker集羣是否須要註冊ui
private boolean needRegister(final String clusterName, final String brokerAddr, final String brokerName, final long brokerId, final int timeoutMills) { TopicConfigSerializeWrapper topicConfigWrapper = this.getTopicConfigManager().buildTopicConfigSerializeWrapper(); // 判斷是否須要註冊 =》 List<Boolean> changeList = brokerOuterAPI.needRegister(clusterName, brokerAddr, brokerName, brokerId, topicConfigWrapper, timeoutMills); boolean needRegister = false; for (Boolean changed : changeList) { // 只要一個namesrv須要更新就所有須要更新 if (changed) { needRegister = true; break; } } return needRegister; }
進入這個方法org.apache.rocketmq.broker.out.BrokerOuterAPI#needRegister 判斷是否須要註冊 this
public List<Boolean> needRegister( final String clusterName, final String brokerAddr, final String brokerName, final long brokerId, final TopicConfigSerializeWrapper topicConfigWrapper, final int timeoutMills) { // 多線程更新是否須要變化狀態的集合 final List<Boolean> changedList = new CopyOnWriteArrayList<>(); // 獲取namesrv地址 List<String> nameServerAddressList = this.remotingClient.getNameServerAddressList(); if (nameServerAddressList != null && nameServerAddressList.size() > 0) { final CountDownLatch countDownLatch = new CountDownLatch(nameServerAddressList.size()); // 多線程分發執行,一個namesrv一個線程 for (final String namesrvAddr : nameServerAddressList) { brokerOuterExecutor.execute(new Runnable() { @Override public void run() { try { QueryDataVersionRequestHeader requestHeader = new QueryDataVersionRequestHeader(); requestHeader.setBrokerAddr(brokerAddr); requestHeader.setBrokerId(brokerId); requestHeader.setBrokerName(brokerName); requestHeader.setClusterName(clusterName); RemotingCommand request = RemotingCommand.createRequestCommand(RequestCode.QUERY_DATA_VERSION, requestHeader); request.setBody(topicConfigWrapper.getDataVersion().encode()); // broker向namesrv同步查詢數據版本 =》 RemotingCommand response = remotingClient.invokeSync(namesrvAddr, request, timeoutMills); DataVersion nameServerDataVersion = null; Boolean changed = false; switch (response.getCode()) { case ResponseCode.SUCCESS: { QueryDataVersionResponseHeader queryDataVersionResponseHeader = (QueryDataVersionResponseHeader) response.decodeCommandCustomHeader(QueryDataVersionResponseHeader.class); // 根據版本號判斷數據有沒有發生變化 changed = queryDataVersionResponseHeader.getChanged(); byte[] body = response.getBody(); if (body != null) { nameServerDataVersion = DataVersion.decode(body, DataVersion.class); if (!topicConfigWrapper.getDataVersion().equals(nameServerDataVersion)) { // 數據版本不一致須要更新 changed = true; } } if (changed == null || changed) { changedList.add(Boolean.TRUE); } } default: break; } log.warn("Query data version from name server {} OK,changed {}, broker {},name server {}", namesrvAddr, changed, topicConfigWrapper.getDataVersion(), nameServerDataVersion == null ? "" : nameServerDataVersion); } catch (Exception e) { changedList.add(Boolean.TRUE); log.error("Query data version from name server {} Exception, {}", namesrvAddr, e); } finally { // 異常狀況下解除線程阻塞 countDownLatch.countDown(); } } }); } try { // 等到超時解除線程阻塞 countDownLatch.await(timeoutMills, TimeUnit.MILLISECONDS); } catch (InterruptedException e) { log.error("query dataversion from nameserver countDownLatch await Exception", e); } } return changedList; }
org.apache.rocketmq.remoting.RemotingClient#invokeSync同步請求處理方法前面介紹過了。
進入到這個方法org.apache.rocketmq.broker.BrokerController#doRegisterBrokerAll 註冊全部的broker broker配置信息
private void doRegisterBrokerAll(boolean checkOrderConfig, boolean oneway, TopicConfigSerializeWrapper topicConfigWrapper) { // 向全部的broker進行註冊=》 List<RegisterBrokerResult> registerBrokerResultList = this.brokerOuterAPI.registerBrokerAll( this.brokerConfig.getBrokerClusterName(), this.getBrokerAddr(), this.brokerConfig.getBrokerName(), this.brokerConfig.getBrokerId(), this.getHAServerAddr(), topicConfigWrapper, // 過濾的服務 this.filterServerManager.buildNewFilterServerList(), // 單途 oneway, this.brokerConfig.getRegisterBrokerTimeoutMills(), this.brokerConfig.isCompressedRegister()); if (registerBrokerResultList.size() > 0) { RegisterBrokerResult registerBrokerResult = registerBrokerResultList.get(0); if (registerBrokerResult != null) { if (this.updateMasterHAServerAddrPeriodically && registerBrokerResult.getHaServerAddr() != null) { // 更新master地址本地緩存 this.messageStore.updateHaMasterAddress(registerBrokerResult.getHaServerAddr()); } // 同步設置slave的master地址 this.slaveSynchronize.setMasterAddr(registerBrokerResult.getMasterAddr()); if (checkOrderConfig) { // 更新訂閱的topic配置 =》 this.getTopicConfigManager().updateOrderTopicConfig(registerBrokerResult.getKvTable()); } } } }
進入這個方法org.apache.rocketmq.broker.out.BrokerOuterAPI#registerBrokerAll
public List<RegisterBrokerResult> registerBrokerAll( final String clusterName, final String brokerAddr, final String brokerName, final long brokerId, final String haServerAddr, final TopicConfigSerializeWrapper topicConfigWrapper, final List<String> filterServerList, final boolean oneway, final int timeoutMills, final boolean compressed) { final List<RegisterBrokerResult> registerBrokerResultList = Lists.newArrayList(); // 獲取namesrv地址集合 List<String> nameServerAddressList = this.remotingClient.getNameServerAddressList(); if (nameServerAddressList != null && nameServerAddressList.size() > 0) { final RegisterBrokerRequestHeader requestHeader = new RegisterBrokerRequestHeader(); requestHeader.setBrokerAddr(brokerAddr); requestHeader.setBrokerId(brokerId); requestHeader.setBrokerName(brokerName); requestHeader.setClusterName(clusterName); requestHeader.setHaServerAddr(haServerAddr); requestHeader.setCompressed(compressed); RegisterBrokerBody requestBody = new RegisterBrokerBody(); requestBody.setTopicConfigSerializeWrapper(topicConfigWrapper); requestBody.setFilterServerList(filterServerList); // 對請求體進行編碼=》 final byte[] body = requestBody.encode(compressed); // 壓縮處理 final int bodyCrc32 = UtilAll.crc32(body); requestHeader.setBodyCrc32(bodyCrc32); final CountDownLatch countDownLatch = new CountDownLatch(nameServerAddressList.size()); // 併發向namesrv集羣註冊broker for (final String namesrvAddr : nameServerAddressList) { brokerOuterExecutor.execute(new Runnable() { @Override public void run() { try { // 註冊broker服務任務分發=》 RegisterBrokerResult result = registerBroker(namesrvAddr,oneway, timeoutMills,requestHeader,body); if (result != null) { registerBrokerResultList.add(result); } log.info("register broker to name server {} OK", namesrvAddr); } catch (Exception e) { log.warn("registerBroker Exception, {}", namesrvAddr, e); } finally { countDownLatch.countDown(); } } }); } try { countDownLatch.await(timeoutMills, TimeUnit.MILLISECONDS); } catch (InterruptedException e) { } } return registerBrokerResultList; }
進入這個方法org.apache.rocketmq.common.protocol.body.RegisterBrokerBody#encode 對topic配置信息進行編碼
public byte[] encode(boolean compress) { if (!compress) { // json編碼 return super.encode(); } long start = System.currentTimeMillis(); ByteArrayOutputStream byteArrayOutputStream = new ByteArrayOutputStream(); DeflaterOutputStream outputStream = new DeflaterOutputStream(byteArrayOutputStream, new Deflater(Deflater.BEST_COMPRESSION)); DataVersion dataVersion = topicConfigSerializeWrapper.getDataVersion(); ConcurrentMap<String, TopicConfig> topicConfigTable = cloneTopicConfigTable(topicConfigSerializeWrapper.getTopicConfigTable()); assert topicConfigTable != null; try { // 版本json編碼 byte[] buffer = dataVersion.encode(); // write data version outputStream.write(convertIntToByteArray(buffer.length)); outputStream.write(buffer); int topicNumber = topicConfigTable.size(); // write number of topic configs outputStream.write(convertIntToByteArray(topicNumber)); // write topic config entry one by one. for (ConcurrentMap.Entry<String, TopicConfig> next : topicConfigTable.entrySet()) { buffer = next.getValue().encode().getBytes(MixAll.DEFAULT_CHARSET); outputStream.write(convertIntToByteArray(buffer.length)); outputStream.write(buffer); } buffer = JSON.toJSONString(filterServerList).getBytes(MixAll.DEFAULT_CHARSET); // write filter server list json length outputStream.write(convertIntToByteArray(buffer.length)); // write filter server list json outputStream.write(buffer); outputStream.finish(); long interval = System.currentTimeMillis() - start; if (interval > 50) { LOGGER.info("Compressing takes {}ms", interval); } return byteArrayOutputStream.toByteArray(); } catch (IOException e) { LOGGER.error("Failed to compress RegisterBrokerBody object", e); } return null; }
往上返回到這個方法org.apache.rocketmq.broker.out.BrokerOuterAPI#registerBroker 註冊broker
private RegisterBrokerResult registerBroker( final String namesrvAddr, final boolean oneway, final int timeoutMills, final RegisterBrokerRequestHeader requestHeader, final byte[] body ) throws RemotingCommandException, MQBrokerException, RemotingConnectException, RemotingSendRequestException, RemotingTimeoutException, InterruptedException { RemotingCommand request = RemotingCommand.createRequestCommand(RequestCode.REGISTER_BROKER, requestHeader); request.setBody(body); if (oneway) { try { // 單線請求,不關心結果 =》 this.remotingClient.invokeOneway(namesrvAddr, request, timeoutMills); } catch (RemotingTooMuchRequestException e) { // Ignore } return null; } // broker同步向namesrv註冊broker=》 RemotingCommand response = this.remotingClient.invokeSync(namesrvAddr, request, timeoutMills); assert response != null; switch (response.getCode()) { case ResponseCode.SUCCESS: { RegisterBrokerResponseHeader responseHeader = (RegisterBrokerResponseHeader) response.decodeCommandCustomHeader(RegisterBrokerResponseHeader.class); RegisterBrokerResult result = new RegisterBrokerResult(); result.setMasterAddr(responseHeader.getMasterAddr()); result.setHaServerAddr(responseHeader.getHaServerAddr()); if (response.getBody() != null) { result.setKvTable(KVTable.decode(response.getBody(), KVTable.class)); } return result; } default: break; } throw new MQBrokerException(response.getCode(), response.getRemark()); }
進入這個方法org.apache.rocketmq.remoting.netty.NettyRemotingClient#invokeOneway 執行單途請求
@Override public void invokeOneway(String addr, RemotingCommand request, long timeoutMillis) throws InterruptedException, RemotingConnectException, RemotingTooMuchRequestException, RemotingTimeoutException, RemotingSendRequestException { // 獲取channel=》 final Channel channel = this.getAndCreateChannel(addr); if (channel != null && channel.isActive()) { try { if (this.rpcHook != null) { // 執行請求執行前的鉤子方法 this.rpcHook.doBeforeRequest(addr, request); } // 執行單線請求 =》 this.invokeOnewayImpl(channel, request, timeoutMillis); } catch (RemotingSendRequestException e) { log.warn("invokeOneway: send request exception, so close the channel[{}]", addr); // 異常關閉channel=》 this.closeChannel(addr, channel); throw e; } } else { this.closeChannel(addr, channel); throw new RemotingConnectException(addr); } }
前面介紹過了。
往上返回到這個方法org.apache.rocketmq.remoting.netty.NettyRemotingClient#invokeSync broker同步向namrsrv註冊,前面介紹過了。
進入這個方法org.apache.rocketmq.broker.topic.TopicConfigManager#updateOrderTopicConfig 更新topic配置信息
public void updateOrderTopicConfig(final KVTable orderKVTableFromNs) { if (orderKVTableFromNs != null && orderKVTableFromNs.getTable() != null) { boolean isChange = false; Set<String> orderTopics = orderKVTableFromNs.getTable().keySet(); for (String topic : orderTopics) { TopicConfig topicConfig = this.topicConfigTable.get(topic); if (topicConfig != null && !topicConfig.isOrder()) { topicConfig.setOrder(true); isChange = true; log.info("update order topic config, topic={}, order={}", topic, true); } } for (Map.Entry<String, TopicConfig> entry : this.topicConfigTable.entrySet()) { String topic = entry.getKey(); if (!orderTopics.contains(topic)) { TopicConfig topicConfig = entry.getValue(); if (topicConfig.isOrder()) { topicConfig.setOrder(false); isChange = true; log.info("update order topic config, topic={}, order={}", topic, false); } } } if (isChange) { // 更新數據版本號 this.dataVersion.nextVersion(); // 持久化=》 this.persist(); } } }
進入這個方法org.apache.rocketmq.common.ConfigManager#persist 持久化
public synchronized void persist() { // 持久化的是json存儲,序列化的時候按版本號維護的數據 =》 String jsonString = this.encode(true); if (jsonString != null) { // user.home/store/config/topics.json 文件存儲 =》 String fileName = this.configFilePath(); try { // 保存文件 =》 MixAll.string2File(jsonString, fileName); } catch (IOException e) { log.error("persist file " + fileName + " exception", e); } } }
往上返回到這個方法org.apache.rocketmq.broker.processor.AdminBrokerProcessor#updateBrokerConfig 解析結束。
說在最後
本次解析僅表明我的觀點,僅供參考。
加入技術微信羣
釘釘技術羣