說在前面java
本次開始進行rocketmq源碼解析,比較喜歡rocketmq的架構設計,rocketmq內嵌了namesrv註冊中心保存了元數據,進行負載均衡、容錯的一些處理,4.3以上支持消息事務,有管理控制檯、命令行工具,底層namesrv與broker、client與server交互netty實現。更多精彩文章請關注「天河聊架構」微信公衆號。apache
源碼解析json
建立NamesrvController,進入這個方法org.apache.rocketmq.namesrv.NamesrvStartup#main,再進入這個方法org.apache.rocketmq.namesrv.NamesrvStartup#main0緩存
public static NamesrvController main0(String[] args) { try { // 源碼解析之建立namesrv控制器 =》 NamesrvController controller = createNamesrvController(args); // 源碼解析之啓動namesrv控制器 =》 start(controller); String tip = "The Name Server boot success. serializeType=" + RemotingCommand.getSerializeTypeConfigInThisServer(); log.info(tip); System.out.printf("%s%n", tip); return controller; } catch (Throwable e) { e.printStackTrace(); System.exit(-1); } return null; }
進入這個方法org.apache.rocketmq.namesrv.NamesrvStartup#createNamesrvController微信
public static NamesrvController createNamesrvController(String[] args) throws IOException, JoranException { // 從系統文件中查詢rocketmq版本,默認4.3.0 System.setProperty(RemotingCommand.REMOTING_VERSION_KEY, Integer.toString(MQVersion.CURRENT_VERSION)); // PackageConflictDetect.detectFastjson(); // 構建命令行操做的指令 =》 Options options = ServerUtil.buildCommandlineOptions(new Options()); // mqnamesrv 啓動namesrv命令 =》 commandLine = ServerUtil.parseCmdLine("mqnamesrv", args, buildCommandlineOptions(options), new PosixParser()); if (null == commandLine) { // 系統非正常退出,流程結束 System.exit(-1); return null; } // 解析配置文件 =》 final NamesrvConfig namesrvConfig = new NamesrvConfig(); // =》 final NettyServerConfig nettyServerConfig = new NettyServerConfig(); // 設置 namesrv的服務端口 nettyServerConfig.setListenPort(9876); // c 指定啓動的時候加載配置文件 if (commandLine.hasOption('c')) { // 命令行啓動指定配置文件,前面用c開頭 String file = commandLine.getOptionValue('c'); if (file != null) { InputStream in = new BufferedInputStream(new FileInputStream(file)); properties = new Properties(); properties.load(in); MixAll.properties2Object(properties, namesrvConfig); MixAll.properties2Object(properties, nettyServerConfig); // 設置命令行啓動namesrv指定的配置文件路徑 namesrvConfig.setConfigStorePath(file); System.out.printf("load config properties file OK, %s%n", file); in.close(); } } // 打印namesrv的配置信息,命令行上面加p if (commandLine.hasOption('p')) { MixAll.printObjectProperties(null, namesrvConfig); MixAll.printObjectProperties(null, nettyServerConfig); // 正常程序退出 System.exit(0); } // 把命令行屬性解析成properties MixAll.properties2Object(ServerUtil.commandLine2Properties(commandLine), namesrvConfig); // 解析 ROCKETMQ_HOME 環境變量 if (null == namesrvConfig.getRocketmqHome()) { System.out.printf("Please set the %s variable in your environment to match the location of the RocketMQ installation%n", MixAll.ROCKETMQ_HOME_ENV); // 系統非正常退出 System.exit(-2); } LoggerContext lc = (LoggerContext) LoggerFactory.getILoggerFactory(); JoranConfigurator configurator = new JoranConfigurator(); configurator.setContext(lc); lc.reset(); // logback日誌文件路徑 configurator.doConfigure(namesrvConfig.getRocketmqHome() + "/conf/logback_namesrv.xml"); log = InternalLoggerFactory.getLogger(LoggerName.NAMESRV_LOGGER_NAME); MixAll.printObjectProperties(log, namesrvConfig); MixAll.printObjectProperties(log, nettyServerConfig); // 建立namesrv控制器 =》 final NamesrvController controller = new NamesrvController(namesrvConfig, nettyServerConfig); // remember all configs to prevent discard 把配置文件配置值的屬性值註冊到namesrv控制器 controller.getConfiguration().registerConfig(properties); return controller; }
進入這個方法org.apache.rocketmq.srvutil.ServerUtil#buildCommandlineOptions 這裏能夠看到啓動rocketmq的時候namesrv參數怎麼制定架構
public static Options buildCommandlineOptions(final Options options) { Option opt = new Option("h", "help", false, "Print help"); opt.setRequired(false); options.addOption(opt); // 這裏能夠看到啓動的時候有一個n參數指定namesrv的地址,能夠是單機能夠是集羣,啓動地址之間用;分開 opt = new Option("n", "namesrvAddr", true, "Name server address list, eg: 192.168.0.1:9876;192.168.0.2:9876"); opt.setRequired(false); options.addOption(opt); return options; }
進入這個方法org.apache.rocketmq.namesrv.NamesrvStartup#buildCommandlineOptions 從這裏看出來啓動rocketmq的時候怎麼指定配置文件,是否自動建立topic、消費組這些參數均可以在配置文件中配置負載均衡
// 指定namesrv啓動的時候加載的配置文件 public static Options buildCommandlineOptions(final Options options) { Option opt = new Option("c", "configFile", true, "Name server config properties file"); opt.setRequired(false); options.addOption(opt); // 控制檯輸出配置項 opt = new Option("p", "printConfigItem", false, "Print all config item"); opt.setRequired(false); options.addOption(opt); return options; }
從這裏能夠看到一些配置的存儲地址異步
public class NamesrvConfig { private static final InternalLogger log = InternalLoggerFactory.getLogger(LoggerName.NAMESRV_LOGGER_NAME); // 解析ROCKETMQ_HOME private String rocketmqHome = System.getProperty(MixAll.ROCKETMQ_HOME_PROPERTY, System.getenv(MixAll.ROCKETMQ_HOME_ENV)); // kvConfig存儲地址 private String kvConfigPath = System.getProperty("user.home") + File.separator + "namesrv" + File.separator + "kvConfig.json"; // namesrv存儲地址 private String configStorePath = System.getProperty("user.home") + File.separator + "namesrv" + File.separator + "namesrv.properties";
nettyserver的一些配置socket
public class NettyServerConfig implements Cloneable { // netty server監聽端口 private int listenPort = 8888; // 服務端工做線程數 private int serverWorkerThreads = 8; // 服務端回調執行線程 private int serverCallbackExecutorThreads = 0; // 服務端選擇器線程 private int serverSelectorThreads = 3; // oneway方式信號量的值 private int serverOnewaySemaphoreValue = 256; // 服務端異步信號量值 private int serverAsyncSemaphoreValue = 64; // 服務端channel最大空閒時間 private int serverChannelMaxIdleTimeSeconds = 120; // 發送消息最大大小從系統屬性中獲取,默認值65535 private int serverSocketSndBufSize = NettySystemConfig.socketSndbufSize; // 接受消息最大大小從系統屬性中獲取,默認值65535 private int serverSocketRcvBufSize = NettySystemConfig.socketRcvbufSize; private boolean serverPooledByteBufAllocatorEnable = true;
nettyserver默認監聽端口9876工具
nettyServerConfig.setListenPort(9876);
從這裏能夠看到是解析啓動的時候指定的配置文件屬性
// c 指定啓動的時候加載配置文件 if (commandLine.hasOption('c')) { // 命令行啓動指定配置文件,前面用c開頭 String file = commandLine.getOptionValue('c'); if (file != null) { InputStream in = new BufferedInputStream(new FileInputStream(file)); properties = new Properties(); properties.load(in); MixAll.properties2Object(properties, namesrvConfig); MixAll.properties2Object(properties, nettyServerConfig);
配置文件中能夠配置這個類org.apache.rocketmq.common.namesrv.NamesrvConfig namesrv的一些配置
// 解析ROCKETMQ_HOME private String rocketmqHome = System.getProperty(MixAll.ROCKETMQ_HOME_PROPERTY, System.getenv(MixAll.ROCKETMQ_HOME_ENV)); // kvConfig存儲地址 private String kvConfigPath = System.getProperty("user.home") + File.separator + "namesrv" + File.separator + "kvConfig.json"; // namesrv存儲地址 private String configStorePath = System.getProperty("user.home") + File.separator + "namesrv" + File.separator + "namesrv.properties"; private String productEnvName = "center"; private boolean clusterTest = false; private boolean orderMessageEnable = false;
也能夠配置這個類org.apache.rocketmq.remoting.netty.NettyServerConfig nettyserver的一些配置
public class NettyServerConfig implements Cloneable { // netty server監聽端口 private int listenPort = 8888; // 服務端工做線程數 private int serverWorkerThreads = 8; // 服務端回調執行線程 private int serverCallbackExecutorThreads = 0; // 服務端選擇器線程 private int serverSelectorThreads = 3; // oneway方式信號量的值 private int serverOnewaySemaphoreValue = 256; // 服務端異步信號量值 private int serverAsyncSemaphoreValue = 64; // 服務端channel最大空閒時間 private int serverChannelMaxIdleTimeSeconds = 120; // 發送消息最大大小從系統屬性中獲取,默認值65535 private int serverSocketSndBufSize = NettySystemConfig.socketSndbufSize; // 接受消息最大大小從系統屬性中獲取,默認值65535 private int serverSocketRcvBufSize = NettySystemConfig.socketRcvbufSize; private boolean serverPooledByteBufAllocatorEnable = true;
namesrv配置的存儲地址就是啓動指定的配置文件
namesrvConfig.setConfigStorePath(file);
也能夠在命令行上指定namesrv屬性配置
MixAll.properties2Object(ServerUtil.commandLine2Properties(commandLine), namesrvConfig);
logback的配置文件
configurator.doConfigure(namesrvConfig.getRocketmqHome() + "/conf/logback_namesrv.xml");
根據namesrv配置和nettyserver配置建立NamesrvController
final NamesrvController controller = new NamesrvController(namesrvConfig, nettyServerConfig);
進入這個方法org.apache.rocketmq.namesrv.NamesrvController#NamesrvController
public NamesrvController(NamesrvConfig namesrvConfig, NettyServerConfig nettyServerConfig) { this.namesrvConfig = namesrvConfig; this.nettyServerConfig = nettyServerConfig; this.kvConfigManager = new KVConfigManager(this); this.routeInfoManager = new RouteInfoManager(); this.brokerHousekeepingService = new BrokerHousekeepingService(this); // =》 this.configuration = new Configuration( log, this.namesrvConfig, this.nettyServerConfig ); // 指定保存namesrv配置的配置文件 this.configuration.setStorePathFromConfig(this.namesrvConfig, "configStorePath"); }
進入這裏org.apache.rocketmq.namesrv.routeinfo.RouteInfoManager#RouteInfoManager
public RouteInfoManager() { // 指定長度,減小擴容提升性能 this.topicQueueTable = new HashMap<String, List<QueueData>>(1024);//topicQueue緩存信息 this.brokerAddrTable = new HashMap<String, BrokerData>(128);//broker地址緩存 this.clusterAddrTable = new HashMap<String, Set<String>>(32);//集羣地址緩存 this.brokerLiveTable = new HashMap<String, BrokerLiveInfo>(256);//可用的broker緩存 this.filterServerTable = new HashMap<String, List<String>>(256);//過濾的server地址 }
設置namrsrv配置存儲地址System.getProperty("user.home") + File.separator + "namesrv" + File.separator + "namesrv.properties"
this.configuration.setStorePathFromConfig(this.namesrvConfig, "configStorePath");
// remember all configs to prevent discard 把配置文件配置值的屬性值註冊到namesrv控制器 controller.getConfiguration().registerConfig(properties);
進入這個方法org.apache.rocketmq.common.Configuration#registerConfig(java.util.Properties)
public Configuration registerConfig(Object configObject) { try { readWriteLock.writeLock().lockInterruptibly(); try { Properties registerProps = MixAll.object2Properties(configObject); merge(registerProps, this.allConfigs); configObjectList.add(configObject); } finally { readWriteLock.writeLock().unlock(); } } catch (InterruptedException e) { log.error("registerConfig lock error"); } return this; }
/** * All properties include configs in object and extend properties. * 全部的啓動配置都在這裏 */ private Properties allConfigs = new Properties();
說在最後
本次解析僅表明我的觀點,僅供參考。
加入技術微信羣
釘釘技術羣