Netty學習篇④-心跳機制及斷線重連

心跳檢測

  1. 前言
    客戶端和服務端的鏈接屬於socket鏈接,也屬於長鏈接,每每會存在客戶端在鏈接了服務端以後就沒有任何操做了,但仍是佔用了一個鏈接;當愈來愈多相似的客戶端出現就會浪費不少鏈接,netty中能夠經過心跳檢測來找出必定程度(自定義規則判斷哪些鏈接是無效連接)的無效連接並斷開鏈接,保存真正活躍的鏈接。
  2. 什麼叫心跳檢測
    我理解的心跳檢測應該是客戶端/服務端定時發送一個數據包給服務端/客戶端,檢測對方是否有響應;
    若是是存活的鏈接,在必定的時間內應該會收到響應回來的數據包;
    若是在必定時間內仍是收不到接收方的響應的話,就能夠當作是掛機,能夠斷開此鏈接;
    若是檢測到了掉線以後還能夠進行重連;
  3. 心跳檢測的實現
    • TCP自帶心跳檢測,協議層採用Keeplive機制默認2小時頻率觸發一次檢測,可是它存在缺陷:檢測不出網線拔出、防火牆、使用起來不靈活、依賴操做系統等
    • Netty能夠經過IdleStateHandler來實現心跳檢測,使用起來也很是方便清晰
  4. IdleStateHandler原理
    idleStateHandler在通道註冊以後會開啓一個定時任務,定時去檢測通道中後續是否還有進行數據傳輸,若是在規定的時間內沒有進行數據傳輸則會觸發對應的超時事件,使用者能夠根據對應的事件自定義規則來判別當前鏈接是不是活躍,是否須要關閉鏈接等來進行操做。
    通常idleStateHandler觸發的事件IdleStateEvent會在心跳handler中的userEventTriggered方法中捕獲到對應的超時事件。

    IdleStateHandler的繼承關係:經過ChannelDuplexHandler類繼承ChannelInboundHandler和實現ChannelOutboundHandler來實現對入站和出站的重寫和監控java

    avatar

  5. 源碼分析
    • IdleStateHandler初始化:爲0表明不監控git

      /**
      * @observeOutput 觀察輸出
      * @readerIdleTime 讀超時時間 自定義時間內檢測Channel通道有沒有讀取到數據,爲0表明不監控
      * @writerIdleTime 寫超時時間 自定義時間內檢測Channel通道有沒有write數據,爲0表明不監控
      * @allIdleTime 總超時時間 自定義時間內檢測Channel通道有沒有讀/寫數據,爲0表明不監控
      * @unit 時間單位
      */
      public IdleStateHandler(boolean observeOutput,
                  long readerIdleTime, long writerIdleTime, long allIdleTime,
                  TimeUnit unit) {
              if (unit == null) {
                  throw new NullPointerException("unit");
              }
      
              this.observeOutput = observeOutput;
      
             // 初始化讀取空閒時間,最小值爲0
              if (readerIdleTime <= 0) {
                  readerIdleTimeNanos = 0;
              } else {
                  // 定義讀取超時時間爲自定義設置時間
                  readerIdleTimeNanos = Math.max(unit.toNanos(readerIdleTime), MIN_TIMEOUT_NANOS);
              }
              if (writerIdleTime <= 0) {
                  writerIdleTimeNanos = 0;
              } else {
                  // 設置寫超時時間
                  writerIdleTimeNanos = Math.max(unit.toNanos(writerIdleTime), MIN_TIMEOUT_NANOS);
              }
              if (allIdleTime <= 0) {
                  allIdleTimeNanos = 0;
              } else {
                  // 設置總超時時間
                  allIdleTimeNanos = Math.max(unit.toNanos(allIdleTime), MIN_TIMEOUT_NANOS);
              }
          }
    • IdleStateHandler和channel通道的關聯:經過類圖能夠得知,idleStateHandler能夠重寫入站和出站的方法,不過只是經過channelRead和write方法來記錄閱讀的時間等不作其餘操做github

      @Override
      public void channelActive(ChannelHandlerContext ctx) throws Exception {
          // 初始化檢測器,開啓定時任務
          initialize(ctx);
          super.channelActive(ctx);
      }
      
      @Override
      public void channelInactive(ChannelHandlerContext ctx) throws Exception {
          // 在通道非活躍狀態的時候銷燬定時任務
          destroy();
          super.channelInactive(ctx);
      }
      
      @Override
      public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
          // 若是設置的讀超時時間大於0則設置是否讀操做爲true
          if (readerIdleTimeNanos > 0 || allIdleTimeNanos > 0) {
              reading = true;
              firstReaderIdleEvent = firstAllIdleEvent = true;
          }
          // 記錄時間和標識標誌以後就直接fire當前read到下一個ChannelHandler處理類中
          ctx.fireChannelRead(msg);
      }
      
      @Override
      public void channelReadComplete(ChannelHandlerContext ctx) throws Exception {
          // 當讀取完畢時,設置是否正在讀取爲false,設置最後讀取時間爲系統當前時間
          if ((readerIdleTimeNanos > 0 || allIdleTimeNanos > 0) && reading) {
              lastReadTime = ticksInNanos();
              reading = false;
          }
          // 一樣直接fire掉,跳入到下一個handler中
          ctx.fireChannelReadComplete();
      }
      
      // idleStateHandler重寫的write方法
      @Override
      public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) throws Exception {
          // Allow writing with void promise if handler is only configured for read timeout events.
          if (writerIdleTimeNanos > 0 || allIdleTimeNanos > 0) {
              ctx.write(msg, promise.unvoid()).addListener(writeListener);
          } else {
              ctx.write(msg, promise);
          }
      }
      
      // ChannelOutboundHandler提供的write接口方法
      /**
          * Called once a write operation is made. The write operation will write the messages through the
           * {@link ChannelPipeline}. Those are then ready to be flushed to the actual {@link Channel} once
           * {@link Channel#flush()} is called
           * 執行一次寫操做;寫操做經過ChannelPipeline來傳輸信息;最後經過channel的flush()方法來刷新
           *
           * @param ctx               the {@link ChannelHandlerContext} for which the write operation is made 實際寫操做者
           * @param msg               the message to write 寫的消息
           * @param promise           the {@link ChannelPromise} to notify once the operation completes 在操做完成時當即通知(相似future異步通知)
           * @throws Exception        thrown if an error occurs
           */
          void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) throws Exception;
    • 定時器初始化/銷燬:將使用者自定義的超時時間設置爲延遲定時任務bootstrap

      初始化:
      
      private void initialize(ChannelHandlerContext ctx) {
              // Avoid the case where destroy() is called before scheduling timeouts.
              // See: https://github.com/netty/netty/issues/143
             // state; // 0 - none, 1 - 初始化, 2 - 銷燬
              switch (state) {
              case 1:
              case 2:
                  return;
              }
      
              state = 1;
              initOutputChanged(ctx);
             // 最後讀取時間
              lastReadTime = lastWriteTime = ticksInNanos();
             // 若是設置的空閒時間大於0則開啓定時任務進行監控
              if (readerIdleTimeNanos > 0) {
                  readerIdleTimeout = schedule(ctx, new ReaderIdleTimeoutTask(ctx),
                          readerIdleTimeNanos, TimeUnit.NANOSECONDS);
              }
              // 寫超時時間
              if (writerIdleTimeNanos > 0) {
                  writerIdleTimeout = schedule(ctx, new WriterIdleTimeoutTask(ctx),
                          writerIdleTimeNanos, TimeUnit.NANOSECONDS);
              }
              // 總超時時間
              if (allIdleTimeNanos > 0) {
                  allIdleTimeout = schedule(ctx, new AllIdleTimeoutTask(ctx),
                          allIdleTimeNanos, TimeUnit.NANOSECONDS);
              }
          }
      銷燬:
      
      /**
      * @readerIdleTimeout ==> ScheduledFuture類
      * @writerIdleTimeout ==> ScheduledFuture類
      */
      private void destroy() {
             // 設置銷燬狀態
              state = 2;
             // 銷燬線程
             // ScheduledFuture
              if (readerIdleTimeout != null) {
                  readerIdleTimeout.cancel(false);
                  readerIdleTimeout = null;
              }
              if (writerIdleTimeout != null) {
                  writerIdleTimeout.cancel(false);
                  writerIdleTimeout = null;
              }
              if (allIdleTimeout != null) {
                  allIdleTimeout.cancel(false);
                  allIdleTimeout = null;
              }
          }
    • 開啓定時任務promise

      ScheduledFuture<?> schedule(ChannelHandlerContext ctx, Runnable task, long delay, TimeUnit unit) {
              return ctx.executor().schedule(task, delay, unit);
          }
    • 讀/寫定時任務:定時任務啓動的時候,經過設置的超時時間和上一次觸發channelRead的時間進行相減比較來判斷是否超時了網絡

      // 僅分析讀取超時時間定時任務,寫超時差很少就是觸發的時間不同,比對的變量換成了設置的寫超時時間
      private final class ReaderIdleTimeoutTask extends AbstractIdleTask {
      
              ReaderIdleTimeoutTask(ChannelHandlerContext ctx) {
                  super(ctx);
              }
      
              @Override
              protected void run(ChannelHandlerContext ctx) {
                  // readerIdleTimeNanos:初始化IdleStateHandler設置的讀取超時時間
                  long nextDelay = readerIdleTimeNanos;
                  // 若是沒有任何讀取操做
                  if (!reading) {
                      // 判斷是否有超時
                      // nextDelay = nextDelay-(ticksInNanos() - lastReadTime)
                      // 即設置的超時時間減去距離上一次讀取的時間
                      nextDelay -= ticksInNanos() - lastReadTime;
                  }
                 // 若是小於等於0 則觸發讀取超時事件,設置新的延遲時間
                  if (nextDelay <= 0) {
                      // Reader is idle - set a new timeout and notify the callback.
                      readerIdleTimeout = schedule(ctx, this, readerIdleTimeNanos, TimeUnit.NANOSECONDS);
                     // 標記爲第一次
                      boolean first = firstReaderIdleEvent;
                      // 設置成非第一次
                      firstReaderIdleEvent = false;
      
                      try {
                          IdleStateEvent event = newIdleStateEvent(IdleState.READER_IDLE, first);
                          channelIdle(ctx, event);
                      } catch (Throwable t) {
                          ctx.fireExceptionCaught(t);
                      }
                  } else {
                      // Read occurred before the timeout - set a new timeout with shorter delay.
                      // 超時的時候發生的讀取事件,則從新延遲執行
                      readerIdleTimeout = schedule(ctx, this, nextDelay, TimeUnit.NANOSECONDS);
                  }
              }
          }
  6. 項目實戰(主要代碼,項目以服務端做爲心跳監控,也能夠在客戶端進行心跳監控)
    • 項目結構異步

      ├─src
      │  ├─main
      │  │  ├─java
      │  │  │  └─com
      │  │  │      └─hetangyuese
      │  │  │          └─netty
      │  │  │              ├─client
      │  │  │              │      MyChannelFutureListener.java
      │  │  │              │      MyClient05.java
      │  │  │              │      MyClientChannelHandler.java
      │  │  │              │      MyClientChannelInitializer.java
      │  │  │              │
      │  │  │              └─server
      │  │  │                  │  MyServer05.java
      │  │  │                  │  MyServerChannelInitializer.java
      │  │  │                  │  MyServerHandler.java
      │  │  │                  │
      │  │  │                  └─decoder

    • 在ChannelPipeline中註冊IdleStateHandlersocket

      public class MyServerChannelInitializer extends ChannelInitializer<SocketChannel> {
      
          @Override
          protected void initChannel(SocketChannel ch) throws Exception {
              ch.pipeline().
                      addLast(new StringEncoder(Charset.forName("GBK")))
                      .addLast(new StringDecoder(Charset.forName("GBK")))
                      .addLast(new LoggingHandler(LogLevel.INFO))
                     // 設置讀取超時時間爲5秒,寫超時和總超時爲0即不作監控
                      .addLast(new IdleStateHandler(5, 0, 0))
                      .addLast(new MyServerHandler());
          }
      }
    • 服務端處理handler(其中userEventTriggered爲接收心跳任務觸發的事件,此次作了計數三次觸發讀空閒超時則斷開鏈接)ide

      public class MyServerHandler extends ChannelInboundHandlerAdapter {
      
          private AtomicInteger count = new AtomicInteger(1);
      
          /**
           *  心跳檢測機制會進入
           * @param ctx
           * @param evt
           * @throws Exception
           */
          @Override
          public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception {
              System.out.println("心跳檢測觸發了事件, object: , time: " + evt + new Date().toLocaleString());
              super.userEventTriggered(ctx, evt);
              if (evt instanceof IdleStateEvent) {
                  IdleStateEvent e = (IdleStateEvent) evt;
                  // 客戶端鏈接應該是請求 write
                  if (e.state() == IdleState.READER_IDLE) {
                      System.out.println("服務端監測到了讀取超時");
                      count.incrementAndGet();
                      if (count.get() > 3) {
                          System.out.println("客戶端還在?? 已經3次檢測沒有訪問了,我要斷開了哦!!!");
                          ctx.channel().close();
                      }
                  } else if (e.state() == IdleState.WRITER_IDLE) {
                      // 若是一直有交互則會發送writer_idle
                      System.out.println("服務端收到了寫入超時");
                  } else {
                      System.out.println("服務端收到了All_idle");
                  }
              } else {
                  super.userEventTriggered(ctx,evt);
              }
          }
      
          @Override
          public void channelActive(ChannelHandlerContext ctx) throws Exception {
              System.out.println("myServerHandler is active, time: " + new Date().toLocaleString());
              ctx.writeAndFlush("成功鏈接服務端, 當前時間:" + new Date().toLocaleString());
          }
      
          @Override
          public void channelInactive(ChannelHandlerContext ctx) throws Exception {
              System.out.println("服務端與客戶端斷開了鏈接, time: " + new Date().toLocaleString());
          }
      
          @Override
          public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
              System.out.println("myServerHandler 收到了客戶端的信息 msg:" + msg + ", time: " + new Date().toLocaleString());
              ctx.writeAndFlush("您好,客戶端,我是服務端");
          }
      
          @Override
          public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
              cause.printStackTrace();
              ctx.close();
          }
      }
    • 運行結果oop

      myServer is start time: 2019-11-8 14:47:16
      myServerHandler is active, time: 2019-11-8 14:47:18
      十一月 08, 2019 2:47:18 下午 io.netty.handler.logging.LoggingHandler channelRegistered
      信息: [id: 0x8d924d06, L:/127.0.0.1:9001 - R:/127.0.0.1:62195] REGISTERED
      十一月 08, 2019 2:47:18 下午 io.netty.handler.logging.LoggingHandler channelActive
      信息: [id: 0x8d924d06, L:/127.0.0.1:9001 - R:/127.0.0.1:62195] ACTIVE
      十一月 08, 2019 2:47:18 下午 io.netty.handler.logging.LoggingHandler write
      信息: [id: 0x8d924d06, L:/127.0.0.1:9001 - R:/127.0.0.1:62195] WRITE: 成功鏈接服務端, 當前時間:2019-11-8 14:47:18
      十一月 08, 2019 2:47:18 下午 io.netty.handler.logging.LoggingHandler flush
      信息: [id: 0x8d924d06, L:/127.0.0.1:9001 - R:/127.0.0.1:62195] FLUSH
      心跳檢測觸發了事件, object: , time: io.netty.handler.timeout.IdleStateEvent@a3a0212019-11-8 14:47:23
      服務端監測到了讀取超時
      心跳檢測觸發了事件, object: , time: io.netty.handler.timeout.IdleStateEvent@18692702019-11-8 14:47:28
      服務端監測到了讀取超時
      心跳檢測觸發了事件, object: , time: io.netty.handler.timeout.IdleStateEvent@18692702019-11-8 14:47:33
      服務端監測到了讀取超時
      客戶端還在?? 已經3次檢測沒有訪問了,我要斷開了哦!!!
      十一月 08, 2019 2:47:33 下午 io.netty.handler.logging.LoggingHandler close
      信息: [id: 0x8d924d06, L:/127.0.0.1:9001 - R:/127.0.0.1:62195] CLOSE
      十一月 08, 2019 2:47:33 下午 io.netty.handler.logging.LoggingHandler channelInactive
      信息: [id: 0x8d924d06, L:/127.0.0.1:9001 ! R:/127.0.0.1:62195] INACTIVE
      服務端與客戶端斷開了鏈接, time: 2019-11-8 14:47:33
      十一月 08, 2019 2:47:33 下午 io.netty.handler.logging.LoggingHandler channelUnregistered
      信息: [id: 0x8d924d06, L:/127.0.0.1:9001 ! R:/127.0.0.1:62195] UNREGISTERED
