1、前言
心跳機制是定時發送一個自定義的結構體(心跳包),讓對方知道本身還活着,以確保鏈接的有效性的機制。
咱們用到的不少框架都用到了心跳檢測,好比服務註冊到 Eureka Server 以後會維護一個心跳鏈接,告訴 Eureka Server 本身還活着。本文就是利用 Netty 來實現心跳檢測,以及客戶端重連。html
2、設計思路
- 分爲客戶端和服務端
- 創建鏈接後,客戶端先發送一個消息詢問服務端是否能夠進行通訊了。
- 客戶端收到服務端 Yes 的應答後,主動發送心跳消息,服務端接收到心跳消息後,返回心跳應答,周而復始。
- 心跳超時利用 Netty 的 ReadTimeOutHandler 機制,當必定週期內(默認值50s)沒有讀取到對方任何消息時,須要主動關閉鏈路。若是是客戶端,從新發起鏈接。
- 爲了不出現粘/拆包問題,使用 DelimiterBasedFrameDecoder 和 StringDecoder 來處理消息。
3、編碼
- 先編寫客戶端 NettyClient
- public class NettyClient {
-
- private static final String HOST = "127.0.0.1";
-
- private static final int PORT = 9911;
-
- private ScheduledExecutorService executor = Executors.newScheduledThreadPool(1);
-
- EventLoopGroup group = new NioEventLoopGroup();
-
-
- private void connect(String host,int port){
- try {
- Bootstrap b = new Bootstrap();
- b.group(group)
- .channel(NioSocketChannel.class)
- .option(ChannelOption.TCP_NODELAY,true)
- .remoteAddress(new InetSocketAddress(host,port))
- .handler(new ChannelInitializer<SocketChannel>() {
- @Override
- protected void initChannel(SocketChannel ch) throws Exception {
- ByteBuf delimiter = Unpooled.copiedBuffer("$_", CharsetUtil.UTF_8);
- ch.pipeline()
- .addLast(new DelimiterBasedFrameDecoder(1024,delimiter))
- .addLast(new StringDecoder())
- // 當必定週期內(默認50s)沒有收到對方任何消息時,須要主動關閉連接
- .addLast("readTimeOutHandler",new ReadTimeoutHandler(50))
- .addLast("heartBeatHandler",new HeartBeatReqHandler());
- }
- });
- // 發起異步鏈接操做
- ChannelFuture future = b.connect().sync();
- future.channel().closeFuture().sync();
- }catch (Exception e){
- e.printStackTrace();
- }finally {
- // 全部資源釋放完以後,清空資源,再次發起重連操做
- executor.execute(()->{
- try {
- TimeUnit.SECONDS.sleep(5);
- //發起重連操做
- connect(NettyClient.HOST,NettyClient.PORT);
- } catch (InterruptedException e) {
- e.printStackTrace();
- }
- });
- }
- }
-
- public static void main(String[] args) {
- new NettyClient().connect(NettyClient.HOST,NettyClient.PORT);
- }
-
- }
這裏稍微複雜點的就是38行開始的重連部分。
2. 心跳消息發送類 HeartBeatReqHandlerjava
- package cn.sp.heartbeat;
-
- import io.netty.buffer.Unpooled;
- import io.netty.channel.ChannelHandler;
- import io.netty.channel.ChannelHandlerContext;
- import io.netty.channel.SimpleChannelInboundHandler;
-
- import java.util.concurrent.ScheduledFuture;
- import java.util.concurrent.TimeUnit;
-
- /**
- * Created by 2YSP on 2019/5/23.
- */
- @ChannelHandler.Sharable
- public class HeartBeatReqHandler extends SimpleChannelInboundHandler<String> {
-
- private volatile ScheduledFuture<?> heartBeat;
-
- private static final String hello = "start notify with server$_";
-
- @Override
- public void channelActive(ChannelHandlerContext ctx) throws Exception {
- ctx.writeAndFlush(Unpooled.copiedBuffer(hello.getBytes()));
- System.out.println("================");
- }
-
- @Override
- public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
- if (heartBeat != null){
- heartBeat.cancel(true);
- heartBeat = null;
- }
- ctx.fireExceptionCaught(cause);
- }
-
- @Override
- protected void channelRead0(ChannelHandlerContext ctx, String msg) throws Exception {
- if ("ok".equalsIgnoreCase(msg)){
- //服務端返回ok開始心跳
- heartBeat = ctx.executor().scheduleAtFixedRate(new HeartBeatTask(ctx),0,5000, TimeUnit.MILLISECONDS);
- }else {
- System.out.println("Client receive server heart beat message : --->"+msg);
- }
- }
-
- private class HeartBeatTask implements Runnable{
-
- private final ChannelHandlerContext ctx;
-
- public HeartBeatTask(ChannelHandlerContext ctx){
- this.ctx = ctx;
- }
-
-
- @Override
- public void run() {
- String heartBeat = "I am ok";
- System.out.println("Client send heart beat message to server: ----->"+heartBeat);
- ctx.writeAndFlush(Unpooled.copiedBuffer((heartBeat+"$_").getBytes()));
- }
- }
- }
-
channelActive()方法在首次創建鏈接後向服務端問好,若是服務端返回了 "ok" 就建立一個線程每隔5秒發送一次心跳消息。若是發生了異常,就取消定時任務並將其設置爲 null,等待 GC 回收。
3. 服務端 NettyServergit
- public class NettyServer {
-
- public static void main(String[] args) {
- new NettyServer().bind(9911);
- }
-
- private void bind(int port){
- EventLoopGroup group = new NioEventLoopGroup();
- try {
- ServerBootstrap b = new ServerBootstrap();
- b.group(group)
- .channel(NioServerSocketChannel.class)
- .childHandler(new ChannelInitializer<SocketChannel>() {
- @Override
- protected void initChannel(SocketChannel ch) throws Exception {
- ByteBuf delimiter = Unpooled.copiedBuffer("$_".getBytes());
-
- ch.pipeline()
- .addLast(new DelimiterBasedFrameDecoder(1024,delimiter))
- .addLast(new StringDecoder())
- .addLast("readTimeOutHandler",new ReadTimeoutHandler(50))
- .addLast("HeartBeatHandler",new HeartBeatRespHandler());
- }
- });
- // 綁定端口,同步等待成功
- b.bind(port).sync();
- System.out.println("Netty Server start ok ....");
- }catch (Exception e){
- e.printStackTrace();
- }
- }
- }
- 心跳響應類 HeartBeatRespHandler
- package cn.sp.heartbeat;
-
- import io.netty.buffer.Unpooled;
- import io.netty.channel.ChannelHandler;
- import io.netty.channel.ChannelHandlerContext;
- import io.netty.channel.SimpleChannelInboundHandler;
-
- /**
- * Created by 2YSP on 2019/5/23.
- */
- @ChannelHandler.Sharable
- public class HeartBeatRespHandler extends SimpleChannelInboundHandler<String> {
-
- private static final String resp = "I have received successfully$_";
-
- @Override
- protected void channelRead0(ChannelHandlerContext ctx, String msg) throws Exception {
- if (msg.equals("start notify with server")){
- ctx.writeAndFlush(Unpooled.copiedBuffer("ok$_".getBytes()));
- }else {
- //返回心跳應答信息
- System.out.println("Receive client heart beat message: ---->"+ msg);
- ctx.writeAndFlush(Unpooled.copiedBuffer(resp.getBytes()));
- }
- }
-
- }
-
第一次告訴客戶端我已經準備好了,後面打印客戶端發過來的信息並告訴客戶端我已經收到你的消息了。github
4、測試
啓動服務端再啓動客戶端,能夠看到心跳檢測正常,以下圖。框架
服務端控制檯
客戶端控制檯
如今讓服務端宕機一段時間,看客戶端可否重連並開始正常工做。
關閉服務端後,客戶端週期性的鏈接失敗,控制檯輸出如圖:異步
鏈接失敗
從新啓動服務端,過一下子發現重連成功了。
成功重連
5、總結
總得來講,使用 Netty 實現心跳檢測仍是比較簡單的,這裏比較懶沒有使用其餘序列化協議(如 ProtoBuf 等),若是感興趣的話你們能夠本身試試。
代碼地址,點擊這裏。
有篇SpringBoot 整合長鏈接心跳機制的文章寫的也很不錯,地址https://crossoverjie.top/2018/05/24/netty/Netty(1)TCP-Heartbeat/ide