rocketmq源碼解析之NamesrvController啓動②mqclient admin請求處理建立或更新topic①

說在前面apache

今天開始解析netty client handler的處理邏輯,UPDATE_AND_CREATE_TOPIC 更新並建立topicjson

 

源碼解析緩存

找到這個類org.apache.rocketmq.remoting.netty.NettyRemotingClient.NettyClientHandler微信

class NettyClientHandler extends SimpleChannelInboundHandler<RemotingCommand> {

        @Override
        protected void channelRead0(ChannelHandlerContext ctx, RemotingCommand msg) throws Exception {
//            消息處理=》
            processMessageReceived(ctx, msg);
        }
    }

進入這個實現方法org.apache.rocketmq.remoting.netty.NettyRemotingAbstract#processMessageReceived併發

public void processMessageReceived(ChannelHandlerContext ctx, RemotingCommand msg) throws Exception {
        final RemotingCommand cmd = msg;
        if (cmd != null) {
            switch (cmd.getType()) {
                case REQUEST_COMMAND:
//                    請求消息處理 =》
                    processRequestCommand(ctx, cmd);
                    break;
                case RESPONSE_COMMAND:
//                    響應消息處理=》
                    processResponseCommand(ctx, cmd);
                    break;
                default:
                    break;
            }
        }
    }

進入這個方法org.apache.rocketmq.remoting.netty.NettyRemotingAbstract#processRequestCommandapp

public void processRequestCommand(final ChannelHandlerContext ctx, final RemotingCommand cmd) {
//        公平的處理請求
        final Pair<NettyRequestProcessor, ExecutorService> matched = this.processorTable.get(cmd.getCode());
        final Pair<NettyRequestProcessor, ExecutorService> pair = null == matched ? this.defaultRequestProcessor : matched;
        final int opaque = cmd.getOpaque();
        if (pair != null) {
            Runnable run = new Runnable() {
                @Override
                public void run() {
                    try {
//                        客戶自定義的鉤子實現類
                        RPCHook rpcHook = NettyRemotingAbstract.this.getRPCHook();
                        if (rpcHook != null) {
//                            這裏mq提供了一些鉤子方法能夠擴展的地方,請求前處理邏輯能夠在這裏擴展
                            rpcHook.doBeforeRequest(RemotingHelper.parseChannelRemoteAddr(ctx.channel()), cmd);
                        }

//                        處理請求,有各個實現,主要都是netty通訊 =》TODO 文章
                        final RemotingCommand response = pair.getObject1().processRequest(ctx, cmd);
                        if (rpcHook != null) {
//                            執行rocketmq請求的後置處理鉤子方法
                            rpcHook.doAfterResponse(RemotingHelper.parseChannelRemoteAddr(ctx.channel()), cmd, response);
                        }

//                        若是不是單線請求
                        if (!cmd.isOnewayRPC()) {
                            if (response != null) {
                                response.setOpaque(opaque);
                                response.markResponseType();
                                try {
                                    ctx.writeAndFlush(response);
                                } catch (Throwable e) {
                                    log.error("process request over, but response failed", e);
                                    log.error(cmd.toString());
                                    log.error(response.toString());
                                }
                            } else {

                            }
                        }
                    } catch (Throwable e) {
                        log.error("process request exception", e);
                        log.error(cmd.toString());
                        if (!cmd.isOnewayRPC()) {
                            final RemotingCommand response = RemotingCommand.createResponseCommand(RemotingSysResponseCode.SYSTEM_ERROR,
                                RemotingHelper.exceptionSimpleDesc(e));
                            response.setOpaque(opaque);
                            ctx.writeAndFlush(response);
                        }
                    }
                }
            };
//            系統繁忙,暫時啓動流量控制
            if (pair.getObject1().rejectRequest()) {
                final RemotingCommand response = RemotingCommand.createResponseCommand(RemotingSysResponseCode.SYSTEM_BUSY,
                    "[REJECTREQUEST]system busy, start flow control for a while");
                response.setOpaque(opaque);
                ctx.writeAndFlush(response);
                return;
            }

            try {
                final RequestTask requestTask = new RequestTask(run, ctx.channel(), cmd);
//                異步提交請求
                pair.getObject2().submit(requestTask);
            } catch (RejectedExecutionException e) {
                if ((System.currentTimeMillis() % 10000) == 0) {
                    log.warn(RemotingHelper.parseChannelRemoteAddr(ctx.channel())
                        + ", too many requests and system thread pool busy, RejectedExecutionException "
                        + pair.getObject2().toString()
                        + " request code: " + cmd.getCode());
                }

//                系統繁忙,暫時啓動流量控制
                if (!cmd.isOnewayRPC()) {
                    final RemotingCommand response = RemotingCommand.createResponseCommand(RemotingSysResponseCode.SYSTEM_BUSY,
                        "[OVERLOAD]system busy, start flow control for a while");
                    response.setOpaque(opaque);
                    ctx.writeAndFlush(response);
                }
            }
        } else {
            String error = " request type " + cmd.getCode() + " not supported";
//            請求編碼不支持
            final RemotingCommand response =
                RemotingCommand.createResponseCommand(RemotingSysResponseCode.REQUEST_CODE_NOT_SUPPORTED, error);
            response.setOpaque(opaque);
            ctx.writeAndFlush(response);
            log.error(RemotingHelper.parseChannelRemoteAddr(ctx.channel()) + error);
        }
    }

