Netty實現心跳機制

netty心跳機制示例,使用Netty實現心跳機制,使用netty4,IdleStateHandler 實現。Netty心跳機制,netty心跳檢測,netty,心跳html

本文假設你已經瞭解了Netty的使用,或者至少寫過netty的helloworld,知道了netty的基本使用。咱們知道使用netty的時候,大多數的東西都與Handler有關,咱們的業務邏輯基本都是在Handler中實現的。Netty中自帶了一個IdleStateHandler 能夠用來實現心跳檢測。java

心跳檢測的邏輯

本文中咱們將要實現的心跳檢測邏輯是這樣的:服務端啓動後,等待客戶端鏈接,客戶端鏈接以後,向服務端發送消息。若是客戶端在「幹活」那麼服務端一定會收到數據,若是客戶端「閒下來了」那麼服務端就接收不到這個客戶端的消息,既然客戶端閒下來了,不幹事,那麼何須浪費鏈接資源呢?因此服務端檢測到必定時間內客戶端不活躍的時候,將客戶端鏈接關閉。本文要實現的邏輯步驟爲:git

  1. 啓動服務端,啓動客戶端
  2. 客戶端向服務端發送"I am alive",並sleep隨機時間,用來模擬空閒。
  3. 服務端接收客戶端消息,並返回"copy that",客戶端空閒時 計數+1.
  4. 服務端客戶端繼續通訊
  5. 服務端檢測客戶端空閒太多,關閉鏈接。客戶端發現鏈接關閉了,就退出了。

有了這個思路,咱們先來編寫服務端。redis

心跳檢測服務端代碼

public class HeartBeatServer {

    int port ;
    public HeartBeatServer(int port){
        this.port = port;
    }

    public void start(){
        ServerBootstrap bootstrap = new ServerBootstrap();
        EventLoopGroup boss = new NioEventLoopGroup();
        EventLoopGroup worker = new NioEventLoopGroup();
        try{
            bootstrap.group(boss,worker)
                    .handler(new LoggingHandler(LogLevel.INFO))
                    .channel(NioServerSocketChannel.class)
                    .childHandler(new HeartBeatInitializer());

            ChannelFuture future = bootstrap.bind(port).sync();
            future.channel().closeFuture().sync();
        }catch(Exception e){
            e.printStackTrace();
        }finally {
            worker.shutdownGracefully();
            boss.shutdownGracefully();
        }
    }
    public static void main(String[] args) throws Exception {
        HeartBeatServer server = new HeartBeatServer(8090);
        server.start();
    }
}

熟悉netty的同志,對於上面的模板同樣的代碼必定是在熟悉不過了。啥都不用看,只須要看childHandler(new HeartBeatInitializer()) 這一句。HeartBeatInitializer就是一個ChannelInitializer顧名思義,他就是在初始化channel的時作一些事情。咱們所須要開發的業務邏輯Handler就是在這裏添加的。其代碼以下:spring

public class HeartBeatInitializer extends ChannelInitializer<Channel> {

    @Override
    protected void initChannel(Channel channel) throws Exception {
        ChannelPipeline pipeline = channel.pipeline();
        pipeline.addLast("decoder", new StringDecoder());
        pipeline.addLast("encoder", new StringEncoder());
        pipeline.addLast(new IdleStateHandler(2,2,2, TimeUnit.SECONDS));
        pipeline.addLast(new HeartBeatHandler());
    }
}

代碼很簡單,咱們先添加了StringDecoder,和StringEncoder。這兩個其實就是編解碼用的,下面的IdleStateHandler纔是本次心跳的核心組件。咱們能夠看到IdleStateHandler的構造函數中接收了4個參數,其定義以下:bootstrap

public IdleStateHandler(long readerIdleTime, long writerIdleTime, long allIdleTime, TimeUnit unit);

三個空閒時間參數,以及時間參數的格式。咱們的例子中設置的是2,2,2,意思就是客戶端2秒沒有讀/寫,這個超時時間就會被觸發。超時事件觸發就須要咱們來處理了,這就是上的HeartBeatInitializer中最後一行的HeartBeatHandler所作的事情。代碼以下:api

public class HeartBeatHandler extends SimpleChannelInboundHandler<String> {

    int readIdleTimes = 0;

    @Override
    protected void channelRead0(ChannelHandlerContext ctx, String s) throws Exception {
        System.out.println(" ====== > [server] message received : " + s);
       if("I am alive".equals(s)){
            ctx.channel().writeAndFlush("copy that");
        }else {
           System.out.println(" 其餘信息處理 ... ");
       }
    }

    @Override
    public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception {
        IdleStateEvent event = (IdleStateEvent)evt;

        String eventType = null;
        switch (event.state()){
            case READER_IDLE:
                eventType = "讀空閒";
                readIdleTimes ++; // 讀空閒的計數加1
                break;
            case WRITER_IDLE:
                eventType = "寫空閒";
                // 不處理
                break;
            case ALL_IDLE:
                eventType ="讀寫空閒";
                // 不處理
                break;
        }
        System.out.println(ctx.channel().remoteAddress() + "超時事件:" +eventType);
        if(readIdleTimes > 3){
            System.out.println(" [server]讀空閒超過3次,關閉鏈接");
            ctx.channel().writeAndFlush("you are out");
            ctx.channel().close();
        }
    }
    @Override
    public void channelActive(ChannelHandlerContext ctx) throws Exception {
        System.err.println("=== " + ctx.channel().remoteAddress() + " is active ===");
    }

}

