Netty 如何實現心跳機制與斷線重連?

做者:sprinkle_liz
www.jianshu.com/p/1a28e48edd92

心跳機制java

何爲心跳面試

所謂心跳, 即在 TCP 長鏈接中, 客戶端和服務器之間按期發送的一種特殊的數據包, 通知對方本身還在線, 以確保 TCP 鏈接的有效性.bootstrap

注:心跳包還有另外一個做用,常常被忽略,即: 一個鏈接若是長時間不用,防火牆或者路由器就會斷開該鏈接。

如何實現後端

核心Handler —— IdleStateHandler服務器

在 Netty 中, 實現心跳機制的關鍵是 IdleStateHandler, 那麼這個 Handler 如何使用呢? 先看下它的構造器:多線程

public IdleStateHandler(int readerIdleTimeSeconds, int writerIdleTimeSeconds, int allIdleTimeSeconds) {
    this((long)readerIdleTimeSeconds, (long)writerIdleTimeSeconds, (long)allIdleTimeSeconds, TimeUnit.SECONDS);
}

這裏解釋下三個參數的含義:架構

  • readerIdleTimeSeconds: 讀超時. 即當在指定的時間間隔內沒有從 Channel 讀取到數據時, 會觸發一個 READER_IDLE 的 IdleStateEvent 事件.
  • writerIdleTimeSeconds: 寫超時. 即當在指定的時間間隔內沒有數據寫入到 Channel 時, 會觸發一個 WRITER_IDLE 的 IdleStateEvent 事件.
  • allIdleTimeSeconds: 讀/寫超時. 即當在指定的時間間隔內沒有讀或寫操做時, 會觸發一個 ALL_IDLE 的 IdleStateEvent 事件.
注:這三個參數默認的時間單位是秒。若須要指定其餘時間單位,可使用另外一個構造方法:IdleStateHandler(boolean observeOutput, long readerIdleTime, long writerIdleTime, long allIdleTime, TimeUnit unit)

在看下面的實現以前,建議先了解一下IdleStateHandler的實現原理。dom

下面直接上代碼,須要注意的地方,會在代碼中經過註釋進行說明。tcp

使用IdleStateHandler實現心跳ide

下面將使用IdleStateHandler來實現心跳,Client端鏈接到Server端後,會循環執行一個任務:隨機等待幾秒而後ping一下Server即發送一個心跳包。當等待的時間超過規定時間,將會發送失敗,覺得Server端在此以前已經主動斷開鏈接了。代碼以下:

Client端

ClientIdleStateTrigger —— 心跳觸發器

類ClientIdleStateTrigger也是一個Handler,只是重寫了userEventTriggered方法,用於捕獲IdleState.WRITER_IDLE事件(未在指定時間內向服務器發送數據),而後向Server端發送一個心跳包。

/**
 * <p>
 *  用於捕獲{@link IdleState#WRITER_IDLE}事件(未在指定時間內向服務器發送數據),而後向<code>Server</code>端發送一個心跳包。
 * </p>
 */
public class ClientIdleStateTrigger extends ChannelInboundHandlerAdapter {

    public static final String HEART_BEAT = "heart beat!";

    @Override
    public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception {
        if (evt instanceof IdleStateEvent) {
            IdleState state = ((IdleStateEvent) evt).state();
            if (state == IdleState.WRITER_IDLE) {
                // write heartbeat to server
                ctx.writeAndFlush(HEART_BEAT);
            }
        } else {
            super.userEventTriggered(ctx, evt);
        }
    }

}

Pinger —— 心跳發射器

/**
 * <p>客戶端鏈接到服務器端後,會循環執行一個任務:隨機等待幾秒,而後ping一下Server端,即發送一個心跳包。</p>
 */
public class Pinger extends ChannelInboundHandlerAdapter {

    private Random random = new Random();
    private int baseRandom = 8;

    private Channel channel;

