淺析 Netty 實現心跳機制與斷線重連

基礎

何爲心跳

顧名思義, 所謂 心跳, 即在 TCP 長鏈接中, 客戶端和服務器之間按期發送的一種特殊的數據包, 通知對方本身還在線, 以確保 TCP 鏈接的有效性.java

爲何須要心跳

由於網絡的不可靠性, 有可能在 TCP 保持長鏈接的過程當中, 因爲某些突發狀況, 例如網線被拔出, 忽然掉電等, 會形成服務器和客戶端的鏈接中斷. 在這些突發狀況下, 若是剛好服務器和客戶端之間沒有交互的話, 那麼它們是不能在短期內發現對方已經掉線的. 爲了解決這個問題, 咱們就須要引入 心跳 機制. 心跳機制的工做原理是: 在服務器和客戶端之間必定時間內沒有數據交互時, 即處於 idle 狀態時, 客戶端或服務器會發送一個特殊的數據包給對方, 當接收方收到這個數據報文後, 也當即發送一個特殊的數據報文, 迴應發送方, 此即一個 PING-PONG 交互. 天然地, 當某一端收到心跳消息後, 就知道了對方仍然在線, 這就確保 TCP 鏈接的有效性.git

如何實現心跳

咱們能夠經過兩種方式實現心跳機制:github

  • 使用 TCP 協議層面的 keepalive 機制.shell

  • 在應用層上實現自定義的心跳機制.bootstrap

雖然在 TCP 協議層面上, 提供了 keepalive 保活機制, 可是使用它有幾個缺點:segmentfault

  1. 它不是 TCP 的標準協議, 而且是默認關閉的.服務器

  2. TCP keepalive 機制依賴於操做系統的實現, 默認的 keepalive 心跳時間是 兩個小時, 而且對 keepalive 的修改須要系統調用(或者修改系統配置), 靈活性不夠.網絡

  3. TCP keepalive 與 TCP 協議綁定, 所以若是須要更換爲 UDP 協議時, keepalive 機制就失效了.dom

雖然使用 TCP 層面的 keepalive 機制比自定義的應用層心跳機制節省流量, 可是基於上面的幾點缺點, 通常的實踐中, 人們大多數都是選擇在應用層上實現自定義的心跳.
既然如此, 那麼咱們就來大體看看在在 Netty 中是怎麼實現心跳的吧. 在 Netty 中, 實現心跳機制的關鍵是 IdleStateHandler, 它能夠對一個 Channel 的 讀/寫設置定時器, 當 Channel 在必定事件間隔內沒有數據交互時(即處於 idle 狀態), 就會觸發指定的事件.socket

使用 Netty 實現心跳

上面咱們提到了, 在 Netty 中, 實現心跳機制的關鍵是 IdleStateHandler, 那麼這個 Handler 如何使用呢? 咱們來看看它的構造器:

public IdleStateHandler(int readerIdleTimeSeconds, int writerIdleTimeSeconds, int allIdleTimeSeconds) {
    this((long)readerIdleTimeSeconds, (long)writerIdleTimeSeconds, (long)allIdleTimeSeconds, TimeUnit.SECONDS);
}

實例化一個 IdleStateHandler 須要提供三個參數:

  • readerIdleTimeSeconds, 讀超時. 即當在指定的時間間隔內沒有從 Channel 讀取到數據時, 會觸發一個 READER_IDLE 的 IdleStateEvent 事件.

  • writerIdleTimeSeconds, 寫超時. 即當在指定的時間間隔內沒有數據寫入到 Channel 時, 會觸發一個 WRITER_IDLE 的 IdleStateEvent 事件.

  • allIdleTimeSeconds, 讀/寫超時. 即當在指定的時間間隔內沒有讀或寫操做時, 會觸發一個 ALL_IDLE 的 IdleStateEvent 事件.

爲了展現具體的 IdleStateHandler 實現的心跳機制, 下面咱們來構造一個具體的EchoServer 的例子, 這個例子的行爲以下:

  1. 在這個例子中, 客戶端和服務器經過 TCP 長鏈接進行通訊.

  2. TCP 通訊的報文格式是:

+--------+-----+---------------+ 
| Length |Type |   Content     |
|   17   |  1  |"HELLO, WORLD" |
+--------+-----+---------------+
  1. 客戶端每隔一個隨機的時間後, 向服務器發送消息, 服務器收到消息後, 當即將收到的消息原封不動地回覆給客戶端.

  2. 若客戶端在指定的時間間隔內沒有讀/寫操做, 則客戶端會自動向服務器發送一個 PING 心跳, 服務器收到 PING 心跳消息時, 須要回覆一個 PONG 消息.

