ChannelHandler是一個接口族的父接口,它的實現負責接受並響應事件通知,在Netty應用程序中,全部的數據處理邏輯都包含在這些核心抽象的實現中。
Echo服務器會響應傳入的消息,所以須要實現ChannelInboundHandler接口,用來定義響應入站事件的方法。因爲Echo服務器的應用程序只須要用到少許的方法,因此只須要繼承ChannelInboundHandlerAdapter類,它提供了ChannelInboundHandler的默認實現。
在ChannelInboundHandler中,咱們感興趣的方法有:java
package cn.sh.demo.echo; import io.netty.buffer.ByteBuf; import io.netty.buffer.Unpooled; import io.netty.channel.ChannelFutureListener; import io.netty.channel.ChannelHandler; import io.netty.channel.ChannelHandlerContext; import io.netty.channel.ChannelInboundHandlerAdapter; import io.netty.util.CharsetUtil; /** * @author sh * @ChannelHandler.Sharable 標示一個ChannelHandler能夠被多個Channel安全地共享 */ @ChannelHandler.Sharable public class EchoServerHandler extends ChannelInboundHandlerAdapter { @Override public void channelRead(ChannelHandlerContext ctx, Object msg) { ByteBuf in = (ByteBuf) msg; //將接受到的消息輸出到客戶端 System.out.println("Server received:" + in.toString(CharsetUtil.UTF_8)); //將接收到的消息寫給發送者,而不沖刷出站消息 ctx.write(in); } @Override public void channelReadComplete(ChannelHandlerContext ctx) { //將消息沖刷到客戶端,而且關閉該Channel ctx.writeAndFlush(Unpooled.EMPTY_BUFFER).addListener(ChannelFutureListener.CLOSE); } @Override public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) { //打印異常堆棧跟蹤 cause.printStackTrace(); //關閉該Channel ctx.close(); } }
備註git
主要涉及的內容github
Echo服務引導示例代碼bootstrap
package cn.sh.demo.echo; import io.netty.bootstrap.ServerBootstrap; import io.netty.channel.ChannelFuture; import io.netty.channel.ChannelInitializer; import io.netty.channel.EventLoopGroup; import io.netty.channel.nio.NioEventLoopGroup; import io.netty.channel.socket.SocketChannel; import io.netty.channel.socket.nio.NioServerSocketChannel; import io.netty.channel.socket.nio.NioSocketChannel; import java.net.InetSocketAddress; public class EchoServer { private final int port; public EchoServer(int port) { this.port = port; } public void startServer() throws InterruptedException { EchoServerHandler serverHandler = new EchoServerHandler(); //建立EventLoopGroup EventLoopGroup group = new NioEventLoopGroup(); //建立ServerBootstrap ServerBootstrap bootstrap = new ServerBootstrap(); bootstrap.group(group) //指定所使用的NIO傳輸Channel .channel(NioServerSocketChannel.class) //使用指定的端口套接字 .localAddress(new InetSocketAddress(port)) //添加一個EchoServerHandler到子Channel的ChannelPipeline .childHandler(new ChannelInitializer<SocketChannel>() { @Override protected void initChannel(SocketChannel channel) throws Exception { //此處因爲EchoServerHandler被註解標註爲@Shareble,因此咱們老是使用相同的實例 channel.pipeline().addLast(serverHandler); } }); try { //異步的綁定服務器,調用sync()方法阻塞等待直到綁定完成 ChannelFuture channelFuture = bootstrap.bind().sync(); //獲取Channel的CloseFuture,而且阻塞當前線程直到它完成 channelFuture.channel().closeFuture().sync(); } catch (InterruptedException e) { e.printStackTrace(); } finally { //關閉EventLoopGroup,釋放全部的資源 group.shutdownGracefully().sync(); } } public static void main(String[] args) throws InterruptedException { if (args.length != 1) { System.err.println("參數類型或者個數不正確"); return; } //設置端口值 int port = Integer.parseInt(args[0]); //啓動Echo服務器 new EchoServer(port).startServer(); } }
備註數組
客戶端主要包括的操做:安全
編寫客戶端主要包括業務邏輯和引導服務器
在該示例中,咱們使用SimpleChannelInboundHandler類來處理全部的事件,主要的方法有:異步
示例代碼以下:socket
package cn.sh.demo.echo; import io.netty.buffer.ByteBuf; import io.netty.buffer.Unpooled; import io.netty.channel.ChannelHandler; import io.netty.channel.ChannelHandlerContext; import io.netty.channel.SimpleChannelInboundHandler; import io.netty.util.CharsetUtil; /** * @author sh * @ChannelHandler.Sharable 標記該類的示例能夠被多個Channel共享 */ @ChannelHandler.Sharable public class EchoClientHandler extends SimpleChannelInboundHandler<ByteBuf> { @Override public void channelActive(ChannelHandlerContext ctx) { //當一個鏈接被服務器接受並創建後,發送一條消息 ctx.writeAndFlush(Unpooled.copiedBuffer("Hello, Netty", CharsetUtil.UTF_8)); } @Override protected void channelRead0(ChannelHandlerContext channelHandlerContext, ByteBuf byteBuf) { //記錄客戶端接收到服務器的消息 System.out.println("Client received:" + byteBuf.toString(CharsetUtil.UTF_8)); } /** * 在發生異常時,記錄錯誤並關閉Channel * @param ctx * @param cause */ @Override public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) { cause.printStackTrace(); ctx.close(); } }
備註
每次在接受數據時,都會調用channelRead0()方法。須要注意的是,由服務器發送的消息可能會被分塊接受。也就是說,若是服務器發送了5字節,那麼不能保證這5字節會被一次性接受。即便是對於這麼少許的數據,channelRead0()方法也可能會被調用兩次,第一次使用一個持有3字節的ByteBuf(Netty的字節容器),第二次使用一個持有2字節的ByteBuf。做爲一個面向流的協議,TCP保證了字節數組會按照服務器發送它們的順序被接受。ide
主要和業務邏輯如何處理消息以及Netty如何管理資源有關
客戶端中,當channelRead0()方法完成時,已經接受了消息而且處理完畢,當該方法返回時,SimpleChannelInboundHandler負責釋放指向保存該消息的ByteBuf的內存引用。
可是在服務器端,你須要將消息返回給客戶端,write()操做是異步的,直到channelRead()方法返回後有可能仍然沒有完成,ChannelInboundHandlerAdapter在這個時間點上不會釋放消息。
服務端的消息是在channelComplete()方法中,經過writeAndFlush()方法調用時被釋放。
客戶端使用主機和端口參數來鏈接遠程地址,也就是Echo服務器的地址,而不是綁定到一個一直被監聽的端口。
示例代碼以下:
package cn.sh.demo.echo; import io.netty.bootstrap.Bootstrap; import io.netty.channel.ChannelFuture; import io.netty.channel.ChannelInitializer; import io.netty.channel.EventLoopGroup; import io.netty.channel.nio.NioEventLoopGroup; import io.netty.channel.socket.SocketChannel; import io.netty.channel.socket.nio.NioSocketChannel; import java.net.InetSocketAddress; public class EchoClient { private final String host; private final int port; public EchoClient(String host, int port) { this.host = host; this.port = port; } public void start() throws InterruptedException { EventLoopGroup group = new NioEventLoopGroup(); //建立客戶端引導器 Bootstrap bootstrap = new Bootstrap(); //指定使用NioEventLoopGroup來處理客戶端事件 bootstrap.group(group) //指定使用NIO傳輸的Channel類型 .channel(NioSocketChannel.class) //設置服務器的InetSocketAddress .remoteAddress(new InetSocketAddress(host, port)) //在建立Channel時,向ChannelPipeline中添加一個EchoHandler實例 .handler(new ChannelInitializer<SocketChannel>() { @Override protected void initChannel(SocketChannel socketChannel) throws Exception { socketChannel.pipeline().addLast(new EchoClientHandler()); } }); try { //鏈接到遠程節點,阻塞等待直到鏈接完成 ChannelFuture future = bootstrap.connect().sync(); //阻塞直到Channel關閉 future.channel().closeFuture().sync(); } catch (InterruptedException e) { e.printStackTrace(); } finally { //關閉線程池而且釋放全部的資源 group.shutdownGracefully().sync(); } } public static void main(String[] args) throws InterruptedException { if (args.length != 2) { System.err.println("參數個數不正確"); return; } int port = Integer.parseInt(args[1]); new EchoClient(args[0], port).start(); } }
備註
服務器和客戶端均使用了NIO傳輸,可是,客戶端和服務端能夠使用不一樣的傳輸,例如,在服務器使用NIO傳輸,客戶端能夠使用OIO傳輸
服務端的輸出以下:
客戶端的輸出以下:
該文章的示例代碼位於cn.sh.demo.echo包下。