    @Override
    public void channelActive(ChannelHandlerContext ctx) throws Exception {
        super.channelActive(ctx);
        this.channel = ctx.channel();

        ping(ctx.channel());
    }

    private void ping(Channel channel) {
        int second = Math.max(1, random.nextInt(baseRandom));
        System.out.println("next heart beat will send after " + second + "s.");
        ScheduledFuture<?> future = channel.eventLoop().schedule(new Runnable() {
            @Override
            public void run() {
                if (channel.isActive()) {
                    System.out.println("sending heart beat to the server...");
                    channel.writeAndFlush(ClientIdleStateTrigger.HEART_BEAT);
                } else {
                    System.err.println("The connection had broken, cancel the task that will send a heart beat.");
                    channel.closeFuture();
                    throw new RuntimeException();
                }
            }
        }, second, TimeUnit.SECONDS);

        future.addListener(new GenericFutureListener() {
            @Override
            public void operationComplete(Future future) throws Exception {
                if (future.isSuccess()) {
                    ping(channel);
                }
            }
        });
    }

    @Override
    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
        // 當Channel已經斷開的狀況下, 仍然發送數據, 會拋異常, 該方法會被調用.
        cause.printStackTrace();
        ctx.close();
    }
}

ClientHandlersInitializer —— 客戶端處理器集合的初始化類

public class ClientHandlersInitializer extends ChannelInitializer<SocketChannel> {

    private ReconnectHandler reconnectHandler;
    private EchoHandler echoHandler;

    public ClientHandlersInitializer(TcpClient tcpClient) {
        Assert.notNull(tcpClient, "TcpClient can not be null.");
        this.reconnectHandler = new ReconnectHandler(tcpClient);
        this.echoHandler = new EchoHandler();
    }

    @Override
    protected void initChannel(SocketChannel ch) throws Exception {
        ChannelPipeline pipeline = ch.pipeline();
        pipeline.addLast(new LengthFieldBasedFrameDecoder(Integer.MAX_VALUE, 0, 4, 0, 4));
        pipeline.addLast(new LengthFieldPrepender(4));
        pipeline.addLast(new StringDecoder(CharsetUtil.UTF_8));
        pipeline.addLast(new StringEncoder(CharsetUtil.UTF_8));
        pipeline.addLast(new Pinger());
    }
}
注: 上面的Handler集合,除了Pinger,其餘都是編解碼器和解決粘包,能夠忽略。

TcpClient —— TCP鏈接的客戶端

public class TcpClient {

    private String host;
    private int port;
    private Bootstrap bootstrap;
    /** 將<code>Channel</code>保存起來, 可用於在其餘非handler的地方發送數據 */
    private Channel channel;

    public TcpClient(String host, int port) {
        this(host, port, new ExponentialBackOffRetry(1000, Integer.MAX_VALUE, 60 * 1000));
    }

    public TcpClient(String host, int port, RetryPolicy retryPolicy) {
        this.host = host;
        this.port = port;
        init();
    }

    /**
     * 向遠程TCP服務器請求鏈接
     */
    public void connect() {
        synchronized (bootstrap) {
            ChannelFuture future = bootstrap.connect(host, port);
            this.channel = future.channel();
        }
    }

    private void init() {
        EventLoopGroup group = new NioEventLoopGroup();
        // bootstrap 可重用, 只需在TcpClient實例化的時候初始化便可.
        bootstrap = new Bootstrap();
        bootstrap.group(group)
                .channel(NioSocketChannel.class)
                .handler(new ClientHandlersInitializer(TcpClient.this));
    }

    public static void main(String[] args) {
        TcpClient tcpClient = new TcpClient("localhost", 2222);
        tcpClient.connect();
    }

}

Server端

ServerIdleStateTrigger —— 斷連觸發器

/**
 * <p>在規定時間內未收到客戶端的任何數據包, 將主動斷開該鏈接</p>
 */