下面所使用的代碼例子能夠在個人 Github github.com/yongshun/some_java_code 上找到.

通用部分

根據上面定義的行爲, 咱們接下來實現心跳的通用部分 CustomHeartbeatHandler:

/**
 * @author xiongyongshun
 * @version 1.0
 * @email yongshun1228@gmail.com
 * @created 16/9/18 13:02
 */
public abstract class CustomHeartbeatHandler extends SimpleChannelInboundHandler<ByteBuf> {
    public static final byte PING_MSG = 1;
    public static final byte PONG_MSG = 2;
    public static final byte CUSTOM_MSG = 3;
    protected String name;
    private int heartbeatCount = 0;

    public CustomHeartbeatHandler(String name) {
        this.name = name;
    }

    @Override
    protected void channelRead0(ChannelHandlerContext context, ByteBuf byteBuf) throws Exception {
        if (byteBuf.getByte(4) == PING_MSG) {
            sendPongMsg(context);
        } else if (byteBuf.getByte(4) == PONG_MSG){
            System.out.println(name + " get pong msg from " + context.channel().remoteAddress());
        } else {
            handleData(context, byteBuf);
        }
    }

    protected void sendPingMsg(ChannelHandlerContext context) {
        ByteBuf buf = context.alloc().buffer(5);
        buf.writeInt(5);
        buf.writeByte(PING_MSG);
        context.writeAndFlush(buf);
        heartbeatCount++;
        System.out.println(name + " sent ping msg to " + context.channel().remoteAddress() + ", count: " + heartbeatCount);
    }

    private void sendPongMsg(ChannelHandlerContext context) {
        ByteBuf buf = context.alloc().buffer(5);
        buf.writeInt(5);
        buf.writeByte(PONG_MSG);
        context.channel().writeAndFlush(buf);
        heartbeatCount++;
        System.out.println(name + " sent pong msg to " + context.channel().remoteAddress() + ", count: " + heartbeatCount);
    }

    protected abstract void handleData(ChannelHandlerContext channelHandlerContext, ByteBuf byteBuf);

    @Override
    public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception {
        // IdleStateHandler 所產生的 IdleStateEvent 的處理邏輯.
        if (evt instanceof IdleStateEvent) {
            IdleStateEvent e = (IdleStateEvent) evt;
            switch (e.state()) {
                case READER_IDLE:
                    handleReaderIdle(ctx);
                    break;
                case WRITER_IDLE:
                    handleWriterIdle(ctx);
                    break;
                case ALL_IDLE:
                    handleAllIdle(ctx);
                    break;
                default:
                    break;
            }
        }
    }

    @Override
    public void channelActive(ChannelHandlerContext ctx) throws Exception {
        System.err.println("---" + ctx.channel().remoteAddress() + " is active---");
    }

    @Override
    public void channelInactive(ChannelHandlerContext ctx) throws Exception {
        System.err.println("---" + ctx.channel().remoteAddress() + " is inactive---");
    }

    protected void handleReaderIdle(ChannelHandlerContext ctx) {
        System.err.println("---READER_IDLE---");
    }

    protected void handleWriterIdle(ChannelHandlerContext ctx) {
        System.err.println("---WRITER_IDLE---");
    }

    protected void handleAllIdle(ChannelHandlerContext ctx) {
        System.err.println("---ALL_IDLE---");
    }
}

類 CustomHeartbeatHandler 負責心跳的發送和接收, 咱們接下來詳細地分析一下它的做用. 咱們在前面提到, IdleStateHandler 是實現心跳的關鍵, 它會根據不一樣的 IO idle 類型來產生不一樣的 IdleStateEvent 事件, 而這個事件的捕獲, 其實就是在 userEventTriggered 方法中實現的.
咱們來看看 CustomHeartbeatHandler.userEventTriggered 的具體實現:

@Override
public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception {
    if (evt instanceof IdleStateEvent) {
        IdleStateEvent e = (IdleStateEvent) evt;
        switch (e.state()) {
            case READER_IDLE:
                handleReaderIdle(ctx);
                break;
            case WRITER_IDLE:
                handleWriterIdle(ctx);
                break;
            case ALL_IDLE:
                handleAllIdle(ctx);
                break;
            default:
                break;
        }
    }
}

