這一章,主要介紹下Netty的心跳處理,心跳處理在通訊開發中是最經常使用的,服務端經過心跳能夠監控客戶端的連接狀態,進行相應的處理。java
記得,以前用NIO作了一個客戶端和服務端通訊的項目,客戶端並非用java寫的,並且一個嵌入式的設備,走的lwapp協議棧,有時候嵌入式設備點擊復位或者直接掉電後,服務端尚未反應過來,還認爲連接是鏈接狀態,資源也就是一直沒有獲得釋放。早在BIO的時候經過檢測返回值是不是-1,異常捕獲,setSoTimeout(超時時間)來肯定客戶端是否鏈接有效。到了nio只能本身實現一個心跳檢測,很是的麻煩。好在Netty爲咱們提供了IdleStateHandler類來完成心跳檢測功能,它很是簡單,只有三個參數:public IdleStateHandler(int readerIdleTimeSeconds, int writerIdleTimeSeconds, int allIdleTimeSeconds) 讀超時時間,寫超時時間,讀寫超時時間,而後實現用戶事件觸發監聽userEventTriggered這個方法,在這個方法裏作相應的處理就能夠了,是否是很是的方便!bootstrap
下面來看一下服務端的代碼:服務器
package com.dlb.note.server; import io.netty.bootstrap.ServerBootstrap; import io.netty.buffer.ByteBuf; import io.netty.buffer.Unpooled; import io.netty.channel.*; import io.netty.channel.nio.NioEventLoopGroup; import io.netty.channel.socket.nio.NioServerSocketChannel; import io.netty.handler.timeout.IdleState; import io.netty.handler.timeout.IdleStateEvent; import io.netty.handler.timeout.IdleStateHandler; /** * 功能:心跳時間服務器 * 版本:1.0 * 日期:2016/12/13 10:51 * 做者:馟蘇 */ public class IdleTimerServer { /** * 主函數 */ public static void main(String []args) { // 配置服務端的NIO線程池 EventLoopGroup bossGroup = new NioEventLoopGroup(); EventLoopGroup workerGroup = new NioEventLoopGroup(); try { ServerBootstrap serverBootstrap = new ServerBootstrap(); serverBootstrap.group(bossGroup, workerGroup) .channel(NioServerSocketChannel.class) // 當服務器請求處理線程全滿時,用於臨時存放已完成三次握手的請求的隊列的最大長度 .option(ChannelOption.SO_BACKLOG, 1024) .childHandler(new ChannelInitializer() { protected void initChannel(Channel channel) throws Exception { // 添加心跳處理器 5s讀,5s寫,10s讀寫 channel.pipeline().addLast(new IdleStateHandler(5, 5, 10)); channel.pipeline().addLast(new IdleTimerServerHandler()); } }); // 綁定端口,同步等待成功 ChannelFuture future = serverBootstrap.bind(8888).sync(); System.out.println("服務器在8888端口監聽hello"); // 等待服務端監聽端口關閉 future.channel().closeFuture().sync(); System.out.println("服務器關閉bye"); } catch (Exception e) { e.printStackTrace(); } finally { // 優雅退出,釋放線程池資源 bossGroup.shutdownGracefully(); workerGroup.shutdownGracefully(); } } } class IdleTimerServerHandler extends ChannelHandlerAdapter { // 可讀 @Override public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception { // 讀數據 ByteBuf buf = (ByteBuf) msg; byte[] req = new byte[buf.readableBytes()]; buf.readBytes(req); String body = new String(req, "UTF-8"); System.out.println("receive:" + body); // 寫數據 ByteBuf res = Unpooled.copiedBuffer("hello,client!".getBytes()); ctx.write(res); ctx.flush(); } /** * 用戶事件觸發 * @param ctx * @param evt * @throws Exception */ @Override public void userEventTriggered(final ChannelHandlerContext ctx, Object evt) throws Exception { if(evt instanceof IdleStateEvent){ // 接受心跳事件 IdleStateEvent event = (IdleStateEvent)evt; if(event.state() == IdleState.ALL_IDLE){ // 讀和寫狀態 System.out.println("心跳結束"); //清除超時會話 ByteBuf res = Unpooled.copiedBuffer("you will close!".getBytes()); ChannelFuture writeAndFlush = ctx.writeAndFlush(res); // 監聽結果 writeAndFlush.addListener(new ChannelFutureListener() { public void operationComplete(ChannelFuture channelFuture) throws Exception { ctx.channel().close(); } }); } } } // 鏈接 @Override public void channelActive(ChannelHandlerContext ctx) throws Exception { System.out.println("client come,ip:" + ctx.channel().remoteAddress()); } // 關閉 @Override public void channelInactive(ChannelHandlerContext ctx) throws Exception { System.out.println("client close,ip:" + ctx.channel().remoteAddress()); ctx.close(); } // 異常 @Override public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception { System.out.println(cause.toString()); ctx.close(); } }