客戶端發送消息並同步獲取結果,實際上是違背Netty的設計原則的,可是有時候不得不這麼作的話,那麼建議進行以下的設計:css
好比咱們的具體用法以下:html
NettyRequest request = new NettyRequest(); request.setRequestId(UUID.randomUUID().toString()); request.setClassName(method.getDeclaringClass().getName()); request.setMethodName(method.getName()); request.setParameterTypes(method.getParameterTypes()); request.setParameterValues(args); NettyMessage nettyMessage = new NettyMessage(); nettyMessage.setType(MessageType.SERVICE_REQ.value()); nettyMessage.setBody(request); if (serviceDiscovery != null) { serverAddress = serviceDiscovery.discover(); } String[] array = serverAddress.split(":"); String host = array[0]; int port = Integer.parseInt(array[1]); NettyClient client = new NettyClient(host, port); NettyMessage nettyResponse = client.send(nettyMessage); if (nettyResponse != null) { return JSON.toJSONString(nettyResponse.getBody()); } else { return null; }
先來看看NettyClient的寫法 和 send方法的寫法:bootstrap
public class NettyClient { /** * 日誌記錄 */ private static final Logger logger = LoggerFactory.getLogger(NettyClient.class); /** * 客戶端業務處理handler */ private ClientHandler clientHandler = new ClientHandler(); /** * 事件池 */ private EventLoopGroup group = new NioEventLoopGroup(); /** * 啓動器 */ private Bootstrap bootstrap = new Bootstrap(); /** * 客戶端通道 */ private Channel clientChannel; /** * 客戶端鏈接 * @param host * @param port * @throws InterruptedException */ public NettyClient(String host, int port) throws InterruptedException { bootstrap.group(group) .channel(NioSocketChannel.class) .option(ChannelOption.TCP_NODELAY, true) .handler(new ChannelInitializer<SocketChannel>() { @Override protected void initChannel(SocketChannel channel) throws Exception { channel.pipeline().addLast("idleStateHandler", new IdleStateHandler(5, 5, 12)); channel.pipeline().addLast("nettyMessageDecoder", new NettyMessageDecoder(1024 * 1024, 4, 4)); channel.pipeline().addLast("nettyMessageEncoder", new NettyMessageEncoder()); channel.pipeline().addLast("heartBeatHandler", new HeartBeatRequestHandler()); channel.pipeline().addLast("clientHandler", clientHandler); channel.pipeline().addLast("loginAuthHandler", new LoginAuthRequestHandler()); } }); //發起同步鏈接操做 ChannelFuture channelFuture = bootstrap.connect(host, port); //註冊鏈接事件 channelFuture.addListener((ChannelFutureListener)future -> { //若是鏈接成功 if (future.isSuccess()) { logger.info("客戶端[" + channelFuture.channel().localAddress().toString() + "]已鏈接..."); clientChannel = channelFuture.channel(); } //若是鏈接失敗,嘗試從新鏈接 else{ logger.info("客戶端[" + channelFuture.channel().localAddress().toString() + "]鏈接失敗,從新鏈接中..."); future.channel().close(); bootstrap.connect(host, port); } }); //註冊關閉事件 channelFuture.channel().closeFuture().addListener(cfl -> { close(); logger.info("客戶端[" + channelFuture.channel().localAddress().toString() + "]已斷開..."); }); } /** * 客戶端關閉 */ private void close() { //關閉客戶端套接字 if(clientChannel!=null){ clientChannel.close(); } //關閉客戶端線程組 if (group != null) { group.shutdownGracefully(); } } /** * 客戶端發送消息 * @param message * @return * @throws InterruptedException * @throws ExecutionException */ public NettyMessage send(NettyMessage message) throws InterruptedException, ExecutionException { ChannelPromise promise = clientHandler.sendMessage(message); promise.await(3, TimeUnit.SECONDS); return clientHandler.getResponse(); } }
能夠看出,咱們使用了clientHandler來進行消息發送行爲,經過promise阻塞來同步獲取返回結果,接下來看看sendMessage的寫法:promise
public class ClientHandler extends ChannelInboundHandlerAdapter { private static final Logger logger = LoggerFactory.getLogger(ClientHandler.class); private ChannelHandlerContext ctx; private ChannelPromise promise; private NettyMessage response; @Override public void channelActive(ChannelHandlerContext ctx) throws Exception { super.channelActive(ctx); this.ctx = ctx; } @Override public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception { NettyMessage message = (NettyMessage) msg; if (message != null && message.getType() == MessageType.SERVICE_RESP.value()) { response = message; promise.setSuccess(); } else { ctx.fireChannelRead(msg); } } public synchronized ChannelPromise sendMessage(Object message) { while (ctx == null) { try { TimeUnit.MILLISECONDS.sleep(1); //logger.error("等待ChannelHandlerContext實例化"); } catch (InterruptedException e) { logger.error("等待ChannelHandlerContext實例化過程當中出錯",e); } } promise = ctx.newPromise(); ctx.writeAndFlush(message); return promise; } public NettyMessage getResponse(){ return response; } }
能夠看到,在利用ChannelHanderContext進行發送消息前,咱們先建立了一個promise並返回給send方法,那麼send方法此時就會阻塞等待;當咱們收到服務端消息後,promise.setSuccess就會解除send方法的等待行爲,這樣咱們就能獲取結果了。dom
此法針對真正須要同步等待獲取結果的場景,如非必要,仍是建議利用future來改造。ide
benchmark測試代表,此種同步獲取結果的行爲,表現挺穩定的,可是ops 在 150 左右, 真是性能太差了。高性能場合禁用此法。oop