在 userEventTriggered 中, 根據 IdleStateEvent 的 state() 的不一樣, 而進行不一樣的處理. 例如若是是讀取數據 idle, 則 e.state() == READER_IDLE, 所以就調用 handleReaderIdle 來處理它. CustomHeartbeatHandler 提供了三個 idle 處理方法: handleReaderIdle, handleWriterIdle, handleAllIdle, 這三個方法目前只有默認的實現, 它須要在子類中進行重寫, 如今咱們暫時略過它們, 在具體的客戶端和服務器的實現部分時再來看它們.

知道了這一點後, 咱們接下來看看數據處理部分:

@Override
protected void channelRead0(ChannelHandlerContext context, ByteBuf byteBuf) throws Exception {
    if (byteBuf.getByte(4) == PING_MSG) {
        sendPongMsg(context);
    } else if (byteBuf.getByte(4) == PONG_MSG){
        System.out.println(name + " get pong msg from " + context.channel().remoteAddress());
    } else {
        handleData(context, byteBuf);
    }
}

在 CustomHeartbeatHandler.channelRead0 中, 咱們首先根據報文協議:

+--------+-----+---------------+ 
| Length |Type |   Content     |
|   17   |  1  |"HELLO, WORLD" |
+--------+-----+---------------+

來判斷當前的報文類型, 若是是 PING_MSG 則表示是服務器收到客戶端的 PING 消息, 此時服務器須要回覆一個 PONG 消息, 其消息類型是 PONG_MSG.
扔報文類型是 PONG_MSG, 則表示是客戶端收到服務器發送的 PONG 消息, 此時打印一個 log 便可.

客戶端部分

客戶端初始化

public class Client {
    public static void main(String[] args) {
        NioEventLoopGroup workGroup = new NioEventLoopGroup(4);
        Random random = new Random(System.currentTimeMillis());
        try {
            Bootstrap bootstrap = new Bootstrap();
            bootstrap
                    .group(workGroup)
                    .channel(NioSocketChannel.class)
                    .handler(new ChannelInitializer<SocketChannel>() {
                        protected void initChannel(SocketChannel socketChannel) throws Exception {
                            ChannelPipeline p = socketChannel.pipeline();
                            p.addLast(new IdleStateHandler(0, 0, 5));
                            p.addLast(new LengthFieldBasedFrameDecoder(1024, 0, 4, -4, 0));
                            p.addLast(new ClientHandler());
                        }
                    });

            Channel ch = bootstrap.remoteAddress("127.0.0.1", 12345).connect().sync().channel();
            for (int i = 0; i < 10; i++) {
                String content = "client msg " + i;
                ByteBuf buf = ch.alloc().buffer();
                buf.writeInt(5 + content.getBytes().length);
                buf.writeByte(CustomHeartbeatHandler.CUSTOM_MSG);
                buf.writeBytes(content.getBytes());
                ch.writeAndFlush(buf);

                Thread.sleep(random.nextInt(20000));
            }
        } catch (Exception e) {
            throw new RuntimeException(e);
        } finally {
            workGroup.shutdownGracefully();
        }
    }
}

上面的代碼是 Netty 的客戶端端的初始化代碼, 使用過 Netty 的朋友對這個代碼應該不會陌生. 別的部分咱們就再也不贅述, 咱們來看看 ChannelInitializer.initChannel 部分便可:

.handler(new ChannelInitializer<SocketChannel>() {
    protected void initChannel(SocketChannel socketChannel) throws Exception {
        ChannelPipeline p = socketChannel.pipeline();
        p.addLast(new IdleStateHandler(0, 0, 5));
        p.addLast(new LengthFieldBasedFrameDecoder(1024, 0, 4, -4, 0));
        p.addLast(new ClientHandler());
    }
});

咱們給 pipeline 添加了三個 Handler, IdleStateHandler 這個 handler 是心跳機制的核心, 咱們爲客戶端端設置了讀寫 idle 超時, 時間間隔是5s, 即若是客戶端在間隔 5s 後都沒有收到服務器的消息或向服務器發送消息, 則產生 ALL_IDLE 事件.
接下來咱們添加了 LengthFieldBasedFrameDecoder, 它是負責解析咱們的 TCP 報文, 由於和本文的目的無關, 所以這裏不詳細展開.
最後一個 Handler 是 ClientHandler, 它繼承於 CustomHeartbeatHandler, 是咱們處理業務邏輯部分.

客戶端 Handler

public class ClientHandler extends CustomHeartbeatHandler {
    public ClientHandler() {
        super("client");
    }

    @Override
    protected void handleData(ChannelHandlerContext channelHandlerContext, ByteBuf byteBuf) {
        byte[] data = new byte[byteBuf.readableBytes() - 5];
        byteBuf.skipBytes(5);
        byteBuf.readBytes(data);
        String content = new String(data);
        System.out.println(name + " get content: " + content);
    }