進入到這個方法org.apache.rocketmq.broker.processor.AdminBrokerProcessor#processRequest異步

@Override
    public RemotingCommand processRequest(ChannelHandlerContext ctx,
        RemotingCommand request) throws RemotingCommandException {
        switch (request.getCode()) {
//            三、源碼解析之更新並建立topic =》
            case RequestCode.UPDATE_AND_CREATE_TOPIC:
                return this.updateAndCreateTopic(ctx, request);
//                四、源碼解析之刪除topic =》
            case RequestCode.DELETE_TOPIC_IN_BROKER:
                return this.deleteTopic(ctx, request);
//                五、源碼解析之獲取全部的topic配置信息 =》
            case RequestCode.GET_ALL_TOPIC_CONFIG:
                return this.getAllTopicConfig(ctx, request);
            case RequestCode.UPDATE_BROKER_CONFIG:
//                六、源碼解析之更新broker配置信息 =》
                return this.updateBrokerConfig(ctx, request);
//                七、源碼解析之獲取broker的配置信息 =》
            case RequestCode.GET_BROKER_CONFIG:
                return this.getBrokerConfig(ctx, request);
//                八、源碼解析之查找offset按時間 =》
            case RequestCode.SEARCH_OFFSET_BY_TIMESTAMP:
                return this.searchOffsetByTimestamp(ctx, request);
//                九、源碼解析之獲取最大的offset =》
            case RequestCode.GET_MAX_OFFSET:
                return this.getMaxOffset(ctx, request);
//                十、源碼解析之獲取最小的offset =》
            case RequestCode.GET_MIN_OFFSET:
                return this.getMinOffset(ctx, request);
//                十一、源碼解析之獲取最先的消息存儲時間 =》
            case RequestCode.GET_EARLIEST_MSG_STORETIME:
                return this.getEarliestMsgStoretime(ctx, request);
//                十二、源碼解析獲取broker的運行時信息 =》
            case RequestCode.GET_BROKER_RUNTIME_INFO:
                return this.getBrokerRuntimeInfo(ctx, request);
//                1三、源碼解析批量鎖定消息隊列=》
            case RequestCode.LOCK_BATCH_MQ:
                return this.lockBatchMQ(ctx, request);
//                1四、源碼解析之批量解鎖消息隊列=》
            case RequestCode.UNLOCK_BATCH_MQ:
                return this.unlockBatchMQ(ctx, request);
//                1五、源碼解析之更新和建立訂閱組=》
            case RequestCode.UPDATE_AND_CREATE_SUBSCRIPTIONGROUP:
                return this.updateAndCreateSubscriptionGroup(ctx, request);
//                1六、源碼解析之獲取全部的訂閱組配置信息=》
            case RequestCode.GET_ALL_SUBSCRIPTIONGROUP_CONFIG:
                return this.getAllSubscriptionGroup(ctx, request);
//                1七、源碼解析之刪除訂閱組=》
            case RequestCode.DELETE_SUBSCRIPTIONGROUP:
                return this.deleteSubscriptionGroup(ctx, request);
//                1八、源碼解析之獲取topic的狀態信息=》
            case RequestCode.GET_TOPIC_STATS_INFO:
                return this.getTopicStatsInfo(ctx, request);
//                1九、TODO
            case RequestCode.GET_CONSUMER_CONNECTION_LIST:
                return this.getConsumerConnectionList(ctx, request);
//                20、源碼解析之獲取生產者鏈接列表=》
            case RequestCode.GET_PRODUCER_CONNECTION_LIST:
                return this.getProducerConnectionList(ctx, request);
//                2一、源碼解析之獲取消費者的狀態=》
            case RequestCode.GET_CONSUME_STATS:
                return this.getConsumeStats(ctx, request);
//                2二、源碼解析之獲取全部消費者的offset=》
            case RequestCode.GET_ALL_CONSUMER_OFFSET:
                return this.getAllConsumerOffset(ctx, request);
//                2三、獲取全部delay的offset=》
            case RequestCode.GET_ALL_DELAY_OFFSET:
                return this.getAllDelayOffset(ctx, request);
//                2四、源碼解析之重置offset=》
            case RequestCode.INVOKE_BROKER_TO_RESET_OFFSET:
                return this.resetOffset(ctx, request);
//                2五、源碼解析之獲取消費者狀態=》
            case RequestCode.INVOKE_BROKER_TO_GET_CONSUMER_STATUS:
                return this.getConsumerStatus(ctx, request);
            case RequestCode.QUERY_TOPIC_CONSUME_BY_WHO:
//                2六、源碼解析之查詢topic被哪些消費者消費=》
                return this.queryTopicConsumeByWho(ctx, request);
//                2七、源碼解析之註冊過濾的server=》
            case RequestCode.REGISTER_FILTER_SERVER:
                return this.registerFilterServer(ctx, request);
//                2八、源碼解析之查詢消費者時間=》
            case RequestCode.QUERY_CONSUME_TIME_SPAN:
                return this.queryConsumeTimeSpan(ctx, request);
//                2九、源碼解析之從broker中獲取系統topic列表=》
            case RequestCode.GET_SYSTEM_TOPIC_LIST_FROM_BROKER:
                return this.getSystemTopicListFromBroker(ctx, request);
//                30、源碼解析之清除過時的消費隊列=》
            case RequestCode.CLEAN_EXPIRED_CONSUMEQUEUE:
                return this.cleanExpiredConsumeQueue();
//                3一、源碼解析之清楚不用的topic=》
            case RequestCode.CLEAN_UNUSED_TOPIC:
                return this.cleanUnusedTopic();
//                3二、源碼解析之獲取消費者運行時信息=》
            case RequestCode.GET_CONSUMER_RUNNING_INFO:
                return this.getConsumerRunningInfo(ctx, request);
//                3三、源碼解析之查詢修改後的offset=》
            case RequestCode.QUERY_CORRECTION_OFFSET:
                return this.queryCorrectionOffset(ctx, request);
//                3四、源碼解析之直接消費消息=》
            case RequestCode.CONSUME_MESSAGE_DIRECTLY:
                return this.consumeMessageDirectly(ctx, request);
//                3五、源碼解析之clone組的offset=》
            case RequestCode.CLONE_GROUP_OFFSET:
                return this.cloneGroupOffset(ctx, request);
//                3六、查詢broker狀態數據=》
            case RequestCode.VIEW_BROKER_STATS_DATA:
                return ViewBrokerStatsData(ctx, request);
//                3七、獲取broker消費組的狀態=》
            case RequestCode.GET_BROKER_CONSUME_STATS:
                return fetchAllConsumeStatsInBroker(ctx, request);
//                3八、源碼解析之查詢消費隊列=》
            case RequestCode.QUERY_CONSUME_QUEUE:
                return queryConsumeQueue(ctx, request);
            default:
                break;
        }

        return null;
    }