public class ServerIdleStateTrigger extends ChannelInboundHandlerAdapter {
    @Override
    public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception {
        if (evt instanceof IdleStateEvent) {
            IdleState state = ((IdleStateEvent) evt).state();
            if (state == IdleState.READER_IDLE) {
                // 在規定時間內沒有收到客戶端的上行數據, 主動斷開鏈接
                ctx.disconnect();
            }
        } else {
            super.userEventTriggered(ctx, evt);
        }
    }
}

ServerBizHandler —— 服務器端的業務處理器

/**
 * <p>收到來自客戶端的數據包後, 直接在控制檯打印出來.</p>
 */
@ChannelHandler.Sharable
public class ServerBizHandler extends SimpleChannelInboundHandler<String> {

    private final String REC_HEART_BEAT = "I had received the heart beat!";

    @Override
    protected void channelRead0(ChannelHandlerContext ctx, String data) throws Exception {
        try {
            System.out.println("receive data: " + data);
//            ctx.writeAndFlush(REC_HEART_BEAT);
        } catch (Exception e) {
            e.printStackTrace();
        }
    }

    @Override
    public void channelActive(ChannelHandlerContext ctx) throws Exception {
        System.out.println("Established connection with the remote client.");

        // do something

        ctx.fireChannelActive();
    }

    @Override
    public void channelInactive(ChannelHandlerContext ctx) throws Exception {
        System.out.println("Disconnected with the remote client.");

        // do something

        ctx.fireChannelInactive();
    }

    @Override
    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
        cause.printStackTrace();
        ctx.close();
    }
}

ServerHandlerInitializer —— 服務器端處理器集合的初始化類

/**
 * <p>用於初始化服務器端涉及到的全部<code>Handler</code></p>
 */
public class ServerHandlerInitializer extends ChannelInitializer<SocketChannel> {

    protected void initChannel(SocketChannel ch) throws Exception {
        ch.pipeline().addLast("idleStateHandler", new IdleStateHandler(5, 0, 0));
        ch.pipeline().addLast("idleStateTrigger", new ServerIdleStateTrigger());
        ch.pipeline().addLast("frameDecoder", new LengthFieldBasedFrameDecoder(Integer.MAX_VALUE, 0, 4, 0, 4));
        ch.pipeline().addLast("frameEncoder", new LengthFieldPrepender(4));
        ch.pipeline().addLast("decoder", new StringDecoder());
        ch.pipeline().addLast("encoder", new StringEncoder());
        ch.pipeline().addLast("bizHandler", new ServerBizHandler());
    }

}
注:new IdleStateHandler(5, 0, 0)該handler表明若是在5秒內沒有收到來自客戶端的任何數據包(包括但不限於心跳包),將會主動斷開與該客戶端的鏈接。

TcpServer —— 服務器端

public class TcpServer {
    private int port;
    private ServerHandlerInitializer serverHandlerInitializer;

    public TcpServer(int port) {
        this.port = port;
        this.serverHandlerInitializer = new ServerHandlerInitializer();
    }

    public void start() {
        EventLoopGroup bossGroup = new NioEventLoopGroup(1);
        EventLoopGroup workerGroup = new NioEventLoopGroup();
        try {
            ServerBootstrap bootstrap = new ServerBootstrap();
            bootstrap.group(bossGroup, workerGroup)
                    .channel(NioServerSocketChannel.class)
                    .childHandler(this.serverHandlerInitializer);
            // 綁定端口,開始接收進來的鏈接
            ChannelFuture future = bootstrap.bind(port).sync();

            System.out.println("Server start listen at " + port);
            future.channel().closeFuture().sync();
        } catch (Exception e) {
            bossGroup.shutdownGracefully();
            workerGroup.shutdownGracefully();
            e.printStackTrace();
        }
    }

    public static void main(String[] args) throws Exception {
        int port = 2222;
        new TcpServer(port).start();
    }
}

