netty學習之五 netty在rocketmq中的使用

Rocketmq 的通訊層是基於通訊框架 netty,下面來看看rocketmq底層繼承圖。git

輸入圖片說明

NettyRemotingAbstract是NettyRemotingClient和NettyRemotingServer的抽象父類,對發送和接收的公共部分進行了處理github

一 . 首先在數據結構方面使用了responseFuture模式緩存

1.保存了RPC處理器 ,Broker 接收請求將 opaque 直接把這個值設置迴響應對象,客戶端接收到這個響應,經過 opaque 從緩存查找對應的 ResponseFuture 對象數據結構

protected final HashMap<Integer/* request code */, Pair<NettyRequestProcessor, ExecutorService>> processorTable =
          new HashMap<Integer, Pair<NettyRequestProcessor, ExecutorService>>(64);

當服務端接受客戶端響應的時候,會調用NettyServerHandler處理客戶端的請求。框架

ServerBootstrap childHandler = //
                this.serverBootstrap.group(this.eventLoopGroupBoss,     
this.eventLoopGroupSelector).channel(NioServerSocketChannel.class)
                        .option(ChannelOption.SO_BACKLOG, 1024)
                       
                        .option(ChannelOption.SO_REUSEADDR, true)
                       
                        .option(ChannelOption.SO_KEEPALIVE, false)
                       
                        .childOption(ChannelOption.TCP_NODELAY, true)
                       
                        .option(ChannelOption.SO_SNDBUF, nettyServerConfig.getServerSocketSndBufSize())
                        //
                        .option(ChannelOption.SO_RCVBUF, nettyServerConfig.getServerSocketRcvBufSize())
                        //
                        .localAddress(new InetSocketAddress(this.nettyServerConfig.getListenPort()))
                        .childHandler(new ChannelInitializer<SocketChannel>() {
                            @Override
                            public void initChannel(SocketChannel ch) throws Exception {
                                ch.pipeline().addLast(
                                        defaultEventExecutorGroup, //
                                        new NettyEncoder(), //
                                        new NettyDecoder(), //
                                        new IdleStateHandler(0, 0, nettyServerConfig.getServerChannelMaxIdleTimeSeconds()), //
                                        new NettyConnetManageHandler(), //
                                        new NettyServerHandler());
                            }
                        });

NettyServerHandler處理channelRead的時候調用processMessageRecevived異步

class NettyServerHandler extends SimpleChannelInboundHandler<RemotingCommand> {
        @Override
        protected void channelRead0(ChannelHandlerContext ctx, RemotingCommand msg) throws Exception {
            processMessageReceived(ctx, msg);
        }
    }

processMesssageReceived方法就會判斷當前是做爲Server端,接收的消息是請求,那麼調用processTable對應的事件進行處理,若是做爲Client端,接收的消息是回覆,即接收到Server端的回覆,那麼從responseTable中,首先獲取opaque對應的ResponseFuture,若是這個response是異步回調,則有InvokeCallback,那麼調用invokeBack函數,而後將Response塞入ResponseFuture後返回;ide

public void processMessageReceived(ChannelHandlerContext ctx, RemotingCommand msg) throws Exception {
        final RemotingCommand cmd = msg;
        if (cmd != null) {
            switch (cmd.getType()) {
                case REQUEST_COMMAND:
                    processRequestCommand(ctx, cmd);
                    break;
                case RESPONSE_COMMAND:
                    processResponseCommand(ctx, cmd);
                    break;
                default:
                    break;
            }
        }
    }

2.其次還保存了對外請求函數

protected final ConcurrentHashMap<Integer /* opaque */, ResponseFuture> responseTable =
            new ConcurrentHashMap<Integer, ResponseFuture>(256);

在服務端啓動 start() 方法的時候調用線程啓動掃描將超時的responseFuture直接刪除掉oop

