package time.server.impl; import io.netty.bootstrap.ServerBootstrap; import io.netty.channel.ChannelFuture; import io.netty.channel.ChannelInitializer; import io.netty.channel.ChannelOption; import io.netty.channel.EventLoopGroup; import io.netty.channel.nio.NioEventLoopGroup; import io.netty.channel.socket.SocketChannel; import io.netty.channel.socket.nio.NioServerSocketChannel; import io.netty.handler.codec.LineBasedFrameDecoder; import io.netty.handler.codec.string.StringDecoder; /** * TODO * * @description * @author mjorcen * @time 2015年5月25日 下午2:50:57 */ public class NTimeServerImpl { public void bind(int port) { // 建立兩個NioEventLoopGroup 實例,NioEventLoopGroup // 是一個線程組,它包含一組NIO線程,專門用於處理網絡事件的處理,實際上他們就是Reactor 線程組 // 這裏建立兩個的緣由是一個用於服務端接收用戶的連接,另外一個用於進行SocketChannel的網絡讀寫 EventLoopGroup bossGroup = new NioEventLoopGroup(); EventLoopGroup workerGroup = new NioEventLoopGroup(); try { // 建立一個 ServerBootstrap ,它是netty用於NIO服務端的輔助啓動類,目的是下降服務端的開發複雜度. ServerBootstrap bootstrap = new ServerBootstrap(); // 設定 服務端接收用戶請求的線程組和用於進行SocketChannel網絡讀寫的線程組 bootstrap.group(bossGroup, workerGroup); // 設置建立的 channel 類型 bootstrap.channel(NioServerSocketChannel.class); // 配置 NioServerSocketChannel 的 tcp 參數, BACKLOG 的大小 bootstrap.option(ChannelOption.SO_BACKLOG, 1024); // 綁定io處理類(childChannelHandler).他的做用相似於 reactor 模式中的 handler // 類,主要用於處理網絡 I/O 事件,例如對記錄日誌,對消息進行解碼等. bootstrap.childHandler(new ChannelInitializer<SocketChannel>() { protected void initChannel(SocketChannel ch) throws Exception { ch.pipeline().addLast(new LineBasedFrameDecoder(1024)); // 加入行處理器 ch.pipeline().addLast(new StringDecoder()); // 加入字符串解碼器 ch.pipeline().addLast(new TimeServerHandler()); } }); // 綁定端口,隨後調用它的同步阻塞方法 sync 等等綁定操做成功,完成以後 Netty 會返回一個 ChannelFuture // 它的功能相似於的 Future,主要用於異步操做的通知回調. ChannelFuture channelFuture = bootstrap.bind(port).sync(); // 等待服務端監聽端口關閉,調用 sync 方法進行阻塞,等待服務端鏈路關閉以後 main 函數才退出. channelFuture.channel().closeFuture().sync(); } catch (Exception e) { e.printStackTrace(); } finally { // 優雅的退出,釋放線程池資源 bossGroup.shutdownGracefully(); workerGroup.shutdownGracefully(); } } public static void main(String[] args) { NTimeServerImpl server = new NTimeServerImpl(); server.bind(9091); } }
ServerHandlerjava
package time.server.impl; import io.netty.buffer.ByteBuf; import io.netty.buffer.Unpooled; import io.netty.channel.ChannelHandlerAdapter; import io.netty.channel.ChannelHandlerContext; import java.util.Date; import time.TimeConfig; /** * TODO * * @description * @author ez * @time 2015年5月25日 下午3:06:09 */ public class TimeServerHandler extends ChannelHandlerAdapter implements TimeConfig { /* * (non-Javadoc) * * @see io.netty.channel.ChannelHandlerAdapter#channelRead(io.netty.channel. * ChannelHandlerContext, java.lang.Object) */ @Override public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception { String body = (String) msg; System.out.println("The time server receive order : " + body); String currentTime = QUERY.equalsIgnoreCase(body) ? new Date() .toString() : "BAD ORDER"; currentTime += System.getProperty("line.separator"); System.out.println("currentTime : " + currentTime); ByteBuf resp = Unpooled.copiedBuffer(currentTime.getBytes("utf-8")); ctx.writeAndFlush(resp); } @Override public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception { cause.printStackTrace(); // 當出現異常時,釋放資源. ctx.close(); } @Override public void channelReadComplete(ChannelHandlerContext ctx) throws Exception { ctx.flush(); } }
Clientreact
package time.client.impl; import io.netty.bootstrap.Bootstrap; import io.netty.channel.ChannelFuture; import io.netty.channel.ChannelInitializer; import io.netty.channel.ChannelOption; import io.netty.channel.EventLoopGroup; import io.netty.channel.nio.NioEventLoopGroup; import io.netty.channel.socket.SocketChannel; import io.netty.channel.socket.nio.NioSocketChannel; import io.netty.handler.codec.LineBasedFrameDecoder; import io.netty.handler.codec.string.StringDecoder; /** * TODO * * @description * @author ez * @time 2015年5月25日 下午3:17:29 */ public class NTimeClient { public void connect(int port, String host) throws Exception { EventLoopGroup group = new NioEventLoopGroup(); try { Bootstrap bootstrap = new Bootstrap(); bootstrap.group(group); bootstrap.channel(NioSocketChannel.class); bootstrap.option(ChannelOption.TCP_NODELAY, true); bootstrap.handler(new ChannelInitializer<SocketChannel>() { @Override protected void initChannel(SocketChannel ch) throws Exception { ch.pipeline().addLast(new LineBasedFrameDecoder(1024)); ch.pipeline().addLast(new StringDecoder()); ch.pipeline().addLast(new TimeClientHandler()); } }); // 發起異步連接操做 ChannelFuture future = bootstrap.connect(host, port).sync(); // 等待客戶端鏈路關閉 future.channel().closeFuture().sync(); } catch (Exception e) { e.printStackTrace(); } finally { group.shutdownGracefully(); } } public static void main(String[] args) throws Exception { NTimeClient client = new NTimeClient(); client.connect(9091, "localhost"); } }
ClientHandlerbootstrap
package time.server.impl; import io.netty.buffer.ByteBuf; import io.netty.buffer.Unpooled; import io.netty.channel.ChannelHandlerAdapter; import io.netty.channel.ChannelHandlerContext; import java.util.Date; import time.TimeConfig; /** * TODO * * @description * @author ez * @time 2015年5月25日 下午3:06:09 */ public class TimeServerHandler extends ChannelHandlerAdapter implements TimeConfig { /* * (non-Javadoc) * * @see io.netty.channel.ChannelHandlerAdapter#channelRead(io.netty.channel. * ChannelHandlerContext, java.lang.Object) */ @Override public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception { String body = (String) msg; System.out.println("The time server receive order : " + body); String currentTime = QUERY.equalsIgnoreCase(body) ? new Date() .toString() : "BAD ORDER"; currentTime += System.getProperty("line.separator"); System.out.println("currentTime : " + currentTime); ByteBuf resp = Unpooled.copiedBuffer(currentTime.getBytes("utf-8")); ctx.writeAndFlush(resp); } @Override public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception { cause.printStackTrace(); // 當出現異常時,釋放資源. ctx.close(); } @Override public void channelReadComplete(ChannelHandlerContext ctx) throws Exception { ctx.flush(); } }
DelimiterBasedFrameDecoder 跟 LineBasedFrameDecoder 很類似 , 只是增長以自定義的分割符.網絡
ByteBuf buf = Unpooled.copiedBuffer("$".getBytes("utf-8")); ch.pipeline().addLast( new DelimiterBasedFrameDecoder(1024, buf));
ch.pipeline().addLast(new FixedLengthFrameDecoder(1024));