rocketmq源碼解析之NamesrvController啓動②mqclient admin請求處理之更新broker配置信息

說在前面apache

今天開始解析管理請求處理之UPDATE_BROKER_CONFIG更新broker配置信息json

 

源碼解析緩存

進入這個方法org.apache.rocketmq.broker.processor.AdminBrokerProcessor#updateBrokerConfig 更新broker配置信息微信

private synchronized RemotingCommand updateBrokerConfig(ChannelHandlerContext ctx, RemotingCommand request) {
        final RemotingCommand response = RemotingCommand.createResponseCommand(null);
        log.info("updateBrokerConfig called by {}", RemotingHelper.parseChannelRemoteAddr(ctx.channel()));
        byte[] body = request.getBody();
        if (body != null) {
            try {
                String bodyStr = new String(body, MixAll.DEFAULT_CHARSET);
                Properties properties = MixAll.string2Properties(bodyStr);
                if (properties != null) {
                    log.info("updateBrokerConfig, new config: [{}] client: {} ", properties, ctx.channel().remoteAddress());
//                    更新持久存儲 =》
                    this.brokerController.getConfiguration().update(properties);
//                    若是有權限設置強制更新
                    if (properties.containsKey("brokerPermission")) {
//                        更新數據的版本號
                        this.brokerController.getTopicConfigManager().getDataVersion().nextVersion();
//                        註冊全部的broker =》
                        this.brokerController.registerBrokerAll(false, false, true);
                    }
                } else {
                    log.error("string2Properties error");
                    response.setCode(ResponseCode.SYSTEM_ERROR);
                    response.setRemark("string2Properties error");
                    return response;
                }
            } catch (UnsupportedEncodingException e) {
                log.error("", e);
                response.setCode(ResponseCode.SYSTEM_ERROR);
                response.setRemark("UnsupportedEncodingException " + e);
                return response;
            }
        }

        response.setCode(ResponseCode.SUCCESS);
        response.setRemark(null);
        return response;
    }

進入這個方法org.apache.rocketmq.common.Configuration#update 跟更新配置多線程

public void update(Properties properties) {
        try {
//            讀寫鎖同步實現
            readWriteLock.writeLock().lockInterruptibly();
            try {
                // the property must be exist when update
                mergeIfExist(properties, this.allConfigs);
                for (Object configObject : configObjectList) {
                    // not allConfigs to update...
                    MixAll.properties2Object(properties, configObject);
                }

//                更新數據的版本號
                this.dataVersion.nextVersion();
            } finally {
                readWriteLock.writeLock().unlock();
            }
        } catch (InterruptedException e) {
            log.error("update lock error, {}", properties);
            return;
        }

//        持久化=》
        persist();
    }

進入這個方法org.apache.rocketmq.common.Configuration#persist broker配置持久化,從這個類org.apache.rocketmq.broker.BrokerPathConfigHelper的brokerConfigPath變量值是能夠知道broker的配置文件路徑是併發

private static String brokerConfigPath = System.getProperty("user.home") + File.separator + "store"
    + File.separator + "config" + File.separator + "broker.properties";

進入這個方法org.apache.rocketmq.common.MixAll#string2File文件存儲app

public static void string2File(final String str, final String fileName) throws IOException {

//        要保存的內容存儲在臨時文件中
        String tmpFile = fileName + ".tmp";
        string2FileNotSafe(str, tmpFile);
//        把原來的數據進行備份
        String bakFile = fileName + ".bak";
        String prevContent = file2String(fileName);
        if (prevContent != null) {
            string2FileNotSafe(prevContent, bakFile);
        }

//      刪掉源文件
        File file = new File(fileName);
        file.delete();
//       臨時文件重命名
        file = new File(tmpFile);
        file.renameTo(new File(fileName));
    }

往上返回到這個方法org.apache.rocketmq.broker.BrokerController#registerBrokerAll 若是broker配置信息中有權限設置更新全部的broker信息ide

