【Netty】利用Netty實現心跳檢測和重連機制

1、前言

  心跳機制是定時發送一個自定義的結構體(心跳包),讓對方知道本身還活着,以確保鏈接的有效性的機制。
  咱們用到的不少框架都用到了心跳檢測,好比服務註冊到 Eureka Server 以後會維護一個心跳鏈接,告訴 Eureka Server 本身還活着。本文就是利用 Netty 來實現心跳檢測,以及客戶端重連。html

2、設計思路

  1. 分爲客戶端和服務端
  2. 創建鏈接後,客戶端先發送一個消息詢問服務端是否能夠進行通訊了。
  3. 客戶端收到服務端 Yes 的應答後,主動發送心跳消息,服務端接收到心跳消息後,返回心跳應答,周而復始。
  4. 心跳超時利用 Netty 的 ReadTimeOutHandler 機制,當必定週期內(默認值50s)沒有讀取到對方任何消息時,須要主動關閉鏈路。若是是客戶端,從新發起鏈接。
  5. 爲了不出現粘/拆包問題,使用 DelimiterBasedFrameDecoderStringDecoder 來處理消息。

3、編碼

  1. 先編寫客戶端 NettyClient
  1. public class NettyClient
  2.  
  3. private static final String HOST = "127.0.0.1"
  4.  
  5. private static final int PORT = 9911
  6.  
  7. private ScheduledExecutorService executor = Executors.newScheduledThreadPool(1); 
  8.  
  9. EventLoopGroup group = new NioEventLoopGroup(); 
  10.  
  11.  
  12. private void connect(String host,int port)
  13. try
  14. Bootstrap b = new Bootstrap(); 
  15. b.group(group) 
  16. .channel(NioSocketChannel.class) 
  17. .option(ChannelOption.TCP_NODELAY,true
  18. .remoteAddress(new InetSocketAddress(host,port)) 
  19. .handler(new ChannelInitializer<SocketChannel>() { 
  20. @Override 
  21. protected void initChannel(SocketChannel ch) throws Exception
  22. ByteBuf delimiter = Unpooled.copiedBuffer("$_", CharsetUtil.UTF_8); 
  23. ch.pipeline() 
  24. .addLast(new DelimiterBasedFrameDecoder(1024,delimiter)) 
  25. .addLast(new StringDecoder()) 
  26. // 當必定週期內(默認50s)沒有收到對方任何消息時,須要主動關閉連接 
  27. .addLast("readTimeOutHandler",new ReadTimeoutHandler(50)) 
  28. .addLast("heartBeatHandler",new HeartBeatReqHandler()); 

  29. }); 
  30. // 發起異步鏈接操做 
  31. ChannelFuture future = b.connect().sync(); 
  32. future.channel().closeFuture().sync(); 
  33. }catch (Exception e){ 
  34. e.printStackTrace(); 
  35. }finally
  36. // 全部資源釋放完以後,清空資源,再次發起重連操做 
  37. executor.execute(()->{ 
  38. try
  39. TimeUnit.SECONDS.sleep(5); 
  40. //發起重連操做 
  41. connect(NettyClient.HOST,NettyClient.PORT); 
  42. } catch (InterruptedException e) { 
  43. e.printStackTrace(); 

  44. }); 


  45.  
  46. public static void main(String[] args)
  47. new NettyClient().connect(NettyClient.HOST,NettyClient.PORT); 

  48.  