至此,全部代碼已經編寫完畢。

測試

首先啓動客戶端,再啓動服務器端。啓動完成後,在客戶端的控制檯上,能夠看到打印以下相似日誌:

客戶端控制檯輸出的日誌

在服務器端能夠看到控制檯輸出了相似以下的日誌:

服務器端控制檯輸出的日誌

能夠看到,客戶端在發送4個心跳包後,第5個包由於等待時間較長,等到真正發送的時候,發現鏈接已斷開了;而服務器端收到客戶端的4個心跳數據包後,遲遲等不到下一個數據包,因此果斷斷開該鏈接。

異常狀況

在測試過程當中,有可能會出現以下狀況:

異常狀況

出現這種狀況的緣由是:在鏈接已斷開的狀況下,仍然向服務器端發送心跳包。雖然在發送心跳包以前會使用channel.isActive()判斷鏈接是否可用,但也有可能上一刻判斷結果爲可用,但下一刻發送數據包以前,鏈接就斷了。

目前還沒有找到優雅處理這種狀況的方案,各位看官若是有好的解決方案,還望不吝賜教。拜謝!!!

斷線重連

斷線重連這裏就不過多介紹,相信各位都知道是怎麼回事。這裏只說大體思路,而後直接上代碼。

實現思路

客戶端在監測到與服務器端的鏈接斷開後,或者一開始就沒法鏈接的狀況下,使用指定的重連策略進行重連操做,直到從新創建鏈接或重試次數耗盡。

對於如何監測鏈接是否斷開,則是經過重寫ChannelInboundHandler#channelInactive來實現,但鏈接不可用,該方法會被觸發,因此只須要在該方法作好重連工做便可。

代碼實現

注:如下代碼都是在上面心跳機制的基礎上修改/添加的。

由於斷線重連是客戶端的工做,因此只需對客戶端代碼進行修改。

重試策略

RetryPolicy —— 重試策略接口

public interface RetryPolicy {

    /**
     * Called when an operation has failed for some reason. This method should return
     * true to make another attempt.
     *
     * @param retryCount the number of times retried so far (0 the first time)
     * @return true/false
     */
    boolean allowRetry(int retryCount);

    /**
     * get sleep time in ms of current retry count.
     *
     * @param retryCount current retry count
     * @return the time to sleep
     */
    long getSleepTimeMs(int retryCount);
}

ExponentialBackOffRetry —— 重連策略的默認實現

/**
 * <p>Retry policy that retries a set number of times with increasing sleep time between retries</p>
 */
public class ExponentialBackOffRetry implements RetryPolicy {

    private static final int MAX_RETRIES_LIMIT = 29;
    private static final int DEFAULT_MAX_SLEEP_MS = Integer.MAX_VALUE;

    private final Random random = new Random();
    private final long baseSleepTimeMs;
    private final int maxRetries;
    private final int maxSleepMs;

    public ExponentialBackOffRetry(int baseSleepTimeMs, int maxRetries) {
        this(baseSleepTimeMs, maxRetries, DEFAULT_MAX_SLEEP_MS);
    }

    public ExponentialBackOffRetry(int baseSleepTimeMs, int maxRetries, int maxSleepMs) {
        this.maxRetries = maxRetries;
        this.baseSleepTimeMs = baseSleepTimeMs;
        this.maxSleepMs = maxSleepMs;
    }

    @Override
    public boolean allowRetry(int retryCount) {
        if (retryCount < maxRetries) {
            return true;
        }
        return false;
    }

