架構師養成記--22.客戶端與服務器端保持鏈接的解決方案,netty的ReadTimeoutHandler

概述 java

保持客戶端與服務器端鏈接的方案經常使用的有3種bootstrap

1.長鏈接,也就是客戶端與服務器端一直保持鏈接,適用於客戶端比較少的狀況。數組

2.定時段鏈接,好比在某一天的凌晨創建鏈接,適用於對實時性要求不高的狀況。服務器

3.設置鏈接超時,好比超過1分鐘沒有傳輸數據就斷開鏈接,等下次須要的時候再創建鏈接,這種方案比較經常使用。socket

netty的ReadTimeOut實現方案3ide

服務端 
工具

 

大部分代碼都保持不變,有變化的代碼在第30行,設置服務端的超時時間oop

 1 import io.netty.bootstrap.ServerBootstrap;
 2 import io.netty.channel.ChannelFuture;
 3 import io.netty.channel.ChannelInitializer;
 4 import io.netty.channel.ChannelOption;
 5 import io.netty.channel.EventLoopGroup;
 6 import io.netty.channel.nio.NioEventLoopGroup;
 7 import io.netty.channel.socket.SocketChannel;
 8 import io.netty.channel.socket.nio.NioServerSocketChannel;
 9 import io.netty.handler.logging.LogLevel;
10 import io.netty.handler.logging.LoggingHandler;
11 import io.netty.handler.timeout.ReadTimeoutHandler;
12 
13 public class Server {
14 
15     public static void main(String[] args) throws Exception{
16         
17         EventLoopGroup pGroup = new NioEventLoopGroup();
18         EventLoopGroup cGroup = new NioEventLoopGroup();
19         
20         ServerBootstrap b = new ServerBootstrap();
21         b.group(pGroup, cGroup)
22          .channel(NioServerSocketChannel.class)
23          .option(ChannelOption.SO_BACKLOG, 1024)
24          //設置日誌
25          .handler(new LoggingHandler(LogLevel.INFO))
26          .childHandler(new ChannelInitializer<SocketChannel>() {
27             protected void initChannel(SocketChannel sc) throws Exception {
28                 sc.pipeline().addLast(MarshallingCodeCFactory.buildMarshallingDecoder());
29                 sc.pipeline().addLast(MarshallingCodeCFactory.buildMarshallingEncoder());
30                 sc.pipeline().addLast(new ReadTimeoutHandler(5)); 
31                 sc.pipeline().addLast(new ServerHandler());
32             }
33         });
34         
35         ChannelFuture cf = b.bind(8765).sync();
36         
37         cf.channel().closeFuture().sync();
38         pGroup.shutdownGracefully();
39         cGroup.shutdownGracefully();
40         
41     }
42 }

 

ServerHandler代碼也沒有什麼變化ui

 1 import io.netty.channel.ChannelHandlerAdapter;
 2 import io.netty.channel.ChannelHandlerContext;
 3 
 4 public class ServerHandler extends ChannelHandlerAdapter{
 5 
 6     @Override
 7     public void channelActive(ChannelHandlerContext ctx) throws Exception {
 8 
 9     }
10 
11     @Override
12     public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
13         Request request = (Request)msg;
14         System.out.println("Server : " + request.getId() + ", " + request.getName() + ", " + request.getRequestMessage());
15         Response response = new Response();
16         response.setId(request.getId());
17         response.setName("response" + request.getId());
18         response.setResponseMessage("響應內容" + request.getId());
19         ctx.writeAndFlush(response);//.addListener(ChannelFutureListener.CLOSE);
20     }
21 
22     @Override
23     public void channelReadComplete(ChannelHandlerContext ctx) throws Exception {
24         
25     }
26 
27     @Override
28     public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
29         ctx.close();
30     }
31 
32     
33     
34 }

 

 

客戶端 this