進入到這個方法org.apache.rocketmq.broker.processor.AdminBrokerProcessor#updateAndCreateTopicide

private synchronized RemotingCommand updateAndCreateTopic(ChannelHandlerContext ctx,
        RemotingCommand request) throws RemotingCommandException {
        final RemotingCommand response = RemotingCommand.createResponseCommand(null);
        final CreateTopicRequestHeader requestHeader =
            (CreateTopicRequestHeader) request.decodeCommandCustomHeader(CreateTopicRequestHeader.class);
        log.info("updateAndCreateTopic called by {}", RemotingHelper.parseChannelRemoteAddr(ctx.channel()));
//        若是topic名和broker集羣名字同樣了報錯
        if (requestHeader.getTopic().equals(this.brokerController.getBrokerConfig().getBrokerClusterName())) {
            String errorMsg = "the topic[" + requestHeader.getTopic() + "] is conflict with system reserved words.";
            log.warn(errorMsg);
            response.setCode(ResponseCode.SYSTEM_ERROR);
            response.setRemark(errorMsg);
            return response;
        }

        try {
            response.setCode(ResponseCode.SUCCESS);
            response.setOpaque(request.getOpaque());
            response.markResponseType();
            response.setRemark(null);
            ctx.writeAndFlush(response);
        } catch (Exception e) {
            log.error("Failed to produce a proper response", e);
        }

//      建立topic配置信息
        TopicConfig topicConfig = new TopicConfig(requestHeader.getTopic());
        topicConfig.setReadQueueNums(requestHeader.getReadQueueNums());
        topicConfig.setWriteQueueNums(requestHeader.getWriteQueueNums());
        topicConfig.setTopicFilterType(requestHeader.getTopicFilterTypeEnum());
        topicConfig.setPerm(requestHeader.getPerm());
        topicConfig.setTopicSysFlag(requestHeader.getTopicSysFlag() == null ? 0 : requestHeader.getTopicSysFlag());
//        更新topic配置=》
        this.brokerController.getTopicConfigManager().updateTopicConfig(topicConfig);
//        按版本號註冊broker數據 =》
        this.brokerController.registerIncrementBrokerData(topicConfig,this.brokerController.getTopicConfigManager().getDataVersion());
        return null;
    }