public synchronized void registerBrokerAll(final boolean checkOrderConfig, boolean oneway, boolean forceRegister) {
        TopicConfigSerializeWrapper topicConfigWrapper = this.getTopicConfigManager().buildTopicConfigSerializeWrapper();
//        若是沒有讀寫權限
        if (!PermName.isWriteable(this.getBrokerConfig().getBrokerPermission())
            || !PermName.isReadable(this.getBrokerConfig().getBrokerPermission())) {
            ConcurrentHashMap<String, TopicConfig> topicConfigTable = new ConcurrentHashMap<String, TopicConfig>();
            for (TopicConfig topicConfig : topicConfigWrapper.getTopicConfigTable().values()) {
                TopicConfig tmp =
                    new TopicConfig(topicConfig.getTopicName(), topicConfig.getReadQueueNums(), topicConfig.getWriteQueueNums(),
                        this.brokerConfig.getBrokerPermission());
                topicConfigTable.put(topicConfig.getTopicName(), tmp);
            }
            topicConfigWrapper.setTopicConfigTable(topicConfigTable);
        }

//        判斷broker集羣是否須要註冊=》
        if (forceRegister || needRegister(this.brokerConfig.getBrokerClusterName(),
            this.getBrokerAddr(),
            this.brokerConfig.getBrokerName(),
            this.brokerConfig.getBrokerId(),
            this.brokerConfig.getRegisterBrokerTimeoutMills())) {
//            向全部的broker註冊topic配置信息 =》
            doRegisterBrokerAll(checkOrderConfig, oneway, topicConfigWrapper);
        }
    }

進入這個方法org.apache.rocketmq.broker.BrokerController#needRegister 判斷broker集羣是否須要註冊ui

private boolean needRegister(final String clusterName,
        final String brokerAddr,
        final String brokerName,
        final long brokerId,
        final int timeoutMills) {

        TopicConfigSerializeWrapper topicConfigWrapper = this.getTopicConfigManager().buildTopicConfigSerializeWrapper();
//        判斷是否須要註冊 =》
        List<Boolean> changeList = brokerOuterAPI.needRegister(clusterName, brokerAddr, brokerName, brokerId, topicConfigWrapper, timeoutMills);
        boolean needRegister = false;
        for (Boolean changed : changeList) {
//            只要一個namesrv須要更新就所有須要更新
            if (changed) {
                needRegister = true;
                break;
            }
        }
        return needRegister;
    }

進入這個方法org.apache.rocketmq.broker.out.BrokerOuterAPI#needRegister 判斷是否須要註冊 this

public List<Boolean> needRegister(
        final String clusterName,
        final String brokerAddr,
        final String brokerName,
        final long brokerId,
        final TopicConfigSerializeWrapper topicConfigWrapper,
        final int timeoutMills) {
//        多線程更新是否須要變化狀態的集合
        final List<Boolean> changedList = new CopyOnWriteArrayList<>();
//        獲取namesrv地址
        List<String> nameServerAddressList = this.remotingClient.getNameServerAddressList();
        if (nameServerAddressList != null && nameServerAddressList.size() > 0) {
            final CountDownLatch countDownLatch = new CountDownLatch(nameServerAddressList.size());
//            多線程分發執行,一個namesrv一個線程
            for (final String namesrvAddr : nameServerAddressList) {
                brokerOuterExecutor.execute(new Runnable() {
                    @Override
                    public void run() {
                        try {
                            QueryDataVersionRequestHeader requestHeader = new QueryDataVersionRequestHeader();
                            requestHeader.setBrokerAddr(brokerAddr);
                            requestHeader.setBrokerId(brokerId);
                            requestHeader.setBrokerName(brokerName);
                            requestHeader.setClusterName(clusterName);
                            RemotingCommand request = RemotingCommand.createRequestCommand(RequestCode.QUERY_DATA_VERSION, requestHeader);
                            request.setBody(topicConfigWrapper.getDataVersion().encode());
//                            broker向namesrv同步查詢數據版本 =》
                            RemotingCommand response = remotingClient.invokeSync(namesrvAddr, request, timeoutMills);
                            DataVersion nameServerDataVersion = null;
                            Boolean changed = false;
                            switch (response.getCode()) {
                                case ResponseCode.SUCCESS: {
                                    QueryDataVersionResponseHeader queryDataVersionResponseHeader =
                                        (QueryDataVersionResponseHeader) response.decodeCommandCustomHeader(QueryDataVersionResponseHeader.class);
//                                   根據版本號判斷數據有沒有發生變化
                                    changed = queryDataVersionResponseHeader.getChanged();
                                    byte[] body = response.getBody();
                                    if (body != null) {
                                        nameServerDataVersion = DataVersion.decode(body, DataVersion.class);
                                        if (!topicConfigWrapper.getDataVersion().equals(nameServerDataVersion)) {
//                                            數據版本不一致須要更新
                                            changed = true;
                                        }
                                    }
                                    if (changed == null || changed) {
                                        changedList.add(Boolean.TRUE);
                                    }
                                }
                                default:
                                    break;
                            }
                            log.warn("Query data version from name server {} OK,changed {}, broker {},name server {}", namesrvAddr, changed, topicConfigWrapper.getDataVersion(), nameServerDataVersion == null ? "" : nameServerDataVersion);
                        } catch (Exception e) {
                            changedList.add(Boolean.TRUE);
                            log.error("Query data version from name server {}  Exception, {}", namesrvAddr, e);
                        } finally {
//                            異常狀況下解除線程阻塞
                            countDownLatch.countDown();
                        }
                    }
                });
            }
            try {
//                等到超時解除線程阻塞
                countDownLatch.await(timeoutMills, TimeUnit.MILLISECONDS);
            } catch (InterruptedException e) {
                log.error("query dataversion from nameserver countDownLatch await Exception", e);
            }
        }
        return changedList;
    }