客戶端的代碼也設置了超時時間(實際上只要服務器端設置也就能夠了,有人說客戶端不設置會出問題,如今尚未發現什麼問題)。主要看getChannelFuture這個方法,this.cf == null是第一次鏈接的時候用到的,!this.cf.channel().isActive() 是鏈接超時後從新發起鏈接用到的。再看main方法,能夠發現for(int i = 1; i <= 3; i++ ) 這個循環中,每一個循環停頓4秒,也就是每隔4秒發送一次請求,而服務器端的超時時間設置爲5秒,那麼在這個for循環期間鏈接是不會斷開的,等for循環結束 cf.channel().closeFuture().sync(); 斷開鏈接this.cf.channel().isActive()  變爲否,在new Thread()中再次發送請求,getChannelFuture會從新創建鏈接。

  1 import io.netty.bootstrap.Bootstrap;
  2 import io.netty.channel.ChannelFuture;
  3 import io.netty.channel.ChannelInitializer;
  4 import io.netty.channel.EventLoopGroup;
  5 import io.netty.channel.nio.NioEventLoopGroup;
  6 import io.netty.channel.socket.SocketChannel;
  7 import io.netty.channel.socket.nio.NioSocketChannel;
  8 import io.netty.handler.logging.LogLevel;
  9 import io.netty.handler.logging.LoggingHandler;
 10 import io.netty.handler.timeout.ReadTimeoutHandler;
 11 
 12 import java.util.concurrent.TimeUnit;
 13 
 14 
 15 /**
 16  * Best Do It
 17  */
 18 public class Client {
 19     
 20     private static class SingletonHolder {
 21         static final Client instance = new Client();
 22     }
 23     
 24     public static Client getInstance(){
 25         return SingletonHolder.instance;
 26     }
 27     
 28     private EventLoopGroup group;
 29     private Bootstrap b;
 30     private ChannelFuture cf ;
 31     
 32     private Client(){
 33             group = new NioEventLoopGroup();
 34             b = new Bootstrap();
 35             b.group(group)
 36              .channel(NioSocketChannel.class)
 37              .handler(new LoggingHandler(LogLevel.INFO))
 38              .handler(new ChannelInitializer<SocketChannel>() {
 39                     @Override
 40                     protected void initChannel(SocketChannel sc) throws Exception {
 41                         sc.pipeline().addLast(MarshallingCodeCFactory.buildMarshallingDecoder());
 42                         sc.pipeline().addLast(MarshallingCodeCFactory.buildMarshallingEncoder());
 43                         //超時handler(當服務器端與客戶端在指定時間以上沒有任何進行通訊,則會關閉響應的通道,主要爲減少服務端資源佔用)
 44                         sc.pipeline().addLast(new ReadTimeoutHandler(5)); 
 45                         sc.pipeline().addLast(new ClientHandler());
 46                     }
 47             });
 48     }
 49     
 50     public void connect(){
 51         try {
 52             this.cf = b.connect("127.0.0.1", 8765).sync();
 53             System.out.println("遠程服務器已經鏈接, 能夠進行數據交換..");                
 54         } catch (Exception e) {
 55             e.printStackTrace();
 56         }
 57     }
 58     
 59     public ChannelFuture getChannelFuture(){
 60         
 61         if(this.cf == null){
 62             this.connect();
 63         }
 64         if(!this.cf.channel().isActive()){
 65             this.connect();
 66         }
 67         
 68         return this.cf;
 69     }
 70     
 71     public static void main(String[] args) throws Exception{
 72         final Client c = Client.getInstance();
 73         //c.connect();
 74         
 75         ChannelFuture cf = c.getChannelFuture();
 76         for(int i = 1; i <= 3; i++ ){
 77             Request request = new Request();
 78             request.setId("" + i);
 79             request.setName("pro" + i);
 80             request.setRequestMessage("數據信息" + i);
 81             cf.channel().writeAndFlush(request);
 82             TimeUnit.SECONDS.sleep(4);
 83         }
 84 
 85         cf.channel().closeFuture().sync();
 86         
 87         
 88         new Thread(new Runnable() {
 89             @Override
 90             public void run() {
 91                 try {
 92                     System.out.println("進入子線程...");
 93                     ChannelFuture cf = c.getChannelFuture();
 94                     System.out.println(cf.channel().isActive());
 95                     System.out.println(cf.channel().isOpen());
 96                     
 97                     //再次發送數據
 98                     Request request = new Request();
 99                     request.setId("" + 4);
100                     request.setName("pro" + 4);
101                     request.setRequestMessage("數據信息" + 4);
102                     cf.channel().writeAndFlush(request);                    
103                     cf.channel().closeFuture().sync();
104                     System.out.println("子線程結束.");
105                 } catch (InterruptedException e) {
106                     e.printStackTrace();
107                 }
108             }
109         }).start();
110         
111         System.out.println("斷開鏈接,主線程結束..");
112         
113     }
114     
115     
116     
117 }

 

 