進入這個方法,更新topic配置org.apache.rocketmq.broker.topic.TopicConfigManager#updateTopicConfigfetch

public void updateTopicConfig(final TopicConfig topicConfig) {
//        從緩存中取出以前的topic配置信息
        TopicConfig old = this.topicConfigTable.put(topicConfig.getTopicName(), topicConfig);
        if (old != null) {
            log.info("update topic config, old:[{}] new:[{}]", old, topicConfig);
        } else {
            log.info("create new topic [{}]", topicConfig);
        }

//       這裏用版本號來標記數據改變過了
        this.dataVersion.nextVersion();
//        =》持久化
        this.persist();
    }

進入這個方法,持久化topic配置信息org.apache.rocketmq.common.ConfigManager#persistui

public synchronized void persist() {
//        持久化的是json存儲,序列化的時候按版本號維護的數據 =》
        String jsonString = this.encode(true);
        if (jsonString != null) {
//             config/topics.json 文件存儲 =》
            String fileName = this.configFilePath();
            try {
//                保存文件 =》
                MixAll.string2File(jsonString, fileName);
            } catch (IOException e) {
                log.error("persist file " + fileName + " exception", e);
            }
        }
    }

進入這個方法保存文件org.apache.rocketmq.common.MixAll#string2File

public static void string2File(final String str, final String fileName) throws IOException {

//        要保存的內容存儲在臨時文件中
        String tmpFile = fileName + ".tmp";
        string2FileNotSafe(str, tmpFile);
//        把原來的數據進行備份
        String bakFile = fileName + ".bak";
        String prevContent = file2String(fileName);
        if (prevContent != null) {
            string2FileNotSafe(prevContent, bakFile);
        }

//      刪掉源文件
        File file = new File(fileName);
        file.delete();
//       臨時文件重命名
        file = new File(tmpFile);
        file.renameTo(new File(fileName));
    }