public void start() {
    .............................
     this.timer.scheduleAtFixedRate(new TimerTask() {
            [@Override](https://my.oschina.net/u/1162528)
            public void run() {
                try {
                    NettyRemotingServer.this.scanResponseTable();
                } catch (Exception e) {
                    log.error("scanResponseTable exception", e);
                }
            }
        }, 1000 * 3, 1000);
}

掃描方法以下,判斷哪些future超時:this

public void scanResponseTable() {
        final List<ResponseFuture> rfList = new LinkedList<ResponseFuture>();
        Iterator<Entry<Integer, ResponseFuture>> it = this.responseTable.entrySet().iterator();
        while (it.hasNext()) {
            Entry<Integer, ResponseFuture> next = it.next();
            ResponseFuture rep = next.getValue();
            if ((rep.getBeginTimestamp() + rep.getTimeoutMillis() + 1000) <= System.currentTimeMillis()) {
                rep.release();
                it.remove();
                rfList.add(rep);
                plog.warn("remove timeout request, " + rep);
            }
        }
        for (ResponseFuture rf : rfList) {
            try {
                rf.executeInvokeCallback();
            } catch (Throwable e) {
                plog.warn("scanResponseTable, operationComplete Exception", e);
            }
        }
    }

二. 上面看了基本的數據結構和數據處理方式,下面看下發送方信息的發送邏輯。

  1. invokeSyncImpl: 同步發送,發送時,生成ResponseFuture,放入responseTable中;而後發送後等待設置的timeout(3s)時間,若是對應的ResponseFuture爲空,則報錯;不然返回RemoteCommand進行業務邏輯處理;發送失敗設置 ResponseFuture 發送失敗,而且從緩存中移除 ResponseFuture(沒有響 應過來,就用不到緩存中的 ResponseFuturel)
public RemotingCommand invokeSyncImpl(final Channel channel, final RemotingCommand request, final long timeoutMillis)
            throws InterruptedException, RemotingSendRequestException, RemotingTimeoutException {
        final int opaque = request.getOpaque();
        try {
            final ResponseFuture responseFuture = new ResponseFuture(opaque, timeoutMillis, null, null);
            this.responseTable.put(opaque, responseFuture);
            final SocketAddress addr = channel.remoteAddress();
            channel.writeAndFlush(request).addListener(new ChannelFutureListener() {
                @Override
                public void operationComplete(ChannelFuture f) throws Exception {
                    if (f.isSuccess()) {
                        responseFuture.setSendRequestOK(true);
                        return;
                    } else {
                        responseFuture.setSendRequestOK(false);
                    }
                    responseTable.remove(opaque);
                    responseFuture.setCause(f.cause());
                    responseFuture.putResponse(null);
                    plog.warn("send a request command to channel <" + addr + "> failed.");
                }
            });

            RemotingCommand responseCommand = responseFuture.waitResponse(timeoutMillis);
            if (null == responseCommand) {
                if (responseFuture.isSendRequestOK()) {
                    throw new RemotingTimeoutException(RemotingHelper.parseSocketAddressAddr(addr), timeoutMillis,
                            responseFuture.getCause());
                } else {
                    throw new RemotingSendRequestException(RemotingHelper.parseSocketAddressAddr(addr), responseFuture.getCause());
                }
            }
            return responseCommand;
        } finally {
            this.responseTable.remove(opaque);
        }
    }

輸入圖片說明

2 invokeAsyncImpl:異步發送,發送時,生成ResponseFuture,設置 opaque, callback, once,超時時間等值,並放入緩存集合,放入responseTable中;若是超過scanResponseTable的timeout (30s),則報錯;不然調用註冊的invokeCallback進行回調處理;異步通常鏈路耗時比較長, 爲了防止本地緩存的 netty 請求過多, 使用信號量控制上,限默認 2048 個,發送成功 responseFuture.setSendRequestOK(true); 發送失敗 responseFuture.setSendRequestOK(false), 信號量經過 once 釋放, 刪除緩存 Netty 接收 server 端響應,根據 opaque 從緩存獲取 responseFuture,調用回調方法即接 口 InvokeCallback 實現

public RemotingCommand invokeSyncImpl(final Channel channel, final RemotingCommand request, final long timeoutMillis)
           throws InterruptedException, RemotingSendRequestException, RemotingTimeoutException {
       final int opaque = request.getOpaque();
       try {
           final ResponseFuture responseFuture = new ResponseFuture(opaque, timeoutMillis, null, null);
           this.responseTable.put(opaque, responseFuture);
           final SocketAddress addr = channel.remoteAddress();
           channel.writeAndFlush(request).addListener(new ChannelFutureListener() {
               @Override
               public void operationComplete(ChannelFuture f) throws Exception {
                   if (f.isSuccess()) {
                       responseFuture.setSendRequestOK(true);
                       return;
                   } else {
                       responseFuture.setSendRequestOK(false);
                   }
                   responseTable.remove(opaque);
                   responseFuture.setCause(f.cause());
                   responseFuture.putResponse(null);
                   plog.warn("send a request command to channel <" + addr + "> failed.");
               }
           });

           RemotingCommand responseCommand = responseFuture.waitResponse(timeoutMillis);
           if (null == responseCommand) {
               if (responseFuture.isSendRequestOK()) {
                   throw new RemotingTimeoutException(RemotingHelper.parseSocketAddressAddr(addr), timeoutMillis,
                           responseFuture.getCause());
               } else {
                   throw new RemotingSendRequestException(RemotingHelper.parseSocketAddressAddr(addr), responseFuture.getCause());
               }
           }
           return responseCommand;
       } finally {
           this.responseTable.remove(opaque);
       }
   }

輸入圖片說明

rocketmq大概的通訊流程基本如此,具體仍是要去github上下載源碼研讀,學下源代碼對自身的提升確實蠻大的。

相關文章
相關標籤/搜索