clientHandler沒有什麼變化

 1 import io.netty.channel.ChannelHandlerAdapter;
 2 import io.netty.channel.ChannelHandlerContext;
 3 import io.netty.util.ReferenceCountUtil;
 4 
 5 public class ClientHandler extends ChannelHandlerAdapter{
 6     
 7     @Override
 8     public void channelActive(ChannelHandlerContext ctx) throws Exception {
 9 
10     }
11 
12     @Override
13     public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
14         try {
15             Response resp = (Response)msg;
16             System.out.println("Client : " + resp.getId() + ", " + resp.getName() + ", " + resp.getResponseMessage());            
17         } finally {
18             ReferenceCountUtil.release(msg);
19         }
20     }
21 
22     @Override
23     public void channelReadComplete(ChannelHandlerContext ctx) throws Exception {
24         
25     }
26 
27     @Override
28     public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
29         ctx.close();
30     }
31     
32 }

 

 

 

工廠類不變

 1 import io.netty.handler.codec.marshalling.DefaultMarshallerProvider;
 2 import io.netty.handler.codec.marshalling.DefaultUnmarshallerProvider;
 3 import io.netty.handler.codec.marshalling.MarshallerProvider;
 4 import io.netty.handler.codec.marshalling.MarshallingDecoder;
 5 import io.netty.handler.codec.marshalling.MarshallingEncoder;
 6 import io.netty.handler.codec.marshalling.UnmarshallerProvider;
 7 
 8 import org.jboss.marshalling.MarshallerFactory;
 9 import org.jboss.marshalling.Marshalling;
10 import org.jboss.marshalling.MarshallingConfiguration;
11 
12 /**
13  * Marshalling工廠
14  * @author(alienware)
15  * @since 2014-12-16
16  */
17 public final class MarshallingCodeCFactory {
18 
19     /**
20      * 建立Jboss Marshalling解碼器MarshallingDecoder
21      * @return MarshallingDecoder
22      */
23     public static MarshallingDecoder buildMarshallingDecoder() {
24         //首先經過Marshalling工具類的精通方法獲取Marshalling實例對象 參數serial標識建立的是java序列化工廠對象。
25         final MarshallerFactory marshallerFactory = Marshalling.getProvidedMarshallerFactory("serial");
26         //建立了MarshallingConfiguration對象,配置了版本號爲5 
27         final MarshallingConfiguration configuration = new MarshallingConfiguration();
28         configuration.setVersion(5);
29         //根據marshallerFactory和configuration建立provider
30         UnmarshallerProvider provider = new DefaultUnmarshallerProvider(marshallerFactory, configuration);
31         //構建Netty的MarshallingDecoder對象,倆個參數分別爲provider和單個消息序列化後的最大長度
32         MarshallingDecoder decoder = new MarshallingDecoder(provider, 1024);
33         return decoder;
34     }
35 
36     /**
37      * 建立Jboss Marshalling編碼器MarshallingEncoder
38      * @return MarshallingEncoder
39      */
40     public static MarshallingEncoder buildMarshallingEncoder() {
41         final MarshallerFactory marshallerFactory = Marshalling.getProvidedMarshallerFactory("serial");
42         final MarshallingConfiguration configuration = new MarshallingConfiguration();
43         configuration.setVersion(5);
44         MarshallerProvider provider = new DefaultMarshallerProvider(marshallerFactory, configuration);
45         //構建Netty的MarshallingEncoder對象,MarshallingEncoder用於實現序列化接口的POJO對象序列化爲二進制數組
46         MarshallingEncoder encoder = new MarshallingEncoder(provider);
47         return encoder;
48     }
49 }

 

 

自定義的Request和Response對象沒有什麼變化,這裏再也不贅述

相關文章
相關標籤/搜索