Netty網編程實戰:四種解決粘包方式切換、兩種生產級雙向監聽模式並行、高效編解碼、多處理器協同做戰

前言
前端

        在前面的課題把Netty網絡編程的各類理論和主要組件、核心部件、重要機制都攤開說清楚後,今天給你們分享一下Netty在網絡編程中的一些典型應用場景和一些重要問題的解決方案。好比因爲TCP網絡傳輸底層的緣由,而產生不可預期的粘包和半包問題,導致收到對端的報文明顯缺斤少兩或先後報文之間相互粘連在一塊兒;又如在生產環境中,網絡通訊的任意一端因爲外界網絡或自身bug的緣由,致使網絡長時間阻塞或直接斷開等行爲,對端沒法明顯感知真正緣由;不能第一時間通知到對端或相關技術人員處理,最終致使更大的業務受損。
算法


網絡粘包半包數據庫

image.png

 假設客戶端分別發送了兩個數據包 D1 和 D2 給服務端,因爲服務端一次讀取到的字節 數是不肯定的,故可能存在如下 4 種狀況。 編程

(1)服務端分兩次讀取到了兩個獨立的數據包,分別是 D1 和 D2,沒有粘包和拆包; 緩存

(2)服務端一次接收到了兩個數據包,D1 和 D2 粘合在一塊兒,被稱爲 TCP 粘包; 服務器

(3)服務端分兩次讀取到了兩個數據包,第一次讀取到了完整的 D1 包和 D2 包的部分 內容,第二次讀取到了 D2 包的剩餘內容,這被稱爲 TCP 拆包; 網絡

(4)服務端分兩次讀取到了兩個數據包,第一次讀取到了 D1 包的部份內容 D1_1,第 二次讀取到了 D1 包的剩餘內容 D1_2 和 D2 包的整包。 異步

 若是此時服務端 TCP 接收滑窗很是小,而數據包 D1 和 D2 比較大,頗有可能會發生第 五種可能,即服務端分屢次才能將 D1 和 D2 包接收徹底,期間發生屢次拆包。ide

        因爲 TCP 協議自己的機制(面向鏈接的可靠地協議-三次握手機制)客戶端與服務器會 維持一個鏈接,數據在鏈接不斷開的狀況下,能夠持續不斷地將多個數據包發 往服務器,可是若是發送的網絡數據包過小,那麼他自己會啓用 Nagle 算法(可配置是否啓 用)對較小的數據包進行合併而後再發送。那麼這樣的話,服務器在接收到消息(數據流)的時候就沒法區分哪 些數據包是客戶端本身分開發送的,這樣產生了粘包;服務器在接收到數據庫後,放到緩衝 區中,若是消息沒有被及時從緩存區取走,下次在取數據的時候可能就會出現一次取出多個 數據包的狀況,形成粘包現象。
工具

      解決辦法已在前面的文章介紹得很清楚了,這裏不在贅述。今天本文將綜合採用回車字符分割、任意特殊字符分割、固定字符分割和數據自己帶長度信息等四種方式,交替切換,在實戰層面給你們展現這種四種方式是如何構建和工做的。


生產級雙向心跳機制

image.png

        在凌晨等業務低谷時段,若是發生網絡閃斷、鏈接被 Hang 住等問題時,因爲沒有業務 消息,應用程序很難發現。到了白天業務高峯期時,會發生大量的網絡通訊失敗,嚴重的會 致使一段時間進程內沒法處理業務消息。爲了解決這個問題,在網絡空閒時採用心跳機制來檢測鏈路的互通性,一旦發現網絡故障,當即關閉鏈路,主動重連。

      本文將提供雙向端對端心跳檢測機制滑動檢測,隨時把雙向服務的運行動向級存活狀況,體如今前端控制檯。


生產級存活檢測和自動滑動重試與恢復機制

        自動監聽對端通道是否斷開或突變爲未激活狀態,導致網絡傳輸中斷或長時間阻塞等待。當這些問題發生時,對端會在第一時間感知到並啓動自動滑動重試(重試頻率在持續斷開時,將逐漸下降)機制,當對端網絡或服務恢復時,當前端也自動恢復正常的通信,不須要外接的任何干預。


高效編解碼器

image.png

        以上是Facebook在高鏈接數狀況下,綜合對以上20種三方開源序列和反序列化組件,進行對比測試。你們都知道JDK內置的基於流的序列化和反序列化效率都很是低下,數據報文和原文相比也膨脹得至關龐大。本文將以google的原生protostuff組件爲例,展現Netty內嵌非內置組件的方式和高效輕快的序列化表現。


