本博客 貓叔的博客,轉載請申明出處閱讀本文約 「4分鐘」html
適讀人羣:Java-Netty 初級java
版本:netty 4.1.*申明:本文旨在從新分享討論Netty官方相關案例,添加部分我的理解與要點解析。git
這個是InChat的案例地址,裏面補充了詳細的註釋,比起官方會容易看一點。github
官方案例地址:https://netty.io/4.1/xref/io/...api
一個對Channel還沒有執行讀、寫或兩次操做的觸發器服務器
屬性 | 含義 |
---|---|
readerIdleTime | 在IdleStateEvent其狀態IdleState.READER_IDLE 時的指定時間段沒有執行讀操做將被觸發。指定0禁用。 |
writerIdleTime | 在IdleStateEvent其狀態IdleState.WRITER_IDLE 時的指定時間段沒有執行寫操做將被觸發。指定0禁用。 |
allIdleTime | 一個IdleStateEvent其狀態IdleState.ALL_IDLE 時的時間在規定的時間進行讀取和寫入都將被觸發。指定0禁用。 |
以下一個在沒有信息時發送ping消息,且30秒沒有入站信息則關閉鏈接app
public class MyChannelInitializer extends ChannelInitializer<Channel> { @Override public void initChannel(Channel channel) { channel.pipeline().addLast("idleStateHandler", new IdleStateHandler(60, 30, 0)); channel.pipeline().addLast("myHandler", new MyHandler()); } } // Handler should handle the IdleStateEvent triggered by IdleStateHandler. public class MyHandler extends ChannelDuplexHandler { @Override public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception { if (evt instanceof IdleStateEvent) { IdleStateEvent e = (IdleStateEvent) evt; if (e.state() == IdleState.READER_IDLE) { ctx.close(); } else if (e.state() == IdleState.WRITER_IDLE) { ctx.writeAndFlush(new PingMessage()); } } } }
/** * Created by MySelf on 2019/8/27. */ public final class UptimeClient { static final String HOST = System.getProperty("host", "127.0.0.1"); static final int PORT = Integer.parseInt(System.getProperty("port", "8080")); // 從新鏈接前睡眠5秒 static final int RECONNECT_DELAY = Integer.parseInt(System.getProperty("reconnectDelay", "5")); // 當服務器在 10 秒內不發送任何內容時從新鏈接。 private static final int READ_TIMEOUT = Integer.parseInt(System.getProperty("readTimeout", "10")); private static final UptimeClientHandler handler = new UptimeClientHandler(); private static final Bootstrap bs = new Bootstrap(); public static void main(String[] args) throws Exception { EventLoopGroup group = new NioEventLoopGroup(); bs.group(group) .channel(NioSocketChannel.class) .remoteAddress(HOST,PORT) .handler(new ChannelInitializer<SocketChannel>() { @Override protected void initChannel(SocketChannel ch) throws Exception { ch.pipeline().addLast(new IdleStateHandler(READ_TIMEOUT,0,0),handler); } }); bs.connect(); } static void connect(){ bs.connect().addListener(new ChannelFutureListener() { @Override public void operationComplete(ChannelFuture future) throws Exception { if (future.cause() != null){ handler.startTime = -1; handler.println("Failed to connect:" + future.cause()); } } }); } }
/** * Created by MySelf on 2019/8/27. */ @ChannelHandler.Sharable public class UptimeClientHandler extends SimpleChannelInboundHandler<Object> { long startTime = -1; @Override protected void channelRead0(ChannelHandlerContext ctx, Object msg) throws Exception { //Discard received data } @Override public void channelActive(ChannelHandlerContext ctx) throws Exception { if (startTime < 0){ startTime = System.currentTimeMillis(); } println("Connected to:" + ctx.channel().remoteAddress()); } @Override public void channelInactive(ChannelHandlerContext ctx) throws Exception { println("Disconnected from: " + ctx.channel().remoteAddress()); } @Override public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception { if (!(evt instanceof IdleStateEvent)){ return; } IdleStateEvent e = (IdleStateEvent)evt; if (e.state() == IdleState.READER_IDLE){ // 鏈接正常,可是沒有讀信息,關閉鏈接 println("Disconnecting due to no inbound traffic"); ctx.close(); } } @Override public void channelUnregistered(ChannelHandlerContext ctx) throws Exception { // 睡眠5秒 println("Sleeping for:" + UptimeClient.RECONNECT_DELAY + 's'); // 啓動線程從新鏈接 ctx.channel().eventLoop().schedule(new Runnable() { @Override public void run() { println("Reconnecting to:" + UptimeClient.HOST + ":" + UptimeClient.PORT); UptimeClient.connect(); } },UptimeClient.RECONNECT_DELAY, TimeUnit.SECONDS); } @Override public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception { cause.printStackTrace(); ctx.close(); } void println(String msg){ if (startTime < 0){ System.err.format("[SERVER IS DOWN] %s%n",msg); } else { System.err.format("[UPTIME: %5ds] %s%n",(System.currentTimeMillis() - startTime)/1000,msg); } } }
/** * Created by MySelf on 2019/8/27. */ public final class UptimeServer { private static final int PORT = Integer.parseInt(System.getProperty("port", "8080")); private static final UptimeServerHandler handler = new UptimeServerHandler(); private UptimeServer(){} public static void main(String[] args) throws Exception { EventLoopGroup bossGroup = new NioEventLoopGroup(); EventLoopGroup workerGroup = new NioEventLoopGroup(); try { ServerBootstrap b = new ServerBootstrap(); b.group(bossGroup,workerGroup) .channel(NioServerSocketChannel.class) .handler(new LoggingHandler(LogLevel.INFO)) .childHandler(new ChannelInitializer<SocketChannel>() { @Override protected void initChannel(SocketChannel ch) throws Exception { ch.pipeline().addLast(handler); } }); // Bind and start to accept incoming connections. ChannelFuture f = b.bind(PORT).sync(); // Wait until the server socket is closed. // In this example, this does not happen, but you can do that to gracefully // shut down your server. f.channel().closeFuture().sync(); }finally { workerGroup.shutdownGracefully(); bossGroup.shutdownGracefully(); } } }
/** * Created by MySelf on 2019/8/27. */ @ChannelHandler.Sharable public class UptimeServerHandler extends SimpleChannelInboundHandler<Object> { @Override public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception { // Close the connection when an exception is raised. cause.printStackTrace(); ctx.close(); } @Override protected void channelRead0(ChannelHandlerContext ctx, Object msg) throws Exception { // discard } }