netty實現客戶端服務端心跳重連

                       

前言:java

          公司的加密機調度系統一直使用的是http請求調度的方式去調度,可是會出現網絡故障致使某個客戶端或者服務端斷線的狀況,致使不少請求信息以及回執信息丟失的狀況,接着咱們拋棄了http的方式,改成Tcp的方式去創建客戶端和服務器之間的鏈接,而且要去實現斷線重連的功能,通過討論後決定使用java中成熟的nio框架 – netty去解決這一系列的問題。spring

 

1.       netty簡單介紹:bootstrap

在百度中對netty的解釋是:數組

Netty是由JBOSS提供的一個java開源框架。Netty提供異步的、事件驅動的網絡應用程序框架和工具,用以快速開發高性能、高可靠性的網絡服務器和客戶端程序。

Netty框架並不僅是封裝了多路複用的IO模型,也包括提供了傳統的阻塞式/非阻塞式 同步IO的模型封,Netty 是一個利用 Java 的高級網絡的能力,隱藏其背後的複雜性而提供一個易於使用的 API 的客戶端/服務器框架。其併發高、傳輸快、封裝好的特性受到了許多大公司的青睞,在這裏咱們就不過多的分析netty的原理和特性了,以後我會寫一篇文章專門寫一下從io到nio,再到netty的整個過程。重點講一下netty的魅力所在,今天咱們已代碼實現爲主,講解一下在springboot架構中,用netty實現服務端和客戶端之間的通訊以及斷線重連等機制。springboot

 

2.       服務端代碼:服務器

首先,引入netty的pom依賴網絡

 

<dependency>
            <groupId>io.netty</groupId>
            <artifactId>netty-all</artifactId>
            <version>5.0.0.Alpha2</version>
</dependency>

 

而後咱們在配置文件中寫入服務端的ip和端口號,用於鏈接多線程

在springboot的application啓動類中寫入服務端的啓動start方法,用於在啓動項目時自動啓動服務端架構

 1 @SpringBootApplication  2 public class Application implements CommandLineRunner {  3 
 4     @Value("${netty.server.port}")  5     private  int port;  6 
 7     @Value("${netty.server.host}")  8     private String host;  9 
10  @Autowired 11  NettyServer server; 12 
13     public static void main(String[] args) { 14         SpringApplication.run(Application.class, args); 15  } 16 
17 
18  @Override 19     public void run(String... strings) throws Exception { 20         this.startServer(); 21 
22  } 23 
24     //啓動service
25     public void startServer(){
       //這個類實現一個IP套接字地址(IP地址+端口號)  
26 InetSocketAddress address = new InetSocketAddress(host, port); 27 ChannelFuture future = server.start(address); 28 29 Runtime.getRuntime().addShutdownHook(new Thread(){ 30 @Override 31 public void run() { 32 server.destroy(); 33 } 34 }); 35 36 future.channel().closeFuture().syncUninterruptibly(); 37 } 38 } 39 40 41 }

ChannelFuture: 併發

  Future最先出現於JDK的java.util.concurrent.Future,它用於表示異步操做的結果.因爲Netty的Future都是與異步I/O操做相關的,所以命名爲ChannelFuture,表明它與Channel操做相關.因爲Netty中的全部I / O操做都是異步的,所以Netty爲了解決調用者如何獲取異步操做結果的問題而專門設計了ChannelFuture接口. 
  所以,Channel與ChannelFuture能夠說如影隨行的.

而後咱們要去重點看server.start()

public class NettyServer { private static final Logger logger = LoggerFactory.getLogger(ConnectionWatchdog.class); private final ChannelGroup channelGroup = new DefaultChannelGroup(ImmediateEventExecutor.INSTANCE); private final EventLoopGroup bossGroup = new NioEventLoopGroup(); private final EventLoopGroup workGroup = new NioEventLoopGroup(); private Channel channel; /** * 開啓及服務線程 */
    public ChannelFuture start(InetSocketAddress address) { //服務端引導類
        ServerBootstrap bootstrap = new ServerBootstrap(); bootstrap.group(bossGroup, workGroup)//經過ServerBootstrap的group方法,設置(1)中初始化的主從"線程池"
                .channel(NioServerSocketChannel.class)//指定通道channel的類型,因爲是服務端,故而是NioServerSocketChannel
                .childHandler(new NettyServerInitializer())//設置ServerSocketChannel的處理器
                .option(ChannelOption.SO_BACKLOG, 100)// 設置tcp協議的請求等待隊列
                .childOption(ChannelOption.SO_KEEPALIVE, true);//配置子通道也就是SocketChannel的選項
        ChannelFuture future = bootstrap.bind(address).syncUninterruptibly(); logger.info("準備接收——————"); channel = future.channel(); return future; } public void destroy() { if(channel != null) { channel.close(); } channelGroup.close(); workGroup.shutdownGracefully(); bossGroup.shutdownGracefully(); } }

 在這裏的設置中,.childHandler(new NettyServerInitializer()) 用於設置了服務器管道 NioServerSocketChannel 的處理器handler,