至此,咱們的服務端寫好了。服務器

心跳檢測客戶端代碼

netty的api設計使得編碼的模式很是具備通用性,因此客戶端代碼和服務端的代碼幾乎同樣:啓動client端的代碼幾乎同樣,也須要一個ChannelInitializer,也須要Handler。改動的地方不多,所以本文不對客戶端代碼進行詳細解釋。下面給出client端的完整代碼:app

public class HeartBeatClient  {

    int port;
    Channel channel;
    Random random ;

    public HeartBeatClient(int port){
        this.port = port;
        random = new Random();
    }
    public static void main(String[] args) throws Exception{
        HeartBeatClient client = new HeartBeatClient(8090);
        client.start();
    }

    public void start() {
        EventLoopGroup eventLoopGroup = new NioEventLoopGroup();
        try{
            Bootstrap bootstrap = new Bootstrap();
            bootstrap.group(eventLoopGroup).channel(NioSocketChannel.class)
                    .handler(new HeartBeatClientInitializer());

            connect(bootstrap,port);
            String  text = "I am alive";
            while (channel.isActive()){
                sendMsg(text);
            }
        }catch(Exception e){
            // do something
        }finally {
            eventLoopGroup.shutdownGracefully();
        }
    }

    public void connect(Bootstrap bootstrap,int port) throws Exception{
        channel = bootstrap.connect("localhost",8090).sync().channel();
    }

    public void sendMsg(String text) throws Exception{
        int num = random.nextInt(10);
        Thread.sleep(num * 1000);
        channel.writeAndFlush(text);
    }

    static class HeartBeatClientInitializer extends ChannelInitializer<Channel> {

        @Override
        protected void initChannel(Channel ch) throws Exception {
            ChannelPipeline pipeline = ch.pipeline();
            pipeline.addLast("decoder", new StringDecoder());
            pipeline.addLast("encoder", new StringEncoder());
            pipeline.addLast(new HeartBeatClientHandler());
        }
    }

    static class HeartBeatClientHandler extends SimpleChannelInboundHandler<String> {
        @Override
        protected void channelRead0(ChannelHandlerContext ctx, String msg) throws Exception {
            System.out.println(" client received :" +msg);
            if(msg!= null && msg.equals("you are out")) {
                System.out.println(" server closed connection , so client will close too");
                ctx.channel().closeFuture();
            }
        }
    }
}

運行代碼

在上面的代碼寫好以後,咱們先啓動服務端,而後在啓動客戶端。運行日誌以下:dom

server端:

=== /127.0.0.1:57700 is active ===
 ====== > [server] message received : I am alive
 ====== > [server] message received : I am alive
/127.0.0.1:57700超時事件:寫空閒
/127.0.0.1:57700超時事件:讀空閒
/127.0.0.1:57700超時事件:讀寫空閒
/127.0.0.1:57700超時事件:寫空閒
/127.0.0.1:57700超時事件:讀空閒
/127.0.0.1:57700超時事件:讀寫空閒
/127.0.0.1:57700超時事件:寫空閒
 ====== > [server] message received : I am alive
/127.0.0.1:57700超時事件:寫空閒
/127.0.0.1:57700超時事件:讀寫空閒
/127.0.0.1:57700超時事件:讀空閒
/127.0.0.1:57700超時事件:寫空閒
/127.0.0.1:57700超時事件:讀寫空閒
/127.0.0.1:57700超時事件:讀空閒
 [server]讀空閒超過3次,關閉鏈接

client端:

client sent msg and sleep 2
 client received :copy that
 client received :copy that
 client sent msg and sleep 6
 client sent msg and sleep 6
 client received :copy that
 client received :you are out
 server closed connection , so client will close too

Process finished with exit code 0

經過上面的運行日誌,咱們能夠看到:

1.客戶端在與服務器成功創建以後,發送了3次'I am alive',服務端也迴應了3次:'copy that'

2.因爲客戶端消極怠工,超時了屢次,服務端關閉了連接。

3.客戶端知道服務端拋棄本身以後,也關閉了鏈接,程序退出。

以上簡單了演示了一下,netty的心跳機制,其實主要就是使用了IdleStateHandler源碼下載


使用Netty實現HTTP服務器
Netty實現心跳機制
Netty開發redis客戶端,Netty發送redis命令,netty解析redis消息
Netty系列

spring如何啓動的?這裏結合spring源碼描述了啓動過程
SpringMVC是怎麼工做的,SpringMVC的工做原理
spring 異常處理。結合spring源碼分析400異常處理流程及解決方法
Mybatis Mapper接口是如何找到實現類的-源碼分析

相關文章
相關標籤/搜索