rocketmq源碼解析之name啓動(一)

 

說在前面java

主要解析namrsrv啓動部分,namesrv配置加載、netty server建立、註冊出處理器。apache

 

正文緩存

源碼解析namesrv啓動app

入口找到這個方法org.apache.rocketmq.namesrv.NamesrvStartup#main0這行代碼oop

(controller)

進入到這個方法ui

org.apache.rocketmq.namesrv.NamesrvStartup#startspa

    NamesrvController (NamesrvController controller) Exception {

        (== controller) {
            IllegalArgumentException()}

initResult = controller.initialize()(!initResult) {
            controller.shutdown()System.(-)}

        Runtime.().addShutdownHook(ShutdownHookThread(Callable<Void>() {
            Void () Exception {
                .shutdown()}
        }))controller.start()controller}

先看這行代碼.net

initResult = controller.initialize()

進入這個方法netty

org.apache.rocketmq.namesrv.NamesrvController#initializeserver

..load()

進入這個方法org.apache.rocketmq.namesrv.kvconfig.KVConfigManager#load

    () {
        String content = {
content = MixAll.(..getNamesrvConfig().getKvConfigPath())} (IOException e) {
            .warn(e)}
        (content != ) {
            KVConfigSerializeWrapper kvConfigSerializeWrapper =
                KVConfigSerializeWrapper.(contentKVConfigSerializeWrapper.)(!= kvConfigSerializeWrapper) {
                ..putAll(kvConfigSerializeWrapper.getConfigTable()).info()}
        }
    }

進入這個方法

.= NettyRemotingServer(..)

進入這個方法org.apache.rocketmq.remoting.netty.NettyRemotingServer#NettyRemotingServer(org.apache.rocketmq.remoting.netty.NettyServerConfig, org.apache.rocketmq.remoting.ChannelEventListener)建立netty server

    (NettyServerConfig nettyServerConfigChannelEventListener channelEventListener) {
(nettyServerConfig.getServerOnewaySemaphoreValue()nettyServerConfig.getServerAsyncSemaphoreValue()).= ServerBootstrap().= nettyServerConfig.= channelEventListenerpublicThreadNums = nettyServerConfig.getServerCallbackExecutorThreads()(publicThreadNums <= ) {
            publicThreadNums = }

.= Executors.(publicThreadNumsThreadFactory() {
            AtomicInteger = AtomicInteger()Thread (Runnable r) {
                Thread(r+ ..incrementAndGet())}
        }).= NioEventLoopGroup(ThreadFactory() {
            AtomicInteger = AtomicInteger()Thread (Runnable r) {
                Thread(rString.(..incrementAndGet()))}
        })(useEpoll()) {
            .= EpollEventLoopGroup(nettyServerConfig.getServerSelectorThreads()ThreadFactory() {
                AtomicInteger = AtomicInteger()= .getServerSelectorThreads()Thread (Runnable r) {
                    Thread(rString.(..incrementAndGet()))}
            })} {
.= NioEventLoopGroup(nettyServerConfig.getServerSelectorThreads()ThreadFactory() {
                AtomicInteger = AtomicInteger()= .getServerSelectorThreads()Thread (Runnable r) {
                    Thread(rString.(..incrementAndGet()))}
            })}

loadSslContext()}
.registerProcessor()

進入這個方法org.apache.rocketmq.namesrv.NamesrvController#registerProcessor

    () {
(.isClusterTest()) {

            ..registerDefaultProcessor(ClusterTestRequestProcessor(.getProductEnvName()).)} {

            ..registerDefaultProcessor(DefaultRequestProcessor().)}
    }

進入這個方法org.apache.rocketmq.namesrv.processor.ClusterTestRequestProcessor#ClusterTestRequestProcessor建立集羣請求控制器

(NamesrvController namesrvControllerString productEnvName) {
    (namesrvController).= productEnvName= DefaultMQAdminExt().setInstanceName(+ productEnvName).setUnitName(productEnvName){
        .start()} (MQClientException e) {
        .error(e)}
}

進入這個方法org.apache.rocketmq.tools.admin.DefaultMQAdminExtImpl#start

() MQClientException {
    (.) {
        :.= ServiceState...changeInstanceNameToPID().= MQClientManager.().getAndCreateMQClientInstance(.)registerOK = .registerAdminExt(..getAdminExtGroup())(!registerOK) {
                .= ServiceState.MQClientException(+ ..getAdminExtGroup()
                    + + FAQUrl.(FAQUrl.))}

            .start().info(..getAdminExtGroup()).= ServiceState.:
        :
        :
            MQClientException(+ .+ FAQUrl.(FAQUrl.)):
            }
}
.= MQClientManager.().getAndCreateMQClientInstance(.)

進入這個方法org.apache.rocketmq.client.impl.MQClientManager#getAndCreateMQClientInstance(org.apache.rocketmq.client.ClientConfig, org.apache.rocketmq.remoting.RPCHook)

  MQClientInstance (ClientConfig clientConfigRPCHook rpcHook) {
        String clientId = clientConfig.buildMQClientId()MQClientInstance instance = ..get(clientId)(== instance) {
            instance =
                MQClientInstance(clientConfig.cloneClientConfig()..getAndIncrement()clientIdrpcHook)MQClientInstance prev = ..putIfAbsent(clientIdinstance)(prev != ) {
                instance = prev.warn(clientId)} {
                .info(clientId)}
        }

        instance}

從緩存中獲取,若是緩存中沒有就建立後在放到緩存中。

進入這個方法建立mqclient對象

org.apache.rocketmq.client.impl.factory.MQClientInstance#MQClientInstance(org.apache.rocketmq.client.ClientConfig, int, java.lang.String, org.apache.rocketmq.remoting.RPCHook)

(ClientConfig clientConfiginstanceIndexString clientIdRPCHook rpcHook) {
    .= clientConfig.= instanceIndex.= NettyClientConfig()..setClientCallbackExecutorThreads(clientConfig.getClientCallbackExecutorThreads())..setUseTLS(clientConfig.isUseTLS()).= ClientRemotingProcessor().= MQClientAPIImpl(..rpcHookclientConfig)(..getNamesrvAddr() != ) {
        ..updateNameServerAddressList(..getNamesrvAddr()).info(..getNamesrvAddr())}

    .= clientId.= MQAdminImpl().= PullMessageService().= RebalanceService().= DefaultMQProducer(MixAll.)..resetClientConfig(clientConfig).= ConsumerStatsManager(.).info(...MQVersion.(MQVersion.)RemotingCommand.())}

 

說在最後

本篇介紹namesrv啓動源碼解析,僅表明我的觀點,歡迎一塊兒討論交流。

關注「天河聊技術」公衆號

 

 

加羣討論

 

相關文章
相關標籤/搜索