概述 java
保持客戶端與服務器端鏈接的方案經常使用的有3種bootstrap
1.長鏈接,也就是客戶端與服務器端一直保持鏈接,適用於客戶端比較少的狀況。數組
2.定時段鏈接,好比在某一天的凌晨創建鏈接,適用於對實時性要求不高的狀況。服務器
3.設置鏈接超時,好比超過1分鐘沒有傳輸數據就斷開鏈接,等下次須要的時候再創建鏈接,這種方案比較經常使用。socket
netty的ReadTimeOut實現方案3ide
服務端
工具
大部分代碼都保持不變,有變化的代碼在第30行,設置服務端的超時時間oop
1 import io.netty.bootstrap.ServerBootstrap; 2 import io.netty.channel.ChannelFuture; 3 import io.netty.channel.ChannelInitializer; 4 import io.netty.channel.ChannelOption; 5 import io.netty.channel.EventLoopGroup; 6 import io.netty.channel.nio.NioEventLoopGroup; 7 import io.netty.channel.socket.SocketChannel; 8 import io.netty.channel.socket.nio.NioServerSocketChannel; 9 import io.netty.handler.logging.LogLevel; 10 import io.netty.handler.logging.LoggingHandler; 11 import io.netty.handler.timeout.ReadTimeoutHandler; 12 13 public class Server { 14 15 public static void main(String[] args) throws Exception{ 16 17 EventLoopGroup pGroup = new NioEventLoopGroup(); 18 EventLoopGroup cGroup = new NioEventLoopGroup(); 19 20 ServerBootstrap b = new ServerBootstrap(); 21 b.group(pGroup, cGroup) 22 .channel(NioServerSocketChannel.class) 23 .option(ChannelOption.SO_BACKLOG, 1024) 24 //設置日誌 25 .handler(new LoggingHandler(LogLevel.INFO)) 26 .childHandler(new ChannelInitializer<SocketChannel>() { 27 protected void initChannel(SocketChannel sc) throws Exception { 28 sc.pipeline().addLast(MarshallingCodeCFactory.buildMarshallingDecoder()); 29 sc.pipeline().addLast(MarshallingCodeCFactory.buildMarshallingEncoder()); 30 sc.pipeline().addLast(new ReadTimeoutHandler(5)); 31 sc.pipeline().addLast(new ServerHandler()); 32 } 33 }); 34 35 ChannelFuture cf = b.bind(8765).sync(); 36 37 cf.channel().closeFuture().sync(); 38 pGroup.shutdownGracefully(); 39 cGroup.shutdownGracefully(); 40 41 } 42 }
ServerHandler代碼也沒有什麼變化ui
1 import io.netty.channel.ChannelHandlerAdapter; 2 import io.netty.channel.ChannelHandlerContext; 3 4 public class ServerHandler extends ChannelHandlerAdapter{ 5 6 @Override 7 public void channelActive(ChannelHandlerContext ctx) throws Exception { 8 9 } 10 11 @Override 12 public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception { 13 Request request = (Request)msg; 14 System.out.println("Server : " + request.getId() + ", " + request.getName() + ", " + request.getRequestMessage()); 15 Response response = new Response(); 16 response.setId(request.getId()); 17 response.setName("response" + request.getId()); 18 response.setResponseMessage("響應內容" + request.getId()); 19 ctx.writeAndFlush(response);//.addListener(ChannelFutureListener.CLOSE); 20 } 21 22 @Override 23 public void channelReadComplete(ChannelHandlerContext ctx) throws Exception { 24 25 } 26 27 @Override 28 public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception { 29 ctx.close(); 30 } 31 32 33 34 }
客戶端 this
客戶端的代碼也設置了超時時間(實際上只要服務器端設置也就能夠了,有人說客戶端不設置會出問題,如今尚未發現什麼問題)。主要看getChannelFuture這個方法,this.cf == null是第一次鏈接的時候用到的,!this.cf.channel().isActive() 是鏈接超時後從新發起鏈接用到的。再看main方法,能夠發現for(int i = 1; i <= 3; i++ ) 這個循環中,每一個循環停頓4秒,也就是每隔4秒發送一次請求,而服務器端的超時時間設置爲5秒,那麼在這個for循環期間鏈接是不會斷開的,等for循環結束 cf.channel().closeFuture().sync(); 斷開鏈接this.cf.channel().isActive() 變爲否,在new Thread()中再次發送請求,getChannelFuture會從新創建鏈接。
1 import io.netty.bootstrap.Bootstrap; 2 import io.netty.channel.ChannelFuture; 3 import io.netty.channel.ChannelInitializer; 4 import io.netty.channel.EventLoopGroup; 5 import io.netty.channel.nio.NioEventLoopGroup; 6 import io.netty.channel.socket.SocketChannel; 7 import io.netty.channel.socket.nio.NioSocketChannel; 8 import io.netty.handler.logging.LogLevel; 9 import io.netty.handler.logging.LoggingHandler; 10 import io.netty.handler.timeout.ReadTimeoutHandler; 11 12 import java.util.concurrent.TimeUnit; 13 14 15 /** 16 * Best Do It 17 */ 18 public class Client { 19 20 private static class SingletonHolder { 21 static final Client instance = new Client(); 22 } 23 24 public static Client getInstance(){ 25 return SingletonHolder.instance; 26 } 27 28 private EventLoopGroup group; 29 private Bootstrap b; 30 private ChannelFuture cf ; 31 32 private Client(){ 33 group = new NioEventLoopGroup(); 34 b = new Bootstrap(); 35 b.group(group) 36 .channel(NioSocketChannel.class) 37 .handler(new LoggingHandler(LogLevel.INFO)) 38 .handler(new ChannelInitializer<SocketChannel>() { 39 @Override 40 protected void initChannel(SocketChannel sc) throws Exception { 41 sc.pipeline().addLast(MarshallingCodeCFactory.buildMarshallingDecoder()); 42 sc.pipeline().addLast(MarshallingCodeCFactory.buildMarshallingEncoder()); 43 //超時handler(當服務器端與客戶端在指定時間以上沒有任何進行通訊,則會關閉響應的通道,主要爲減少服務端資源佔用) 44 sc.pipeline().addLast(new ReadTimeoutHandler(5)); 45 sc.pipeline().addLast(new ClientHandler()); 46 } 47 }); 48 } 49 50 public void connect(){ 51 try { 52 this.cf = b.connect("127.0.0.1", 8765).sync(); 53 System.out.println("遠程服務器已經鏈接, 能夠進行數據交換.."); 54 } catch (Exception e) { 55 e.printStackTrace(); 56 } 57 } 58 59 public ChannelFuture getChannelFuture(){ 60 61 if(this.cf == null){ 62 this.connect(); 63 } 64 if(!this.cf.channel().isActive()){ 65 this.connect(); 66 } 67 68 return this.cf; 69 } 70 71 public static void main(String[] args) throws Exception{ 72 final Client c = Client.getInstance(); 73 //c.connect(); 74 75 ChannelFuture cf = c.getChannelFuture(); 76 for(int i = 1; i <= 3; i++ ){ 77 Request request = new Request(); 78 request.setId("" + i); 79 request.setName("pro" + i); 80 request.setRequestMessage("數據信息" + i); 81 cf.channel().writeAndFlush(request); 82 TimeUnit.SECONDS.sleep(4); 83 } 84 85 cf.channel().closeFuture().sync(); 86 87 88 new Thread(new Runnable() { 89 @Override 90 public void run() { 91 try { 92 System.out.println("進入子線程..."); 93 ChannelFuture cf = c.getChannelFuture(); 94 System.out.println(cf.channel().isActive()); 95 System.out.println(cf.channel().isOpen()); 96 97 //再次發送數據 98 Request request = new Request(); 99 request.setId("" + 4); 100 request.setName("pro" + 4); 101 request.setRequestMessage("數據信息" + 4); 102 cf.channel().writeAndFlush(request); 103 cf.channel().closeFuture().sync(); 104 System.out.println("子線程結束."); 105 } catch (InterruptedException e) { 106 e.printStackTrace(); 107 } 108 } 109 }).start(); 110 111 System.out.println("斷開鏈接,主線程結束.."); 112 113 } 114 115 116 117 }
clientHandler沒有什麼變化
1 import io.netty.channel.ChannelHandlerAdapter; 2 import io.netty.channel.ChannelHandlerContext; 3 import io.netty.util.ReferenceCountUtil; 4 5 public class ClientHandler extends ChannelHandlerAdapter{ 6 7 @Override 8 public void channelActive(ChannelHandlerContext ctx) throws Exception { 9 10 } 11 12 @Override 13 public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception { 14 try { 15 Response resp = (Response)msg; 16 System.out.println("Client : " + resp.getId() + ", " + resp.getName() + ", " + resp.getResponseMessage()); 17 } finally { 18 ReferenceCountUtil.release(msg); 19 } 20 } 21 22 @Override 23 public void channelReadComplete(ChannelHandlerContext ctx) throws Exception { 24 25 } 26 27 @Override 28 public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception { 29 ctx.close(); 30 } 31 32 }
工廠類不變
1 import io.netty.handler.codec.marshalling.DefaultMarshallerProvider; 2 import io.netty.handler.codec.marshalling.DefaultUnmarshallerProvider; 3 import io.netty.handler.codec.marshalling.MarshallerProvider; 4 import io.netty.handler.codec.marshalling.MarshallingDecoder; 5 import io.netty.handler.codec.marshalling.MarshallingEncoder; 6 import io.netty.handler.codec.marshalling.UnmarshallerProvider; 7 8 import org.jboss.marshalling.MarshallerFactory; 9 import org.jboss.marshalling.Marshalling; 10 import org.jboss.marshalling.MarshallingConfiguration; 11 12 /** 13 * Marshalling工廠 14 * @author(alienware) 15 * @since 2014-12-16 16 */ 17 public final class MarshallingCodeCFactory { 18 19 /** 20 * 建立Jboss Marshalling解碼器MarshallingDecoder 21 * @return MarshallingDecoder 22 */ 23 public static MarshallingDecoder buildMarshallingDecoder() { 24 //首先經過Marshalling工具類的精通方法獲取Marshalling實例對象 參數serial標識建立的是java序列化工廠對象。 25 final MarshallerFactory marshallerFactory = Marshalling.getProvidedMarshallerFactory("serial"); 26 //建立了MarshallingConfiguration對象,配置了版本號爲5 27 final MarshallingConfiguration configuration = new MarshallingConfiguration(); 28 configuration.setVersion(5); 29 //根據marshallerFactory和configuration建立provider 30 UnmarshallerProvider provider = new DefaultUnmarshallerProvider(marshallerFactory, configuration); 31 //構建Netty的MarshallingDecoder對象,倆個參數分別爲provider和單個消息序列化後的最大長度 32 MarshallingDecoder decoder = new MarshallingDecoder(provider, 1024); 33 return decoder; 34 } 35 36 /** 37 * 建立Jboss Marshalling編碼器MarshallingEncoder 38 * @return MarshallingEncoder 39 */ 40 public static MarshallingEncoder buildMarshallingEncoder() { 41 final MarshallerFactory marshallerFactory = Marshalling.getProvidedMarshallerFactory("serial"); 42 final MarshallingConfiguration configuration = new MarshallingConfiguration(); 43 configuration.setVersion(5); 44 MarshallerProvider provider = new DefaultMarshallerProvider(marshallerFactory, configuration); 45 //構建Netty的MarshallingEncoder對象,MarshallingEncoder用於實現序列化接口的POJO對象序列化爲二進制數組 46 MarshallingEncoder encoder = new MarshallingEncoder(provider); 47 return encoder; 48 } 49 }
自定義的Request和Response對象沒有什麼變化,這裏再也不贅述