org.apache.rocketmq.remoting.RemotingClient#invokeSync同步請求處理方法前面介紹過了。

進入到這個方法org.apache.rocketmq.broker.BrokerController#doRegisterBrokerAll 註冊全部的broker broker配置信息

private void doRegisterBrokerAll(boolean checkOrderConfig, boolean oneway,
        TopicConfigSerializeWrapper topicConfigWrapper) {
//        向全部的broker進行註冊=》
        List<RegisterBrokerResult> registerBrokerResultList = this.brokerOuterAPI.registerBrokerAll(
            this.brokerConfig.getBrokerClusterName(),
            this.getBrokerAddr(),
            this.brokerConfig.getBrokerName(),
            this.brokerConfig.getBrokerId(),
            this.getHAServerAddr(),
            topicConfigWrapper,
//            過濾的服務
            this.filterServerManager.buildNewFilterServerList(),
//            單途
            oneway,
            this.brokerConfig.getRegisterBrokerTimeoutMills(),
            this.brokerConfig.isCompressedRegister());
        if (registerBrokerResultList.size() > 0) {
            RegisterBrokerResult registerBrokerResult = registerBrokerResultList.get(0);
            if (registerBrokerResult != null) {
                if (this.updateMasterHAServerAddrPeriodically && registerBrokerResult.getHaServerAddr() != null) {
//                    更新master地址本地緩存
                    this.messageStore.updateHaMasterAddress(registerBrokerResult.getHaServerAddr());
                }

//                同步設置slave的master地址
                this.slaveSynchronize.setMasterAddr(registerBrokerResult.getMasterAddr());
                if (checkOrderConfig) {
//                   更新訂閱的topic配置 =》
                    this.getTopicConfigManager().updateOrderTopicConfig(registerBrokerResult.getKvTable());
                }
            }
        }
    }

進入這個方法org.apache.rocketmq.broker.out.BrokerOuterAPI#registerBrokerAll

