說在前面java
接上次apache
源碼解析bootstrap
進入這個方法org.apache.rocketmq.remoting.netty.NettyRemotingClient#invokeOneway發送單途請求緩存
@Override public void invokeOneway(String addr, RemotingCommand request, long timeoutMillis) throws InterruptedException, RemotingConnectException, RemotingTooMuchRequestException, RemotingTimeoutException, RemotingSendRequestException { // 獲取channel=》 final Channel channel = this.getAndCreateChannel(addr); if (channel != null && channel.isActive()) { try { if (this.rpcHook != null) { // 執行請求執行前的鉤子方法 this.rpcHook.doBeforeRequest(addr, request); } // 執行單線請求 =》 this.invokeOnewayImpl(channel, request, timeoutMillis); } catch (RemotingSendRequestException e) { log.warn("invokeOneway: send request exception, so close the channel[{}]", addr); // 異常關閉channel=》 this.closeChannel(addr, channel); throw e; } } else { this.closeChannel(addr, channel); throw new RemotingConnectException(addr); } }
進入這個方法org.apache.rocketmq.remoting.netty.NettyRemotingClient#getAndCreateChannel獲取channel微信
private Channel getAndCreateChannel(final String addr) throws InterruptedException { if (null == addr) { // =》 return getAndCreateNameserverChannel(); } ChannelWrapper cw = this.channelTables.get(addr); if (cw != null && cw.isOK()) { return cw.getChannel(); } // =》 return this.createChannel(addr); }
進入這個方法org.apache.rocketmq.remoting.netty.NettyRemotingClient#getAndCreateNameserverChannel獲取namrsrv通訊的channelapp
private Channel getAndCreateNameserverChannel() throws InterruptedException { String addr = this.namesrvAddrChoosed.get(); if (addr != null) { ChannelWrapper cw = this.channelTables.get(addr); if (cw != null && cw.isOK()) { return cw.getChannel(); } } // 從namesrvAddrChoosed中查找namesrv,若是不存在同步輪詢的方式從namesrvAddrList中取 final List<String> addrList = this.namesrvAddrList.get(); if (this.lockNamesrvChannel.tryLock(LOCK_TIMEOUT_MILLIS, TimeUnit.MILLISECONDS)) { try { addr = this.namesrvAddrChoosed.get(); if (addr != null) { ChannelWrapper cw = this.channelTables.get(addr); if (cw != null && cw.isOK()) { return cw.getChannel(); } } if (addrList != null && !addrList.isEmpty()) { for (int i = 0; i < addrList.size(); i++) { int index = this.namesrvIndex.incrementAndGet(); index = Math.abs(index); index = index % addrList.size(); String newAddr = addrList.get(index); this.namesrvAddrChoosed.set(newAddr); log.info("new name server is chosen. OLD: {} , NEW: {}. namesrvIndex = {}", addr, newAddr, namesrvIndex); // 同步建立渠道 Channel channelNew = this.createChannel(newAddr); if (channelNew != null) { return channelNew; } } } } catch (Exception e) { log.error("getAndCreateNameserverChannel: create name server channel exception", e); } finally { this.lockNamesrvChannel.unlock(); } } else { log.warn("getAndCreateNameserverChannel: try to lock name server, but timeout, {}ms", LOCK_TIMEOUT_MILLIS); } return null; }
進入到這個方法org.apache.rocketmq.remoting.netty.NettyRemotingClient#createChannel同步建立channelsocket
private Channel createChannel(final String addr) throws InterruptedException { ChannelWrapper cw = this.channelTables.get(addr); // 代碼走到這裏,這裏的邏輯正常狀況下是走不到的,爲了代碼嚴謹性 if (cw != null && cw.isOK()) { cw.getChannel().close(); channelTables.remove(addr); } if (this.lockChannelTables.tryLock(LOCK_TIMEOUT_MILLIS, TimeUnit.MILLISECONDS)) { try { boolean createNewConnection; // 爲了代碼嚴謹性,這裏又作了一次判斷 cw = this.channelTables.get(addr); if (cw != null) { if (cw.isOK()) { cw.getChannel().close(); this.channelTables.remove(addr); createNewConnection = true; // 若是channel還在用,不讓建立 } else if (!cw.getChannelFuture().isDone()) { createNewConnection = false; } else { this.channelTables.remove(addr); createNewConnection = true; } } else { createNewConnection = true; } if (createNewConnection) { // 從新創建鏈接 ChannelFuture channelFuture = this.bootstrap.connect(RemotingHelper.string2SocketAddress(addr)); log.info("createChannel: begin to connect remote host[{}] asynchronously", addr); cw = new ChannelWrapper(channelFuture); // 創建的channel也放到本次緩存中 this.channelTables.put(addr, cw); } } catch (Exception e) { log.error("createChannel: create channel exception", e); } finally { this.lockChannelTables.unlock(); } } else { log.warn("createChannel: try to lock channel table, but timeout, {}ms", LOCK_TIMEOUT_MILLIS); } if (cw != null) { ChannelFuture channelFuture = cw.getChannelFuture(); if (channelFuture.awaitUninterruptibly(this.nettyClientConfig.getConnectTimeoutMillis())) { // 對channel再次判斷 if (cw.isOK()) { log.info("createChannel: connect remote host[{}] success, {}", addr, channelFuture.toString()); return cw.getChannel(); } else { log.warn("createChannel: connect remote host[" + addr + "] failed, " + channelFuture.toString(), channelFuture.cause()); } } else { log.warn("createChannel: connect remote host[{}] timeout {}ms, {}", addr, this.nettyClientConfig.getConnectTimeoutMillis(), channelFuture.toString()); } } return null; }
往上返回到這個方法org.apache.rocketmq.remoting.netty.NettyRemotingAbstract#invokeOnewayImpl執行單途實現async
public void invokeOnewayImpl(final Channel channel, final RemotingCommand request, final long timeoutMillis) throws InterruptedException, RemotingTooMuchRequestException, RemotingTimeoutException, RemotingSendRequestException { request.markOnewayRPC(); // 獲取信號量的信號 boolean acquired = this.semaphoreOneway.tryAcquire(timeoutMillis, TimeUnit.MILLISECONDS); if (acquired) { final SemaphoreReleaseOnlyOnce once = new SemaphoreReleaseOnlyOnce(this.semaphoreOneway); try { channel.writeAndFlush(request).addListener(new ChannelFutureListener() { @Override public void operationComplete(ChannelFuture f) throws Exception { once.release(); if (!f.isSuccess()) { log.warn("send a request command to channel <" + channel.remoteAddress() + "> failed."); } } }); } catch (Exception e) { once.release(); log.warn("write send a request command to channel <" + channel.remoteAddress() + "> failed."); throw new RemotingSendRequestException(RemotingHelper.parseChannelRemoteAddr(channel), e); } } else { if (timeoutMillis <= 0) { throw new RemotingTooMuchRequestException("invokeOnewayImpl invoke too fast"); } else { String info = String.format( "invokeOnewayImpl tryAcquire semaphore timeout, %dms, waiting thread nums: %d semaphoreAsyncValue: %d", timeoutMillis, this.semaphoreOneway.getQueueLength(), this.semaphoreOneway.availablePermits() ); log.warn(info); throw new RemotingTimeoutException(info); } } }
往上返回到這個方法org.apache.rocketmq.remoting.netty.NettyRemotingClient#closeChannel(java.lang.String, io.netty.channel.Channel)異常同步關閉channelide
public void closeChannel(final String addr, final Channel channel) { if (null == channel) return; final String addrRemote = null == addr ? RemotingHelper.parseChannelRemoteAddr(channel) : addr; try { if (this.lockChannelTables.tryLock(LOCK_TIMEOUT_MILLIS, TimeUnit.MILLISECONDS)) { try { boolean removeItemFromTable = true; final ChannelWrapper prevCW = this.channelTables.get(addrRemote); log.info("closeChannel: begin close the channel[{}] Found: {}", addrRemote, prevCW != null); if (null == prevCW) { log.info("closeChannel: the channel[{}] has been removed from the channel table before", addrRemote); removeItemFromTable = false; } else if (prevCW.getChannel() != channel) { log.info("closeChannel: the channel[{}] has been closed before, and has been created again, nothing to do.", addrRemote); removeItemFromTable = false; } if (removeItemFromTable) { // 刪除緩存中的channel this.channelTables.remove(addrRemote); log.info("closeChannel: the channel[{}] was removed from channel table", addrRemote); } // 關閉channel RemotingUtil.closeChannel(channel); } catch (Exception e) { log.error("closeChannel: close the channel exception", e); } finally { this.lockChannelTables.unlock(); } } else { log.warn("closeChannel: try to lock channel table, but timeout, {}ms", LOCK_TIMEOUT_MILLIS); } } catch (InterruptedException e) { log.error("closeChannel exception", e); } }
往上返回到這個方法org.apache.rocketmq.remoting.netty.NettyRemotingClient#invokeSync broker同步向namesrv註冊ui
@Override public RemotingCommand invokeSync(String addr, final RemotingCommand request, long timeoutMillis) throws InterruptedException, RemotingConnectException, RemotingSendRequestException, RemotingTimeoutException { long beginStartTime = System.currentTimeMillis(); // 獲取並建立channel =》 final Channel channel = this.getAndCreateChannel(addr); if (channel != null && channel.isActive()) { try { if (this.rpcHook != null) { // 執行請求前鉤子方法 this.rpcHook.doBeforeRequest(addr, request); } long costTime = System.currentTimeMillis() - beginStartTime; if (timeoutMillis < costTime) { throw new RemotingTimeoutException("invokeSync call timeout"); } // 執行同步請求 =》 RemotingCommand response = this.invokeSyncImpl(channel, request, timeoutMillis - costTime); if (this.rpcHook != null) { // 執行響應後鉤子方法 =》 this.rpcHook.doAfterResponse(RemotingHelper.parseChannelRemoteAddr(channel), request, response); } return response; } catch (RemotingSendRequestException e) { log.warn("invokeSync: send request exception, so close the channel[{}]", addr); // 出現異常,channel關閉 =》 this.closeChannel(addr, channel); throw e; } catch (RemotingTimeoutException e) { if (nettyClientConfig.isClientCloseSocketIfTimeout()) { this.closeChannel(addr, channel); log.warn("invokeSync: close socket because of timeout, {}ms, {}", timeoutMillis, addr); } log.warn("invokeSync: wait response timeout exception, the channel[{}]", addr); throw e; } } else { this.closeChannel(addr, channel); throw new RemotingConnectException(addr); } }
進入這個方法org.apache.rocketmq.remoting.netty.NettyRemotingClient#getAndCreateChannel建立channel,前面介紹過了。
進入這個方法org.apache.rocketmq.remoting.netty.NettyRemotingAbstract#invokeSyncImpl同步處理實現
public RemotingCommand invokeSyncImpl(final Channel channel, final RemotingCommand request, final long timeoutMillis) throws InterruptedException, RemotingSendRequestException, RemotingTimeoutException { final int opaque = request.getOpaque(); try { final ResponseFuture responseFuture = new ResponseFuture(channel, opaque, timeoutMillis, null, null); // 緩存正在進行的請求 this.responseTable.put(opaque, responseFuture); final SocketAddress addr = channel.remoteAddress(); channel.writeAndFlush(request).addListener(new ChannelFutureListener() { @Override public void operationComplete(ChannelFuture f) throws Exception { if (f.isSuccess()) { responseFuture.setSendRequestOK(true); return; } else { responseFuture.setSendRequestOK(false); } responseTable.remove(opaque); responseFuture.setCause(f.cause()); // 響應解析完畢會解除countDownLatch的阻塞 =》 responseFuture.putResponse(null); log.warn("send a request command to channel <" + addr + "> failed."); } }); // 這裏用countDownLatch實現的阻塞 =》 RemotingCommand responseCommand = responseFuture.waitResponse(timeoutMillis); if (null == responseCommand) { if (responseFuture.isSendRequestOK()) { throw new RemotingTimeoutException(RemotingHelper.parseSocketAddressAddr(addr), timeoutMillis, responseFuture.getCause()); } else { throw new RemotingSendRequestException(RemotingHelper.parseSocketAddressAddr(addr), responseFuture.getCause()); } } return responseCommand; } finally { this.responseTable.remove(opaque); } }
進入這個方法org.apache.rocketmq.remoting.netty.NettyRemotingClient#closeChannel(java.lang.String, io.netty.channel.Channel)異常關閉channel,前面介紹過了。
往上返回到這個方法org.apache.rocketmq.broker.topic.TopicConfigManager#updateOrderTopicConfig更新topic配置
public void updateOrderTopicConfig(final KVTable orderKVTableFromNs) { if (orderKVTableFromNs != null && orderKVTableFromNs.getTable() != null) { boolean isChange = false; Set<String> orderTopics = orderKVTableFromNs.getTable().keySet(); for (String topic : orderTopics) { TopicConfig topicConfig = this.topicConfigTable.get(topic); if (topicConfig != null && !topicConfig.isOrder()) { topicConfig.setOrder(true); isChange = true; log.info("update order topic config, topic={}, order={}", topic, true); } } for (Map.Entry<String, TopicConfig> entry : this.topicConfigTable.entrySet()) { String topic = entry.getKey(); if (!orderTopics.contains(topic)) { TopicConfig topicConfig = entry.getValue(); if (topicConfig.isOrder()) { topicConfig.setOrder(false); isChange = true; log.info("update order topic config, topic={}, order={}", topic, false); } } } if (isChange) { // 更新數據版本號 this.dataVersion.nextVersion(); // 持久化=》 this.persist(); } } }
往上返回到這個方法org.apache.rocketmq.broker.processor.AdminBrokerProcessor#updateAndCreateTopic 建立或更新topic解析完畢。
說在最後
本次解析僅表明我的觀點,僅供參考。
加入技術微信羣
釘釘技術羣