各類ChannelHandler協同

image.png

        在ChannelPipeline內部維護一個雙向鏈表來存儲進入管道的,各類ChannelHandler事件和處理器有序責任鏈式處理有業務執行前後順序的處理及事件觸發、傳遞等。假若咱們打亂Pipeline中處理器的順序,那麼執行最終會報錯終止或數據報文傳遞和處理結果錯亂。本文將容器化載入多種Handler實現多業務處理協同工做。


有了上面本文核心功能和業務集成介紹後,下面咱們就運用Netty強大的特性,完成咱們需集成的全部功能的代碼設計。


各部分代碼實戰

1、客戶端部分

/**
* @author andychen https://blog.51cto.com/14815984
* @description:Netty客戶端業務通道處理器類
* 負責處理和服務端的全部IO業務事件處理
*/
public class ClientChannelHandler extends SimpleChannelInboundHandler<ByteBuf> {
   /**
    * 統計當前通道第幾回讀取數據
    */
   private final AtomicInteger counter = new AtomicInteger(0);
   /**
    * 客戶端讀取網絡通道數據後處理
    * @param channelHandlerContext 通道上下文(很重要,網絡IO業務就靠它了)
    * @param byteBuf 網絡傳送過來的數據
    * @throws Exception
    */
   protected void channelRead0(ChannelHandlerContext channelHandlerContext, ByteBuf byteBuf) throws Exception {
       //打印接收到的服務端數據和當前統計計數
       System.out.println("Accept server data {"+byteBuf.toString(CharsetUtil.UTF_8)+"], the counter is:"+this.counter.incrementAndGet());
   }

   /**
    * 鏈接創建成功事件回調
    * @param ctx
    * @throws Exception
    */
   @Override
   public void channelActive(ChannelHandlerContext ctx) throws Exception {
       //鏈接創建後,循環10次向服務端發送連續報文
       ByteBuf buf = null;
       String msg = null;
       for (int i=0;i<10;i++){
           if(Constant.STICKUP_SOLUTION_TYPE == StickupSolutionType.SEND_LEN){
               UserAddress address = new UserAddress("abc@163.com", "ChengDu.SiChuan");
               User user = new User(i,"AndyChen"+i,"WaveBeed"+i, address);
               ctx.write(user);
           }else{
               msg = "Client message"+i+" data ";
               if(Constant.STICKUP_SOLUTION_TYPE == StickupSolutionType.LINE_BASED){
                   msg += "(line_based)"+System.getProperty("line.separator");
               }
               else if(Constant.STICKUP_SOLUTION_TYPE == StickupSolutionType.DELIMITER_BASED){
                   msg += "(custom_based)"+Constant.CUSTOM_SPLIT_MARK;
               }else if(Constant.STICKUP_SOLUTION_TYPE == StickupSolutionType.FIXED_LEN){
                   msg = Constant.FIXED_LEN_CLIENT_TXT;
               }
               buf = Unpooled.buffer(msg.length());
               buf.writeBytes(msg.getBytes());
               ctx.writeAndFlush(buf);
           }
       }
       if(Constant.STICKUP_SOLUTION_TYPE == StickupSolutionType.SEND_LEN){
           ctx.flush();
       }
   }
   /**
    * 處理器異常集中處理
    * @param ctx 處理器上下文
    * @param cause 異常
    * @throws Exception
    */
   @Override
   public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
        cause.printStackTrace();
        ctx.close();
   }
}

/**
* @author andychen https://blog.51cto.com/14815984
* @description:Netty客戶端啓動器類
*/
public class NettyClientStarter {
   //採用netty內置定時器
   private static final HashedWheelTimer timer = new HashedWheelTimer();
   /**
    * 啓動客戶端
    * @param args
    */
   public static void main(String[] args){
       try {
           start();
       } catch (InterruptedException e) {
           e.printStackTrace();
       }
   }