往上返回到這個方法,同步註冊broker數據org.apache.rocketmq.broker.BrokerController#registerIncrementBrokerData

public synchronized void registerIncrementBrokerData(TopicConfig topicConfig, DataVersion dataVersion) {
        TopicConfig registerTopicConfig = topicConfig;
        if (!PermName.isWriteable(this.getBrokerConfig().getBrokerPermission())
            || !PermName.isReadable(this.getBrokerConfig().getBrokerPermission())) {
            registerTopicConfig =
                new TopicConfig(topicConfig.getTopicName(), topicConfig.getReadQueueNums(), topicConfig.getWriteQueueNums(),
                    this.brokerConfig.getBrokerPermission());
        }

//      組裝topic序列化信息
        ConcurrentMap<String, TopicConfig> topicConfigTable = new ConcurrentHashMap<String, TopicConfig>();
        topicConfigTable.put(topicConfig.getTopicName(), registerTopicConfig);
        TopicConfigSerializeWrapper topicConfigSerializeWrapper = new TopicConfigSerializeWrapper();
        topicConfigSerializeWrapper.setDataVersion(dataVersion);
        topicConfigSerializeWrapper.setTopicConfigTable(topicConfigTable);
//        註冊全部的broker =》
        doRegisterBrokerAll(true, false, topicConfigSerializeWrapper);
    }

 

進入這個方法org.apache.rocketmq.broker.BrokerController#doRegisterBrokerAll向全部的broker註冊topic配置信息

private void doRegisterBrokerAll(boolean checkOrderConfig, boolean oneway,
        TopicConfigSerializeWrapper topicConfigWrapper) {
//        向全部的broker進行註冊=》
        List<RegisterBrokerResult> registerBrokerResultList = this.brokerOuterAPI.registerBrokerAll(
            this.brokerConfig.getBrokerClusterName(),
            this.getBrokerAddr(),
            this.brokerConfig.getBrokerName(),
            this.brokerConfig.getBrokerId(),
            this.getHAServerAddr(),
            topicConfigWrapper,
//            過濾的服務
            this.filterServerManager.buildNewFilterServerList(),
//            單途
            oneway,
            this.brokerConfig.getRegisterBrokerTimeoutMills(),
            this.brokerConfig.isCompressedRegister());
        if (registerBrokerResultList.size() > 0) {
            RegisterBrokerResult registerBrokerResult = registerBrokerResultList.get(0);
            if (registerBrokerResult != null) {
                if (this.updateMasterHAServerAddrPeriodically && registerBrokerResult.getHaServerAddr() != null) {
//                    更新master地址本地緩存
                    this.messageStore.updateHaMasterAddress(registerBrokerResult.getHaServerAddr());
                }

//                同步設置slave的master地址
                this.slaveSynchronize.setMasterAddr(registerBrokerResult.getMasterAddr());
                if (checkOrderConfig) {
//                   更新訂閱的topic配置 =》
                    this.getTopicConfigManager().updateOrderTopicConfig(registerBrokerResult.getKvTable());
                }
            }
        }
    }

