在rocketmq中,nameserver充當了一個配置管理者的角色,看起來好似不過重要。然而它是一個不或缺的角色,沒有了它的存在,各個broker就是人心渙散,各自爲戰。apache
因此,實際上,在rocketmq中,nameserver也是一個領導者的角色。它能夠決定哪一個消息存儲到哪裏,哪一個broker幹活或者上下線,在出現異常狀況時,它要可以及時處理。以便讓整個團隊發揮應有的做用。nameserver至關於一個分佈式系統的協調者。可是這個名字,是否是看起來很熟悉?請看後續!編程
如文章開頭所說,nameserver擔任的,差很少是一個系統協調者這麼個角色。那麼,咱們知道,在分佈式協調工做方面,有不少現成的組件可用。好比 zookeeper, 那麼爲何還要本身搞一套nameserver出來?是爲了刷存在感?json
對於爲何不選擇zk之類的組件實現協調者角色,初衷如何咱們不得而知。但至少有幾個可知答案能夠作下支撐:(以zk爲例)服務器
1. zk存在大量的集羣間通訊;
2. zk是一個比較重的組件,而自己就做爲消息中間的mq,則最好很差另外再依賴其餘組件;(我的感受)
3. zk對於數據的固化能力比較弱,配置每每受限於zk的數據格式;數據結構
整體來講,可能就是rocketmq想要作的功能在zk上不太好作,或者作起來也費勁,或者過重,索性就不要搞了。本身搞一個徹底定製化的好了。事實上,rocketmq的nameserver也實現得至關簡單輕量。這也是設計者的初衷吧。app
通常地,一個框架級別的服務啓動,仍是有些複雜的,那樣的話,咱們懶得去看其具體過程。但前面說了,nameserver實現得很是輕量級,因此,其啓動也就至關簡單。因此,咱們能夠快速一覽其過程。框架
整個nameserver的啓動類是 org.apache.rocketmq.namesrv.NamesrvStartup, 工做過程大體以下:dom
// 入口main public static void main(String[] args) { main0(args); } public static NamesrvController main0(String[] args) { try { // 建立本服務的核心控制器, 解析各類配置參數,默認值之類的 NamesrvController controller = createNamesrvController(args); // 開啓服務, 如打開 start(controller); String tip = "The Name Server boot success. serializeType=" + RemotingCommand.getSerializeTypeConfigInThisServer(); log.info(tip); System.out.printf("%s%n", tip); return controller; } catch (Throwable e) { e.printStackTrace(); System.exit(-1); } return null; }
因此整個啓動過程,基本就是一個 Controller 搞定了,你說不簡單嗎?額,也許不必定!整個建立 Controller 的過程就是解析參數的過程,有興趣能夠打開以下代碼看看:tcp
public static NamesrvController createNamesrvController(String[] args) throws IOException, JoranException { System.setProperty(RemotingCommand.REMOTING_VERSION_KEY, Integer.toString(MQVersion.CURRENT_VERSION)); //PackageConflictDetect.detectFastjson(); Options options = ServerUtil.buildCommandlineOptions(new Options()); commandLine = ServerUtil.parseCmdLine("mqnamesrv", args, buildCommandlineOptions(options), new PosixParser()); if (null == commandLine) { System.exit(-1); return null; } final NamesrvConfig namesrvConfig = new NamesrvConfig(); final NettyServerConfig nettyServerConfig = new NettyServerConfig(); nettyServerConfig.setListenPort(9876); // -c xx.properties 用於指定配置文件,優先級較低 if (commandLine.hasOption('c')) { String file = commandLine.getOptionValue('c'); if (file != null) { InputStream in = new BufferedInputStream(new FileInputStream(file)); properties = new Properties(); properties.load(in); MixAll.properties2Object(properties, namesrvConfig); MixAll.properties2Object(properties, nettyServerConfig); namesrvConfig.setConfigStorePath(file); System.out.printf("load config properties file OK, %s%n", file); in.close(); } } // -p 僅爲打印查看啓動參數 if (commandLine.hasOption('p')) { InternalLogger console = InternalLoggerFactory.getLogger(LoggerName.NAMESRV_CONSOLE_NAME); MixAll.printObjectProperties(console, namesrvConfig); MixAll.printObjectProperties(console, nettyServerConfig); System.exit(0); } MixAll.properties2Object(ServerUtil.commandLine2Properties(commandLine), namesrvConfig); if (null == namesrvConfig.getRocketmqHome()) { System.out.printf("Please set the %s variable in your environment to match the location of the RocketMQ installation%n", MixAll.ROCKETMQ_HOME_ENV); System.exit(-2); } LoggerContext lc = (LoggerContext) LoggerFactory.getILoggerFactory(); JoranConfigurator configurator = new JoranConfigurator(); configurator.setContext(lc); lc.reset(); configurator.doConfigure(namesrvConfig.getRocketmqHome() + "/conf/logback_namesrv.xml"); log = InternalLoggerFactory.getLogger(LoggerName.NAMESRV_LOGGER_NAME); MixAll.printObjectProperties(log, namesrvConfig); MixAll.printObjectProperties(log, nettyServerConfig); // 將配置參數傳入controller構造實例 final NamesrvController controller = new NamesrvController(namesrvConfig, nettyServerConfig); // remember all configs to prevent discard controller.getConfiguration().registerConfig(properties); return controller; } // Controller 構造方法 // org.apache.rocketmq.namesrv.NamesrvController#NamesrvController public NamesrvController(NamesrvConfig namesrvConfig, NettyServerConfig nettyServerConfig) { this.namesrvConfig = namesrvConfig; this.nettyServerConfig = nettyServerConfig; this.kvConfigManager = new KVConfigManager(this); this.routeInfoManager = new RouteInfoManager(); this.brokerHousekeepingService = new BrokerHousekeepingService(this); this.configuration = new Configuration( log, this.namesrvConfig, this.nettyServerConfig ); this.configuration.setStorePathFromConfig(this.namesrvConfig, "configStorePath"); } // org.apache.rocketmq.common.Configuration#registerConfig /** * register config properties * * @return the current Configuration object */ public Configuration registerConfig(Properties extProperties) { if (extProperties == null) { return this; } try { readWriteLock.writeLock().lockInterruptibly(); try { merge(extProperties, this.allConfigs); } finally { readWriteLock.writeLock().unlock(); } } catch (InterruptedException e) { log.error("register lock error. {}" + extProperties); } return this; }
接下來,咱們主要來看看這start()過程到底如何,複雜性必然都在這裏了。分佈式
// org.apache.rocketmq.namesrv.NamesrvStartup#start public static NamesrvController start(final NamesrvController controller) throws Exception { if (null == controller) { throw new IllegalArgumentException("NamesrvController is null"); } // 初始化controller各環境,若是失敗,則退出啓動 boolean initResult = controller.initialize(); if (!initResult) { controller.shutdown(); System.exit(-3); } // 註冊一個關閉鉤子 Runtime.getRuntime().addShutdownHook(new ShutdownHookThread(log, new Callable<Void>() { @Override public Void call() throws Exception { controller.shutdown(); return null; } })); // 核心start()方法 controller.start(); return controller; } // org.apache.rocketmq.namesrv.NamesrvController#initialize public boolean initialize() { this.kvConfigManager.load(); this.remotingServer = new NettyRemotingServer(this.nettyServerConfig, this.brokerHousekeepingService); this.remotingExecutor = Executors.newFixedThreadPool(nettyServerConfig.getServerWorkerThreads(), new ThreadFactoryImpl("RemotingExecutorThread_")); // 註冊處理器 this.registerProcessor(); // 啓動後臺掃描線程,掃描掉線的broker this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() { @Override public void run() { NamesrvController.this.routeInfoManager.scanNotActiveBroker(); } }, 5, 10, TimeUnit.SECONDS); // 打印日誌定時任務 this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() { @Override public void run() { NamesrvController.this.kvConfigManager.printAllPeriodically(); } }, 1, 10, TimeUnit.MINUTES); if (TlsSystemConfig.tlsMode != TlsMode.DISABLED) { // Register a listener to reload SslContext try { fileWatchService = new FileWatchService( new String[] { TlsSystemConfig.tlsServerCertPath, TlsSystemConfig.tlsServerKeyPath, TlsSystemConfig.tlsServerTrustCertPath }, new FileWatchService.Listener() { boolean certChanged, keyChanged = false; @Override public void onChanged(String path) { if (path.equals(TlsSystemConfig.tlsServerTrustCertPath)) { log.info("The trust certificate changed, reload the ssl context"); reloadServerSslContext(); } if (path.equals(TlsSystemConfig.tlsServerCertPath)) { certChanged = true; } if (path.equals(TlsSystemConfig.tlsServerKeyPath)) { keyChanged = true; } if (certChanged && keyChanged) { log.info("The certificate and private key changed, reload the ssl context"); certChanged = keyChanged = false; reloadServerSslContext(); } } private void reloadServerSslContext() { ((NettyRemotingServer) remotingServer).loadSslContext(); } }); } catch (Exception e) { log.warn("FileWatchService created error, can't load the certificate dynamically"); } } // no false return true; } private void registerProcessor() { if (namesrvConfig.isClusterTest()) { this.remotingServer.registerDefaultProcessor(new ClusterTestRequestProcessor(this, namesrvConfig.getProductEnvName()), this.remotingExecutor); } else { // 只會有一個處理器處理業務 this.remotingServer.registerDefaultProcessor(new DefaultRequestProcessor(this), this.remotingExecutor); } } // 初始化完成後,接下來是 start() 方法 // org.apache.rocketmq.namesrv.NamesrvController#start public void start() throws Exception { // 開啓後臺端口服務,nameserver可鏈接 this.remotingServer.start(); // 文件檢測線程 if (this.fileWatchService != null) { this.fileWatchService.start(); } }
可見,controller的啓動過程也很是簡單,就是設置好各初始實例,註冊處理器,而後將tcp端口打開,便可。其中端口服務是使用netty做爲通訊組件,其操做徹底聽從netty編程範式。可自行查閱。
@Override public void start() { this.defaultEventExecutorGroup = new DefaultEventExecutorGroup( nettyServerConfig.getServerWorkerThreads(), new ThreadFactory() { private AtomicInteger threadIndex = new AtomicInteger(0); @Override public Thread newThread(Runnable r) { return new Thread(r, "NettyServerCodecThread_" + this.threadIndex.incrementAndGet()); } }); prepareSharableHandlers(); ServerBootstrap childHandler = this.serverBootstrap.group(this.eventLoopGroupBoss, this.eventLoopGroupSelector) .channel(useEpoll() ? EpollServerSocketChannel.class : NioServerSocketChannel.class) .option(ChannelOption.SO_BACKLOG, 1024) .option(ChannelOption.SO_REUSEADDR, true) .option(ChannelOption.SO_KEEPALIVE, false) .childOption(ChannelOption.TCP_NODELAY, true) .childOption(ChannelOption.SO_SNDBUF, nettyServerConfig.getServerSocketSndBufSize()) .childOption(ChannelOption.SO_RCVBUF, nettyServerConfig.getServerSocketRcvBufSize()) .localAddress(new InetSocketAddress(this.nettyServerConfig.getListenPort())) .childHandler(new ChannelInitializer<SocketChannel>() { @Override public void initChannel(SocketChannel ch) throws Exception { ch.pipeline() .addLast(defaultEventExecutorGroup, HANDSHAKE_HANDLER_NAME, handshakeHandler) .addLast(defaultEventExecutorGroup, encoder, new NettyDecoder(), new IdleStateHandler(0, 0, nettyServerConfig.getServerChannelMaxIdleTimeSeconds()), connectionManageHandler, serverHandler ); } }); if (nettyServerConfig.isServerPooledByteBufAllocatorEnable()) { childHandler.childOption(ChannelOption.ALLOCATOR, PooledByteBufAllocator.DEFAULT); } try { ChannelFuture sync = this.serverBootstrap.bind().sync(); InetSocketAddress addr = (InetSocketAddress) sync.channel().localAddress(); this.port = addr.getPort(); } catch (InterruptedException e1) { throw new RuntimeException("this.serverBootstrap.bind().sync() InterruptedException", e1); } if (this.channelEventListener != null) { this.nettyEventExecutor.start(); } this.timer.scheduleAtFixedRate(new TimerTask() { @Override public void run() { try { NettyRemotingServer.this.scanResponseTable(); } catch (Throwable e) { log.error("scanResponseTable exception", e); } } }, 1000 * 3, 1000); }
至此,nameserver的啓動流程就完成了,果真是輕量級。至於其提供什麼樣的服務,咱們下一節再講。
因nameserver和broker同樣,都共用了remoting模塊的代碼,即都依賴於netty的handler處理機制。因此其處理器入口都是同樣的。反正最終都是找到對應的processor, 而後處理業務便可。此處,nameserver只會提供一個默認的處理器,即DefaultRequestProcessor。因此,只需瞭解其processRequest()便可知nameserver的總體能力了。
// org.apache.rocketmq.namesrv.processor.DefaultRequestProcessor#processRequest @Override public RemotingCommand processRequest(ChannelHandlerContext ctx, RemotingCommand request) throws RemotingCommandException { if (ctx != null) { log.debug("receive request, {} {} {}", request.getCode(), RemotingHelper.parseChannelRemoteAddr(ctx.channel()), request); } switch (request.getCode()) { case RequestCode.PUT_KV_CONFIG: return this.putKVConfig(ctx, request); case RequestCode.GET_KV_CONFIG: return this.getKVConfig(ctx, request); case RequestCode.DELETE_KV_CONFIG: return this.deleteKVConfig(ctx, request); case RequestCode.QUERY_DATA_VERSION: return queryBrokerTopicConfig(ctx, request); // 註冊broker信息,這種操做通常是在broker啓動的時候進行請求 case RequestCode.REGISTER_BROKER: Version brokerVersion = MQVersion.value2Version(request.getVersion()); if (brokerVersion.ordinal() >= MQVersion.Version.V3_0_11.ordinal()) { return this.registerBrokerWithFilterServer(ctx, request); } else { return this.registerBroker(ctx, request); } // 下線broker case RequestCode.UNREGISTER_BROKER: return this.unregisterBroker(ctx, request); // 獲取路由信息,即哪一個topic存在於哪些broker上,哪些messageQueue在哪裏等 case RequestCode.GET_ROUTEINFO_BY_TOPIC: return this.getRouteInfoByTopic(ctx, request); case RequestCode.GET_BROKER_CLUSTER_INFO: return this.getBrokerClusterInfo(ctx, request); case RequestCode.WIPE_WRITE_PERM_OF_BROKER: return this.wipeWritePermOfBroker(ctx, request); case RequestCode.GET_ALL_TOPIC_LIST_FROM_NAMESERVER: return getAllTopicListFromNameserver(ctx, request); case RequestCode.DELETE_TOPIC_IN_NAMESRV: return deleteTopicInNamesrv(ctx, request); case RequestCode.GET_KVLIST_BY_NAMESPACE: return this.getKVListByNamespace(ctx, request); case RequestCode.GET_TOPICS_BY_CLUSTER: return this.getTopicsByCluster(ctx, request); case RequestCode.GET_SYSTEM_TOPIC_LIST_FROM_NS: return this.getSystemTopicListFromNs(ctx, request); case RequestCode.GET_UNIT_TOPIC_LIST: return this.getUnitTopicList(ctx, request); case RequestCode.GET_HAS_UNIT_SUB_TOPIC_LIST: return this.getHasUnitSubTopicList(ctx, request); case RequestCode.GET_HAS_UNIT_SUB_UNUNIT_TOPIC_LIST: return this.getHasUnitSubUnUnitTopicList(ctx, request); case RequestCode.UPDATE_NAMESRV_CONFIG: return this.updateConfig(ctx, request); case RequestCode.GET_NAMESRV_CONFIG: return this.getConfig(ctx, request); default: break; } return null; }
以上就是整個nameserver提供的服務列表了,也沒啥註釋,見字如悟吧,咱們也不想過多糾纏。但整體上,其處理的業務類型並很少,主要有三類:
1. 配置信息kv的操做;
2. broker上下線管理操做;
3. topic路由信息管理服務;
各自實現固然是按照業務處理,本無需多說,但爲了解概要,咱們仍是挑一個重點來講說吧:broker的上線處理註冊:
// 爲保持前沿起見,我們以高版本服務展開思路(即版本大於3.0.11) public RemotingCommand registerBrokerWithFilterServer(ChannelHandlerContext ctx, RemotingCommand request) throws RemotingCommandException { final RemotingCommand response = RemotingCommand.createResponseCommand(RegisterBrokerResponseHeader.class); final RegisterBrokerResponseHeader responseHeader = (RegisterBrokerResponseHeader) response.readCustomHeader(); final RegisterBrokerRequestHeader requestHeader = (RegisterBrokerRequestHeader) request.decodeCommandCustomHeader(RegisterBrokerRequestHeader.class); if (!checksum(ctx, request, requestHeader)) { response.setCode(ResponseCode.SYSTEM_ERROR); response.setRemark("crc32 not match"); return response; } RegisterBrokerBody registerBrokerBody = new RegisterBrokerBody(); if (request.getBody() != null) { try { registerBrokerBody = RegisterBrokerBody.decode(request.getBody(), requestHeader.isCompressed()); } catch (Exception e) { throw new RemotingCommandException("Failed to decode RegisterBrokerBody", e); } } else { registerBrokerBody.getTopicConfigSerializeWrapper().getDataVersion().setCounter(new AtomicLong(0)); registerBrokerBody.getTopicConfigSerializeWrapper().getDataVersion().setTimestamp(0); } // 重點實現: registerBroker RegisterBrokerResult result = this.namesrvController.getRouteInfoManager().registerBroker( requestHeader.getClusterName(), requestHeader.getBrokerAddr(), requestHeader.getBrokerName(), requestHeader.getBrokerId(), requestHeader.getHaServerAddr(), registerBrokerBody.getTopicConfigSerializeWrapper(), registerBrokerBody.getFilterServerList(), ctx.channel()); responseHeader.setHaServerAddr(result.getHaServerAddr()); responseHeader.setMasterAddr(result.getMasterAddr()); byte[] jsonValue = this.namesrvController.getKvConfigManager().getKVListByNamespace(NamesrvUtil.NAMESPACE_ORDER_TOPIC_CONFIG); response.setBody(jsonValue); response.setCode(ResponseCode.SUCCESS); response.setRemark(null); return response; } // org.apache.rocketmq.namesrv.routeinfo.RouteInfoManager#registerBroker public RegisterBrokerResult registerBroker( final String clusterName, final String brokerAddr, final String brokerName, final long brokerId, final String haServerAddr, final TopicConfigSerializeWrapper topicConfigWrapper, final List<String> filterServerList, final Channel channel) { RegisterBrokerResult result = new RegisterBrokerResult(); try { try { // 上鎖更新各表數據 this.lock.writeLock().lockInterruptibly(); // 集羣名錶 Set<String> brokerNames = this.clusterAddrTable.get(clusterName); if (null == brokerNames) { brokerNames = new HashSet<String>(); this.clusterAddrTable.put(clusterName, brokerNames); } brokerNames.add(brokerName); boolean registerFirst = false; // broker詳細信息表 BrokerData brokerData = this.brokerAddrTable.get(brokerName); if (null == brokerData) { registerFirst = true; brokerData = new BrokerData(clusterName, brokerName, new HashMap<Long, String>()); this.brokerAddrTable.put(brokerName, brokerData); } Map<Long, String> brokerAddrsMap = brokerData.getBrokerAddrs(); //Switch slave to master: first remove <1, IP:PORT> in namesrv, then add <0, IP:PORT> //The same IP:PORT must only have one record in brokerAddrTable Iterator<Entry<Long, String>> it = brokerAddrsMap.entrySet().iterator(); while (it.hasNext()) { Entry<Long, String> item = it.next(); if (null != brokerAddr && brokerAddr.equals(item.getValue()) && brokerId != item.getKey()) { it.remove(); } } String oldAddr = brokerData.getBrokerAddrs().put(brokerId, brokerAddr); registerFirst = registerFirst || (null == oldAddr); if (null != topicConfigWrapper && MixAll.MASTER_ID == brokerId) { if (this.isBrokerTopicConfigChanged(brokerAddr, topicConfigWrapper.getDataVersion()) || registerFirst) { // 首次註冊或者topic變動,則更新topic信息 ConcurrentMap<String, TopicConfig> tcTable = topicConfigWrapper.getTopicConfigTable(); if (tcTable != null) { for (Map.Entry<String, TopicConfig> entry : tcTable.entrySet()) { this.createAndUpdateQueueData(brokerName, entry.getValue()); } } } } // 存活的broker信息表 BrokerLiveInfo prevBrokerLiveInfo = this.brokerLiveTable.put(brokerAddr, new BrokerLiveInfo( System.currentTimeMillis(), topicConfigWrapper.getDataVersion(), channel, haServerAddr)); if (null == prevBrokerLiveInfo) { log.info("new broker registered, {} HAServer: {}", brokerAddr, haServerAddr); } if (filterServerList != null) { if (filterServerList.isEmpty()) { this.filterServerTable.remove(brokerAddr); } else { this.filterServerTable.put(brokerAddr, filterServerList); } } // slave節點註冊需綁定masterAddr 返回 if (MixAll.MASTER_ID != brokerId) { String masterAddr = brokerData.getBrokerAddrs().get(MixAll.MASTER_ID); if (masterAddr != null) { BrokerLiveInfo brokerLiveInfo = this.brokerLiveTable.get(masterAddr); if (brokerLiveInfo != null) { result.setHaServerAddr(brokerLiveInfo.getHaServerAddr()); result.setMasterAddr(masterAddr); } } } } finally { this.lock.writeLock().unlock(); } } catch (Exception e) { log.error("registerBroker Exception", e); } return result; }
好吧,是否是很抽象。不要緊,能知道大概意思就好了。大致上就是broker上線了,nameserver須要知道這些事,要把這信息加入到各表項中,以備未來使用。具體理解咱們應該要從業務性質出發才能透徹。反正就和我們平時寫業務代碼並沒有二致。
nameserver除了有註冊broker的核心做用外,還有一個很是核心的做用就是,爲各消費者或生產者提供各topic信息所在位置。這個位置決定了數據如何存儲以及如何訪問問題,只要這個決策出問題,則整個集羣的可靠性就沒法保證了。因此,這個點須要咱們深刻理解下。
在kafka中,其存儲策略是和shard強相關的,一個topic分配了多少shard就決定了它能夠存儲到幾個機器節點上,即kafka是以shard做爲粒度分配存儲的。
但rocketmq中則不太同樣,相似的概念有:topic是最外層的存儲,而messageQueue則是內一層的存儲,它是不是按照topic存儲或者按照msgQueue存在呢?實際上,在官方文檔中,已經描述清楚了: Broker 在實際部署過程當中對應一臺服務器,每一個 Broker 能夠存儲多個Topic的消息,每一個Topic的消息也能夠分片存儲於不一樣的 Broker。Message Queue 用於存儲消息的物理地址,每一個Topic中的消息地址存儲於多個 Message Queue 中。
即rocketmq中是以message queue做爲最細粒度的存儲的,實際上這基本無懸念,由於分佈式存儲須要。(試想以topic爲存儲粒度會帶來多少問題就知道了)
那麼,它又是如何劃分哪一個message queue存儲在哪裏的呢?
// RequestCode.GET_ROUTEINFO_BY_TOPIC public RemotingCommand getRouteInfoByTopic(ChannelHandlerContext ctx, RemotingCommand request) throws RemotingCommandException { final RemotingCommand response = RemotingCommand.createResponseCommand(null); final GetRouteInfoRequestHeader requestHeader = (GetRouteInfoRequestHeader) request.decodeCommandCustomHeader(GetRouteInfoRequestHeader.class); // 獲取topic路由信息 TopicRouteData topicRouteData = this.namesrvController.getRouteInfoManager().pickupTopicRouteData(requestHeader.getTopic()); if (topicRouteData != null) { // 順序消費配置 if (this.namesrvController.getNamesrvConfig().isOrderMessageEnable()) { String orderTopicConf = this.namesrvController.getKvConfigManager().getKVConfig(NamesrvUtil.NAMESPACE_ORDER_TOPIC_CONFIG, requestHeader.getTopic()); topicRouteData.setOrderTopicConf(orderTopicConf); } byte[] content = topicRouteData.encode(); response.setBody(content); response.setCode(ResponseCode.SUCCESS); response.setRemark(null); return response; } response.setCode(ResponseCode.TOPIC_NOT_EXIST); response.setRemark("No topic route info in name server for the topic: " + requestHeader.getTopic() + FAQUrl.suggestTodo(FAQUrl.APPLY_TOPIC_URL)); return response; } // org.apache.rocketmq.namesrv.routeinfo.RouteInfoManager#pickupTopicRouteData public TopicRouteData pickupTopicRouteData(final String topic) { TopicRouteData topicRouteData = new TopicRouteData(); boolean foundQueueData = false; boolean foundBrokerData = false; Set<String> brokerNameSet = new HashSet<String>(); List<BrokerData> brokerDataList = new LinkedList<BrokerData>(); topicRouteData.setBrokerDatas(brokerDataList); HashMap<String, List<String>> filterServerMap = new HashMap<String, List<String>>(); topicRouteData.setFilterServerTable(filterServerMap); try { try { this.lock.readLock().lockInterruptibly(); // 獲取全部topic的messageQueue信息 List<QueueData> queueDataList = this.topicQueueTable.get(topic); if (queueDataList != null) { topicRouteData.setQueueDatas(queueDataList); foundQueueData = true; Iterator<QueueData> it = queueDataList.iterator(); while (it.hasNext()) { QueueData qd = it.next(); brokerNameSet.add(qd.getBrokerName()); } // 根據brokerName, 查找broker信息,若是沒找到說明該broker可能已經下線,不能算在路由信息內 for (String brokerName : brokerNameSet) { BrokerData brokerData = this.brokerAddrTable.get(brokerName); if (null != brokerData) { BrokerData brokerDataClone = new BrokerData(brokerData.getCluster(), brokerData.getBrokerName(), (HashMap<Long, String>) brokerData .getBrokerAddrs().clone()); brokerDataList.add(brokerDataClone); // 只要找到一個broker就能夠進行路由處理 foundBrokerData = true; for (final String brokerAddr : brokerDataClone.getBrokerAddrs().values()) { List<String> filterServerList = this.filterServerTable.get(brokerAddr); filterServerMap.put(brokerAddr, filterServerList); } } } } } finally { this.lock.readLock().unlock(); } } catch (Exception e) { log.error("pickupTopicRouteData Exception", e); } log.debug("pickupTopicRouteData {} {}", topic, topicRouteData); // 只有隊列信息和broker信息都找到時,整個路由信息纔可返回 if (foundBrokerData && foundQueueData) { return topicRouteData; } return null; } // QueueData 做爲路由信息的重要組成部分,其數據結構以下 public class QueueData implements Comparable<QueueData> { private String brokerName; private int readQueueNums; private int writeQueueNums; private int perm; private int topicSynFlag; ... } // brokerData 數據結構以下 public class BrokerData implements Comparable<BrokerData> { private String cluster; private String brokerName; private HashMap<Long/* brokerId */, String/* broker address */> brokerAddrs; ... }
ok, 從上面的實現中,咱們能夠看到,查找路由信息,是根據topic進行查找的。而topic信息保存在 topicQueueTable 中。這裏有個重要點是,整個路由查找過程,竟然的queueId是無關的,那麼它又是如何定位queueId所在的位置呢?另外,這個topicQueTable裏的數據又是什麼時候維護的呢?
首先,對於topicQueueTable的維護,是在broker註冊和解註冊時維護的,這很好理解。
// 也就前面看到的broker爲master節點時的 createAndUpdateQueueData() private void createAndUpdateQueueData(final String brokerName, final TopicConfig topicConfig) { QueueData queueData = new QueueData(); queueData.setBrokerName(brokerName); queueData.setWriteQueueNums(topicConfig.getWriteQueueNums()); queueData.setReadQueueNums(topicConfig.getReadQueueNums()); queueData.setPerm(topicConfig.getPerm()); queueData.setTopicSynFlag(topicConfig.getTopicSysFlag()); List<QueueData> queueDataList = this.topicQueueTable.get(topicConfig.getTopicName()); // topic的首個broker if (null == queueDataList) { queueDataList = new LinkedList<QueueData>(); queueDataList.add(queueData); this.topicQueueTable.put(topicConfig.getTopicName(), queueDataList); log.info("new topic registered, {} {}", topicConfig.getTopicName(), queueData); } else { boolean addNewOne = true; Iterator<QueueData> it = queueDataList.iterator(); // 添加一個broker while (it.hasNext()) { QueueData qd = it.next(); if (qd.getBrokerName().equals(brokerName)) { if (qd.equals(queueData)) { addNewOne = false; } else { log.info("topic changed, {} OLD: {} NEW: {}", topicConfig.getTopicName(), qd, queueData); it.remove(); } } } if (addNewOne) { queueDataList.add(queueData); } } }
但針對queueId又是什麼時候進行處理的呢?看起來nameserver不得而知。
事實上,數據發送到哪一個broker或從哪一個broker上進行數據消費,是由各客戶端根據策略決定的。好比在producer中是這樣處理的:
// org.apache.rocketmq.client.impl.producer.DefaultMQProducerImpl#sendDefaultImpl private SendResult sendDefaultImpl( Message msg, final CommunicationMode communicationMode, final SendCallback sendCallback, final long timeout ) throws MQClientException, RemotingException, MQBrokerException, InterruptedException { this.makeSureStateOK(); Validators.checkMessage(msg, this.defaultMQProducer); final long invokeID = random.nextLong(); long beginTimestampFirst = System.currentTimeMillis(); long beginTimestampPrev = beginTimestampFirst; long endTimestamp = beginTimestampFirst; // 此處便是nameserver返回的路由信息,便可用的broker列表 TopicPublishInfo topicPublishInfo = this.tryToFindTopicPublishInfo(msg.getTopic()); if (topicPublishInfo != null && topicPublishInfo.ok()) { boolean callTimeout = false; MessageQueue mq = null; Exception exception = null; SendResult sendResult = null; int timesTotal = communicationMode == CommunicationMode.SYNC ? 1 + this.defaultMQProducer.getRetryTimesWhenSendFailed() : 1; int times = 0; String[] brokersSent = new String[timesTotal]; for (; times < timesTotal; times++) { // 首次進入時,只是選擇一個隊列發送 String lastBrokerName = null == mq ? null : mq.getBrokerName(); MessageQueue mqSelected = this.selectOneMessageQueue(topicPublishInfo, lastBrokerName); if (mqSelected != null) { mq = mqSelected; brokersSent[times] = mq.getBrokerName(); try { beginTimestampPrev = System.currentTimeMillis(); if (times > 0) { //Reset topic with namespace during resend. msg.setTopic(this.defaultMQProducer.withNamespace(msg.getTopic())); } long costTime = beginTimestampPrev - beginTimestampFirst; if (timeout < costTime) { callTimeout = true; break; } // 向選擇出來的messageQueue 發送消息數據 sendResult = this.sendKernelImpl(msg, mq, communicationMode, sendCallback, topicPublishInfo, timeout - costTime); endTimestamp = System.currentTimeMillis(); this.updateFaultItem(mq.getBrokerName(), endTimestamp - beginTimestampPrev, false); switch (communicationMode) { case ASYNC: return null; case ONEWAY: return null; case SYNC: if (sendResult.getSendStatus() != SendStatus.SEND_OK) { if (this.defaultMQProducer.isRetryAnotherBrokerWhenNotStoreOK()) { continue; } } return sendResult; default: break; } } catch (RemotingException e) ... } // org.apache.rocketmq.client.impl.producer.DefaultMQProducerImpl#selectOneMessageQueue public MessageQueue selectOneMessageQueue(final TopicPublishInfo tpInfo, final String lastBrokerName) { return this.mqFaultStrategy.selectOneMessageQueue(tpInfo, lastBrokerName); } // org.apache.rocketmq.client.latency.MQFaultStrategy#selectOneMessageQueue public MessageQueue selectOneMessageQueue(final TopicPublishInfo tpInfo, final String lastBrokerName) { // 容錯處理,不影響策略理解 if (this.sendLatencyFaultEnable) { try { int index = tpInfo.getSendWhichQueue().getAndIncrement(); for (int i = 0; i < tpInfo.getMessageQueueList().size(); i++) { int pos = Math.abs(index++) % tpInfo.getMessageQueueList().size(); if (pos < 0) pos = 0; MessageQueue mq = tpInfo.getMessageQueueList().get(pos); if (latencyFaultTolerance.isAvailable(mq.getBrokerName())) { if (null == lastBrokerName || mq.getBrokerName().equals(lastBrokerName)) return mq; } } final String notBestBroker = latencyFaultTolerance.pickOneAtLeast(); int writeQueueNums = tpInfo.getQueueIdByBroker(notBestBroker); if (writeQueueNums > 0) { final MessageQueue mq = tpInfo.selectOneMessageQueue(); if (notBestBroker != null) { mq.setBrokerName(notBestBroker); mq.setQueueId(tpInfo.getSendWhichQueue().getAndIncrement() % writeQueueNums); } return mq; } else { latencyFaultTolerance.remove(notBestBroker); } } catch (Exception e) { log.error("Error occurred when selecting message queue", e); } return tpInfo.selectOneMessageQueue(); } return tpInfo.selectOneMessageQueue(lastBrokerName); } // org.apache.rocketmq.client.impl.producer.TopicPublishInfo#selectOneMessageQueue // 直接使用輪詢的方式選擇一個隊列 public MessageQueue selectOneMessageQueue(final String lastBrokerName) { if (lastBrokerName == null) { // 任意選擇一個messageQueue做爲發送目標 return selectOneMessageQueue(); } else { int index = this.sendWhichQueue.getAndIncrement(); // 最大嘗試n次獲取不同的MQueue, 如仍然獲取不到,則隨便選擇一個便可 for (int i = 0; i < this.messageQueueList.size(); i++) { int pos = Math.abs(index++) % this.messageQueueList.size(); if (pos < 0) pos = 0; MessageQueue mq = this.messageQueueList.get(pos); if (!mq.getBrokerName().equals(lastBrokerName)) { return mq; } } return selectOneMessageQueue(); } }
好了,經過上面的描述,咱們大概知道了,一個消息要發送往消息server時,首先會根據topic找到全部可用的broker列表(nameserver提供),而後根據一個所謂策略選擇一個MessageQueue,最後向這個MessageQueue發送數據便可。因此,這個MessageQueue是很是重要的,咱們來看下其數據結構:
// org.apache.rocketmq.common.message.MessageQueue public class MessageQueue implements Comparable<MessageQueue>, Serializable { private static final long serialVersionUID = 6191200464116433425L; private String topic; private String brokerName; private int queueId; ... }
這是很是之簡潔啊,僅有主要的三個核心:topic(主題),brokerName(broker標識),queueId(隊列id)。
前面提到的客戶端策略,會選擇一個MessageQueue, 即會獲得一個broker標識,對應一個queueId。因此,數據存放在哪一個broker,是由客戶端決定的,且存放位置未知。也就是說,rocketmq中同一個topic的數據,是散亂存放在一堆broker中的。這是和咱們一般的認知有必定差距的。
這樣設計有什麼好處呢?好處天然是有的,好比假如其中有些broker掛掉了,那麼整個集羣無需通過什麼再均衡策略,一樣能夠工做得很好,由於客戶端能夠直接向正常的broker發送消息便可。其餘好處。。。
可是我我的以爲這樣的設計,也不見得很好,好比你不可以很肯定地定位到某條消息在哪一個broker上,徹底無規律可循。另外,若是想在單queueId上保持必定的規則,則是不可能的(也許有其餘曲線救國之法)。另外,對於queueId, 只是一個系統內部的概念,實際上用戶並不能指定該值。
按照上面說的,一個topic數據可能被存放在n個broker中,且以messageQueue的queueId做爲單獨存儲。那麼,到底數據存放在哪裏?所說的n個broker到底指哪幾個broker?每一個broker上到底存放了幾個queueId?這些問題若是沒有搞清楚,咱們就沒法說清楚這玩意。
咱們先來回答第1個問題,topic數據到底存放在幾個broker中?回顧下前面broker的註冊過程可知:
// org.apache.rocketmq.namesrv.routeinfo.RouteInfoManager#registerBroker if (null != topicConfigWrapper && MixAll.MASTER_ID == brokerId) { if (this.isBrokerTopicConfigChanged(brokerAddr, topicConfigWrapper.getDataVersion()) || registerFirst) { // 首次註冊或者topic變動,則更新topic信息 ConcurrentMap<String, TopicConfig> tcTable = topicConfigWrapper.getTopicConfigTable(); if (tcTable != null) { // 遍歷全部topic, 將當前新進的broker 加入處處理機器中 for (Map.Entry<String, TopicConfig> entry : tcTable.entrySet()) { this.createAndUpdateQueueData(brokerName, entry.getValue()); } } } }
看完這段,咱們就明白了,原來所謂的n個broker可處理topic信息,實際上指的是全部broker啊!好吧,咱也不懂爲啥這麼幹同,反正就是這麼幹了,topic可能分佈在全部broker機器上。至於具體哪一臺,你猜啊!
接下來咱們看第二個問題,一個broker到底存儲了幾個queueId的數據?實際上,咱們稍微想一想前面的實現,broker是指全部的broker,若是全部broker都是同樣的配置,那麼是否是應該讓每一個broker都存儲全部queueId呢?(儘管沒啥依據,仍是能夠想一想的嘛)
rocketmq的各客戶端(生產者、消費者)每次向服務器發送生產或消費請求時,均可能向nameserver請求拉取路由信息,但這些信息從咱們前面調查的結果來看,並不包含queueId信息。那麼,後續又是如何轉換爲queueId的呢?實際上,就是在拉取了nameserver的路由信息以後,本地再作一次分配就能夠了:
// 更新topic路由信息 // org.apache.rocketmq.client.impl.producer.DefaultMQProducerImpl#tryToFindTopicPublishInfo private TopicPublishInfo tryToFindTopicPublishInfo(final String topic) { TopicPublishInfo topicPublishInfo = this.topicPublishInfoTable.get(topic); if (null == topicPublishInfo || !topicPublishInfo.ok()) { this.topicPublishInfoTable.putIfAbsent(topic, new TopicPublishInfo()); this.mQClientFactory.updateTopicRouteInfoFromNameServer(topic); topicPublishInfo = this.topicPublishInfoTable.get(topic); } if (topicPublishInfo.isHaveTopicRouterInfo() || topicPublishInfo.ok()) { return topicPublishInfo; } else { // 從nameserver拉取路由數據 this.mQClientFactory.updateTopicRouteInfoFromNameServer(topic, true, this.defaultMQProducer); topicPublishInfo = this.topicPublishInfoTable.get(topic); return topicPublishInfo; } } // org.apache.rocketmq.client.impl.factory.MQClientInstance#updateTopicRouteInfoFromNameServer public boolean updateTopicRouteInfoFromNameServer(final String topic, boolean isDefault, DefaultMQProducer defaultMQProducer) { try { if (this.lockNamesrv.tryLock(LOCK_TIMEOUT_MILLIS, TimeUnit.MILLISECONDS)) { try { TopicRouteData topicRouteData; if (isDefault && defaultMQProducer != null) { topicRouteData = this.mQClientAPIImpl.getDefaultTopicRouteInfoFromNameServer(defaultMQProducer.getCreateTopicKey(), 1000 * 3); if (topicRouteData != null) { for (QueueData data : topicRouteData.getQueueDatas()) { int queueNums = Math.min(defaultMQProducer.getDefaultTopicQueueNums(), data.getReadQueueNums()); data.setReadQueueNums(queueNums); data.setWriteQueueNums(queueNums); } } } else { topicRouteData = this.mQClientAPIImpl.getTopicRouteInfoFromNameServer(topic, 1000 * 3); } if (topicRouteData != null) { TopicRouteData old = this.topicRouteTable.get(topic); boolean changed = topicRouteDataIsChange(old, topicRouteData); if (!changed) { changed = this.isNeedUpdateTopicRouteInfo(topic); } else { log.info("the topic[{}] route info changed, old[{}] ,new[{}]", topic, old, topicRouteData); } if (changed) { TopicRouteData cloneTopicRouteData = topicRouteData.cloneTopicRouteData(); for (BrokerData bd : topicRouteData.getBrokerDatas()) { this.brokerAddrTable.put(bd.getBrokerName(), bd.getBrokerAddrs()); } // Update Pub info { // 爲每一個broker分配queueId TopicPublishInfo publishInfo = topicRouteData2TopicPublishInfo(topic, topicRouteData); publishInfo.setHaveTopicRouterInfo(true); Iterator<Entry<String, MQProducerInner>> it = this.producerTable.entrySet().iterator(); while (it.hasNext()) { Entry<String, MQProducerInner> entry = it.next(); MQProducerInner impl = entry.getValue(); if (impl != null) { impl.updateTopicPublishInfo(topic, publishInfo); } } } // Update sub info { Set<MessageQueue> subscribeInfo = topicRouteData2TopicSubscribeInfo(topic, topicRouteData); Iterator<Entry<String, MQConsumerInner>> it = this.consumerTable.entrySet().iterator(); while (it.hasNext()) { Entry<String, MQConsumerInner> entry = it.next(); MQConsumerInner impl = entry.getValue(); if (impl != null) { impl.updateTopicSubscribeInfo(topic, subscribeInfo); } } } log.info("topicRouteTable.put. Topic = {}, TopicRouteData[{}]", topic, cloneTopicRouteData); this.topicRouteTable.put(topic, cloneTopicRouteData); return true; } } else { log.warn("updateTopicRouteInfoFromNameServer, getTopicRouteInfoFromNameServer return null, Topic: {}", topic); } } catch (MQClientException e) { if (!topic.startsWith(MixAll.RETRY_GROUP_TOPIC_PREFIX)) { log.warn("updateTopicRouteInfoFromNameServer Exception", e); } } catch (RemotingException e) { log.error("updateTopicRouteInfoFromNameServer Exception", e); throw new IllegalStateException(e); } finally { this.lockNamesrv.unlock(); } } else { log.warn("updateTopicRouteInfoFromNameServer tryLock timeout {}ms", LOCK_TIMEOUT_MILLIS); } } catch (InterruptedException e) { log.warn("updateTopicRouteInfoFromNameServer Exception", e); } return false; }
生產者分配queueId的實現以下:
// org.apache.rocketmq.client.impl.factory.MQClientInstance#topicRouteData2TopicPublishInfo public static TopicPublishInfo topicRouteData2TopicPublishInfo(final String topic, final TopicRouteData route) { TopicPublishInfo info = new TopicPublishInfo(); info.setTopicRouteData(route); // 爲每一個broker指定queueId的分配狀況(最大queueId) // 這樣的配置不知道累不累 if (route.getOrderTopicConf() != null && route.getOrderTopicConf().length() > 0) { String[] brokers = route.getOrderTopicConf().split(";"); for (String broker : brokers) { String[] item = broker.split(":"); int nums = Integer.parseInt(item[1]); for (int i = 0; i < nums; i++) { MessageQueue mq = new MessageQueue(topic, item[0], i); info.getMessageQueueList().add(mq); } } info.setOrderTopic(true); } else { List<QueueData> qds = route.getQueueDatas(); Collections.sort(qds); for (QueueData qd : qds) { if (PermName.isWriteable(qd.getPerm())) { BrokerData brokerData = null; for (BrokerData bd : route.getBrokerDatas()) { if (bd.getBrokerName().equals(qd.getBrokerName())) { brokerData = bd; break; } } // 仍是有broker沒法處理queue哦 if (null == brokerData) { continue; } // 非master節點不能接受寫請求 if (!brokerData.getBrokerAddrs().containsKey(MixAll.MASTER_ID)) { continue; } // 根據 writeQueueNums 數量,要求該broker接受全部小於該值的queueId for (int i = 0; i < qd.getWriteQueueNums(); i++) { MessageQueue mq = new MessageQueue(topic, qd.getBrokerName(), i); info.getMessageQueueList().add(mq); } } } info.setOrderTopic(false); } return info; }
能夠看出,生產者對應的broker中,負責寫的broker只能是master節點,負責全部小於writeQueueNums的數據存儲。(若是全部broker配置同樣,則至關於全部broker都存儲全部queueId),因此,這存儲關係,多是理不清楚了。
咱們再來看看消費者是如何對應queueId的呢?
// org.apache.rocketmq.client.impl.factory.MQClientInstance#topicRouteData2TopicSubscribeInfo public static Set<MessageQueue> topicRouteData2TopicSubscribeInfo(final String topic, final TopicRouteData route) { Set<MessageQueue> mqList = new HashSet<MessageQueue>(); List<QueueData> qds = route.getQueueDatas(); for (QueueData qd : qds) { if (PermName.isReadable(qd.getPerm())) { // 可讀取broker上對應的全部小於readQueueNums 的隊列 for (int i = 0; i < qd.getReadQueueNums(); i++) { MessageQueue mq = new MessageQueue(topic, qd.getBrokerName(), i); mqList.add(mq); } } } return mqList; }
原理和生產者差很少,就是經過一個 readQueueNums 來限定讀取的隊列數,基本上就是等於全部隊列了,緣由多是本來數據就存儲了全部queueId,若是消費者不讀取,又該誰來讀取呢。
好了,到此咱們總算釐清了整個rocketmq的消息存儲定位方式了。總結一句話就是:任何節點均可能有任意topic的任意queueId數據。這結果,不由又讓我有一種千頭萬緒的感受!
以上僅是一些正常的rocketmq數據存儲的實現,只能算是皮毛。事實上,分佈式系統中一個很是重要的能力是容錯,這須要咱們後續再聊。