   /**
    * 啓動客戶端
    */
   private static void start() throws InterruptedException {
       //線程組
       EventLoopGroup group = new NioEventLoopGroup();
       try {
           //Netty客戶端啓動類
           final Bootstrap boot = new Bootstrap();
           boot.group(group)//將線程組綁定到啓動器
                   .channel(NioSocketChannel.class)//使用NIO進行網絡傳輸
                   //綁定服務端鏈接地址
                   .remoteAddress(new InetSocketAddress(Constant.SERV_HOST, Constant.SERV_PORT));
           /**
            * 定義監聽器
            */
           final ChannelMonitor<Channel> monitor = new ClientMonitorHandler(Constant.SERV_HOST, Constant.SERV_PORT, timer, boot);
           boot.handler(new ChannelInitializer<Channel>(){
               protected void initChannel(Channel channel) throws Exception {
                   channel.pipeline().addLast(monitor.setMonitorHandlers());
               }
           });
           //這裏會阻塞,直到鏈接完成
           ChannelFuture future = boot.connect().sync();
           System.out.println("Has connected to server:"+Constant.SERV_PORT+" ...");
           //這裏也會阻塞,直到鏈接通道關閉
           future.channel().closeFuture().sync();
       }finally {
           //線程池組關閉
           //group.shutdownGracefully().sync();
       }
   }
}


2、服務端部分

/**
* @author andychen https://blog.51cto.com/14815984
* @description:Netty服務端業務通道處理器類
* 負責向對端響應業務應答和服務端相關IO業務處理
*/
public class ServerChannelHandler extends ChannelInboundHandlerAdapter {
   /**
    * 消息接收計數器
    */
   private final AtomicInteger counter = new AtomicInteger(0);
   /**
    * 接收客戶端發送的數據
    * @param ctx 處理器上下文
    * @param msg 消息
    * @throws Exception
    */
   @Override
   public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
       String clientData = null;
       User user = null;
       if(msg instanceof User){
           user = (User)msg;
           clientData = user.toString();
       }else{
           clientData = ((ByteBuf)msg).toString(CharsetUtil.UTF_8);
       }
       System.out.println("Accept client data ["+clientData+"], the counter is:"+this.counter.incrementAndGet());
       //回饋消息給客戶端
       String toClientData = "Data has been accepted by server";
       if(Constant.STICKUP_SOLUTION_TYPE == StickupSolutionType.LINE_BASED ||
          Constant.STICKUP_SOLUTION_TYPE == StickupSolutionType.SEND_LEN){
           toClientData += "(line_based)"+System.getProperty("line.separator");
       }
       else if(Constant.STICKUP_SOLUTION_TYPE == StickupSolutionType.DELIMITER_BASED){
           toClientData += "(custom_based)"+Constant.CUSTOM_SPLIT_MARK;
       }else if(Constant.STICKUP_SOLUTION_TYPE == StickupSolutionType.FIXED_LEN){
           toClientData = Constant.FIXED_LEN_SERVER_TXT;
       }
       ctx.writeAndFlush(Unpooled.copiedBuffer(toClientData.getBytes()));
       if(Constant.STICKUP_SOLUTION_TYPE == StickupSolutionType.SEND_LEN && null != user){
           ctx.fireChannelRead(user);
       }
   }
   /**
    *
    * @param ctx 處理器上下文
    * @throws Exception
    */
   @Override
   public void channelReadComplete(ChannelHandlerContext ctx) throws Exception {
       ctx.fireChannelReadComplete();
       System.out.println("Client data recevied completed!");
   }

   /**
    * 管道關閉時觸發
    * @param ctx
    * @throws Exception
    */
   @Override
   public void channelInactive(ChannelHandlerContext ctx) throws Exception {
       System.out.println("Client: "+ctx.channel().remoteAddress()+" channel will close...");
   }
   /**
    * 異常處理器
    * @param ctx 處理器上下文
    * @param cause 異常
    * @throws Exception
    */
   @Override
   public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
       cause.printStackTrace();
       ctx.close();
   }
}

/**
* @author andychen https://blog.51cto.com/14815984
* @description:Netty服務端啓動器類
*/
public class NettyServerStarter {
   /**
    * 啓動服務器
    * @param args
    */
   public static void main(String[] args) {
       try {
           start();
       } catch (InterruptedException e) {
           e.printStackTrace();
       }
   }

   /**
    * 啓動服務器端
    */
   private static void start() throws InterruptedException {
       //線程組
       EventLoopGroup mainGroup = new NioEventLoopGroup();
       EventLoopGroup workGroup = new NioEventLoopGroup();
       try {
           //服務端啓動器
           ServerBootstrap boot = new ServerBootstrap();
           boot.group(mainGroup, workGroup)//將線程組綁定到啓動器
                   .channel(NioServerSocketChannel.class)//使用NIO進行網絡通信
                   .localAddress(new InetSocketAddress(Constant.SERV_PORT))//綁定本地端口監聽
                   .childHandler(new ChannelInitializerExt());//爲Channel添加業務處理器
           //異步綁定服務器端口,sync()方法阻塞,直到綁定完成
           ChannelFuture future = boot.bind().sync();
           System.out.println("Server address:"+Constant.SERV_PORT+" has bind complete,waiting for data...");
           //這裏通道會阻塞,直到通道關閉
           future.channel().closeFuture().sync();
       }finally {
           //優雅地關閉線程池組
           mainGroup.shutdownGracefully().sync();
           workGroup.shutdownGracefully().sync();
       }
   }
   /**
    * 通道初始化器擴展
    * 負責定義初始化各類業務ChannelHandler
    */
   private static class ChannelInitializerExt extends ChannelInitializer<Channel>{
       /**
        * 初始化Channel
        * @param channel 當前通道
        * @throws Exception
        */
       protected void initChannel(Channel channel) throws Exception {
           ChannelPipeline p = channel.pipeline();
           //添加對客戶端心跳檢測
           p.addLast("heartbeat-state",new IdleStateHandler(Constant.HEART_BEAT_TIMEOUT,0,0, TimeUnit.SECONDS));
           p.addLast("heartbeat-check", new ServerHeartbeatHandler());
           /**
            * 解決粘包半包問題辦法一:字符分隔
            */
           //支持一:回車分隔
           if(Constant.STICKUP_SOLUTION_TYPE == StickupSolutionType.LINE_BASED){
               p.addLast("linebase", new LineBasedFrameDecoder(Constant.DECODER_BYTE_MAX_LEN));
           }
           //支持二:自定義分隔符分隔
           if(Constant.STICKUP_SOLUTION_TYPE == StickupSolutionType.DELIMITER_BASED) {
               ByteBuf byteMark = Unpooled.copiedBuffer(Constant.CUSTOM_SPLIT_MARK.getBytes());
               p.addLast("custom", new DelimiterBasedFrameDecoder(Constant.DECODER_BYTE_MAX_LEN, byteMark));
           }
           /**
            * 解決粘包半包問題辦法二:固定長度
            */
           if(Constant.STICKUP_SOLUTION_TYPE == StickupSolutionType.FIXED_LEN) {
               p.addLast("fixlength", new FixedLengthFrameDecoder(Constant.FIXED_LEN_CLIENT_TXT.length()));
           }
           /**
            * 解決粘包半包問題辦法三:帶長度
            */
           if(Constant.STICKUP_SOLUTION_TYPE == StickupSolutionType.SEND_LEN) {
               p.addLast("sendlen", new LengthFieldBasedFrameDecoder(65535,0,2,0,2));
               p.addLast(new ProtoStuffDecoder<User>(User.class));
               p.addLast("linebase", new LineBasedFrameDecoder(Constant.DECODER_BYTE_MAX_LEN));
           }
           //業務處理Handler--每每此Handler是註冊在管道的最後節點
           p.addLast(new ServerChannelHandler());
       }
   }
}


3、心跳機制部分

/**
* @author andychen https://blog.51cto.com/14815984
* @description:客戶端心跳檢測處理器
*/
public class ClientHeartbeatHandler extends ChannelInboundHandlerAdapter {
   /**
    * 事件觸發器
    * @param ctx 處理器上下文
    * @param evt 觸發對象
    * @throws Exception
    */
   @Override
   public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception {
       /**
        * 構建心跳字節序列
        */
       String seq = Constant.HEART_BEAT_CLIN_MSG;
       if(Constant.STICKUP_SOLUTION_TYPE == StickupSolutionType.LINE_BASED ||
          Constant.STICKUP_SOLUTION_TYPE == StickupSolutionType.SEND_LEN){
           seq += System.getProperty("line.separator");
       }
       else if(Constant.STICKUP_SOLUTION_TYPE == StickupSolutionType.DELIMITER_BASED){
           seq += Constant.CUSTOM_SPLIT_MARK;
       }else if(Constant.STICKUP_SOLUTION_TYPE == StickupSolutionType.FIXED_LEN){
           seq = Constant.FIXED_LEN_SERVER_TXT;
       }
       final ByteBuf HEARBEAT_SEQ = Unpooled.unreleasableBuffer(Unpooled.copiedBuffer(seq, CharsetUtil.UTF_8));//ISO_8859_1
       /**
        * 發送心跳消息,並在發送失敗時關閉鏈接
        */
       if(evt instanceof IdleStateEvent){
           IdleState state = ((IdleStateEvent)evt).state();
           /**
            * 服務端是數據接收端,那麼客戶端就是Write
            * 客戶端5秒鐘未收到應答,則認爲發送心跳包檢測
            */
           if(state == IdleState.WRITER_IDLE){
               ctx.writeAndFlush(HEARBEAT_SEQ).addListener(ChannelFutureListener.CLOSE_ON_FAILURE);
           }
       }else{
           //非心跳事件,將它傳遞給下個處理器處理
           super.userEventTriggered(ctx, evt);
       }
   }
}

/**
* @author andychen https://blog.51cto.com/14815984
* @description:服務端心跳檢測處理器
*/
public class ServerHeartbeatHandler extends ChannelInboundHandlerAdapter {
   /**
    * 事件觸發器
    * @param ctx 處理器上下文
    * @param evt 觸發對象
    * @throws Exception
    */
   @Override
   public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception {
       /**
        * 構建心跳字節序列
        */
       String seq = Constant.HEART_BEAT_SERV_MSG;
       if(Constant.STICKUP_SOLUTION_TYPE == StickupSolutionType.LINE_BASED ||
          Constant.STICKUP_SOLUTION_TYPE == StickupSolutionType.SEND_LEN){
           seq += System.getProperty("line.separator");
       }
       else if(Constant.STICKUP_SOLUTION_TYPE == StickupSolutionType.DELIMITER_BASED){
           seq += Constant.CUSTOM_SPLIT_MARK;
       }else if(Constant.STICKUP_SOLUTION_TYPE == StickupSolutionType.FIXED_LEN){
           seq = Constant.FIXED_LEN_SERVER_TXT;
       }
       final ByteBuf HEARBEAT_SEQ = Unpooled.unreleasableBuffer(Unpooled.copiedBuffer(seq, CharsetUtil.UTF_8));//ISO_8859_1
       /**
        * 發送心跳消息,並在發送失敗時關閉鏈接
        */
       if(evt instanceof IdleStateEvent){
           IdleState state = ((IdleStateEvent)evt).state();
           /**
            * 客戶端是數據發送端,那麼服務器端就是Read
            * 服務器端5秒鐘未收到應答,則認爲發送心跳包檢測
            */
           if(state == IdleState.READER_IDLE){
               ctx.writeAndFlush(HEARBEAT_SEQ).addListener(ChannelFutureListener.CLOSE_ON_FAILURE);
           }
       }else{
           //非心跳事件,將它傳遞給下個處理器處理
           super.userEventTriggered(ctx, evt);
       }
   }
}


4、生產級檢活和重試機制部分

/**
* @author andychen https://blog.51cto.com/14815984
* @description:通道監視器設計類
* 負責定時監聽與對端的鏈接,當鏈接斷開時,自動重試鏈接
*/
@ChannelHandler.Sharable
public abstract class ChannelMonitor<T extends Channel> extends ChannelInboundHandlerAdapter
                                    implements MonitoredChannelHandler, TimerTask {
   private final String host;//監視的主機
   private final int port;//監視的端口
   private final Timer timer;//定時器
   private final AbstractBootstrap boot;//啓動對象
   private final AtomicInteger counter = new AtomicInteger(0);//重試次數統計
   public ChannelMonitor(String host, int port, Timer timer, AbstractBootstrap boot) {
       this.host = host;
       this.port = port;
       this.timer = timer;
       this.boot = boot;
   }

   /**
    * 運行定時任務檢測通道是否斷開
    * @param timeout 定時時間
    * @throws Exception
    */
   public void run(Timeout timeout) throws Exception {
       ChannelFuture future = null;

       /**
        * 設置綁定
        */
       //synchronized (this.boot) {
           if (this.boot instanceof ServerBootstrap) {
               ((ServerBootstrap) this.boot).childHandler(new ChannelInitializerExt());
               future = this.boot.bind(new InetSocketAddress(this.host, this.port));
           } else {
               this.boot.handler(new ChannelInitializerExt());
               future = ((Bootstrap) this.boot).connect(this.host, this.port);
           }
       //}
       /**
        * 監聽通道鏈接
        */
       future.addListener(new ChannelFutureListener() {
           public void operationComplete(ChannelFuture channelFuture) throws Exception {
               boolean success = channelFuture.isSuccess();
               System.out.println("Retry channel connection "+(success ? "success" : "fail")+"("+counter.get()+"times).");
               if(!success){
                   channelFuture.channel().pipeline().fireChannelInactive();
               }
           }
       });
   }

   /**
    * 鏈接斷開事件處理
    * @param ctx
    * @throws Exception
    */
   @Override
   public void channelInactive(ChannelHandlerContext ctx) throws Exception {
       System.out.println("Monitored channel will disconnection...");
       System.out.println("Monitored channel disconnected and will auto reconnect...");
       int times = this.counter.incrementAndGet();
       if(Constant.CH_RETRY_TIMES >= times){
           //逐步延長重試時間
           int timeout = 4 << times;
           this.timer.newTimeout(this, timeout, TimeUnit.MILLISECONDS);
       }
       //觸發對下個斷開事件的調用
       ctx.fireChannelInactive();
   }
   /**
    * 初始化器擴展
    */
   private class ChannelInitializerExt extends ChannelInitializer<T> {
       //加入handler
       protected void initChannel(T t) throws Exception {
           ChannelHandler[] handlers = setMonitorHandlers();
           t.pipeline().addLast(handlers);
       }
   }
}

/**
* @author andychen https://blog.51cto.com/14815984
* @description:客戶端監視器實現類
*/
@ChannelHandler.Sharable
public class ClientMonitorHandler extends ChannelMonitor {
   public ClientMonitorHandler(String host, int port, Timer timer, AbstractBootstrap boot) {
       super(host, port, timer, boot);
   }

   /**
    * 定義客戶端處理器
    * @return
    */
   public ChannelHandler[] setMonitorHandlers() {
       int aLen = (Constant.STICKUP_SOLUTION_TYPE == StickupSolutionType.SEND_LEN ? 7 : 5);
       ChannelHandler[] handlers = new ChannelHandler[aLen];
       handlers[0] = this;//監聽器自己也是Handler,這裏也加入
       handlers[1] = new IdleStateHandler(0, Constant.HEART_BEAT_TIMEOUT+1,0, TimeUnit.SECONDS);
       handlers[2] = new ClientHeartbeatHandler();
       /**
        * 解決粘包半包問題辦法一:字符分隔
        */
       //支持一:回車分隔
       if(Constant.STICKUP_SOLUTION_TYPE == StickupSolutionType.LINE_BASED){
           handlers[3] = new LineBasedFrameDecoder(Constant.DECODER_BYTE_MAX_LEN);
       }
       //支持二:自定義分隔符分隔
       if(Constant.STICKUP_SOLUTION_TYPE == StickupSolutionType.DELIMITER_BASED) {
           ByteBuf byteMark = Unpooled.copiedBuffer(Constant.CUSTOM_SPLIT_MARK.getBytes());
           handlers[3] = new DelimiterBasedFrameDecoder(Constant.DECODER_BYTE_MAX_LEN, byteMark);
       }
       /**
        * 解決粘包半包問題辦法二:固定長度
        */
       if(Constant.STICKUP_SOLUTION_TYPE == StickupSolutionType.FIXED_LEN) {
           handlers[3] = new FixedLengthFrameDecoder(Constant.FIXED_LEN_SERVER_TXT.length());
       }
       /**
        * 解決粘包半包問題辦法三:帶長度
        */
       if(Constant.STICKUP_SOLUTION_TYPE == StickupSolutionType.SEND_LEN){
           handlers[3] = new LengthFieldPrepender(2);
           handlers[4] = new ProtoStuffEncoder<User>(User.class);
           handlers[5] = new LineBasedFrameDecoder(Constant.DECODER_BYTE_MAX_LEN);
       }
       handlers[aLen-1] = new ClientChannelHandler();

       return handlers;
   }
}

/**
* @author andychen https://blog.51cto.com/14815984
* @description:定義被監視的ChannelHandler
*/
public interface MonitoredChannelHandler {
   /**
    * 設置被監視的ChannelHandler
    * @return
    */
   ChannelHandler[] setMonitorHandlers();
}


5、Protostuff編解碼器部分

/**
* @author andychen https://blog.51cto.com/14815984
* @description:ProtoStuff解碼器類
*/
public class ProtoStuffDecoder<T> extends MessageToMessageDecoder<ByteBuf> {
   private final RuntimeSchema<T> schema;
   public ProtoStuffDecoder(Class<T> clazz) {
       this.schema = (RuntimeSchema<T>) RuntimeSchema.createFrom(clazz);
   }

   /**
    * 自定義實現高效解碼方法
    * @param channelHandlerContext 處理器上下文
    * @param byteBuf 緩衝對象
    * @param list 對象集合
    * @throws Exception
    */
   protected void decode(ChannelHandlerContext channelHandlerContext, ByteBuf byteBuf, List<Object> list) throws Exception {
       int length = byteBuf.readableBytes();
       if(0 < length){
           byte[] bytes = new byte[length];
           byteBuf.getBytes(byteBuf.readerIndex(), bytes, 0, length);
           T t = this.schema.newMessage();
           ProtostuffIOUtil.mergeFrom(bytes, t, this.schema);
           list.add(t);
       }
   }
}

/**
* @author andychen https://blog.51cto.com/14815984
* @description:序列化編碼器
*/
public class ProtoStuffEncoder<T> extends MessageToByteEncoder<Object> {
   private final RuntimeSchema<T> schema;
   public ProtoStuffEncoder(Class<T> clazz) {
       this.schema = (RuntimeSchema<T>) RuntimeSchema.createFrom(clazz);
   }

   /**
    * 實現高效序列化方法
    * @param channelHandlerContext 處理器上下文
    * @param o 傳輸對象
    * @param byteBuf buffer緩衝
    * @throws Exception
    */
   protected void encode(ChannelHandlerContext channelHandlerContext, Object o, ByteBuf byteBuf) throws Exception {
      byte[] bytes = ProtostuffIOUtil.toByteArray((T)o, this.schema,
              LinkedBuffer.allocate(LinkedBuffer.DEFAULT_BUFFER_SIZE));
      byteBuf.writeBytes(bytes);
   }
}


6、其它協同工具

/**
* @author andychen https://blog.51cto.com/14815984
* @description:系統相關常量配置
* 建議直接從全局應用配置文件中
*/
public class Constant {
   /**
    * 服務器端ip
    */
   public static final String SERV_HOST = "127.0.0.1";
   /**
    * 服務端監聽端口
    */
   public static final int SERV_PORT = 6888;
   /**
    * 解碼器處理字節最大長度
    */
   public static final int DECODER_BYTE_MAX_LEN = 1024;
   /**
    * 固定長度文本
    */
   public static final String FIXED_LEN_CLIENT_TXT = "Netty Client Fixed Len Text";
   /**
    * 固定長度文本
    */
   public static final String FIXED_LEN_SERVER_TXT = "Netty Server Fixed Len Text";
   /**
    * 自定義分隔標記
    */
   public static final String CUSTOM_SPLIT_MARK = "@#";
   /**
    * 粘包解決方法
    */
   public static final StickupSolutionType STICKUP_SOLUTION_TYPE = StickupSolutionType.LINE_BASED;
   /**
    * 心跳超時時間
    */
   public static final int HEART_BEAT_TIMEOUT = 4;
   /**
    * 服務端心跳
    */
   public static final String HEART_BEAT_SERV_MSG = "SERV_HEARTBEAT";
   /**
    * 客戶端心跳
    */
   public static final String HEART_BEAT_CLIN_MSG = "CLIN_HEARTBEAT";
   /**
    * 通道重試次數
    */
   public static final int CH_RETRY_TIMES = 30;
}

/**
 * @author andychen https://blog.51cto.com/14815984
 * @description:粘包解決類型定義
 */
public enum StickupSolutionType{
    LINE_BASED(1, "回車分隔"),
    DELIMITER_BASED(2,"自定義分隔"),
    FIXED_LEN(4, "固定長度"),
    SEND_LEN(8, "帶長度");


    public int getCode() {
        return code;
    }

    public String getMsg() {
        return msg;
    }

    private final int code;
    private final String msg;
    StickupSolutionType(int code, String msg) {
        this.code = code;
        this.msg = msg;
    }
}

運行結果

image.png

image.png

image.png

image.png

image.png


image.png

總結

基於Netty的網絡編程高效性、穩定性、擴展性和易用性遠不止於此,後面咱們將陸續展現Netty在其它領域的應用和相關實戰,給你們分享不一樣的網絡編程場景體驗與借鑑。

相關文章
相關標籤/搜索