進入這個方法org.apache.rocketmq.broker.out.BrokerOuterAPI#registerBrokerAll

public List<RegisterBrokerResult> registerBrokerAll(
        final String clusterName,
        final String brokerAddr,
        final String brokerName,
        final long brokerId,
        final String haServerAddr,
        final TopicConfigSerializeWrapper topicConfigWrapper,
        final List<String> filterServerList,
        final boolean oneway,
        final int timeoutMills,
        final boolean compressed) {

        final List<RegisterBrokerResult> registerBrokerResultList = Lists.newArrayList();
//        獲取namesrv地址集合
        List<String> nameServerAddressList = this.remotingClient.getNameServerAddressList();
        if (nameServerAddressList != null && nameServerAddressList.size() > 0) {

            final RegisterBrokerRequestHeader requestHeader = new RegisterBrokerRequestHeader();
            requestHeader.setBrokerAddr(brokerAddr);
            requestHeader.setBrokerId(brokerId);
            requestHeader.setBrokerName(brokerName);
            requestHeader.setClusterName(clusterName);
            requestHeader.setHaServerAddr(haServerAddr);
            requestHeader.setCompressed(compressed);
            RegisterBrokerBody requestBody = new RegisterBrokerBody();
            requestBody.setTopicConfigSerializeWrapper(topicConfigWrapper);
            requestBody.setFilterServerList(filterServerList);
//            對請求體進行編碼=》
            final byte[] body = requestBody.encode(compressed);
//            壓縮處理
            final int bodyCrc32 = UtilAll.crc32(body);
            requestHeader.setBodyCrc32(bodyCrc32);
            final CountDownLatch countDownLatch = new CountDownLatch(nameServerAddressList.size());
//            併發向namesrv集羣註冊broker
            for (final String namesrvAddr : nameServerAddressList) {
                brokerOuterExecutor.execute(new Runnable() {
                    @Override
                    public void run() {
                        try {
//                            註冊broker服務任務分發=》
                            RegisterBrokerResult result = registerBroker(namesrvAddr,oneway, timeoutMills,requestHeader,body);
                            if (result != null) {
                                registerBrokerResultList.add(result);
                            }

                            log.info("register broker to name server {} OK", namesrvAddr);
                        } catch (Exception e) {
                            log.warn("registerBroker Exception, {}", namesrvAddr, e);
                        } finally {
                            countDownLatch.countDown();
                        }
                    }
                });
            }

            try {
                countDownLatch.await(timeoutMills, TimeUnit.MILLISECONDS);
            } catch (InterruptedException e) {
            }
        }

        return registerBrokerResultList;
    }

進入這個方法對請求提進行編碼org.apache.rocketmq.common.protocol.body.RegisterBrokerBody#encode

public byte[] encode(boolean compress) {

        if (!compress) {
//            json編碼
            return super.encode();
        }
        long start = System.currentTimeMillis();
        ByteArrayOutputStream byteArrayOutputStream = new ByteArrayOutputStream();
        DeflaterOutputStream outputStream = new DeflaterOutputStream(byteArrayOutputStream, new Deflater(Deflater.BEST_COMPRESSION));
        DataVersion dataVersion = topicConfigSerializeWrapper.getDataVersion();
        ConcurrentMap<String, TopicConfig> topicConfigTable = cloneTopicConfigTable(topicConfigSerializeWrapper.getTopicConfigTable());
        assert topicConfigTable != null;
        try {
//            版本json編碼
            byte[] buffer = dataVersion.encode();
            // write data version
            outputStream.write(convertIntToByteArray(buffer.length));
            outputStream.write(buffer);
            int topicNumber = topicConfigTable.size();
            // write number of topic configs
            outputStream.write(convertIntToByteArray(topicNumber));
            // write topic config entry one by one.
            for (ConcurrentMap.Entry<String, TopicConfig> next : topicConfigTable.entrySet()) {
                buffer = next.getValue().encode().getBytes(MixAll.DEFAULT_CHARSET);
                outputStream.write(convertIntToByteArray(buffer.length));
                outputStream.write(buffer);
            }

            buffer = JSON.toJSONString(filterServerList).getBytes(MixAll.DEFAULT_CHARSET);
            // write filter server list json length
            outputStream.write(convertIntToByteArray(buffer.length));
            // write filter server list json
            outputStream.write(buffer);
            outputStream.finish();
            long interval = System.currentTimeMillis() - start;
            if (interval > 50) {
                LOGGER.info("Compressing takes {}ms", interval);
            }
            return byteArrayOutputStream.toByteArray();
        } catch (IOException e) {
            LOGGER.error("Failed to compress RegisterBrokerBody object", e);
        }

        return null;
    }