    @Override
    public long getSleepTimeMs(int retryCount) {
        if (retryCount < 0) {
            throw new IllegalArgumentException("retries count must greater than 0.");
        }
        if (retryCount > MAX_RETRIES_LIMIT) {
            System.out.println(String.format("maxRetries too large (%d). Pinning to %d", maxRetries, MAX_RETRIES_LIMIT));
            retryCount = MAX_RETRIES_LIMIT;
        }
        long sleepMs = baseSleepTimeMs * Math.max(1, random.nextInt(1 << retryCount));
        if (sleepMs > maxSleepMs) {
            System.out.println(String.format("Sleep extension too large (%d). Pinning to %d", sleepMs, maxSleepMs));
            sleepMs = maxSleepMs;
        }
        return sleepMs;
    }
}

ReconnectHandler—— 重連處理器

@ChannelHandler.Sharable
public class ReconnectHandler extends ChannelInboundHandlerAdapter {

    private int retries = 0;
    private RetryPolicy retryPolicy;

    private TcpClient tcpClient;

    public ReconnectHandler(TcpClient tcpClient) {
        this.tcpClient = tcpClient;
    }

    @Override
    public void channelActive(ChannelHandlerContext ctx) throws Exception {
        System.out.println("Successfully established a connection to the server.");
        retries = 0;
        ctx.fireChannelActive();
    }

    @Override
    public void channelInactive(ChannelHandlerContext ctx) throws Exception {
        if (retries == 0) {
            System.err.println("Lost the TCP connection with the server.");
            ctx.close();
        }

        boolean allowRetry = getRetryPolicy().allowRetry(retries);
        if (allowRetry) {

            long sleepTimeMs = getRetryPolicy().getSleepTimeMs(retries);

            System.out.println(String.format("Try to reconnect to the server after %dms. Retry count: %d.", sleepTimeMs, ++retries));

            final EventLoop eventLoop = ctx.channel().eventLoop();
            eventLoop.schedule(() -> {
                System.out.println("Reconnecting ...");
                tcpClient.connect();
            }, sleepTimeMs, TimeUnit.MILLISECONDS);
        }
        ctx.fireChannelInactive();
    }

    private RetryPolicy getRetryPolicy() {
        if (this.retryPolicy == null) {
            this.retryPolicy = tcpClient.getRetryPolicy();
        }
        return this.retryPolicy;
    }
}

ClientHandlersInitializer

在以前的基礎上,添加了重連處理器ReconnectHandler。

public class ClientHandlersInitializer extends ChannelInitializer<SocketChannel> {

    private ReconnectHandler reconnectHandler;
    private EchoHandler echoHandler;

    public ClientHandlersInitializer(TcpClient tcpClient) {
        Assert.notNull(tcpClient, "TcpClient can not be null.");
        this.reconnectHandler = new ReconnectHandler(tcpClient);
        this.echoHandler = new EchoHandler();
    }

    @Override
    protected void initChannel(SocketChannel ch) throws Exception {
        ChannelPipeline pipeline = ch.pipeline();
        pipeline.addLast(this.reconnectHandler);
        pipeline.addLast(new LengthFieldBasedFrameDecoder(Integer.MAX_VALUE, 0, 4, 0, 4));
        pipeline.addLast(new LengthFieldPrepender(4));
        pipeline.addLast(new StringDecoder(CharsetUtil.UTF_8));
        pipeline.addLast(new StringEncoder(CharsetUtil.UTF_8));
        pipeline.addLast(new Pinger());
    }
}

TcpClient

在以前的基礎上添加劇連、重連策略的支持。

public class TcpClient {

    private String host;
    private int port;
    private Bootstrap bootstrap;
    /** 重連策略 */
    private RetryPolicy retryPolicy;
    /** 將<code>Channel</code>保存起來, 可用於在其餘非handler的地方發送數據 */
    private Channel channel;

    public TcpClient(String host, int port) {
        this(host, port, new ExponentialBackOffRetry(1000, Integer.MAX_VALUE, 60 * 1000));
    }

    public TcpClient(String host, int port, RetryPolicy retryPolicy) {
        this.host = host;
        this.port = port;
        this.retryPolicy = retryPolicy;
        init();
    }