    @Override
    protected void handleAllIdle(ChannelHandlerContext ctx) {
        super.handleAllIdle(ctx);
        sendPingMsg(ctx);
    }
}

ClientHandler 繼承於 CustomHeartbeatHandler, 它重寫了兩個方法, 一個是 handleData, 在這裏面實現 僅僅打印收到的消息.
第二個重寫的方法是 handleAllIdle. 咱們在前面提到, 客戶端負責發送心跳的 PING 消息, 當客戶端產生一個 ALL_IDLE 事件後, 會致使父類的 CustomHeartbeatHandler.userEventTriggered 調用, 而 userEventTriggered 中會根據 e.state() 來調用不一樣的方法, 所以最後調用的是 ClientHandler.handleAllIdle, 在這個方法中, 客戶端調用 sendPingMsg 向服務器發送一個 PING 消息.

服務器部分

服務器初始化

public class Server {
    public static void main(String[] args) {
        NioEventLoopGroup bossGroup = new NioEventLoopGroup(1);
        NioEventLoopGroup workGroup = new NioEventLoopGroup(4);
        try {
            ServerBootstrap bootstrap = new ServerBootstrap();
            bootstrap
                    .group(bossGroup, workGroup)
                    .channel(NioServerSocketChannel.class)
                    .childHandler(new ChannelInitializer<SocketChannel>() {
                        protected void initChannel(SocketChannel socketChannel) throws Exception {
                            ChannelPipeline p = socketChannel.pipeline();
                            p.addLast(new IdleStateHandler(10, 0, 0));
                            p.addLast(new LengthFieldBasedFrameDecoder(1024, 0, 4, -4, 0));
                            p.addLast(new ServerHandler());
                        }
                    });

            Channel ch = bootstrap.bind(12345).sync().channel();
            ch.closeFuture().sync();
        } catch (Exception e) {
            throw new RuntimeException(e);
        } finally {
            bossGroup.shutdownGracefully();
            workGroup.shutdownGracefully();
        }
    }
}

服務器的初始化部分也沒有什麼好說的, 它也和客戶端的初始化同樣, 爲 pipeline 添加了三個 Handler.

服務器 Handler

public class ServerHandler extends CustomHeartbeatHandler {
    public ServerHandler() {
        super("server");
    }

    @Override
    protected void handleData(ChannelHandlerContext channelHandlerContext, ByteBuf buf) {
        byte[] data = new byte[buf.readableBytes() - 5];
        ByteBuf responseBuf = Unpooled.copiedBuffer(buf);
        buf.skipBytes(5);
        buf.readBytes(data);
        String content = new String(data);
        System.out.println(name + " get content: " + content);
        channelHandlerContext.write(responseBuf);
    }

    @Override
    protected void handleReaderIdle(ChannelHandlerContext ctx) {
        super.handleReaderIdle(ctx);
        System.err.println("---client " + ctx.channel().remoteAddress().toString() + " reader timeout, close it---");
        ctx.close();
    }
}

ServerHandler 繼承於 CustomHeartbeatHandler, 它重寫了兩個方法, 一個是 handleData, 在這裏面實現 EchoServer 的功能: 即收到客戶端的消息後, 當即原封不動地將消息回覆給客戶端.
第二個重寫的方法是 handleReaderIdle, 由於服務器僅僅對客戶端的讀 idle 感興趣, 所以只從新了這個方法. 若服務器在指定時間後沒有收到客戶端的消息, 則會觸發 READER_IDLE 消息, 進而會調用 handleReaderIdle 這個方法. 咱們在前面提到, 客戶端負責發送心跳的 PING 消息, 而且服務器的 READER_IDLE 的超時時間是客戶端發送 PING 消息的間隔的兩倍, 所以當服務器 READER_IDLE 觸發時, 就能夠肯定是客戶端已經掉線了, 所以服務器直接關閉客戶端鏈接便可.

總結

  1. 使用 Netty 實現心跳機制的關鍵就是利用 IdleStateHandler 來產生對應的 idle 事件.

  2. 通常是客戶端負責發送心跳的 PING 消息, 所以客戶端注意關注 ALL_IDLE 事件, 在這個事件觸發後, 客戶端須要向服務器發送 PING 消息, 告訴服務器"我還存活着".

  3. 服務器是接收客戶端的 PING 消息的, 所以服務器關注的是 READER_IDLE 事件, 而且服務器的 READER_IDLE 間隔須要比客戶端的 ALL_IDLE 事件間隔大(例如客戶端ALL_IDLE 是5s 沒有讀寫時觸發, 所以服務器的 READER_IDLE 能夠設置爲10s)

  4. 當服務器收到客戶端的 PING 消息時, 會發送一個 PONG 消息做爲回覆. 一個 PING-PONG 消息對就是一個心跳交互.