進入這個方法org.apache.rocketmq.broker.out.BrokerOuterAPI#registerBroker註冊broker

private RegisterBrokerResult registerBroker(
        final String namesrvAddr,
        final boolean oneway,
        final int timeoutMills,
        final RegisterBrokerRequestHeader requestHeader,
        final byte[] body
    ) throws RemotingCommandException, MQBrokerException, RemotingConnectException, RemotingSendRequestException, RemotingTimeoutException,
        InterruptedException {
        RemotingCommand request = RemotingCommand.createRequestCommand(RequestCode.REGISTER_BROKER, requestHeader);
        request.setBody(body);
        if (oneway) {
            try {
//                單線請求,不關心結果 =》
                this.remotingClient.invokeOneway(namesrvAddr, request, timeoutMills);
            } catch (RemotingTooMuchRequestException e) {
                // Ignore
            }
            return null;
        }

//        broker同步向namesrv註冊broker=》
        RemotingCommand response = this.remotingClient.invokeSync(namesrvAddr, request, timeoutMills);
        assert response != null;
        switch (response.getCode()) {
            case ResponseCode.SUCCESS: {
                RegisterBrokerResponseHeader responseHeader =
                    (RegisterBrokerResponseHeader) response.decodeCommandCustomHeader(RegisterBrokerResponseHeader.class);
                RegisterBrokerResult result = new RegisterBrokerResult();
                result.setMasterAddr(responseHeader.getMasterAddr());
                result.setHaServerAddr(responseHeader.getHaServerAddr());
                if (response.getBody() != null) {
                    result.setKvTable(KVTable.decode(response.getBody(), KVTable.class));
                }
                return result;
            }
            default:
                break;
        }

        throw new MQBrokerException(response.getCode(), response.getRemark());
    }

進入這個方法org.apache.rocketmq.remoting.netty.NettyRemotingClient#invokeOneway單途註冊

@Override
    public void invokeOneway(String addr, RemotingCommand request, long timeoutMillis) throws InterruptedException,
        RemotingConnectException, RemotingTooMuchRequestException, RemotingTimeoutException, RemotingSendRequestException {
//        獲取channel=》
        final Channel channel = this.getAndCreateChannel(addr);
        if (channel != null && channel.isActive()) {
            try {
                if (this.rpcHook != null) {
//                    執行請求執行前的鉤子方法
                    this.rpcHook.doBeforeRequest(addr, request);
                }
//               執行單線請求 =》
                this.invokeOnewayImpl(channel, request, timeoutMillis);
            } catch (RemotingSendRequestException e) {
                log.warn("invokeOneway: send request exception, so close the channel[{}]", addr);
//                異常關閉channel=》
                this.closeChannel(addr, channel);
                throw e;
            }
        } else {
            this.closeChannel(addr, channel);
            throw new RemotingConnectException(addr);
        }
    }

未完待續。

 

說在最後

本次解析僅表明我的觀點,僅供參考。

 

加入技術微信羣

釘釘技術羣

相關文章
相關標籤/搜索