這個handler是咱們自定義封裝的一些對channel的public class NettyServerInitializer extends ChannelInitializer<Channel>{

 
  
@Component
public class TcpMsgHandler extends ChannelInboundHandlerAdapter {
@Override
    protected void initChannel(Channel ch) throws Exception {

        ChannelPipeline pipeline = ch.pipeline();
        //處理日誌
        //pipeline.addLast(new LoggingHandler(LogLevel.INFO));

        //處理心跳
        pipeline.addLast(new IdleStateHandler(5, 0, 0, TimeUnit.SECONDS));
     //消息編碼 pipeline.addLast(new MessageEncoder());
   
     //粘包長度控制 pipeline.addLast(new LengthFieldBasedFrameDecoder(Integer.MAX_VALUE,0,4));
     //消息解碼 pipeline.addLast(new MessageDecoder());
     //自定義hander pipeline.addLast(new TcpMsgHandler()); } }
 
 
ChannelPipeline :
Netty 的 Channel 過濾器實現原理與 Servlet Filter 機制一致,它將 Channel 的數據管道抽象爲 ChannelPipeline,消息在 ChannelPipeline 中流動和傳遞。ChannelPipeline 持有 I/O 事件攔截器 ChannelHandler 的鏈表,由 ChannelHandler 來對 I/O 事件進行具體的攔截和處理,能夠方便地經過新增和刪除 ChannelHandler 來實現不一樣業務邏輯的定製,可以實現對修改封閉和對擴展到支持。


咱們看到咱們添加了idleStateHandler用來處理心跳,那麼心跳到底是什麼呢,咱們先來介紹一下心跳
  

心跳機制

  • 心跳是在TCP長鏈接中,客戶端和服務端定時向對方發送數據包通知對方本身還在線,保證鏈接的有效性的一種機制
  • 在服務器和客戶端之間必定時間內沒有數據交互時, 即處於 idle 狀態時, 客戶端或服務器會發送一個特殊的數據包給對方, 當接收方收到這個數據報文後, 也當即發送一個特殊的數據報文, 迴應發送方, 此即一個 PING-PONG 交互. 天然地, 當某一端收到心跳消息後, 就知道了對方仍然在線, 這就確保 TCP 鏈接的有效性

在咱們的服務端中,不會主動發心跳給客戶端,只會對對應的心跳消息,進行迴應,告訴那些給我發心跳的客戶端說:我還活着!

  • 服務端添加IdleStateHandler心跳檢測處理器,並添加自定義處理Handler類實現userEventTriggered()方法做爲超時事件的邏輯處理;

  • 設定IdleStateHandler心跳檢測每五秒進行一次讀檢測,若是五秒內ChannelRead()方法未被調用則觸發一次userEventTrigger()方法

 

 

TcpMsgHandler.java
 
 
@Component
public class TcpMsgHandler extends ChannelInboundHandlerAdapter {


    private final static Logger logger = LoggerFactory.getLogger("");

    @Override
    public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception {
        if (evt instanceof IdleStateEvent) {		}
            IdleState state = ((IdleStateEvent) evt).state();
            if (state == IdleState.READER_IDLE) {
                ctx.close();
            }
        } else {
            super.userEventTriggered(ctx, evt);
        }
    }

    @Override
    public void channelRead(ChannelHandlerContext ctx, Object obj) throws Exception {
        TcpMsg msg = (TcpMsg) obj;
        try {
              //處理心跳
              ...
              ctx.writeAndFlush(msg);
            }
        }catch(Exception ex){
            logger.info(ex.getMessage());
        }
    }
}
在這裏,咱們的channelRead比較簡單,只是將客戶端發來的心跳直接發回去了,實現了響應客戶端心跳請求的目的,除了心跳,咱們還能夠去定義不一樣的消息類別,好比說是加密請求,仍是處理數據的請求,入庫的請求等等,
咱們能夠本身從channel中獲取到客戶端發過來的信息作處理,記得要即便響應,好比,心跳中,咱們將msg又返回給了channel:
ctx.writeAndFlush(msg);

在handler中,decoder用於解碼的做用,將客戶端發來的ByteBuf流的形式,轉爲咱們須要的格式,能夠轉爲咱們要的對象,或者是一個string字符串

MessageDecoder.java
public class MessageDecoder extends ByteToMessageDecoder {
    private Logger logger = LoggerFactory.getLogger("");

    @Override
    protected void decode(ChannelHandlerContext ctx, ByteBuf in, List<Object> out) throws Exception {
        int len = in.readableBytes();
            byte[] bytes = new byte[len];
       //將ByteBuf轉爲byte數組 in.readBytes(bytes); try { TcpMsg msg = TcpMsg.ByteToObj(bytes); out.add(msg); } catch (Exception ex) { logger.error("MessageDecoder",ex); } } }
encoder負責在咱們發送數據的時候,將咱們的對象、或者是字符串轉爲byte數組,而後輸出
public class MessageEncoder extends MessageToByteEncoder<TcpMsg>{
	private Logger logger = LoggerFactory.getLogger("");
	@Override
	protected void encode(ChannelHandlerContext ctx, TcpMsg msg, ByteBuf out) throws Exception {
		try{
			if (msg.getType() != 0){
				//logger.info("send: " + msg.getType() + ":" + msg.getGuid() + ":" + msg.getBody());
			}
			byte[] src = msg.ToBytes();
			out.writeBytes(src);

		}catch (Exception e){
			logger.error("MessageEncoder",e);
		}
	}
}

 

3.       客戶端代碼:

在application配置文件中加入服務端的主機名和端口號

 

netty.server.host = 127.0.0.1
netty.server.port = 9090

啓動類Application

@SpringBootApplication
public class Application{
  @Autowired
	private NettyClient client;

	@Value("${netty.server.port}")
	private int port;

	@Value("${netty.server.host}")
	private String host;

	public static void main(String[] args) throws Exception {
		SpringApplication.run(NettyClientApplication.class, args);
	}

	@Bean
	public NettyClient nettyClient() {
		return new NettyClient();
	}

	@Override
	public void run(String... arg0) throws Exception {
		client.start(host, port);
	}

}

NettyClient.java: 客戶端啓動類

 

@Component
public class NettyClient {

    //日誌輸出
    private static final Logger logger = LoggerFactory.getLogger(NettyClient.class);
    //主要鏈接地址
    private static String nettyHost = "";
    //備用鏈接地址
    private static String nettyHostRe = "";
    private static Integer nettyPort = 0;

    public boolean start(String host1,String host2,int port){
        //主要鏈接地址
        nettyHost = host1;
        //備用鏈接地址
        nettyHostRe = host2;
        nettyPort = port;
        //EventLoopGroup能夠理解爲是一個線程池,這個線程池用來處理鏈接、接受數據、發送數據
        EventLoopGroup nioEventLoopGroup = new NioEventLoopGroup();
        //NioEventLoop
        //客戶端引導類
        Bootstrap bootstrap = new Bootstrap();
        //多線程處理
        bootstrap.group(nioEventLoopGroup);
        //指定通道類型爲NioServerSocketChannel,一種異步模式
        bootstrap.channel(NioSocketChannel.class);
        //指定請求地址
        bootstrap.remoteAddress(new InetSocketAddress(nettyHost,port));
        bootstrap.option(ChannelOption.TCP_NODELAY,true);
        final ConnectionWatchdog watchDog = new ConnectionWatchdog(bootstrap, new HashedWheelTimer(), nettyHost,nettyHostRe, port) {
            @Override
            public ChannelHandler[] handlers() {
                return new ChannelHandler[]{
                    new MessageEncoder(),
                    new LengthFieldBasedFrameDecoder(Integer.MAX_VALUE,0,4),
                    new MessageDecoder(),
                    this,
                    // 每隔5s的時間觸發一次userEventTriggered的方法,而且指定IdleState的狀態位是WRITER_IDLE
                    new IdleStateHandler(0, 1, 0, TimeUnit.SECONDS),
                    // 實現userEventTriggered方法,並在state是WRITER_IDLE的時候發送一個心跳包到sever端,告訴server端我還活着
                    new ClientHeartBeatHandler(),
                };
            }
        };
        final ChannelFuture future;
        try {
            synchronized (bootstrap) {
                bootstrap.handler(new ChannelInitializer<NioSocketChannel>() {

                    @Override
                    protected void initChannel(NioSocketChannel ch) throws Exception {
                        ch.pipeline().addLast(watchDog.handlers());
                    }
                });
                future = bootstrap.connect().sync();// 連接服務器.調用sync()方法會同步阻塞
                //服務端鏈接ip:
                logger.info("目前服務端鏈接ip爲" + nettyHost);
            }

            if (!future.isSuccess()) {
                logger.info("---- 鏈接服務器失敗,2秒後重試 ---------port=" + port);
                future.channel().eventLoop().schedule(new Runnable() {
                    @Override
                    public void run() {
                        start(nettyHost,nettyHostRe,nettyPort);
                    }

                }, 2L, TimeUnit.SECONDS);
            }

        } catch (Exception e) {
            logger.info("exception happends e {}", e);
            return false;
        }
        return true;
    }

}

 

  

 

 

ConnectionWatchdog.java  :重連檢測狗,當發現當前的鏈路不穩定關閉以後,進行重連

 

@ChannelHandler.Sharable
public abstract class ConnectionWatchdog extends ChannelInboundHandlerAdapter implements TimerTask,ChannelHandlerHolder{

    //日誌輸出
    private static final Logger logger = LoggerFactory.getLogger(ConnectionWatchdog.class);
    //客戶端引導類
    private Bootstrap bootstrap;
    private Timer timer;
    private final String host;
    //備用服務端ip
    private final String host2;
    //使用ip
    private String useHost;
    private final int port;

    private volatile boolean reconnect = true;
    private int attempts;
    //刷新時間
    private volatile long refreshTime = 0L;
    //心跳鏈接標識
    private volatile boolean heartBeatCheck = false;
    //通道
    private volatile Channel channel;
    //失敗次數
    private static int failCount;

    public ConnectionWatchdog(Bootstrap boot, Timer timer, String host,String host2, int port) {
        this.bootstrap = boot;
        this.timer = timer;
        this.host = host;
        this.host2 = host2;
        this.port = port;
    }

    public boolean isReconnect() {
        return reconnect;
    }

    public void setReconnect(boolean reconnect) {
        this.reconnect = reconnect;
    }
  
  
   //鏈接成功時調用的方法 @Override public void channelActive(final ChannelHandlerContext ctx) throws Exception { channel = ctx.channel(); attempts = 0; reconnect =false; refreshTime = new Date().getTime(); if (!heartBeatCheck) { heartBeatCheck = true; channel.eventLoop().scheduleAtFixedRate(new Runnable() { @Override public void run() { long time = new Date().getTime() - refreshTime; logger.info(String.valueOf(time)); if (time > 5 * 1000L) { channel.close(); logger.info("心跳檢查失敗"); } else { logger.info("心跳檢查Successs"); } } }, 5L, 5L, TimeUnit.SECONDS); } logger.info("Connects with {}.", channel); ctx.fireChannelActive(); } /** * 由於鏈路斷掉以後,會觸發channelInActive方法,進行重連 2秒重連一次 */ @Override public void channelInactive(ChannelHandlerContext ctx) throws Exception { reconnect = true; logger.warn("Disconnects with {}, doReconnect = {},attempts == {}", ctx.channel(), reconnect, attempts); if (reconnect) { /*if (attempts < 12) { attempts++; } else { reconnect = false; }*/ long timeout = 2; logger.info("再過 {} 秒客戶端將進行重連",timeout); timer.newTimeout(this, timeout, TimeUnit.SECONDS); } } /* * run啓動方法 * */ public void run(Timeout timeout) throws Exception { //Future表示異步操做的結果 final ChannelFuture future; if(failCount > 2){ //使用備用ip if(host.equals(useHost)){ useHost = host2; }else{ useHost = host; } }else { if(StrUtil.IsNullOrEmpty(useHost)) { //首次重連 useHost = host; } } synchronized (bootstrap) { future = bootstrap.connect(useHost, port); } //使用future監聽結果,執行異步操做結束後的回調. future.addListener(new ChannelFutureListener() { @Override public void operationComplete(final ChannelFuture f) throws Exception { boolean succeed = f.isSuccess(); logger.warn("鏈接經過 {}, {}.", useHost + ":" + port, succeed ? "成功" : "失敗"); if (!succeed) { logger.info("重連失敗"); failCount ++; f.channel().pipeline().fireChannelInactive(); }else{ failCount = 0; logger.info("重連成功"); } } }); } @Override public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception { if (msg instanceof TcpMsg) { TcpMsg heartMsg = (TcpMsg) msg; if (heartMsg.getType()>=0) { refreshTime = new Date().getTime(); } logger.warn("獲得服務器響應,響應內容爲"+ ((TcpMsg) msg).getBody()); } } @Override public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception { super.exceptionCaught(ctx, cause); Channel channel = ctx.channel(); logger.info("客戶端:"+channel.remoteAddress()+"網絡異常"); cause.printStackTrace(); if(channel.isActive())ctx.close(); } } 

這裏咱們定義了一個變量: refreshTime,當咱們從channel中read到了服務端發來的心跳響應消息的話,就刷新refreshTime爲當前時間

當鏈接成功時,會觸發channelActive 方法,在這裏咱們開啓了一個定時任務去判斷refreshTime和當前時間的時間差,超過5秒說明斷線了,要進行重連,我這裏因爲配置了兩個服務器,全部在個人邏輯中,嘗試鏈接2次以上連不上就去連另外一個服務器去了

 

下面的handler用於發送心跳消息,實現userEventTriggered方法,並在stateWRITER_IDLE的時候發送一個心跳包到sever端,告訴server端我還活着

 

@Component
public class ClientHeartBeatHandler extends ChannelInboundHandlerAdapter {

	private static final Logger logger = LoggerFactory.getLogger(ClientHeartBeatHandler.class);

	@Override
	public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception {
		clientname = ReadFileUtil.readFile("C:/CrawlProgram/wrapper_nettyClient/name.txt");
		if (evt instanceof IdleStateEvent) {
			IdleState state = ((IdleStateEvent) evt).state();
			if (state == IdleState.WRITER_IDLE) {
				//用於心跳的客戶端類型爲0
				int type = 0;
				//客戶端機器名
				String body = clientname;
				TcpMsg msg = new TcpMsg(type,body);
				try {
					ctx.writeAndFlush(msg).sync();
					logger.info("發送消息成功,消息類型爲:"+type+",請求id爲" + msg.getGuid() + ",客戶端機器號爲:" + msg.getBody());
				} catch (Exception ex) {
					ex.printStackTrace();
					logger.info("發送失敗");
				}
			}
		} else {
			super.userEventTriggered(ctx, evt);
		}
	}

}

 

  而後就是和服務端同樣的decoder、encoder過程,不一樣的是,咱們在decoder的時候使用了線程池去將任務放入隊列中去,防止請求慢的時候丟失任務請求

MessageDecoder.java

public class MessageDecoder extends ByteToMessageDecoder {

    private static final Logger logger = LoggerFactory.getLogger(MessageDecoder.class);
    @Autowired
    private VisiableThreadPoolTaskExecutor visiableThreadPoolTaskExecutor;

    //線程池常量
    public static VisiableThreadPoolTaskExecutor executor;

    private TcpMsg tcpMsg;
    List<Object> out;

    // 用@PostConstruct方法引導綁定
    @PostConstruct
    public void init() {
        executor = visiableThreadPoolTaskExecutor;
        encryptService =  encrypt;
        orderService = order;
    }

    @Override
    public void decode(ChannelHandlerContext ctx, ByteBuf in, List<Object> out) throws Exception {
        this.context = ctx;
        this.out = out;
        int len = in.readableBytes();
        if (len > 0) {
            logger.info("獲得返回數據,長度爲" + len);
            byte[] bytes = new byte[len];
            in.readBytes(bytes);
            TcpMsg msg = TcpMsg.ByteToObj(bytes);
            this.tcpMsg = msg;
            logger.info("start asyncServiceExecutor");
            executor.execute(new Runnable() {
                @Override
                public void run() {
                    executeTask();
                }
            });
            logger.info("end executeAsync");
        }

    }

}

  這裏,咱們使用了netty來實現了服務端、客戶端通訊、心跳檢測的功能。體會到了netty的傳輸效率高、封裝好的特性,用起來簡單、實用。咱們不只能夠作斷線重連、還能夠作不少業務請求,能夠配置多臺客戶端去作不一樣的事情,來達到服務器調度的目的。

   歸根結底,netty仍是一個框架的東西,咱們仍是沒有過多的去看透nio的本質、咱們要作的不只僅是會用netty,並且還要了解nio、瞭解netty的實現原理,它的底層是如何封裝的,但願你們多去研究,咱們一塊兒去搞懂它

 

Netty 的 Channel 過濾器實現原理與 Servlet Filter 機制一致,它將 Channel 的數據管道抽象爲 ChannelPipeline,消息在 ChannelPipeline 中流動和傳遞。ChannelPipeline 持有 I/O 事件攔截器 ChannelHandler 的鏈表,由 ChannelHandler 來對 I/O 事件進行具體的攔截和處理,能夠方便地經過新增和刪除 ChannelHandler 來實現不一樣業務邏輯的定製,可以實現對修改封閉和對擴展到支持。

相關文章
相關標籤/搜索