首先,整理NIO進行服務端開發的步驟:java
(1)建立ServerSocketChannel,配置它爲非阻塞模式。bootstrap
(2)綁定監聽,配置TCP參數,backlog的大小。數組
(3)建立一個獨立的I/O線程,用於輪詢多路複用器Selector。服務器
(4)建立Selector,將以前建立的ServerSocketChannel註冊到Selector上,監聽SelectionKeyACCEPT。網絡
(5)啓動I/O線程,在循環體中執行Selector.select()方法,輪訓就緒的Channel。異步
(6)當輪詢到了處於就緒狀態的Channel時,須要對其進行判斷,若是是OP_ACCEPT狀態,說明是新的客戶端接入,則調用ServerSocketChannel.accept()方法接受新的客戶端。socket
(7)設置新接入的客戶端鏈路SocketChannel爲非阻塞模式,配置其餘的一些TCP參數。ide
(8)將SocketChannel註冊到Selector,監聽OP_READ操做位。oop
(9)若是輪詢的Channel爲OP_READ,則說明SocketChannel中有新的就緒的數據包須要讀取,則構造ByteBuffer對象,讀取數據包。spa
(10)若是輪詢的Channel爲OP_WRITE,則說明還有數據沒有發送完成,須要繼續發送。
Netty時間服務器服務端 TimeServer:
1 package netty; 2 3 import io.netty.bootstrap.ServerBootstrap; 4 import io.netty.channel.ChannelFuture; 5 import io.netty.channel.ChannelInitializer; 6 import io.netty.channel.ChannelOption; 7 import io.netty.channel.EventLoopGroup; 8 import io.netty.channel.nio.NioEventLoopGroup; 9 import io.netty.channel.socket.SocketChannel; 10 import io.netty.channel.socket.nio.NioServerSocketChannel; 11 12 13 public class TimeServer { 14 15 public void bind(int port) throws Exception{ 16 //配置服務端的NIO線程組 一個用於服務端接收客戶端的鏈接,另外一個用於進行SocketChannel的網絡讀寫 17 EventLoopGroup bossGroup = new NioEventLoopGroup(); 18 EventLoopGroup workerGroup = new NioEventLoopGroup(); 19 try{ 20 ServerBootstrap b = new ServerBootstrap(); 21 b.group(bossGroup,workerGroup) 22 .channel(NioServerSocketChannel.class) 23 .option(ChannelOption.SO_BACKLOG, 1024) 24 .childHandler(new ChildChannelHandler()); 25 //綁定端口,同步等待成功 26 ChannelFuture f = b.bind(port).sync(); 27 //等待服務器監聽端口關閉 28 f.channel().closeFuture().sync(); 29 }finally{ 30 //優雅退出,釋放線程池資源 31 bossGroup.shutdownGracefully(); 32 workerGroup.shutdownGracefully(); 33 } 34 } 35 36 private class ChildChannelHandler extends ChannelInitializer<SocketChannel>{ 37 protected void initChannel(SocketChannel arg0) throws Exception{ 38 arg0.pipeline().addLast(new TimeServerHandler()); 39 } 40 } 41 }
ServerBootstrap是Netty用於啓動NIO服務端的輔助類,目的是下降服務端的開發難度。
綁定childChannelHandler,其做用相似於Reactor模式中的handler類,主要用於處理網絡I/O事件,例如記錄日誌、對消息進行編解碼等。
使用bind綁定監聽端口,隨後,調用它的同步阻塞方法sync等待綁定操做完成,完成後Netty會返回一個ChannelFuture,主要用於異步操做的通知回調。
Netty時間服務器服務端 TimeServerHandler:
1 package netty; 2 3 import java.io.IOException; 4 import io.netty.buffer.ByteBuf; 5 import io.netty.buffer.Unpooled; 6 import io.netty.channel.ChannelHandlerAdapter; 7 import io.netty.channel.ChannelHandlerContext; 8 9 public class TimeServerHandler extends ChannelHandlerAdapter{ 10 11 public void channelRead(ChannelHandlerContext ctx,Object msg) throws IOException{ 12 //將msg轉換成Netty的ByteBuf對象 13 ByteBuf buf = (ByteBuf)msg; 14 //將緩衝區中的字節數組複製到新建的byte數組中, 15 byte[] req = new byte[buf.readableBytes()]; 16 buf.readBytes(req); 17 //獲取請求消息 18 String body = new String(req,"UTF-8"); 19 System.out.println("The time server receive order:" + body); 20 //若是是"QUERY TIME ORDER"則建立應答消息 21 String currentTime = "QUERY TIME ORDER".equalsIgnoreCase(body) ? new java.util.Date( 22 System.currentTimeMillis()).toString() : "BAD ORDER"; 23 ByteBuf resp = Unpooled.copiedBuffer(currentTime.getBytes()); 24 //異步發送應答消息給客戶端 25 ctx.write(resp); 26 } 27 28 public void channelReadComplete(ChannelHandlerContext ctx) throws Exception{ 29 ctx.flush(); 30 } 31 32 public void exceptionCaught(ChannelHandlerContext ctx,Throwable cause){ 33 ctx.close(); 34 } 35 }
相比昨天原生的NIO服務端,代碼量大大減小。
Netty時間服務器客戶端 TimeClient:
1 package netty; 2 3 import io.netty.bootstrap.Bootstrap; 4 import io.netty.channel.ChannelFuture; 5 import io.netty.channel.ChannelInitializer; 6 import io.netty.channel.ChannelOption; 7 import io.netty.channel.EventLoopGroup; 8 import io.netty.channel.nio.NioEventLoopGroup; 9 import io.netty.channel.socket.SocketChannel; 10 import io.netty.channel.socket.nio.NioSocketChannel; 11 12 public class TimeClient { 13 14 public void connect(int port,String host) throws Exception{ 15 //建立客戶端處理I/O讀寫的NioEventLoopGroup Group線程組 16 EventLoopGroup group = new NioEventLoopGroup(); 17 try{ 18 //建立客戶端輔助啓動類Bootstrap 19 Bootstrap b = new Bootstrap(); 20 b.group(group).channel(NioSocketChannel.class) 21 .option(ChannelOption.TCP_NODELAY, true) 22 .handler(new ChannelInitializer<SocketChannel>(){ 23 //將ChannelHandler設置到ChannelPipleline中,用於處理網絡I/O事件 24 @Override 25 protected void initChannel(SocketChannel ch) throws Exception { 26 ch.pipeline().addLast(new TimeClientHandler()); 27 } 28 }); 29 //發起異步鏈接操做,而後調用同步方法等待鏈接成功。 30 ChannelFuture f = b.connect(host,port).sync(); 31 32 //等待客戶端鏈路關閉 33 f.channel().closeFuture().sync(); 34 }finally{ 35 //優雅退出,釋放NIO線程組 36 group.shutdownGracefully(); 37 } 38 } 39 40 public static void main(String[] args) throws Exception{ 41 int port = 8080; 42 if(args != null && args.length > 0){ 43 try{ 44 port = Integer.valueOf(args[0]); 45 }catch(NumberFormatException e){ 46 //採用默認值 47 } 48 } 49 new TimeClient().connect(port, "127.0.0.1"); 50 } 51 52 }
Netty時間服務器客戶端 TimeClientHandler:
1 package netty; 2 3 import io.netty.buffer.ByteBuf; 4 import io.netty.buffer.Unpooled; 5 import io.netty.channel.ChannelHandlerAdapter; 6 import io.netty.channel.ChannelHandlerContext; 7 8 import java.util.logging.Logger; 9 10 public class TimeClientHandler extends ChannelHandlerAdapter{ 11 12 private static final Logger logger = Logger.getLogger(TimeClientHandler.class.getName()); 13 14 private final ByteBuf firstMessage; 15 16 public TimeClientHandler(){ 17 byte[] req = "QUERY TIME ORDER".getBytes(); 18 firstMessage = Unpooled.buffer(req.length); 19 firstMessage.writeBytes(req); 20 } 21 22 //當客戶端與服務端TCP鏈路簡歷成功後,Netty的NIO線程會調用該方法,發送查詢時間的指令給服務器 23 public void channelActive(ChannelHandlerContext ctx){ 24 //將請求消息發送給服務端 25 ctx.writeAndFlush(firstMessage); 26 } 27 28 //當服務器返回應答消息時,該方法被調用 29 public void channelRead(ChannelHandlerContext ctx,Object msg) throws Exception{ 30 ByteBuf buf = (ByteBuf) msg; 31 byte[] req = new byte[buf.readableBytes()]; 32 buf.readBytes(req); 33 String body = new String(req,"UTF-8"); 34 System.out.println("Now is :" + body); 35 } 36 37 public void exceptionCaught(ChannelHandlerContext ctx,Throwable cause){ 38 39 //釋放資源 40 logger.warning("Unexpected exception from downstream :" + cause.getMessage()); 41 ctx.close(); 42 } 43 }
運行結果:
Server:
Client: