基於netty實現的長鏈接,心跳機制及重連機制

技術:maven3.0.5 + netty4.1.33 + jdk1.8
 

概述

Netty是由JBOSS提供的一個java開源框架。Netty提供異步的、事件驅動的網絡應用程序框架和工具,用以快速開發高性能、高可靠性的網絡服務器和客戶端程序。 也就是說,Netty 是一個基於NIO的客戶、服務器端編程框架,使用Netty 能夠確保你快速和簡單的開發出一個網絡應用,例如實現了某種協議的客戶、服務端應用。Netty至關於簡化和流線化了網絡應用的編程開發過程,例如:基於TCP和UDP的socket服務開發。 「快速」和「簡單」並不用產生維護性或性能上的問題。Netty 是一個吸取了多種協議(包括FTP、SMTP、HTTP等各類二進制文本協議)的實現經驗,並通過至關精心設計的項目。最終,Netty 成功的找到了一種方式,在保證易於開發的同時還保證了其應用的性能,穩定性和伸縮性。

詳細

 

詳細

本篇demo實現的功能是基於netty的心跳機制和長鏈接以及重連機制,最關鍵的就是經過netty中的 IdleStateHandler 的超時機制來實現心跳和重連 ,而後經過org.msgpack編碼器來實現跨平臺數據傳輸,html

實現的功能就是經過Scanner來輸入消息獲得服務端的迴應,超過設定的超時時間就觸發超時事件來進行心跳傳輸,若是服務端宕機客戶端就會一直髮起重連。java

1、運行效果編程

 

服務端:bootstrap

image.png

 

客戶端:服務器

image.png

2、實現過程

  1. 在maven pom文件添加依賴:網絡

     

     

     

     

  2.        <!-- 解碼and編碼器 -->
            <!-- https://mvnrepository.com/artifact/org.msgpack/msgpack -->
            <dependency>
                <groupId>org.msgpack</groupId>
                <artifactId>msgpack</artifactId>
                <version>0.6.12</version>
            </dependency>
            <!-- netty 核心依賴 -->
            <!-- https://mvnrepository.com/artifact/io.netty/netty-all -->
          <dependency>
    	    <groupId>io.netty</groupId>
    	    <artifactId>netty-all</artifactId>
    	    <version>4.1.33.Final</version>
    	  </dependency>
  3. 導入以上依賴 
    ↓ 
    建立配置模型model(模型類) , TypeData(參數配置類) 
    ↓ 
    建立解碼and編碼器MsgPckDecode(解碼器) ,MsgPckEncode(編碼器) 
    ↓ 
    建立各自的控制器 AbstractClientChannelInboundHandleAdapter,AbstractServerChannelInboundHandleAdapter
    ↓ 
    建立客戶端及客戶端控制器Client(客戶端啓動類) , ClientHandler(客戶端控制器) 
    ↓ 
    建立服務端以及控制器Server(客戶端啓動類) , ServerHandler(客戶端控制器)
    
    ps:本demo使用了msgpack , It’s like JSON. but fast and small.
  4. package com.zxh.demo.model;
    
    import java.io.Serializable;
    import org.msgpack.annotation.Message;
    /**
     * 消息類型分離器
     * @author Administrator
     *
     */
    @Message
    public class Model implements Serializable{
    
        private static final long serialVersionUID = 1L;
    
        //類型
        private int type;
    
        //內容
        private String body;
    
        public String getBody() {
            return body;
        }
    
        public void setBody(String body) {
            this.body = body;
        }
    
        public int getType() {
            return type;
        }
    
        public void setType(int type) {
            this.type = type;
        }
    
        @Override
        public String toString() {
            return "Model [type=" + type + ", body=" + body + "]";
        }
    }
  5. 編寫一個配置類接口,用於控制心跳包和應用消息的處理
  6. package com.zxh.demo.model;
    
    /**
     * 配置項
     * @author Administrator
     *
     */
    public interface TypeData {
    
        byte PING = 1;
    
        byte PONG = 2;  
        //顧客
        byte CUSTOMER = 3;
    }

    建立MsgPckDecode(解碼器)框架

  7. package com.zxh.demo.model;
    
    import java.util.List;
    import org.msgpack.MessagePack;
    import io.netty.buffer.ByteBuf;
    import io.netty.channel.ChannelHandlerContext;
    import io.netty.handler.codec.MessageToMessageDecoder;
    
    /**
     * 解碼器
     * @author Administrator
     *
     */
    public class MsgPckDecode extends MessageToMessageDecoder<ByteBuf>{
    
        @Override
        protected void decode(ChannelHandlerContext ctx, ByteBuf msg,
                List<Object> out) throws Exception {
    
            final  byte[] array;
    
            final int length = msg.readableBytes();
    
            array = new byte[length];
    
            msg.getBytes(msg.readerIndex(), array, 0, length);
    
            MessagePack pack = new MessagePack();
    
            out.add(pack.read(array, Model.class));
    
        }
    }
  8. 建立MsgPckEncode(編碼器)
  9. package com.zxh.demo.model;
    
    import org.msgpack.MessagePack;
    import io.netty.buffer.ByteBuf;
    import io.netty.channel.ChannelHandlerContext;
    import io.netty.handler.codec.MessageToByteEncoder;
    
    /**
     * 編碼器
     * @author Administrator
     *
     */
    public class MsgPckEncode extends MessageToByteEncoder<Object>{
    
        @Override
        protected void encode(ChannelHandlerContext ctx, Object msg, ByteBuf buf)
                throws Exception {
            // TODO Auto-generated method stub
            MessagePack pack = new MessagePack();
    
            byte[] write = pack.write(msg);
    
            buf.writeBytes(write);
    
        }
    }
  10. 建立client客戶端:
  11. package com.zxh.demo.client;
    
    import java.util.Scanner;
    import java.util.concurrent.TimeUnit;
    
    import com.zxh.demo.model.Model;
    import com.zxh.demo.model.MsgPckDecode;
    import com.zxh.demo.model.MsgPckEncode;
    import com.zxh.demo.model.TypeData;
    
    import io.netty.bootstrap.Bootstrap;
    import io.netty.channel.Channel;
    import io.netty.channel.ChannelFuture;
    import io.netty.channel.ChannelFutureListener;
    import io.netty.channel.ChannelInitializer;
    import io.netty.channel.ChannelOption;
    import io.netty.channel.ChannelPipeline;
    import io.netty.channel.nio.NioEventLoopGroup;
    import io.netty.channel.socket.nio.NioSocketChannel;
    import io.netty.handler.timeout.IdleStateHandler;
    
    public class Client {
    
        private NioEventLoopGroup worker = new NioEventLoopGroup();
    
        private Channel channel;
    
        private Bootstrap bootstrap;
    
        public static void main(String[] args) {
            Client  client = new Client();
    
            client.start();
    
            client.sendData();      
        }
    
        private void start() {
            bootstrap = new Bootstrap();        
            bootstrap.group(worker)
            .channel(NioSocketChannel.class)
            .option(ChannelOption.TCP_NODELAY, true)
            .handler(new ChannelInitializer<Channel>() {
                @Override
                protected void initChannel(Channel ch) throws Exception {
                    // TODO Auto-generated method stub
                    ChannelPipeline pipeline = ch.pipeline();
    
                    pipeline.addLast(new IdleStateHandler(0,0,5));
    
                    pipeline.addLast(new MsgPckDecode());
    
                    pipeline.addLast(new MsgPckEncode());
    
                    pipeline.addLast(new ClientHandler(Client.this));              
                }           
            }); 
            doConnect();
        }
    
        /**
         * 鏈接服務端 and 重連
         */
        protected void doConnect() {
    
            if (channel != null && channel.isActive()){
                return;
            }       
            ChannelFuture connect = bootstrap.connect("127.0.0.1", 8081);
            //實現監聽通道鏈接的方法
            connect.addListener(new ChannelFutureListener() {
    
                @Override
                public void operationComplete(ChannelFuture channelFuture) throws Exception {
    
                    if(channelFuture.isSuccess()){
                        channel = channelFuture.channel();
                        System.out.println("鏈接服務端成功");
                    }else{
                        System.out.println("每隔2s重連....");
                        channelFuture.channel().eventLoop().schedule(new Runnable() {
    
                            @Override
                            public void run() {
                                doConnect();
                            }
                        },2,TimeUnit.SECONDS);
                    }   
                }
            });     
        }   
        /**
         * 向服務端發送消息
         */
        private void sendData() {
            Scanner sc= new Scanner(System.in); 
            for (int i = 0; i < 1000; i++) {
    
                if(channel != null && channel.isActive()){              
                    //獲取一個鍵盤掃描器
                    String nextLine = sc.nextLine();
                    Model model = new Model();
    
                    model.setType(TypeData.CUSTOMER);
    
                    model.setBody(nextLine);
    
                    channel.writeAndFlush(model);
                }
            }
        }
    }
  12. 建立Server服務端:
  13. package com.zxh.demo.server;
    import com.zxh.demo.model.MsgPckDecode;
    import com.zxh.demo.model.MsgPckEncode;
    
    import io.netty.bootstrap.ServerBootstrap;
    import io.netty.channel.Channel;
    import io.netty.channel.ChannelFuture;
    import io.netty.channel.ChannelInitializer;
    import io.netty.channel.ChannelPipeline;
    import io.netty.channel.EventLoopGroup;
    import io.netty.channel.nio.NioEventLoopGroup;
    import io.netty.channel.socket.nio.NioServerSocketChannel;
    import io.netty.handler.timeout.IdleStateHandler;
    
    public class Server {
        public static void main(String[] args) {
            EventLoopGroup bossGroup = new NioEventLoopGroup(1);
    
            EventLoopGroup workerGroup = new NioEventLoopGroup(4);
            try {
                ServerBootstrap serverBootstrap = new ServerBootstrap();
    
                serverBootstrap.group(bossGroup, workerGroup)
                .channel(NioServerSocketChannel.class)
                .localAddress(8081)
                .childHandler(new ChannelInitializer<Channel>() {
    
                    @Override
                    protected void initChannel(Channel ch) throws Exception {
                        // TODO Auto-generated method stub
                        ChannelPipeline pipeline = ch.pipeline();
                        pipeline.addLast(new IdleStateHandler(10,0,0));
                        pipeline.addLast(new MsgPckDecode());
                        pipeline.addLast(new MsgPckEncode());
                        pipeline.addLast(new ServerHandler()); 
                    }
                });         
                System.out.println("start server by port 8081 --");
                ChannelFuture sync = serverBootstrap.bind().sync();
                sync.channel().closeFuture().sync();
            } catch (InterruptedException e) {
                // TODO Auto-generated catch block
                e.printStackTrace();
            }finally{
                //優雅的關閉資源
                bossGroup.shutdownGracefully();
                workerGroup.shutdownGracefully();
            }
        }
    }

 

先運行服務端,而後再啓動客戶端 會根據設置的端口鏈接服務端,在客戶端輸入消息就會獲得服務端的迴應,若是超過5秒沒有進行讀寫就會觸發IdleStateHandler類超時事件 來進行心跳包的傳輸 ,服務端未檢測到客戶端的讀寫或者心跳就會主動關閉channel通道異步

 

3、項目結構圖socket

 

image.png

 

4、補充

 

所謂的心跳, 即在 TCP 長鏈接中, 客戶端和服務器之間按期發送的一種特殊的數據包, 通知對方本身還在線, 以確保 TCP 鏈接的有效性.由於網絡的不可靠性, 有可能在 TCP 保持長鏈接的過程當中, 因爲某些突發狀況, 例如網線被拔出, 忽然掉電等, 會形成服務器和客戶端的鏈接中斷. 在這些突發狀況下, 若是剛好服務器和客戶端之間沒有交互的話, 那麼它們是不能在短期內發現對方已經掉線的. 爲了解決這個問題, 咱們就須要引入 心跳 機制. 心跳機制的工做原理是: 在服務器和客戶端之間必定時間內沒有數據交互時, 即處於 idle 狀態時, 客戶端或服務器會發送一個特殊的數據包給對方, 當接收方收到這個數據報文後, 也當即發送一個特殊的數據報文, 迴應發送方, 此即一個 PING-PONG 交互. 天然地, 當某一端收到心跳消息後, 就知道了對方仍然在線, 這就確保 TCP 鏈接的有效性maven

 

 

 

注:本文著做權歸做者,由demo大師發表,拒絕轉載,轉載須要做者受權

相關文章
相關標籤/搜索