以上就是心跳監控的全部流程了,合理的利用Netty的心跳機制能夠有效的剔除一些無用的鏈接釋放些資源

Netty斷線重連:在長鏈接中有時候出現斷開的時候能夠從新鏈接

出現斷線重連的狀況:

  • 首次鏈接可是鏈接不上,經過ChannelFutureListener增長監控進行重連
  • 因爲網絡緣由等、自動斷開等 經過在channelInactive中進行重連便可
  1. ChannelFutureListener進行重連

    public class MyChannelFutureListener implements ChannelFutureListener {
    
        @Override
        public void operationComplete(ChannelFuture future) throws Exception {
            if (future.isSuccess()) {
                System.out.println("當前已鏈接");
                return;
            }
            System.out.println("啓動鏈接客戶端失敗,開始重連");
            final EventLoop loop = future.channel().eventLoop();
            loop.schedule(new Runnable() {
                @Override
                public void run() {
                    try {
                       MyClient05.reConnection();
                       System.out.println("客戶端重連成功");
                    } catch (Exception e){
                        e.printStackTrace();
                    }
                }
            }, 1L, TimeUnit.SECONDS);
        }
    }

    client啓動 不知道爲啥想要測試listener的效果時,connect的sync()不能帶,是因爲同步阻塞的緣由?

    public void start() {
            EventLoopGroup group = new NioEventLoopGroup();
            try {
                bootstrap = getBootstrap();
    
                bootstrap.group(group)
                        .option(ChannelOption.AUTO_READ, true)
                        .option(ChannelOption.TCP_NODELAY, true)
                        .channel(NioSocketChannel.class)
                        .handler(new MyClientChannelInitializer());
    
                // ChannelFuture future = bootstrap.connect(new InetSocketAddress(ip, port)).sync();
                 ChannelFuture future = bootstrap.connect(new InetSocketAddress(ip, port));      
                // 增長監聽 
                future.addListener(new MyChannelFutureListener());
                future.channel().closeFuture().sync();
            } catch (Exception e) {
                e.printStackTrace();
            } finally {
                group.shutdownGracefully();
            }
        }

    直接啓動客戶端,能夠看到listener輸出:啓動鏈接客戶端失敗,開始重連

  2. channelInactive進行重連(我是直接new了一個線程去重連)

    @Override
    public void channelInactive(ChannelHandlerContext ctx) throws Exception {
        System.out.println("客戶端斷開了鏈接, time: " + new Date().toLocaleString());
        new Thread(new Runnable() {
            @Override
            public void run() {
                try {
                    new MyClient05("127.0.0.1", 9001).start();
                    System.out.println("客戶端從新鏈接了服務端 time:" + new Date().toLocaleString());
                } catch (Exception e) {
                    e.printStackTrace();
                }
            }
        }).start();
    }

    測試方法:

    1.能夠直接經過上面的心跳機制斷開鏈接後,客戶端的channelInactive檢測到斷開會自動執行重連

    測試結果:

    服務端:
    ------
    心跳檢測觸發了事件, object: , time: io.netty.handler.timeout.IdleStateEvent@ab81552019-11-8 16:43:12
    服務端監測到了讀取超時
    心跳檢測觸發了事件, object: , time: io.netty.handler.timeout.IdleStateEvent@15192622019-11-8 16:43:17
    服務端監測到了讀取超時
    心跳檢測觸發了事件, object: , time: io.netty.handler.timeout.IdleStateEvent@15192622019-11-8 16:43:22
    服務端監測到了讀取超時
    客戶端還在?? 已經3次檢測沒有訪問了,我要斷開了哦!!!
    十一月 08, 2019 4:43:22 下午 io.netty.handler.logging.LoggingHandler close
    信息: [id: 0x6bfc0d90, L:/192.168.0.118:9001 - R:/192.168.0.118:51031] CLOSE
    十一月 08, 2019 4:43:22 下午 io.netty.handler.logging.LoggingHandler channelInactive
    信息: [id: 0x6bfc0d90, L:/192.168.0.118:9001 ! R:/192.168.0.118:51031] INACTIVE
    十一月 08, 2019 4:43:22 下午 io.netty.handler.logging.LoggingHandler channelUnregistered
    信息: [id: 0x6bfc0d90, L:/192.168.0.118:9001 ! R:/192.168.0.118:51031] UNREGISTERED
    服務端與客戶端斷開了鏈接, time: 2019-11-8 16:43:22
    十一月 08, 2019 4:43:22 下午 io.netty.handler.logging.LoggingHandler channelRegistered
    信息: [id: 0xbcb2ec62, L:/127.0.0.1:9001 - R:/127.0.0.1:51061] REGISTERED
    十一月 08, 2019 4:43:22 下午 io.netty.handler.logging.LoggingHandler channelActive
    信息: [id: 0xbcb2ec62, L:/127.0.0.1:9001 - R:/127.0.0.1:51061] ACTIVE
    myServerHandler is active, time: 2019-11-8 16:43:22
    十一月 08, 2019 4:43:22 下午 io.netty.handler.logging.LoggingHandler write
    信息: [id: 0xbcb2ec62, L:/127.0.0.1:9001 - R:/127.0.0.1:51061] WRITE: 成功鏈接服務端, 當前時間:2019-11-8 16:43:22
    十一月 08, 2019 4:43:22 下午 io.netty.handler.logging.LoggingHandler flush
    信息: [id: 0xbcb2ec62, L:/127.0.0.1:9001 - R:/127.0.0.1:51061] FLUSH
    十一月 08, 2019 4:43:26 下午 io.netty.handler.logging.LoggingHandler channelReadComplete
    ------------------------------------------------------------------------------------- 
    
    客戶端:
    -------
    當前已鏈接
    客戶端與服務端創建了鏈接 time: 2019-11-8 16:43:07
    客戶端接收到了服務的響應的數據 msg: 成功鏈接服務端, 當前時間:2019-11-8 16:43:07, time: 2019-11-8 16:43:07
    客戶端斷開了鏈接, time: 2019-11-8 16:43:22
    當前已鏈接
    客戶端與服務端創建了鏈接 time: 2019-11-8 16:43:22
    客戶端接收到了服務的響應的數據 msg: 成功鏈接服務端, 當前時間:2019-11-8 16:43:22, time: 2019-11-8 16:43:22
相關文章
相關標籤/搜索