實現客戶端的斷線重連

public class Client {
    private NioEventLoopGroup workGroup = new NioEventLoopGroup(4);
    private Channel channel;
    private Bootstrap bootstrap;

    public static void main(String[] args) throws Exception {
        Client client = new Client();
        client.start();
        client.sendData();
    }

    public void sendData() throws Exception {
        Random random = new Random(System.currentTimeMillis());
        for (int i = 0; i < 10000; i++) {
            if (channel != null && channel.isActive()) {
                String content = "client msg " + i;
                ByteBuf buf = channel.alloc().buffer(5 + content.getBytes().length);
                buf.writeInt(5 + content.getBytes().length);
                buf.writeByte(CustomHeartbeatHandler.CUSTOM_MSG);
                buf.writeBytes(content.getBytes());
                channel.writeAndFlush(buf);
            }

            Thread.sleep(random.nextInt(20000));
        }
    }

    public void start() {
        try {
            bootstrap = new Bootstrap();
            bootstrap
                    .group(workGroup)
                    .channel(NioSocketChannel.class)
                    .handler(new ChannelInitializer<SocketChannel>() {
                        protected void initChannel(SocketChannel socketChannel) throws Exception {
                            ChannelPipeline p = socketChannel.pipeline();
                            p.addLast(new IdleStateHandler(0, 0, 5));
                            p.addLast(new LengthFieldBasedFrameDecoder(1024, 0, 4, -4, 0));
                            p.addLast(new ClientHandler(Client.this));
                        }
                    });
            doConnect();

        } catch (Exception e) {
            throw new RuntimeException(e);
        }
    }

    protected void doConnect() {
        if (channel != null && channel.isActive()) {
            return;
        }

        ChannelFuture future = bootstrap.connect("127.0.0.1", 12345);

        future.addListener(new ChannelFutureListener() {
            public void operationComplete(ChannelFuture futureListener) throws Exception {
                if (futureListener.isSuccess()) {
                    channel = futureListener.channel();
                    System.out.println("Connect to server successfully!");
                } else {
                    System.out.println("Failed to connect to server, try connect after 10s");

                    futureListener.channel().eventLoop().schedule(new Runnable() {
                        @Override
                        public void run() {
                            doConnect();
                        }
                    }, 10, TimeUnit.SECONDS);
                }
            }
        });
    }

}

上面的代碼中, 咱們抽象出 doConnect 方法, 它負責客戶端和服務器的 TCP 鏈接的創建, 而且當 TCP 鏈接失敗時, doConnect 會 經過 "channel().eventLoop().schedule" 來延時10s 後嘗試從新鏈接.

客戶端 Handler

public class ClientHandler extends CustomHeartbeatHandler {
    private Client client;
    public ClientHandler(Client client) {
        super("client");
        this.client = client;
    }

    @Override
    protected void handleData(ChannelHandlerContext channelHandlerContext, ByteBuf byteBuf) {
        byte[] data = new byte[byteBuf.readableBytes() - 5];
        byteBuf.skipBytes(5);
        byteBuf.readBytes(data);
        String content = new String(data);
        System.out.println(name + " get content: " + content);
    }

    @Override
    protected void handleAllIdle(ChannelHandlerContext ctx) {
        super.handleAllIdle(ctx);
        sendPingMsg(ctx);
    }

    @Override
    public void channelInactive(ChannelHandlerContext ctx) throws Exception {
        super.channelInactive(ctx);
        client.doConnect();
    }
}

斷線重連的關鍵一點是檢測鏈接是否已經斷開. 所以咱們改寫了 ClientHandler, 重寫了 channelInactive 方法. 當 TCP 鏈接斷開時, 會回調 channelInactive 方法, 所以咱們在這個方法中調用 client.doConnect() 來進行重連.

完整代碼能夠在個人 Github github.com/yongshun/some_java_code 上找到.

本文由 yongshun 發表於我的博客, 採用署名-非商業性使用-相同方式共享 3.0 中國大陸許可協議.
非商業轉載請註明做者及出處. 商業轉載請聯繫做者本人
Email: yongshun1228@gmail.com
本文標題爲: 淺析 Netty 實現心跳機制與斷線重連
本文連接爲: http://www.javashuo.com/article/p-ebxrbkeh-dq.html

相關文章
相關標籤/搜索