public List<RegisterBrokerResult> registerBrokerAll(
        final String clusterName,
        final String brokerAddr,
        final String brokerName,
        final long brokerId,
        final String haServerAddr,
        final TopicConfigSerializeWrapper topicConfigWrapper,
        final List<String> filterServerList,
        final boolean oneway,
        final int timeoutMills,
        final boolean compressed) {

        final List<RegisterBrokerResult> registerBrokerResultList = Lists.newArrayList();
//        獲取namesrv地址集合
        List<String> nameServerAddressList = this.remotingClient.getNameServerAddressList();
        if (nameServerAddressList != null && nameServerAddressList.size() > 0) {

            final RegisterBrokerRequestHeader requestHeader = new RegisterBrokerRequestHeader();
            requestHeader.setBrokerAddr(brokerAddr);
            requestHeader.setBrokerId(brokerId);
            requestHeader.setBrokerName(brokerName);
            requestHeader.setClusterName(clusterName);
            requestHeader.setHaServerAddr(haServerAddr);
            requestHeader.setCompressed(compressed);
            RegisterBrokerBody requestBody = new RegisterBrokerBody();
            requestBody.setTopicConfigSerializeWrapper(topicConfigWrapper);
            requestBody.setFilterServerList(filterServerList);
//            對請求體進行編碼=》
            final byte[] body = requestBody.encode(compressed);
//            壓縮處理
            final int bodyCrc32 = UtilAll.crc32(body);
            requestHeader.setBodyCrc32(bodyCrc32);
            final CountDownLatch countDownLatch = new CountDownLatch(nameServerAddressList.size());
//            併發向namesrv集羣註冊broker
            for (final String namesrvAddr : nameServerAddressList) {
                brokerOuterExecutor.execute(new Runnable() {
                    @Override
                    public void run() {
                        try {
//                            註冊broker服務任務分發=》
                            RegisterBrokerResult result = registerBroker(namesrvAddr,oneway, timeoutMills,requestHeader,body);
                            if (result != null) {
                                registerBrokerResultList.add(result);
                            }

                            log.info("register broker to name server {} OK", namesrvAddr);
                        } catch (Exception e) {
                            log.warn("registerBroker Exception, {}", namesrvAddr, e);
                        } finally {
                            countDownLatch.countDown();
                        }
                    }
                });
            }

            try {
                countDownLatch.await(timeoutMills, TimeUnit.MILLISECONDS);
            } catch (InterruptedException e) {
            }
        }

        return registerBrokerResultList;
    }

進入這個方法org.apache.rocketmq.common.protocol.body.RegisterBrokerBody#encode 對topic配置信息進行編碼

public byte[] encode(boolean compress) {

        if (!compress) {
//            json編碼
            return super.encode();
        }
        long start = System.currentTimeMillis();
        ByteArrayOutputStream byteArrayOutputStream = new ByteArrayOutputStream();
        DeflaterOutputStream outputStream = new DeflaterOutputStream(byteArrayOutputStream, new Deflater(Deflater.BEST_COMPRESSION));
        DataVersion dataVersion = topicConfigSerializeWrapper.getDataVersion();
        ConcurrentMap<String, TopicConfig> topicConfigTable = cloneTopicConfigTable(topicConfigSerializeWrapper.getTopicConfigTable());
        assert topicConfigTable != null;
        try {
//            版本json編碼
            byte[] buffer = dataVersion.encode();
            // write data version
            outputStream.write(convertIntToByteArray(buffer.length));
            outputStream.write(buffer);
            int topicNumber = topicConfigTable.size();
            // write number of topic configs
            outputStream.write(convertIntToByteArray(topicNumber));
            // write topic config entry one by one.
            for (ConcurrentMap.Entry<String, TopicConfig> next : topicConfigTable.entrySet()) {
                buffer = next.getValue().encode().getBytes(MixAll.DEFAULT_CHARSET);
                outputStream.write(convertIntToByteArray(buffer.length));
                outputStream.write(buffer);
            }

            buffer = JSON.toJSONString(filterServerList).getBytes(MixAll.DEFAULT_CHARSET);
            // write filter server list json length
            outputStream.write(convertIntToByteArray(buffer.length));
            // write filter server list json
            outputStream.write(buffer);
            outputStream.finish();
            long interval = System.currentTimeMillis() - start;
            if (interval > 50) {
                LOGGER.info("Compressing takes {}ms", interval);
            }
            return byteArrayOutputStream.toByteArray();
        } catch (IOException e) {
            LOGGER.error("Failed to compress RegisterBrokerBody object", e);
        }

        return null;
    }

往上返回到這個方法org.apache.rocketmq.broker.out.BrokerOuterAPI#registerBroker 註冊broker