這裏稍微複雜點的就是38行開始的重連部分。
2. 心跳消息發送類 HeartBeatReqHandlerjava

  1. package cn.sp.heartbeat; 
  2.  
  3. import io.netty.buffer.Unpooled; 
  4. import io.netty.channel.ChannelHandler; 
  5. import io.netty.channel.ChannelHandlerContext; 
  6. import io.netty.channel.SimpleChannelInboundHandler; 
  7.  
  8. import java.util.concurrent.ScheduledFuture; 
  9. import java.util.concurrent.TimeUnit; 
  10.  
  11. /** 
  12. * Created by 2YSP on 2019/5/23. 
  13. */ 
  14. @ChannelHandler.Sharable 
  15. public class HeartBeatReqHandler extends SimpleChannelInboundHandler<String>
  16.  
  17. private volatile ScheduledFuture<?> heartBeat; 
  18.  
  19. private static final String hello = "start notify with server$_"
  20.  
  21. @Override 
  22. public void channelActive(ChannelHandlerContext ctx) throws Exception
  23. ctx.writeAndFlush(Unpooled.copiedBuffer(hello.getBytes())); 
  24. System.out.println("================"); 

  25.  
  26. @Override 
  27. public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception
  28. if (heartBeat != null){ 
  29. heartBeat.cancel(true); 
  30. heartBeat = null

  31. ctx.fireExceptionCaught(cause); 

  32.  
  33. @Override 
  34. protected void channelRead0(ChannelHandlerContext ctx, String msg) throws Exception
  35. if ("ok".equalsIgnoreCase(msg)){ 
  36. //服務端返回ok開始心跳 
  37. heartBeat = ctx.executor().scheduleAtFixedRate(new HeartBeatTask(ctx),0,5000, TimeUnit.MILLISECONDS); 
  38. }else
  39. System.out.println("Client receive server heart beat message : --->"+msg); 


  40.  
  41. private class HeartBeatTask implements Runnable
  42.  
  43. private final ChannelHandlerContext ctx; 
  44.  
  45. public HeartBeatTask(ChannelHandlerContext ctx)
  46. this.ctx = ctx; 

  47.  
  48.  
  49. @Override 
  50. public void run()
  51. String heartBeat = "I am ok"
  52. System.out.println("Client send heart beat message to server: ----->"+heartBeat); 
  53. ctx.writeAndFlush(Unpooled.copiedBuffer((heartBeat+"$_").getBytes())); 



  54.  

channelActive()方法在首次創建鏈接後向服務端問好,若是服務端返回了 "ok" 就建立一個線程每隔5秒發送一次心跳消息。若是發生了異常,就取消定時任務並將其設置爲 null,等待 GC 回收。
3. 服務端 NettyServergit

  1. public class NettyServer
  2.  
  3. public static void main(String[] args)
  4. new NettyServer().bind(9911); 

  5.  
  6. private void bind(int port)
  7. EventLoopGroup group = new NioEventLoopGroup(); 
  8. try
  9. ServerBootstrap b = new ServerBootstrap(); 
  10. b.group(group) 
  11. .channel(NioServerSocketChannel.class) 
  12. .childHandler(new ChannelInitializer<SocketChannel>() { 
  13. @Override 
  14. protected void initChannel(SocketChannel ch) throws Exception
  15. ByteBuf delimiter = Unpooled.copiedBuffer("$_".getBytes()); 
  16.  
  17. ch.pipeline() 
  18. .addLast(new DelimiterBasedFrameDecoder(1024,delimiter)) 
  19. .addLast(new StringDecoder()) 
  20. .addLast("readTimeOutHandler",new ReadTimeoutHandler(50)) 
  21. .addLast("HeartBeatHandler",new HeartBeatRespHandler()); 

  22. }); 
  23. // 綁定端口,同步等待成功 
  24. b.bind(port).sync(); 
  25. System.out.println("Netty Server start ok ...."); 
  26. }catch (Exception e){ 
  27. e.printStackTrace(); 



  1. 心跳響應類 HeartBeatRespHandler
  1. package cn.sp.heartbeat; 
  2.  
  3. import io.netty.buffer.Unpooled; 
  4. import io.netty.channel.ChannelHandler; 
  5. import io.netty.channel.ChannelHandlerContext; 
  6. import io.netty.channel.SimpleChannelInboundHandler; 
  7.  
  8. /** 
  9. * Created by 2YSP on 2019/5/23. 
  10. */ 
  11. @ChannelHandler.Sharable 
  12. public class HeartBeatRespHandler extends SimpleChannelInboundHandler<String>
  13.  
  14. private static final String resp = "I have received successfully$_"
  15.  
  16. @Override 
  17. protected void channelRead0(ChannelHandlerContext ctx, String msg) throws Exception
  18. if (msg.equals("start notify with server")){ 
  19. ctx.writeAndFlush(Unpooled.copiedBuffer("ok$_".getBytes())); 
  20. }else
  21. //返回心跳應答信息 
  22. System.out.println("Receive client heart beat message: ---->"+ msg); 
  23. ctx.writeAndFlush(Unpooled.copiedBuffer(resp.getBytes())); 


  24.  

  25.  

第一次告訴客戶端我已經準備好了,後面打印客戶端發過來的信息並告訴客戶端我已經收到你的消息了。github

4、測試

啓動服務端再啓動客戶端,能夠看到心跳檢測正常,以下圖。框架

 

服務端控制檯
服務端控制檯

 

 

客戶端控制檯
客戶端控制檯

如今讓服務端宕機一段時間,看客戶端可否重連並開始正常工做。

 

關閉服務端後,客戶端週期性的鏈接失敗,控制檯輸出如圖:異步

 

鏈接失敗
鏈接失敗

從新啓動服務端,過一下子發現重連成功了。

 

 

成功重連
成功重連

 

5、總結

總得來講,使用 Netty 實現心跳檢測仍是比較簡單的,這裏比較懶沒有使用其餘序列化協議(如 ProtoBuf 等),若是感興趣的話你們能夠本身試試。
代碼地址,點擊這裏
有篇SpringBoot 整合長鏈接心跳機制的文章寫的也很不錯,地址https://crossoverjie.top/2018/05/24/netty/Netty(1)TCP-Heartbeat/ide

相關文章
相關標籤/搜索