Rocketmq 的通訊層是基於通訊框架 netty,下面來看看rocketmq底層繼承圖。git
NettyRemotingAbstract是NettyRemotingClient和NettyRemotingServer的抽象父類,對發送和接收的公共部分進行了處理github
一 . 首先在數據結構方面使用了responseFuture模式緩存
1.保存了RPC處理器 ,Broker 接收請求將 opaque 直接把這個值設置迴響應對象,客戶端接收到這個響應,經過 opaque 從緩存查找對應的 ResponseFuture 對象數據結構
protected final HashMap<Integer/* request code */, Pair<NettyRequestProcessor, ExecutorService>> processorTable = new HashMap<Integer, Pair<NettyRequestProcessor, ExecutorService>>(64);
當服務端接受客戶端響應的時候,會調用NettyServerHandler處理客戶端的請求。框架
ServerBootstrap childHandler = // this.serverBootstrap.group(this.eventLoopGroupBoss, this.eventLoopGroupSelector).channel(NioServerSocketChannel.class) .option(ChannelOption.SO_BACKLOG, 1024) .option(ChannelOption.SO_REUSEADDR, true) .option(ChannelOption.SO_KEEPALIVE, false) .childOption(ChannelOption.TCP_NODELAY, true) .option(ChannelOption.SO_SNDBUF, nettyServerConfig.getServerSocketSndBufSize()) // .option(ChannelOption.SO_RCVBUF, nettyServerConfig.getServerSocketRcvBufSize()) // .localAddress(new InetSocketAddress(this.nettyServerConfig.getListenPort())) .childHandler(new ChannelInitializer<SocketChannel>() { @Override public void initChannel(SocketChannel ch) throws Exception { ch.pipeline().addLast( defaultEventExecutorGroup, // new NettyEncoder(), // new NettyDecoder(), // new IdleStateHandler(0, 0, nettyServerConfig.getServerChannelMaxIdleTimeSeconds()), // new NettyConnetManageHandler(), // new NettyServerHandler()); } });
NettyServerHandler處理channelRead的時候調用processMessageRecevived異步
class NettyServerHandler extends SimpleChannelInboundHandler<RemotingCommand> { @Override protected void channelRead0(ChannelHandlerContext ctx, RemotingCommand msg) throws Exception { processMessageReceived(ctx, msg); } }
processMesssageReceived方法就會判斷當前是做爲Server端,接收的消息是請求,那麼調用processTable對應的事件進行處理,若是做爲Client端,接收的消息是回覆,即接收到Server端的回覆,那麼從responseTable中,首先獲取opaque對應的ResponseFuture,若是這個response是異步回調,則有InvokeCallback,那麼調用invokeBack函數,而後將Response塞入ResponseFuture後返回;ide
public void processMessageReceived(ChannelHandlerContext ctx, RemotingCommand msg) throws Exception { final RemotingCommand cmd = msg; if (cmd != null) { switch (cmd.getType()) { case REQUEST_COMMAND: processRequestCommand(ctx, cmd); break; case RESPONSE_COMMAND: processResponseCommand(ctx, cmd); break; default: break; } } }
2.其次還保存了對外請求函數
protected final ConcurrentHashMap<Integer /* opaque */, ResponseFuture> responseTable = new ConcurrentHashMap<Integer, ResponseFuture>(256);
在服務端啓動 start() 方法的時候調用線程啓動掃描將超時的responseFuture直接刪除掉oop
public void start() { ............................. this.timer.scheduleAtFixedRate(new TimerTask() { [@Override](https://my.oschina.net/u/1162528) public void run() { try { NettyRemotingServer.this.scanResponseTable(); } catch (Exception e) { log.error("scanResponseTable exception", e); } } }, 1000 * 3, 1000); }
掃描方法以下,判斷哪些future超時:this
public void scanResponseTable() { final List<ResponseFuture> rfList = new LinkedList<ResponseFuture>(); Iterator<Entry<Integer, ResponseFuture>> it = this.responseTable.entrySet().iterator(); while (it.hasNext()) { Entry<Integer, ResponseFuture> next = it.next(); ResponseFuture rep = next.getValue(); if ((rep.getBeginTimestamp() + rep.getTimeoutMillis() + 1000) <= System.currentTimeMillis()) { rep.release(); it.remove(); rfList.add(rep); plog.warn("remove timeout request, " + rep); } } for (ResponseFuture rf : rfList) { try { rf.executeInvokeCallback(); } catch (Throwable e) { plog.warn("scanResponseTable, operationComplete Exception", e); } } }
二. 上面看了基本的數據結構和數據處理方式,下面看下發送方信息的發送邏輯。
public RemotingCommand invokeSyncImpl(final Channel channel, final RemotingCommand request, final long timeoutMillis) throws InterruptedException, RemotingSendRequestException, RemotingTimeoutException { final int opaque = request.getOpaque(); try { final ResponseFuture responseFuture = new ResponseFuture(opaque, timeoutMillis, null, null); this.responseTable.put(opaque, responseFuture); final SocketAddress addr = channel.remoteAddress(); channel.writeAndFlush(request).addListener(new ChannelFutureListener() { @Override public void operationComplete(ChannelFuture f) throws Exception { if (f.isSuccess()) { responseFuture.setSendRequestOK(true); return; } else { responseFuture.setSendRequestOK(false); } responseTable.remove(opaque); responseFuture.setCause(f.cause()); responseFuture.putResponse(null); plog.warn("send a request command to channel <" + addr + "> failed."); } }); RemotingCommand responseCommand = responseFuture.waitResponse(timeoutMillis); if (null == responseCommand) { if (responseFuture.isSendRequestOK()) { throw new RemotingTimeoutException(RemotingHelper.parseSocketAddressAddr(addr), timeoutMillis, responseFuture.getCause()); } else { throw new RemotingSendRequestException(RemotingHelper.parseSocketAddressAddr(addr), responseFuture.getCause()); } } return responseCommand; } finally { this.responseTable.remove(opaque); } }
2 invokeAsyncImpl:異步發送,發送時,生成ResponseFuture,設置 opaque, callback, once,超時時間等值,並放入緩存集合,放入responseTable中;若是超過scanResponseTable的timeout (30s),則報錯;不然調用註冊的invokeCallback進行回調處理;異步通常鏈路耗時比較長, 爲了防止本地緩存的 netty 請求過多, 使用信號量控制上,限默認 2048 個,發送成功 responseFuture.setSendRequestOK(true); 發送失敗 responseFuture.setSendRequestOK(false), 信號量經過 once 釋放, 刪除緩存 Netty 接收 server 端響應,根據 opaque 從緩存獲取 responseFuture,調用回調方法即接 口 InvokeCallback 實現
public RemotingCommand invokeSyncImpl(final Channel channel, final RemotingCommand request, final long timeoutMillis) throws InterruptedException, RemotingSendRequestException, RemotingTimeoutException { final int opaque = request.getOpaque(); try { final ResponseFuture responseFuture = new ResponseFuture(opaque, timeoutMillis, null, null); this.responseTable.put(opaque, responseFuture); final SocketAddress addr = channel.remoteAddress(); channel.writeAndFlush(request).addListener(new ChannelFutureListener() { @Override public void operationComplete(ChannelFuture f) throws Exception { if (f.isSuccess()) { responseFuture.setSendRequestOK(true); return; } else { responseFuture.setSendRequestOK(false); } responseTable.remove(opaque); responseFuture.setCause(f.cause()); responseFuture.putResponse(null); plog.warn("send a request command to channel <" + addr + "> failed."); } }); RemotingCommand responseCommand = responseFuture.waitResponse(timeoutMillis); if (null == responseCommand) { if (responseFuture.isSendRequestOK()) { throw new RemotingTimeoutException(RemotingHelper.parseSocketAddressAddr(addr), timeoutMillis, responseFuture.getCause()); } else { throw new RemotingSendRequestException(RemotingHelper.parseSocketAddressAddr(addr), responseFuture.getCause()); } } return responseCommand; } finally { this.responseTable.remove(opaque); } }
rocketmq大概的通訊流程基本如此,具體仍是要去github上下載源碼研讀,學下源代碼對自身的提升確實蠻大的。