private RegisterBrokerResult registerBroker(
        final String namesrvAddr,
        final boolean oneway,
        final int timeoutMills,
        final RegisterBrokerRequestHeader requestHeader,
        final byte[] body
    ) throws RemotingCommandException, MQBrokerException, RemotingConnectException, RemotingSendRequestException, RemotingTimeoutException,
        InterruptedException {
        RemotingCommand request = RemotingCommand.createRequestCommand(RequestCode.REGISTER_BROKER, requestHeader);
        request.setBody(body);
        if (oneway) {
            try {
//                單線請求,不關心結果 =》
                this.remotingClient.invokeOneway(namesrvAddr, request, timeoutMills);
            } catch (RemotingTooMuchRequestException e) {
                // Ignore
            }
            return null;
        }

//        broker同步向namesrv註冊broker=》
        RemotingCommand response = this.remotingClient.invokeSync(namesrvAddr, request, timeoutMills);
        assert response != null;
        switch (response.getCode()) {
            case ResponseCode.SUCCESS: {
                RegisterBrokerResponseHeader responseHeader =
                    (RegisterBrokerResponseHeader) response.decodeCommandCustomHeader(RegisterBrokerResponseHeader.class);
                RegisterBrokerResult result = new RegisterBrokerResult();
                result.setMasterAddr(responseHeader.getMasterAddr());
                result.setHaServerAddr(responseHeader.getHaServerAddr());
                if (response.getBody() != null) {
                    result.setKvTable(KVTable.decode(response.getBody(), KVTable.class));
                }
                return result;
            }
            default:
                break;
        }

        throw new MQBrokerException(response.getCode(), response.getRemark());
    }

進入這個方法org.apache.rocketmq.remoting.netty.NettyRemotingClient#invokeOneway 執行單途請求

@Override
    public void invokeOneway(String addr, RemotingCommand request, long timeoutMillis) throws InterruptedException,
        RemotingConnectException, RemotingTooMuchRequestException, RemotingTimeoutException, RemotingSendRequestException {
//        獲取channel=》
        final Channel channel = this.getAndCreateChannel(addr);
        if (channel != null && channel.isActive()) {
            try {
                if (this.rpcHook != null) {
//                    執行請求執行前的鉤子方法
                    this.rpcHook.doBeforeRequest(addr, request);
                }
//               執行單線請求 =》
                this.invokeOnewayImpl(channel, request, timeoutMillis);
            } catch (RemotingSendRequestException e) {
                log.warn("invokeOneway: send request exception, so close the channel[{}]", addr);
//                異常關閉channel=》
                this.closeChannel(addr, channel);
                throw e;
            }
        } else {
            this.closeChannel(addr, channel);
            throw new RemotingConnectException(addr);
        }
    }

前面介紹過了。

往上返回到這個方法org.apache.rocketmq.remoting.netty.NettyRemotingClient#invokeSync broker同步向namrsrv註冊,前面介紹過了。

 

進入這個方法org.apache.rocketmq.broker.topic.TopicConfigManager#updateOrderTopicConfig 更新topic配置信息

public void updateOrderTopicConfig(final KVTable orderKVTableFromNs) {

        if (orderKVTableFromNs != null && orderKVTableFromNs.getTable() != null) {
            boolean isChange = false;
            Set<String> orderTopics = orderKVTableFromNs.getTable().keySet();
            for (String topic : orderTopics) {
                TopicConfig topicConfig = this.topicConfigTable.get(topic);
                if (topicConfig != null && !topicConfig.isOrder()) {
                    topicConfig.setOrder(true);
                    isChange = true;
                    log.info("update order topic config, topic={}, order={}", topic, true);
                }
            }

            for (Map.Entry<String, TopicConfig> entry : this.topicConfigTable.entrySet()) {
                String topic = entry.getKey();
                if (!orderTopics.contains(topic)) {
                    TopicConfig topicConfig = entry.getValue();
                    if (topicConfig.isOrder()) {
                        topicConfig.setOrder(false);
                        isChange = true;
                        log.info("update order topic config, topic={}, order={}", topic, false);
                    }
                }
            }

            if (isChange) {
//                更新數據版本號
                this.dataVersion.nextVersion();
//                持久化=》
                this.persist();
            }
        }
    }

進入這個方法org.apache.rocketmq.common.ConfigManager#persist 持久化

public synchronized void persist() {
//        持久化的是json存儲,序列化的時候按版本號維護的數據 =》
        String jsonString = this.encode(true);
        if (jsonString != null) {
//             user.home/store/config/topics.json 文件存儲 =》
            String fileName = this.configFilePath();
            try {
//                保存文件 =》
                MixAll.string2File(jsonString, fileName);
            } catch (IOException e) {
                log.error("persist file " + fileName + " exception", e);
            }
        }
    }

往上返回到這個方法org.apache.rocketmq.broker.processor.AdminBrokerProcessor#updateBrokerConfig 解析結束。

 

說在最後

本次解析僅表明我的觀點,僅供參考。

 

加入技術微信羣

釘釘技術羣

相關文章
相關標籤/搜索