說在前面java
主要解析namrsrv啓動部分,namesrv配置加載、netty server建立、註冊出處理器。apache
正文緩存
源碼解析namesrv啓動app
入口找到這個方法org.apache.rocketmq.namesrv.NamesrvStartup#main0這行代碼oop
(controller)
進入到這個方法ui
org.apache.rocketmq.namesrv.NamesrvStartup#startspa
NamesrvController (NamesrvController controller) Exception { (== controller) { IllegalArgumentException()} initResult = controller.initialize()(!initResult) { controller.shutdown()System.(-)} Runtime.().addShutdownHook(ShutdownHookThread(Callable<Void>() { Void () Exception { .shutdown()} }))controller.start()controller}
先看這行代碼.net
initResult = controller.initialize()
進入這個方法netty
org.apache.rocketmq.namesrv.NamesrvController#initializeserver
..load()
進入這個方法org.apache.rocketmq.namesrv.kvconfig.KVConfigManager#load
() { String content = { content = MixAll.(..getNamesrvConfig().getKvConfigPath())} (IOException e) { .warn(e)} (content != ) { KVConfigSerializeWrapper kvConfigSerializeWrapper = KVConfigSerializeWrapper.(contentKVConfigSerializeWrapper.)(!= kvConfigSerializeWrapper) { ..putAll(kvConfigSerializeWrapper.getConfigTable()).info()} } }
進入這個方法
.= NettyRemotingServer(..)
進入這個方法org.apache.rocketmq.remoting.netty.NettyRemotingServer#NettyRemotingServer(org.apache.rocketmq.remoting.netty.NettyServerConfig, org.apache.rocketmq.remoting.ChannelEventListener)建立netty server
(NettyServerConfig nettyServerConfigChannelEventListener channelEventListener) { (nettyServerConfig.getServerOnewaySemaphoreValue()nettyServerConfig.getServerAsyncSemaphoreValue()).= ServerBootstrap().= nettyServerConfig.= channelEventListenerpublicThreadNums = nettyServerConfig.getServerCallbackExecutorThreads()(publicThreadNums <= ) { publicThreadNums = } .= Executors.(publicThreadNumsThreadFactory() { AtomicInteger = AtomicInteger()Thread (Runnable r) { Thread(r+ ..incrementAndGet())} }).= NioEventLoopGroup(ThreadFactory() { AtomicInteger = AtomicInteger()Thread (Runnable r) { Thread(rString.(..incrementAndGet()))} })(useEpoll()) { .= EpollEventLoopGroup(nettyServerConfig.getServerSelectorThreads()ThreadFactory() { AtomicInteger = AtomicInteger()= .getServerSelectorThreads()Thread (Runnable r) { Thread(rString.(..incrementAndGet()))} })} { .= NioEventLoopGroup(nettyServerConfig.getServerSelectorThreads()ThreadFactory() { AtomicInteger = AtomicInteger()= .getServerSelectorThreads()Thread (Runnable r) { Thread(rString.(..incrementAndGet()))} })} loadSslContext()}
.registerProcessor()
進入這個方法org.apache.rocketmq.namesrv.NamesrvController#registerProcessor
() { (.isClusterTest()) { ..registerDefaultProcessor(ClusterTestRequestProcessor(.getProductEnvName()).)} { ..registerDefaultProcessor(DefaultRequestProcessor().)} }
進入這個方法org.apache.rocketmq.namesrv.processor.ClusterTestRequestProcessor#ClusterTestRequestProcessor建立集羣請求控制器
(NamesrvController namesrvControllerString productEnvName) { (namesrvController).= productEnvName= DefaultMQAdminExt().setInstanceName(+ productEnvName).setUnitName(productEnvName){ .start()} (MQClientException e) { .error(e)} }
進入這個方法org.apache.rocketmq.tools.admin.DefaultMQAdminExtImpl#start
() MQClientException { (.) { :.= ServiceState...changeInstanceNameToPID().= MQClientManager.().getAndCreateMQClientInstance(.)registerOK = .registerAdminExt(..getAdminExtGroup())(!registerOK) { .= ServiceState.MQClientException(+ ..getAdminExtGroup() + + FAQUrl.(FAQUrl.))} .start().info(..getAdminExtGroup()).= ServiceState.: : : MQClientException(+ .+ FAQUrl.(FAQUrl.)): } }
.= MQClientManager.().getAndCreateMQClientInstance(.)
進入這個方法org.apache.rocketmq.client.impl.MQClientManager#getAndCreateMQClientInstance(org.apache.rocketmq.client.ClientConfig, org.apache.rocketmq.remoting.RPCHook)
MQClientInstance (ClientConfig clientConfigRPCHook rpcHook) { String clientId = clientConfig.buildMQClientId()MQClientInstance instance = ..get(clientId)(== instance) { instance = MQClientInstance(clientConfig.cloneClientConfig()..getAndIncrement()clientIdrpcHook)MQClientInstance prev = ..putIfAbsent(clientIdinstance)(prev != ) { instance = prev.warn(clientId)} { .info(clientId)} } instance}
從緩存中獲取,若是緩存中沒有就建立後在放到緩存中。
進入這個方法建立mqclient對象
org.apache.rocketmq.client.impl.factory.MQClientInstance#MQClientInstance(org.apache.rocketmq.client.ClientConfig, int, java.lang.String, org.apache.rocketmq.remoting.RPCHook)
(ClientConfig clientConfiginstanceIndexString clientIdRPCHook rpcHook) { .= clientConfig.= instanceIndex.= NettyClientConfig()..setClientCallbackExecutorThreads(clientConfig.getClientCallbackExecutorThreads())..setUseTLS(clientConfig.isUseTLS()).= ClientRemotingProcessor().= MQClientAPIImpl(..rpcHookclientConfig)(..getNamesrvAddr() != ) { ..updateNameServerAddressList(..getNamesrvAddr()).info(..getNamesrvAddr())} .= clientId.= MQAdminImpl().= PullMessageService().= RebalanceService().= DefaultMQProducer(MixAll.)..resetClientConfig(clientConfig).= ConsumerStatsManager(.).info(...MQVersion.(MQVersion.)RemotingCommand.())}
說在最後
本篇介紹namesrv啓動源碼解析,僅表明我的觀點,歡迎一塊兒討論交流。
關注「天河聊技術」公衆號
加羣討論