Netty就是這麼回事(七)

這一章,主要介紹下Netty的心跳處理,心跳處理在通訊開發中是最經常使用的,服務端經過心跳能夠監控客戶端的連接狀態,進行相應的處理。java

記得,以前用NIO作了一個客戶端和服務端通訊的項目,客戶端並非用java寫的,並且一個嵌入式的設備,走的lwapp協議棧,有時候嵌入式設備點擊復位或者直接掉電後,服務端尚未反應過來,還認爲連接是鏈接狀態,資源也就是一直沒有獲得釋放。早在BIO的時候經過檢測返回值是不是-1,異常捕獲,setSoTimeout(超時時間)來肯定客戶端是否鏈接有效。到了nio只能本身實現一個心跳檢測,很是的麻煩。好在Netty爲咱們提供了IdleStateHandler類來完成心跳檢測功能,它很是簡單,只有三個參數:public IdleStateHandler(int readerIdleTimeSeconds, int writerIdleTimeSeconds, int allIdleTimeSeconds) 讀超時時間,寫超時時間,讀寫超時時間,而後實現用戶事件觸發監聽userEventTriggered這個方法,在這個方法裏作相應的處理就能夠了,是否是很是的方便!bootstrap

下面來看一下服務端的代碼:服務器

package com.dlb.note.server;

import io.netty.bootstrap.ServerBootstrap;
import io.netty.buffer.ByteBuf;
import io.netty.buffer.Unpooled;
import io.netty.channel.*;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.nio.NioServerSocketChannel;
import io.netty.handler.timeout.IdleState;
import io.netty.handler.timeout.IdleStateEvent;
import io.netty.handler.timeout.IdleStateHandler;

/**
 * 功能:心跳時間服務器
 * 版本:1.0
 * 日期:2016/12/13 10:51
 * 做者:馟蘇
 */
public class IdleTimerServer {
    /**
     * 主函數
     */
    public static void main(String []args) {
        // 配置服務端的NIO線程池
        EventLoopGroup bossGroup = new NioEventLoopGroup();
        EventLoopGroup workerGroup = new NioEventLoopGroup();

        try {
            ServerBootstrap serverBootstrap = new ServerBootstrap();
            serverBootstrap.group(bossGroup, workerGroup)
                    .channel(NioServerSocketChannel.class)
                    // 當服務器請求處理線程全滿時,用於臨時存放已完成三次握手的請求的隊列的最大長度
                    .option(ChannelOption.SO_BACKLOG, 1024)
                    .childHandler(new ChannelInitializer() {
                        protected void initChannel(Channel channel) throws Exception {
                            // 添加心跳處理器 5s讀,5s寫,10s讀寫
                            channel.pipeline().addLast(new IdleStateHandler(5, 5, 10));
                            channel.pipeline().addLast(new IdleTimerServerHandler());
                        }
                    });

            // 綁定端口,同步等待成功
            ChannelFuture future = serverBootstrap.bind(8888).sync();
            System.out.println("服務器在8888端口監聽hello");

            // 等待服務端監聽端口關閉
            future.channel().closeFuture().sync();
            System.out.println("服務器關閉bye");
        } catch (Exception e) {
            e.printStackTrace();
        } finally {
            // 優雅退出,釋放線程池資源
            bossGroup.shutdownGracefully();
            workerGroup.shutdownGracefully();
        }
    }
}

class IdleTimerServerHandler extends ChannelHandlerAdapter {
    // 可讀
    @Override
    public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
        // 讀數據
        ByteBuf buf = (ByteBuf) msg;

        byte[] req = new byte[buf.readableBytes()];
        buf.readBytes(req);

        String body = new String(req, "UTF-8");
        System.out.println("receive:" + body);

        // 寫數據
        ByteBuf res = Unpooled.copiedBuffer("hello,client!".getBytes());
        ctx.write(res);
        ctx.flush();
    }

    /**
     * 用戶事件觸發
     * @param ctx
     * @param evt
     * @throws Exception
     */
    @Override
    public void userEventTriggered(final ChannelHandlerContext ctx, Object evt) throws Exception {
        if(evt instanceof IdleStateEvent){ // 接受心跳事件
            IdleStateEvent event = (IdleStateEvent)evt;

            if(event.state() == IdleState.ALL_IDLE){ // 讀和寫狀態
                System.out.println("心跳結束");
                //清除超時會話
                ByteBuf res = Unpooled.copiedBuffer("you will close!".getBytes());
                ChannelFuture writeAndFlush = ctx.writeAndFlush(res);
                // 監聽結果
                writeAndFlush.addListener(new ChannelFutureListener() {
                    public void operationComplete(ChannelFuture channelFuture) throws Exception {
                        ctx.channel().close();
                    }
                });
            }
        }
    }

    // 鏈接
    @Override
    public void channelActive(ChannelHandlerContext ctx) throws Exception {
        System.out.println("client come,ip:" + ctx.channel().remoteAddress());
    }

    // 關閉
    @Override
    public void channelInactive(ChannelHandlerContext ctx) throws Exception {
        System.out.println("client close,ip:" + ctx.channel().remoteAddress());
        ctx.close();
    }

    // 異常
    @Override
    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
        System.out.println(cause.toString());
        ctx.close();
}
}
相關文章
相關標籤/搜索