broker啓動apache
進入這個方法org.apache.rocketmq.broker.BrokerStartup#startapp
這一行socket
// broker控制器啓動 controller.start();
進入這個方法org.apache.rocketmq.broker.BrokerController#startide
public void start() throws Exception { if (this.messageStore != null) { // 消磁存儲服務啓動 this.messageStore.start(); } if (this.remotingServer != null) { this.remotingServer.start(); } if (this.fastRemotingServer != null) { this.fastRemotingServer.start(); } if (this.fileWatchService != null) { this.fileWatchService.start(); } if (this.brokerOuterAPI != null) { this.brokerOuterAPI.start(); } if (this.pullRequestHoldService != null) { this.pullRequestHoldService.start(); } if (this.clientHousekeepingService != null) { this.clientHousekeepingService.start(); } if (this.filterServerManager != null) { this.filterServerManager.start(); } // 註冊broker this.registerBrokerAll(true, false, true); this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() { @Override public void run() { try { BrokerController.this.registerBrokerAll(true, false, brokerConfig.isForceRegister()); } catch (Throwable e) { log.error("registerBrokerAll Exception", e); } } }, 1000 * 10, Math.max(10000, Math.min(brokerConfig.getRegisterNameServerPeriod(), 60000)), TimeUnit.MILLISECONDS); if (this.brokerStatsManager != null) { this.brokerStatsManager.start(); } if (this.brokerFastFailure != null) { this.brokerFastFailure.start(); } if (BrokerRole.SLAVE != messageStoreConfig.getBrokerRole()) { if (this.transactionalMessageCheckService != null) { log.info("Start transaction service!"); this.transactionalMessageCheckService.start(); } } }
進入這個方法org.apache.rocketmq.store.DefaultMessageStore.FlushConsumeQueueService#run函數
public void run() { DefaultMessageStore.log.info(this.getServiceName() + " service started"); while (!this.isStopped()) { try { int interval = DefaultMessageStore.this.getMessageStoreConfig().getFlushIntervalConsumeQueue(); this.waitForRunning(interval); this.doFlush(1); } catch (Exception e) { DefaultMessageStore.log.warn(this.getServiceName() + " service has exception. ", e); } } this.doFlush(RETRY_TIMES_OVER); DefaultMessageStore.log.info(this.getServiceName() + " service end"); }
進入這個方法org.apache.rocketmq.store.ConsumeQueue#flushoop
public boolean flush(final int flushLeastPages) { // 隊列映射文件刷新 boolean result = this.mappedFileQueue.flush(flushLeastPages); if (isExtReadEnable()) { result = result & this.consumeQueueExt.flush(flushLeastPages); } return result; }
這一行this
// 隊列映射文件刷新 boolean result = this.mappedFileQueue.flush(flushLeastPages);
進入這個方法spa
public boolean flush(final int flushLeastPages) { boolean result = true; // 根據offset找到映射文件 MappedFile mappedFile = this.findMappedFileByOffset(this.flushedWhere, this.flushedWhere == 0); if (mappedFile != null) { long tmpTimeStamp = mappedFile.getStoreTimestamp(); int offset = mappedFile.flush(flushLeastPages); long where = mappedFile.getFileFromOffset() + offset; result = where == this.flushedWhere; this.flushedWhere = where; if (0 == flushLeastPages) { this.storeTimestamp = tmpTimeStamp; } } return result; }
進入這個方法org.apache.rocketmq.store.MappedFileQueue#flush.net
public boolean flush(final int flushLeastPages) { boolean result = true; // 根據offset找到映射文件 MappedFile mappedFile = this.findMappedFileByOffset(this.flushedWhere, this.flushedWhere == 0); if (mappedFile != null) { long tmpTimeStamp = mappedFile.getStoreTimestamp(); int offset = mappedFile.flush(flushLeastPages); long where = mappedFile.getFileFromOffset() + offset; result = where == this.flushedWhere; this.flushedWhere = where; if (0 == flushLeastPages) { this.storeTimestamp = tmpTimeStamp; } } return result; }
返回到這個方法這一行線程
org.apache.rocketmq.store.DefaultMessageStore#start
// 提交日誌 this.commitLog.start();
進入這個方法org.apache.rocketmq.store.CommitLog.CommitRealTimeService#run
@Override public void run() { CommitLog.log.info(this.getServiceName() + " service started"); while (!this.isStopped()) { // 從配置文件中獲取配置文件 int interval = CommitLog.this.defaultMessageStore.getMessageStoreConfig().getCommitIntervalCommitLog(); int commitDataLeastPages = CommitLog.this.defaultMessageStore.getMessageStoreConfig().getCommitCommitLogLeastPages(); int commitDataThoroughInterval = CommitLog.this.defaultMessageStore.getMessageStoreConfig().getCommitCommitLogThoroughInterval(); long begin = System.currentTimeMillis(); if (begin >= (this.lastCommitTimestamp + commitDataThoroughInterval)) { this.lastCommitTimestamp = begin; commitDataLeastPages = 0; } try { boolean result = CommitLog.this.mappedFileQueue.commit(commitDataLeastPages); long end = System.currentTimeMillis(); if (!result) { this.lastCommitTimestamp = end; // result = false means some data committed. //now wake up flush thread. flushCommitLogService.wakeup(); } if (end - begin > 500) { log.info("Commit data to file costs {} ms", end - begin); } this.waitForRunning(interval); } catch (Throwable e) { CommitLog.log.error(this.getServiceName() + " service has exception. ", e); } } boolean result = false; for (int i = 0; i < RETRY_TIMES_OVER && !result; i++) { result = CommitLog.this.mappedFileQueue.commit(0); CommitLog.log.info(this.getServiceName() + " service shutdown, retry " + (i + 1) + " times " + (result ? "OK" : "Not OK")); } CommitLog.log.info(this.getServiceName() + " service end"); } }
返回到這個方法這一行
org.apache.rocketmq.store.DefaultMessageStore#start
// 存儲狀態服務啓動 this.storeStatsService.start();
進入這個方法org.apache.rocketmq.store.StoreStatsService#run
返回到這個方法org.apache.rocketmq.store.DefaultMessageStore#start
這一行
// 存儲消息服務啓動 this.reputMessageService.start();
進入這個方法org.apache.rocketmq.store.DefaultMessageStore.ReputMessageService#run
@Override public void run() { DefaultMessageStore.log.info(this.getServiceName() + " service started"); while (!this.isStopped()) { try { Thread.sleep(1); this.doReput(); } catch (Exception e) { DefaultMessageStore.log.warn(this.getServiceName() + " service has exception. ", e); } } DefaultMessageStore.log.info(this.getServiceName() + " service end"); }
進入這個方法org.apache.rocketmq.store.DefaultMessageStore.ReputMessageService#doReput
private void doReput() { for (boolean doNext = true; this.isCommitLogAvailable() && doNext; ) { if (DefaultMessageStore.this.getMessageStoreConfig().isDuplicationEnable() && this.reputFromOffset >= DefaultMessageStore.this.getConfirmOffset()) { break; } SelectMappedBufferResult result = DefaultMessageStore.this.commitLog.getData(reputFromOffset); if (result != null) { try { this.reputFromOffset = result.getStartOffset(); for (int readSize = 0; readSize < result.getSize() && doNext; ) { DispatchRequest dispatchRequest = DefaultMessageStore.this.commitLog.checkMessageAndReturnSize(result.getByteBuffer(), false, false); int size = dispatchRequest.getMsgSize(); if (dispatchRequest.isSuccess()) { if (size > 0) { // 存儲服務轉發請求 DefaultMessageStore.this.doDispatch(dispatchRequest); if (BrokerRole.SLAVE != DefaultMessageStore.this.getMessageStoreConfig().getBrokerRole() && DefaultMessageStore.this.brokerConfig.isLongPollingEnable()) { DefaultMessageStore.this.messageArrivingListener.arriving(dispatchRequest.getTopic(), dispatchRequest.getQueueId(), dispatchRequest.getConsumeQueueOffset() + 1, dispatchRequest.getTagsCode(), dispatchRequest.getStoreTimestamp(), dispatchRequest.getBitMap(), dispatchRequest.getPropertiesMap()); } this.reputFromOffset += size; readSize += size; if (DefaultMessageStore.this.getMessageStoreConfig().getBrokerRole() == BrokerRole.SLAVE) { DefaultMessageStore.this.storeStatsService .getSinglePutMessageTopicTimesTotal(dispatchRequest.getTopic()).incrementAndGet(); DefaultMessageStore.this.storeStatsService .getSinglePutMessageTopicSizeTotal(dispatchRequest.getTopic()) .addAndGet(dispatchRequest.getMsgSize()); } } else if (size == 0) { this.reputFromOffset = DefaultMessageStore.this.commitLog.rollNextFile(this.reputFromOffset); readSize = result.getSize(); } } else if (!dispatchRequest.isSuccess()) { if (size > 0) { log.error("[BUG]read total count not equals msg total size. reputFromOffset={}", reputFromOffset); this.reputFromOffset += size; } else { doNext = false; if (DefaultMessageStore.this.brokerConfig.getBrokerId() == MixAll.MASTER_ID) { log.error("[BUG]the master dispatch message to consume queue error, COMMITLOG OFFSET: {}", this.reputFromOffset); this.reputFromOffset += result.getSize() - readSize; } } } } } finally { result.release(); } } else { doNext = false; } } }
返回到這個方法org.apache.rocketmq.store.DefaultMessageStore#start這一行
// ha服務啓動 this.haService.start();
public void start() throws Exception { lock = lockFile.getChannel().tryLock(0, 1, false); if (lock == null || lock.isShared() || !lock.isValid()) { throw new RuntimeException("Lock failed,MQ already started"); } lockFile.getChannel().write(ByteBuffer.wrap("lock".getBytes())); lockFile.getChannel().force(true); // 刷新消費隊列啓動 this.flushConsumeQueueService.start(); // 提交日誌 this.commitLog.start(); // 存儲狀態服務啓動 this.storeStatsService.start(); if (this.scheduleMessageService != null && SLAVE != messageStoreConfig.getBrokerRole()) { this.scheduleMessageService.start(); } if (this.getMessageStoreConfig().isDuplicationEnable()) { this.reputMessageService.setReputFromOffset(this.commitLog.getConfirmOffset()); } else { this.reputMessageService.setReputFromOffset(this.commitLog.getMaxOffset()); } // 存儲消息服務啓動 this.reputMessageService.start(); // ha服務啓動 this.haService.start(); this.createTempFile(); this.addScheduleTask(); this.shutdown = false; }
進入這個方法org.apache.rocketmq.store.ha.HAService#start
進入這個方法org.apache.rocketmq.store.ha.HAService.AcceptSocketService#run
@Override public void run() { log.info(this.getServiceName() + " service started"); while (!this.isStopped()) { try { this.selector.select(1000); Set<SelectionKey> selected = this.selector.selectedKeys(); if (selected != null) { for (SelectionKey k : selected) { if ((k.readyOps() & SelectionKey.OP_ACCEPT) != 0) { SocketChannel sc = ((ServerSocketChannel) k.channel()).accept(); if (sc != null) { HAService.log.info("HAService receive new connection, " + sc.socket().getRemoteSocketAddress()); try { HAConnection conn = new HAConnection(HAService.this, sc); // ha connection啓動 conn.start(); HAService.this.addConnection(conn); } catch (Exception e) { log.error("new HAConnection exception", e); sc.close(); } } } else { log.warn("Unexpected ops in select " + k.readyOps()); } } selected.clear(); } } catch (Exception e) { log.error(this.getServiceName() + " service has exception.", e); } } log.info(this.getServiceName() + " service end"); }
ha connection啓動 conn.start();
public void start() { this.readSocketService.start(); this.writeSocketService.start(); }
進入這個方法org.apache.rocketmq.store.ha.HAConnection.ReadSocketService#run
@Override public void run() { HAConnection.log.info(this.getServiceName() + " service started"); while (!this.isStopped()) { try { this.selector.select(1000); // 處理讀事件 boolean ok = this.processReadEvent(); if (!ok) { HAConnection.log.error("processReadEvent error"); break; } long interval = HAConnection.this.haService.getDefaultMessageStore().getSystemClock().now() - this.lastReadTimestamp; if (interval > HAConnection.this.haService.getDefaultMessageStore().getMessageStoreConfig().getHaHousekeepingInterval()) { log.warn("ha housekeeping, found this connection[" + HAConnection.this.clientAddr + "] expired, " + interval); break; } } catch (Exception e) { HAConnection.log.error(this.getServiceName() + " service has exception.", e); break; } } this.makeStop(); writeSocketService.makeStop(); haService.removeConnection(HAConnection.this); HAConnection.this.haService.getConnectionCount().decrementAndGet(); SelectionKey sk = this.socketChannel.keyFor(this.selector); if (sk != null) { sk.cancel(); } try { this.selector.close(); this.socketChannel.close(); } catch (IOException e) { HAConnection.log.error("", e); } HAConnection.log.info(this.getServiceName() + " service end"); }
這一行
處理讀事件 boolean ok = this.processReadEvent();
進入這個方法
org.apache.rocketmq.store.ha.HAConnection.WriteSocketService#run
返回到這個方法org.apache.rocketmq.store.ha.HAService#start這一行
this.groupTransferService.start();
這一行
this.haClient.start();
進入這個方法org.apache.rocketmq.store.ha.HAService.HAClient#run
@Override public void run() { log.info(this.getServiceName() + " service started"); while (!this.isStopped()) { try { // 鏈接master if (this.connectMaster()) { if (this.isTimeToReportOffset()) { boolean result = this.reportSlaveMaxOffset(this.currentReportedOffset); if (!result) { // 關閉master this.closeMaster(); } } this.selector.select(1000); // 處理讀事件 boolean ok = this.processReadEvent(); if (!ok) { this.closeMaster(); } if (!reportSlaveMaxOffsetPlus()) { continue; } long interval = HAService.this.getDefaultMessageStore().getSystemClock().now() - this.lastWriteTimestamp; if (interval > HAService.this.getDefaultMessageStore().getMessageStoreConfig() .getHaHousekeepingInterval()) { log.warn("HAClient, housekeeping, found this connection[" + this.masterAddress + "] expired, " + interval); this.closeMaster(); log.warn("HAClient, master not response some time, so close connection"); } } else { this.waitForRunning(1000 * 5); } } catch (Exception e) { log.warn(this.getServiceName() + " service has exception. ", e); this.waitForRunning(1000 * 5); } } log.info(this.getServiceName() + " service end"); }
返回到這個方法org.apache.rocketmq.broker.BrokerController#start
這一行
if (this.remotingServer != null) { this.remotingServer.start(); }
進入這個方法org.apache.rocketmq.remoting.netty.NettyRemotingServer#start
@Override public void start() { this.defaultEventExecutorGroup = new DefaultEventExecutorGroup( nettyServerConfig.getServerWorkerThreads(), // 自定義線程池中的線程名字 new ThreadFactory() { private AtomicInteger threadIndex = new AtomicInteger(0); @Override public Thread newThread(Runnable r) { return new Thread(r, "NettyServerCodecThread_" + this.threadIndex.incrementAndGet()); } }); ServerBootstrap childHandler = this.serverBootstrap.group(this.eventLoopGroupBoss, this.eventLoopGroupSelector) .channel(useEpoll() ? EpollServerSocketChannel.class : NioServerSocketChannel.class) .option(ChannelOption.SO_BACKLOG, 1024) .option(ChannelOption.SO_REUSEADDR, true) .option(ChannelOption.SO_KEEPALIVE, false) .childOption(ChannelOption.TCP_NODELAY, true) .childOption(ChannelOption.SO_SNDBUF, nettyServerConfig.getServerSocketSndBufSize()) .childOption(ChannelOption.SO_RCVBUF, nettyServerConfig.getServerSocketRcvBufSize()) .localAddress(new InetSocketAddress(this.nettyServerConfig.getListenPort())) .childHandler(new ChannelInitializer<SocketChannel>() { @Override public void initChannel(SocketChannel ch) throws Exception { ch.pipeline() .addLast(defaultEventExecutorGroup, HANDSHAKE_HANDLER_NAME, new HandshakeHandler(TlsSystemConfig.tlsMode)) .addLast(defaultEventExecutorGroup, new NettyEncoder(), new NettyDecoder(), new IdleStateHandler(0, 0, nettyServerConfig.getServerChannelMaxIdleTimeSeconds()), new NettyConnectManageHandler(), new NettyServerHandler() ); } }); if (nettyServerConfig.isServerPooledByteBufAllocatorEnable()) { childHandler.childOption(ChannelOption.ALLOCATOR, PooledByteBufAllocator.DEFAULT); } try { ChannelFuture sync = this.serverBootstrap.bind().sync(); InetSocketAddress addr = (InetSocketAddress) sync.channel().localAddress(); this.port = addr.getPort(); } catch (InterruptedException e1) { throw new RuntimeException("this.serverBootstrap.bind().sync() InterruptedException", e1); } if (this.channelEventListener != null) { this.nettyEventExecutor.start(); } this.timer.scheduleAtFixedRate(new TimerTask() { @Override public void run() { try { NettyRemotingServer.this.scanResponseTable(); } catch (Throwable e) { log.error("scanResponseTable exception", e); } } }, 1000 * 3, 1000); }
org.apache.rocketmq.remoting.netty.NettyRemotingServer.NettyServerHandler netty server解析
class NettyServerHandler extends SimpleChannelInboundHandler<RemotingCommand> { @Override protected void channelRead0(ChannelHandlerContext ctx, RemotingCommand msg) throws Exception { processMessageReceived(ctx, msg); } }
進入這個方法org.apache.rocketmq.remoting.netty.NettyRemotingAbstract#processMessageReceived
public void processMessageReceived(ChannelHandlerContext ctx, RemotingCommand msg) throws Exception { final RemotingCommand cmd = msg; if (cmd != null) { switch (cmd.getType()) { case REQUEST_COMMAND: // 請求消息處理 processRequestCommand(ctx, cmd); break; case RESPONSE_COMMAND: // 響應消息處理 processResponseCommand(ctx, cmd); break; default: break; } } }
進入這個方法org.apache.rocketmq.remoting.netty.NettyRemotingAbstract#processRequestCommand
public void processRequestCommand(final ChannelHandlerContext ctx, final RemotingCommand cmd) { // 公平的處理請求 final Pair<NettyRequestProcessor, ExecutorService> matched = this.processorTable.get(cmd.getCode()); final Pair<NettyRequestProcessor, ExecutorService> pair = null == matched ? this.defaultRequestProcessor : matched; final int opaque = cmd.getOpaque(); if (pair != null) { Runnable run = new Runnable() { @Override public void run() { try { // 客戶自定義的鉤子實現類 RPCHook rpcHook = NettyRemotingAbstract.this.getRPCHook(); if (rpcHook != null) { // 這裏mq提供了一些鉤子方法能夠擴展的地方,請求前處理邏輯能夠在這裏擴展 rpcHook.doBeforeRequest(RemotingHelper.parseChannelRemoteAddr(ctx.channel()), cmd); } final RemotingCommand response = pair.getObject1().processRequest(ctx, cmd); if (rpcHook != null) { // 執行rocketmq請求的後置處理鉤子方法 rpcHook.doAfterResponse(RemotingHelper.parseChannelRemoteAddr(ctx.channel()), cmd, response); } // 若是不是單線請求 if (!cmd.isOnewayRPC()) { if (response != null) { response.setOpaque(opaque); response.markResponseType(); try { ctx.writeAndFlush(response); } catch (Throwable e) { log.error("process request over, but response failed", e); log.error(cmd.toString()); log.error(response.toString()); } } else { } } } catch (Throwable e) { log.error("process request exception", e); log.error(cmd.toString()); if (!cmd.isOnewayRPC()) { final RemotingCommand response = RemotingCommand.createResponseCommand(RemotingSysResponseCode.SYSTEM_ERROR, RemotingHelper.exceptionSimpleDesc(e)); response.setOpaque(opaque); ctx.writeAndFlush(response); } } } }; if (pair.getObject1().rejectRequest()) { final RemotingCommand response = RemotingCommand.createResponseCommand(RemotingSysResponseCode.SYSTEM_BUSY, "[REJECTREQUEST]system busy, start flow control for a while"); response.setOpaque(opaque); ctx.writeAndFlush(response); return; } try { final RequestTask requestTask = new RequestTask(run, ctx.channel(), cmd); pair.getObject2().submit(requestTask); } catch (RejectedExecutionException e) { if ((System.currentTimeMillis() % 10000) == 0) { log.warn(RemotingHelper.parseChannelRemoteAddr(ctx.channel()) + ", too many requests and system thread pool busy, RejectedExecutionException " + pair.getObject2().toString() + " request code: " + cmd.getCode()); } if (!cmd.isOnewayRPC()) { final RemotingCommand response = RemotingCommand.createResponseCommand(RemotingSysResponseCode.SYSTEM_BUSY, "[OVERLOAD]system busy, start flow control for a while"); response.setOpaque(opaque); ctx.writeAndFlush(response); } } } else { String error = " request type " + cmd.getCode() + " not supported"; final RemotingCommand response = RemotingCommand.createResponseCommand(RemotingSysResponseCode.REQUEST_CODE_NOT_SUPPORTED, error); response.setOpaque(opaque); ctx.writeAndFlush(response); log.error(RemotingHelper.parseChannelRemoteAddr(ctx.channel()) + error); } }
進入這個方法org.apache.rocketmq.remoting.netty.NettyRemotingAbstract#processResponseCommand
public void processResponseCommand(ChannelHandlerContext ctx, RemotingCommand cmd) { final int opaque = cmd.getOpaque(); // 從隊列中獲取responseFuture final ResponseFuture responseFuture = responseTable.get(opaque); if (responseFuture != null) { responseFuture.setResponseCommand(cmd); responseTable.remove(opaque); if (responseFuture.getInvokeCallback() != null) { // 若是響應對象有回調處理就處理回調函數 executeInvokeCallback(responseFuture); } else { responseFuture.putResponse(cmd); responseFuture.release(); } } else { log.warn("receive response, but not matched any request, " + RemotingHelper.parseChannelRemoteAddr(ctx.channel())); log.warn(cmd.toString()); } }
返回到這個方法org.apache.rocketmq.remoting.netty.NettyRemotingServer#start
// nett處理器啓動 this.nettyEventExecutor.start();
進入這個方法org.apache.rocketmq.remoting.netty.NettyRemotingAbstract.NettyEventExecutor#run
@Override public void run() { log.info(this.getServiceName() + " service started"); final ChannelEventListener listener = NettyRemotingAbstract.this.getChannelEventListener(); while (!this.isStopped()) { try { // 事件保存在阻塞隊列中 NettyEvent event = this.eventQueue.poll(3000, TimeUnit.MILLISECONDS); if (event != null && listener != null) { switch (event.getType()) { case IDLE: listener.onChannelIdle(event.getRemoteAddr(), event.getChannel()); break; case CLOSE: listener.onChannelClose(event.getRemoteAddr(), event.getChannel()); break; case CONNECT: listener.onChannelConnect(event.getRemoteAddr(), event.getChannel()); break; case EXCEPTION: listener.onChannelException(event.getRemoteAddr(), event.getChannel()); break; default: break; } } } catch (Exception e) { log.warn(this.getServiceName() + " service has exception. ", e); } } log.info(this.getServiceName() + " service end"); }
說到最後
本次解析表明我的觀點,僅供參考。
關注本號
加羣討論