    /**
     * 向遠程TCP服務器請求鏈接
     */
    public void connect() {
        synchronized (bootstrap) {
            ChannelFuture future = bootstrap.connect(host, port);
            future.addListener(getConnectionListener());
            this.channel = future.channel();
        }
    }

    public RetryPolicy getRetryPolicy() {
        return retryPolicy;
    }

    private void init() {
        EventLoopGroup group = new NioEventLoopGroup();
        // bootstrap 可重用, 只需在TcpClient實例化的時候初始化便可.
        bootstrap = new Bootstrap();
        bootstrap.group(group)
                .channel(NioSocketChannel.class)
                .handler(new ClientHandlersInitializer(TcpClient.this));
    }

    private ChannelFutureListener getConnectionListener() {
        return new ChannelFutureListener() {
            @Override
            public void operationComplete(ChannelFuture future) throws Exception {
                if (!future.isSuccess()) {
                    future.channel().pipeline().fireChannelInactive();
                }
            }
        };
    }

    public static void main(String[] args) {
        TcpClient tcpClient = new TcpClient("localhost", 2222);
        tcpClient.connect();
    }

}

測試

在測試以前,爲了避開 Connection reset by peer 異常,能夠稍微修改Pinger的ping()方法,添加if (second == 5)的條件判斷。以下:

private void ping(Channel channel) {
        int second = Math.max(1, random.nextInt(baseRandom));
        if (second == 5) {
            second = 6;
        }
        System.out.println("next heart beat will send after " + second + "s.");
        ScheduledFuture<?> future = channel.eventLoop().schedule(new Runnable() {
            @Override
            public void run() {
                if (channel.isActive()) {
                    System.out.println("sending heart beat to the server...");
                    channel.writeAndFlush(ClientIdleStateTrigger.HEART_BEAT);
                } else {
                    System.err.println("The connection had broken, cancel the task that will send a heart beat.");
                    channel.closeFuture();
                    throw new RuntimeException();
                }
            }
        }, second, TimeUnit.SECONDS);

        future.addListener(new GenericFutureListener() {
            @Override
            public void operationComplete(Future future) throws Exception {
                if (future.isSuccess()) {
                    ping(channel);
                }
            }
        });
    }

啓動客戶端

先只啓動客戶端,觀察控制檯輸出,能夠看到相似以下日誌:

斷線重連測試——客戶端控制檯輸出

能夠看到,當客戶端發現沒法鏈接到服務器端,因此一直嘗試重連。隨着重試次數增長,重試時間間隔越大,但又不想無限增大下去,因此須要定一個閾值,好比60s。如上圖所示,當下一次重試時間超過60s時,會打印Sleep extension too large(*). Pinning to 60000,單位爲ms。出現這句話的意思是,計算出來的時間超過閾值(60s),因此把真正睡眠的時間重置爲閾值(60s)。

啓動服務器端

接着啓動服務器端,而後繼續觀察客戶端控制檯輸出。

斷線重連測試——服務器端啓動後客戶端控制檯輸出

能夠看到,在第9次重試失敗後,第10次重試以前,啓動的服務器,因此第10次重連的結果爲Successfully established a connection to the server.,即成功鏈接到服務器。接下來由於仍是不定時ping服務器,因此出現斷線重連、斷線重連的循環。

擴展

在不一樣環境,可能會有不一樣的重連需求。有不一樣的重連需求的,只需本身實現RetryPolicy接口,而後在建立TcpClient的時候覆蓋默認的重連策略便可。

完!!!

推薦去個人博客閱讀更多:

1.Java JVM、集合、多線程、新特性系列教程

2.Spring MVC、Spring Boot、Spring Cloud 系列教程

3.Maven、Git、Eclipse、Intellij IDEA 系列工具教程

4.Java、後端、架構、阿里巴巴等大廠最新面試題

以爲不錯,別忘了點贊+轉發哦!

相關文章
相關標籤/搜索