說在前面apache
broker啓動json
源碼解析微信
進入方法,org.apache.rocketmq.broker.BrokerStartup#mainapp
public static void main(String[] args) {// 建立brokerController並啓動=》start(createBrokerController(args)); }
進入方法,建立brokerController並啓動,org.apache.rocketmq.broker.BrokerStartup#createBrokerControllersocket
public static BrokerController createBrokerController(String[] args) {System.setProperty(RemotingCommand.REMOTING_VERSION_KEY, Integer.toString(MQVersion.CURRENT_VERSION));// 設置發送的緩衝區大小if (null == System.getProperty(NettySystemConfig.COM_ROCKETMQ_REMOTING_SOCKET_SNDBUF_SIZE)) {NettySystemConfig.socketSndbufSize = 131072;}// 設置接收到的緩衝區大小if (null == System.getProperty(NettySystemConfig.COM_ROCKETMQ_REMOTING_SOCKET_RCVBUF_SIZE)) {NettySystemConfig.socketRcvbufSize = 131072;}try {//PackageConflictDetect.detectFastjson(); 構建命令行操做=》Options options = ServerUtil.buildCommandlineOptions(new Options());// mqbroker 啓動broker命令commandLine = ServerUtil.parseCmdLine("mqbroker", args, buildCommandlineOptions(options),new PosixParser());if (null == commandLine) {System.exit(-1);}// broker配置final BrokerConfig brokerConfig = new BrokerConfig();// server配置final NettyServerConfig nettyServerConfig = new NettyServerConfig();// client配置final NettyClientConfig nettyClientConfig = new NettyClientConfig();nettyClientConfig.setUseTLS(Boolean.parseBoolean(System.getProperty(TLS_ENABLE,String.valueOf(TlsSystemConfig.tlsMode == TlsMode.ENFORCING))));// broker默認端口nettyServerConfig.setListenPort(10911);// 消息存儲配置final MessageStoreConfig messageStoreConfig = new MessageStoreConfig();// 若是是從節點命中率下降if (BrokerRole.SLAVE == messageStoreConfig.getBrokerRole()) {int ratio = messageStoreConfig.getAccessMessageInMemoryMaxRatio() - 10;messageStoreConfig.setAccessMessageInMemoryMaxRatio(ratio);}// 解析命令行c指定的配置文件屬性if (commandLine.hasOption('c')) {String file = commandLine.getOptionValue('c');if (file != null) {configFile = file;InputStream in = new BufferedInputStream(new FileInputStream(file));properties = new Properties();properties.load(in);properties2SystemEnv(properties);// 從配置文件中讀取的配置信息封裝配置對象MixAll.properties2Object(properties, brokerConfig);MixAll.properties2Object(properties, nettyServerConfig);MixAll.properties2Object(properties, nettyClientConfig);MixAll.properties2Object(properties, messageStoreConfig);// 默認broker配置文件 System.getProperty("user.home") + File.separator + "store"// + File.separator + "config" + File.separator + "broker.properties"// 設置broker配置文件BrokerPathConfigHelper.setBrokerConfigPath(file);in.close();}}// 命令行指定的屬性解析成properties對象並解析成對象MixAll.properties2Object(ServerUtil.commandLine2Properties(commandLine), brokerConfig);if (null == brokerConfig.getRocketmqHome()) {System.out.printf("Please set the %s variable in your environment to match the location of the RocketMQ installation", MixAll.ROCKETMQ_HOME_ENV);System.exit(-2);}// 從系統變量rocketmq.namesrv.addr中獲取namesrv的地址String namesrvAddr = brokerConfig.getNamesrvAddr();if (null != namesrvAddr) {try {// 多個namesrv地址是用;分開的String[] addrArray = namesrvAddr.split(";");for (String addr : addrArray) {RemotingUtil.string2SocketAddress(addr);}} catch (Exception e) {System.out.printf("The Name Server Address[%s] illegal, please set it as follows, \"127.0.0.1:9876;192.168.0.1:9876\"%n",namesrvAddr);System.exit(-3);}}switch (messageStoreConfig.getBrokerRole()) {case ASYNC_MASTER:case SYNC_MASTER:brokerConfig.setBrokerId(MixAll.MASTER_ID);break;case SLAVE:// 從節點在broker配置文件中配置的brokerId必須大於0if (brokerConfig.getBrokerId() <= 0) {System.out.printf("Slave's brokerId must be > 0");System.exit(-3);}break;default:break;}// ha broker監聽端口 10912messageStoreConfig.setHaListenPort(nettyServerConfig.getListenPort() + 1);LoggerContext lc = (LoggerContext) LoggerFactory.getILoggerFactory();JoranConfigurator configurator = new JoranConfigurator();configurator.setContext(lc);lc.reset();// broker logback配置文件configurator.doConfigure(brokerConfig.getRocketmqHome() + "/conf/logback_broker.xml");if (commandLine.hasOption('p')) {InternalLogger console = InternalLoggerFactory.getLogger(LoggerName.BROKER_CONSOLE_NAME);MixAll.printObjectProperties(console, brokerConfig);MixAll.printObjectProperties(console, nettyServerConfig);MixAll.printObjectProperties(console, nettyClientConfig);MixAll.printObjectProperties(console, messageStoreConfig);System.exit(0);} else if (commandLine.hasOption('m')) {InternalLogger console = InternalLoggerFactory.getLogger(LoggerName.BROKER_CONSOLE_NAME);MixAll.printObjectProperties(console, brokerConfig, true);MixAll.printObjectProperties(console, nettyServerConfig, true);MixAll.printObjectProperties(console, nettyClientConfig, true);MixAll.printObjectProperties(console, messageStoreConfig, true);System.exit(0);}log = InternalLoggerFactory.getLogger(LoggerName.BROKER_LOGGER_NAME);MixAll.printObjectProperties(log, brokerConfig);MixAll.printObjectProperties(log, nettyServerConfig);MixAll.printObjectProperties(log, nettyClientConfig);MixAll.printObjectProperties(log, messageStoreConfig);// 建立brokerControllerfinal BrokerController controller = new BrokerController(brokerConfig,nettyServerConfig,nettyClientConfig,messageStoreConfig);// remember all configs to prevent discard 註冊配置=》controller.getConfiguration().registerConfig(properties);// 初始化控制器=》boolean initResult = controller.initialize();if (!initResult) {// 初始化失敗,關閉控制器=》controller.shutdown();System.exit(-3);}// jdk提供的虛擬機關閉的鉤子方法Runtime.getRuntime().addShutdownHook(new Thread(new Runnable() {private volatile boolean hasShutdown = false;private AtomicInteger shutdownTimes = new AtomicInteger(0);@Overridepublic void run() {synchronized (this) {log.info("Shutdown hook was invoked, {}", this.shutdownTimes.incrementAndGet());if (!this.hasShutdown) {this.hasShutdown = true;long beginTime = System.currentTimeMillis();controller.shutdown();long consumingTimeTotal = System.currentTimeMillis() - beginTime;log.info("Shutdown hook over, consuming total time(ms): {}", consumingTimeTotal);}}}}, "ShutdownHook"));return controller;} catch (Throwable e) {e.printStackTrace();System.exit(-1);}return null; }
進入方法,構建命令行操做,org.apache.rocketmq.srvutil.ServerUtil#buildCommandlineOptions分佈式
public static Options buildCommandlineOptions(final Options options) {Option opt = new Option("h", "help", false, "Print help");opt.setRequired(false);options.addOption(opt);// 這裏能夠看到啓動的時候有一個n參數指定namesrv的地址,能夠是單機能夠是集羣,啓動地址之間用;分開opt =new Option("n", "namesrvAddr", true,"Name server address list, eg: 192.168.0.1:9876;192.168.0.2:9876");opt.setRequired(false);options.addOption(opt);return options; }
進入方法,初始化控制器,org.apache.rocketmq.broker.BrokerController#initializeide
public boolean initialize() throws CloneNotSupportedException {// 加載topic config/topics.json=》boolean result = this.topicConfigManager.load();// 加載客戶消費的offset config/consumerOffset.jsonresult = result && this.consumerOffsetManager.load();// 加載訂閱組信息 config/subscriptionGroup.jsonresult = result && this.subscriptionGroupManager.load();// 加載客戶filter config/consumerFilter.jsonresult = result && this.consumerFilterManager.load();if (result) {try {this.messageStore =new DefaultMessageStore(this.messageStoreConfig, this.brokerStatsManager, this.messageArrivingListener,this.brokerConfig);this.brokerStats = new BrokerStats((DefaultMessageStore) this.messageStore);//load pluginMessageStorePluginContext context = new MessageStorePluginContext(messageStoreConfig, brokerStatsManager, messageArrivingListener, brokerConfig);this.messageStore = MessageStoreFactory.build(context, this.messageStore);this.messageStore.getDispatcherList().addFirst(new CommitLogDispatcherCalcBitMap(this.brokerConfig, this.consumerFilterManager));} catch (IOException e) {result = false;log.error("Failed to initialize", e);}}// 從消息存儲中加載=》result = result && this.messageStore.load();if (result) {this.remotingServer = new NettyRemotingServer(this.nettyServerConfig, this.clientHousekeepingService);NettyServerConfig fastConfig = (NettyServerConfig) this.nettyServerConfig.clone();fastConfig.setListenPort(nettyServerConfig.getListenPort() - 2);this.fastRemotingServer = new NettyRemotingServer(fastConfig, this.clientHousekeepingService);this.sendMessageExecutor = new BrokerFixedThreadPoolExecutor(// 發送消息線程數默認值是1this.brokerConfig.getSendMessageThreadPoolNums(),this.brokerConfig.getSendMessageThreadPoolNums(),1000 * 60,TimeUnit.MILLISECONDS,this.sendThreadPoolQueue,new ThreadFactoryImpl("SendMessageThread_"));this.pullMessageExecutor = new BrokerFixedThreadPoolExecutor(// 拉取消息默認線程數 2倍的可用線程數+16this.brokerConfig.getPullMessageThreadPoolNums(),this.brokerConfig.getPullMessageThreadPoolNums(),1000 * 60,TimeUnit.MILLISECONDS,this.pullThreadPoolQueue,new ThreadFactoryImpl("PullMessageThread_"));this.queryMessageExecutor = new BrokerFixedThreadPoolExecutor(// 查詢線程數的默認線程數 8+可用線程數this.brokerConfig.getQueryMessageThreadPoolNums(),this.brokerConfig.getQueryMessageThreadPoolNums(),1000 * 60,TimeUnit.MILLISECONDS,this.queryThreadPoolQueue,new ThreadFactoryImpl("QueryMessageThread_"));// 管理服務核心線程數默認值是16this.adminBrokerExecutor =Executors.newFixedThreadPool(this.brokerConfig.getAdminBrokerThreadPoolNums(), new ThreadFactoryImpl("AdminBrokerThread_"));// client管理服務默認核心線程數32this.clientManageExecutor = new ThreadPoolExecutor(this.brokerConfig.getClientManageThreadPoolNums(),this.brokerConfig.getClientManageThreadPoolNums(),1000 * 60,TimeUnit.MILLISECONDS,this.clientManagerThreadPoolQueue,new ThreadFactoryImpl("ClientManageThread_"));// broker心跳監測核心線程數默認值是可用線程數或32this.heartbeatExecutor = new BrokerFixedThreadPoolExecutor(this.brokerConfig.getHeartbeatThreadPoolNums(),this.brokerConfig.getHeartbeatThreadPoolNums(),1000 * 60,TimeUnit.MILLISECONDS,this.heartbeatThreadPoolQueue,new ThreadFactoryImpl("HeartbeatThread_",true));// 消費者管理服務核心線程數默認值是32this.consumerManageExecutor =Executors.newFixedThreadPool(this.brokerConfig.getConsumerManageThreadPoolNums(), new ThreadFactoryImpl("ConsumerManageThread_"));// 註冊處理器=》this.registerProcessor();// 初始化延遲時間final long initialDelay = UtilAll.computNextMorningTimeMillis() - System.currentTimeMillis();final long period = 1000 * 60 * 60 * 24;this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() {@Overridepublic void run() {try {BrokerController.this.getBrokerStats().record();} catch (Throwable e) {log.error("schedule record error.", e);}}}, initialDelay, period, TimeUnit.MILLISECONDS);this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() {@Overridepublic void run() {try {// 消費者offset持久化=》BrokerController.this.consumerOffsetManager.persist();} catch (Throwable e) {log.error("schedule persist consumerOffset error.", e);}}}, 1000 * 10, this.brokerConfig.getFlushConsumerOffsetInterval(), TimeUnit.MILLISECONDS);this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() {@Overridepublic void run() {try {// 消費者過濾信息持久化=》BrokerController.this.consumerFilterManager.persist();} catch (Throwable e) {log.error("schedule persist consumer filter error.", e);}}}, 1000 * 10, 1000 * 10, TimeUnit.MILLISECONDS);this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() {@Overridepublic void run() {try {// broker保護機制 =》BrokerController.this.protectBroker();} catch (Throwable e) {log.error("protectBroker error.", e);}}}, 3, 3, TimeUnit.MINUTES);this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() {@Overridepublic void run() {try {BrokerController.this.printWaterMark();} catch (Throwable e) {log.error("printWaterMark error.", e);}}}, 10, 1, TimeUnit.SECONDS);this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() {@Overridepublic void run() {try {log.info("dispatch behind commit log {} bytes", BrokerController.this.getMessageStore().dispatchBehindBytes());} catch (Throwable e) {log.error("schedule dispatchBehindBytes error.", e);}}}, 1000 * 10, 1000 * 60, TimeUnit.MILLISECONDS);if (this.brokerConfig.getNamesrvAddr() != null) {// 更新namesrv地址this.brokerOuterAPI.updateNameServerAddressList(this.brokerConfig.getNamesrvAddr());log.info("Set user specified name server address: {}", this.brokerConfig.getNamesrvAddr());} else if (this.brokerConfig.isFetchNamesrvAddrByAddressServer()) {this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() {@Overridepublic void run() {try {BrokerController.this.brokerOuterAPI.fetchNameServerAddr();} catch (Throwable e) {log.error("ScheduledTask fetchNameServerAddr exception", e);}}}, 1000 * 10, 1000 * 60 * 2, TimeUnit.MILLISECONDS);}if (BrokerRole.SLAVE == this.messageStoreConfig.getBrokerRole()) {if (this.messageStoreConfig.getHaMasterAddress() != null && this.messageStoreConfig.getHaMasterAddress().length() >= 6) {this.messageStore.updateHaMasterAddress(this.messageStoreConfig.getHaMasterAddress());this.updateMasterHAServerAddrPeriodically = false;} else {this.updateMasterHAServerAddrPeriodically = true;}this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() {@Overridepublic void run() {try {// 主從同步=》BrokerController.this.slaveSynchronize.syncAll();} catch (Throwable e) {log.error("ScheduledTask syncAll slave exception", e);}}// 延遲10s,60s同步一次}, 1000 * 10, 1000 * 60, TimeUnit.MILLISECONDS);} else {this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() {@Overridepublic void run() {try {// 打印master和slave當前的offsetBrokerController.this.printMasterAndSlaveDiff();} catch (Throwable e) {log.error("schedule printMasterAndSlaveDiff error.", e);}}}, 1000 * 10, 1000 * 60, TimeUnit.MILLISECONDS);}if (TlsSystemConfig.tlsMode != TlsMode.DISABLED) {// Register a listener to reload SslContexttry {fileWatchService = new FileWatchService(new String[] {TlsSystemConfig.tlsServerCertPath,TlsSystemConfig.tlsServerKeyPath,TlsSystemConfig.tlsServerTrustCertPath},new FileWatchService.Listener() {boolean certChanged, keyChanged = false;@Overridepublic void onChanged(String path) {if (path.equals(TlsSystemConfig.tlsServerTrustCertPath)) {log.info("The trust certificate changed, reload the ssl context");reloadServerSslContext();}if (path.equals(TlsSystemConfig.tlsServerCertPath)) {certChanged = true;}if (path.equals(TlsSystemConfig.tlsServerKeyPath)) {keyChanged = true;}if (certChanged && keyChanged) {log.info("The certificate and private key changed, reload the ssl context");certChanged = keyChanged = false;reloadServerSslContext();}}private void reloadServerSslContext() {((NettyRemotingServer) remotingServer).loadSslContext();((NettyRemotingServer) fastRemotingServer).loadSslContext();}});} catch (Exception e) {log.warn("FileWatchService created error, can't load the certificate dynamically");}}// 初始化分佈式消息事務=》initialTransaction();}return result; }
進入方法,加載topic config/topics.json,org.apache.rocketmq.common.ConfigManager#loadfetch
public boolean load() {String fileName = null;try {// System.getProperty("user.home") + File.separator + "store" File.separator + "config" + File.separator + "topics.json" 獲取topic配置文件fileName = this.configFilePath();String jsonString = MixAll.file2String(fileName);if (null == jsonString || jsonString.length() == 0) {// 文件備份=》return this.loadBak();} else {// 配置反序列化=》this.decode(jsonString);log.info("load " + fileName + " OK");return true;}} catch (Exception e) {log.error("load " + fileName + " failed, and try to load backup file", e);return this.loadBak();} }
進入方法,文件備份,org.apache.rocketmq.common.ConfigManager#loadBakui
private boolean loadBak() {String fileName = null;try {fileName = this.configFilePath();// 從備份配置文件中讀取配置String jsonString = MixAll.file2String(fileName + ".bak");if (jsonString != null && jsonString.length() > 0) {// 配置反序列化=》this.decode(jsonString);log.info("load " + fileName + " OK");return true;}} catch (Exception e) {log.error("load " + fileName + " Failed", e);return false;}return true; }
進入方法,從消息存儲中加載,org.apache.rocketmq.store.DefaultMessageStore#loadthis
public boolean load() {boolean result = true;try {boolean lastExitOK = !this.isTempFileExist();log.info("last shutdown {}", lastExitOK ? "normally" : "abnormally");if (null != scheduleMessageService) {// 消息加載=》result = result && this.scheduleMessageService.load();}// load Commit Log 加載commitLog=》result = result && this.commitLog.load();// load Consume Queue 記載消費隊列=》result = result && this.loadConsumeQueue();// System.getProperty("user.home") + File.separator + "store" File.separator + "checkpoint" 建立存儲檢查對象if (result) {this.storeCheckpoint =new StoreCheckpoint(StorePathConfigHelper.getStoreCheckpoint(this.messageStoreConfig.getStorePathRootDir()));this.indexService.load(lastExitOK);// 恢復=》this.recover(lastExitOK);log.info("load over, and the max phy offset = {}", this.getMaxPhyOffset());}} catch (Exception e) {log.error("load exception", e);result = false;}if (!result) {this.allocateMappedFileService.shutdown();}return result; }
進入方法,消息加載,org.apache.rocketmq.store.schedule.ScheduleMessageService#load
public boolean load() {// =》boolean result = super.load();// 解析延遲等級=》result = result && this.parseDelayLevel();return result; }
進入方法,解析延遲等級,org.apache.rocketmq.store.schedule.ScheduleMessageService#parseDelayLevel
public boolean parseDelayLevel() {HashMap<String, Long> timeUnitTable = new HashMap<String, Long>();timeUnitTable.put("s", 1000L);timeUnitTable.put("m", 1000L * 60);timeUnitTable.put("h", 1000L * 60 * 60);timeUnitTable.put("d", 1000L * 60 * 60 * 24);String levelString = this.defaultMessageStore.getMessageStoreConfig().getMessageDelayLevel();try {String[] levelArray = levelString.split(" ");for (int i = 0; i < levelArray.length; i++) {String value = levelArray[i];String ch = value.substring(value.length() - 1);Long tu = timeUnitTable.get(ch);int level = i + 1;if (level > this.maxDelayLevel) {this.maxDelayLevel = level;}long num = Long.parseLong(value.substring(0, value.length() - 1));long delayTimeMillis = tu * num;this.delayLevelTable.put(level, delayTimeMillis);}} catch (Exception e) {log.error("parseDelayLevel exception", e);log.info("levelString String = {}", levelString);return false;}return true; }
返回方法,加載commitLog,org.apache.rocketmq.store.CommitLog#load
public boolean load() {// 映射文件隊列加載=》boolean result = this.mappedFileQueue.load();log.info("load commit log " + (result ? "OK" : "Failed"));return result; }
進入方法,映射文件隊列加載,org.apache.rocketmq.store.MappedFileQueue#load
public boolean load() {// System.getProperty("user.home") + File.separator + "store"// + File.separator + "commitlog"File dir = new File(this.storePath);File[] files = dir.listFiles();if (files != null) {// ascending orderArrays.sort(files);for (File file : files) {// 隊列映射文件的大小不等於1Gif (file.length() != this.mappedFileSize) {log.warn(file + "\t" + file.length()+ " length not matched message store config value, ignore it");return true;}try {// 建立映射文件=》MappedFile mappedFile = new MappedFile(file.getPath(), mappedFileSize);mappedFile.setWrotePosition(this.mappedFileSize);mappedFile.setFlushedPosition(this.mappedFileSize);mappedFile.setCommittedPosition(this.mappedFileSize);this.mappedFiles.add(mappedFile);log.info("load " + file.getPath() + " OK");} catch (IOException e) {log.error("load file " + file + " error", e);return false;}}}return true; }
接下篇。
說在最後
本次解析僅表明我的觀點,僅供參考。
加入技術微信羣
釘釘技術羣